博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ZooKeeper编程 基础教程
阅读量:2392 次
发布时间:2019-05-10

本文共 20749 字,大约阅读时间需要 69 分钟。

介绍

在本教程中,我们将使用ZooKeeper来显示简单的barriers和生产者-消费者队列的实现。我们将各自的类称为Barrier和Queue。这些示例假设您至少运行了一个ZooKeeper服务器。

两个都使用下面的代码来实现:

static ZooKeeper zk = null;    static Integer mutex;    String root;    SyncPrimitive(String address) {        if(zk == null){            try {                System.out.println("Starting ZK:");                zk = new ZooKeeper(address, 3000, this);                mutex = new Integer(-1);                System.out.println("Finished starting ZK: " + zk);            } catch (IOException e) {                System.out.println(e.toString());                zk = null;            }        }    }    synchronized public void process(WatchedEvent event) {        synchronized (mutex) {            mutex.notify();        }    }

继承SyncPrimitive。通过这种方式,我们执行SyncPrimitive的构造函数中。保持简单的例子,我们创建一个ZooKeeper对象我们第一次实例化barrier对象或队列对象,我们声明一个静态变量,是对该对象的引用。随后的屏障和队列检查是否一个ZooKeeper对象的实例存在。或者,我们可以应用程序创建一个ZooKeeper并将其传递给构造函数的屏障和队列。我们process()方法来处理通知触发。在接下来的讨论中,我们目前的代码集watches。watches是内部结构,使Zookeeper能够通知客户更改一个节点。举个例子,如果一个客户正在等待其他客户留下一个barriers,那么它可以设置一个watches,等待修改某个节点,从而表明它是结束等待。这一点变得很清晰。

A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node “/b1”. Each process “p” then creates a node “/b1/p”. Once enough processes have created their corresponding nodes, joined processes can start the computation.

In this example, each process instantiates a Barrier object, and its constructor takes as parameters:

the address of a ZooKeeper server (e.g., “zoo1.foo.com:2181”)

the path of the barrier node on ZooKeeper (e.g., “/b1”)

the size of the group of processes

The constructor of Barrier passes the address of the Zookeeper server to the constructor of the parent class. The parent class creates a ZooKeeper instance if one does not exist. The constructor of Barrier then creates a barrier node on ZooKeeper, which is the parent node of all process nodes, and we call root (Note: This is not the ZooKeeper root “/”).

Barriers
发布Barriers 一个障碍是一个原始, 打开集群过程同步进程的开始和结束。这个实现的总体想法是实现一个barrier节点的目的是为父节点为个体过程节点。假设有一个Barriers节点“/b1”。每个进程“p”然后创建一个节点“/b1/p”。一旦有进程创建了相应的节点, 加入过程可以开始计算。在这个例子中,每个流程实例化一个Barriers物体,及其构造函数参数:zookeeper服务器的地址(例如,zoo1.foo.com:2181)zookeeper障碍的路径节点(如“/b1”)组的大小的过程Barriers的构造函数将zookeeper服务器的地址传递给父类的构造函数。父类创建一个Zookeeper实例如果不存在。屏障的构造函数节点在动物园管理员,然后创建一个Barriers是所有流程节点的父节点,和我们称之为根(注:这不是Zookeeper根“/”)。

/**         * Barrier constructor         *         * @param address         * @param root         * @param size         */        Barrier(String address, String root, int size) {            super(address);            this.root = root;            this.size = size;            // Create barrier node            if (zk != null) {                try {                    Stat s = zk.exists(root, false);                    if (s == null) {                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,                                CreateMode.PERSISTENT);                    }                } catch (KeeperException e) {                    System.out                            .println("Keeper exception when instantiating queue: "                                    + e.toString());                } catch (InterruptedException e) {                    System.out.println("Interrupted exception");                }            }            // My node name            try {                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());            } catch (UnknownHostException e) {                System.out.println(e.toString());            }        }

To enter the barrier, a process calls enter(). The process creates a node under the root to represent it, using its host name to form the node name. It then wait until enough processes have entered the barrier. A process does it by checking the number of children the root node has with “getChildren()”, and waiting for notifications in the case it does not have enough. To receive a notification when there is a change to the root node, a process has to set a watch, and does it through the call to “getChildren()”. In the code, we have that “getChildren()” has two parameters. The first one states the node to read from, and the second is a boolean flag that enables the process to set a watch. In the code the flag is true.

/**         * Join barrier         *         * @return         * @throws KeeperException         * @throws InterruptedException         */        boolean enter() throws KeeperException, InterruptedException{            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,                    CreateMode.EPHEMERAL_SEQUENTIAL);            while (true) {                synchronized (mutex) {                    List
list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } }

Note that enter() throws both KeeperException and InterruptedException, so it is the reponsability of the application to catch and handle such exceptions.

Once the computation is finished, a process calls leave() to leave the barrier. First it deletes its corresponding node, and then it gets the children of the root node. If there is at least one child, then it waits for a notification (obs: note that the second parameter of the call to getChildren() is true, meaning that ZooKeeper has to set a watch on the the root node). Upon reception of a notification, it checks once more whether the root node has any child.

/**         * Wait until all reach barrier         *         * @return         * @throws KeeperException         * @throws InterruptedException         */        boolean leave() throws KeeperException, InterruptedException{            zk.delete(root + "/" + name, 0);            while (true) {                synchronized (mutex) {                    List
list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } }

Producer-Consumer Queues

A producer-consumer queue is a distributed data estructure thata group of processes use to generate and consume items. Producer processes create new elements and add them to the queue. Consumer processes remove elements from the list, and process them. In this implementation, the elements are simple integers. The queue is represented by a root node, and to add an element to the queue, a producer process creates a new node, a child of the root node.

The following excerpt of code corresponds to the constructor of the object. As with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, that creates a ZooKeeper object if one doesn’t exist. It then verifies if the root node of the queue exists, and creates if it doesn’t.

/**         * Constructor of producer-consumer queue         *         * @param address         * @param name         */        Queue(String address, String name) {            super(address);            this.root = name;            // Create ZK node name            if (zk != null) {                try {                    Stat s = zk.exists(root, false);                    if (s == null) {                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,                                CreateMode.PERSISTENT);                    }                } catch (KeeperException e) {                    System.out                            .println("Keeper exception when instantiating queue: "                                    + e.toString());                } catch (InterruptedException e) {                    System.out.println("Interrupted exception");                }            }        }

A producer process calls “produce()” to add an element to the queue, and passes an integer as an argument. To add an element to the queue, the method creates a new node using “create()”, and uses the SEQUENCE flag to instruct ZooKeeper to append the value of the sequencer counter associated to the root node. In this way, we impose a total order on the elements of the queue, thus guaranteeing that the oldest element of the queue is the next one consumed.

/**         * Add element to the queue.         *         * @param i         * @return         */        boolean produce(int i) throws KeeperException, InterruptedException{            ByteBuffer b = ByteBuffer.allocate(4);            byte[] value;            // Add child with value i            b.putInt(i);            value = b.array();            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,                        CreateMode.PERSISTENT_SEQUENTIAL);            return true;        }

To consume an element, a consumer process obtains the children of the root node, reads the node with smallest counter value, and returns the element. Note that if there is a conflict, then one of the two contending processes won’t be able to delete the node and the delete operation will throw an exception.

A call to getChildren() returns the list of children in lexicographic order. As lexicographic order does not necessary follow the numerical order of the counter values, we need to decide which element is the smallest. To decide which one has the smallest counter value, we traverse the list, and remove the prefix “element” from each one.

/**         * Remove first element from the queue.         *         * @return         * @throws KeeperException         * @throws InterruptedException         */        int consume() throws KeeperException, InterruptedException{            int retvalue = -1;            Stat stat = null;            // Get the first element available            while (true) {                synchronized (mutex) {                    List
list = zk.getChildren(root, true); if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); //System.out.println("Temporary value: " + tempValue); if(tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } }

Complete example

In the following section you can find a complete command line application to demonstrate the above mentioned recipes. Use the following command to run it.

ZOOBINDIR=”[path_to_distro]/bin”

. “$ZOOBINDIR”/zkEnv.sh
java SyncPrimitive [Test Type] [ZK server] [No of elements] [Client type]
Queue test
Start a producer to create 100 elements

java SyncPrimitive qTest localhost 100 p

Start a consumer to consume 100 elements

java SyncPrimitive qTest localhost 100 c

Barrier test
Start a barrier with 2 participants (start as many times as many participants you’d like to enter)

java SyncPrimitive bTest localhost 2

Source Listing
SyncPrimitive.Java

import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.List;import java.util.Random;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null; static Integer mutex; String root; SyncPrimitive(String address) { if(zk == null){ try { System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, this); mutex = new Integer(-1); System.out.println("Finished starting ZK: " + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } //else mutex = new Integer(-1); } synchronized public void process(WatchedEvent event) { synchronized (mutex) { //System.out.println("Process: " + event.getType()); mutex.notify(); } } /** * Barrier */ static public class Barrier extends SyncPrimitive {
int size; String name; /** * Barrier constructor * * @param address * @param root * @param size */ Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; // Create barrier node if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } // My node name try { name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } /** * Join barrier * * @return * @throws KeeperException * @throws InterruptedException */ boolean enter() throws KeeperException, InterruptedException{ zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); while (true) { synchronized (mutex) { List
list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } } /** * Wait until all reach barrier * * @return * @throws KeeperException * @throws InterruptedException */ boolean leave() throws KeeperException, InterruptedException{ zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { List
list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } /** * Producer-Consumer queue */ static public class Queue extends SyncPrimitive {
/** * Constructor of producer-consumer queue * * @param address * @param name */ Queue(String address, String name) { super(address); this.root = name; // Create ZK node name if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("Keeper exception when instantiating queue: " + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception"); } } } /** * Add element to the queue. * * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * Remove first element from the queue. * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { List
list = zk.getChildren(root, true); if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); String minNode = list.get(0); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); //System.out.println("Temporary value: " + tempValue); if(tempValue < min) { min = tempValue; minNode = s; } } System.out.println("Temporary value: " + root + "/" + minNode); byte[] b = zk.getData(root + "/" + minNode, false, stat); zk.delete(root + "/" + minNode, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String args[]) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } public static void queueTest(String args[]) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input: " + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try{ q.produce(10 + i); } catch (KeeperException e){ } catch (InterruptedException e){ } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) { try{ int r = q.consume(); System.out.println("Item: " + r); } catch (KeeperException e){ i--; } catch (InterruptedException e){ } } } } public static void barrierTest(String args[]) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try{ boolean flag = b.enter(); System.out.println("Entered barrier: " + args[2]); if(!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e){ } catch (InterruptedException e){ } // Generate random integer Random rand = new Random(); int r = rand.nextInt(100); // Loop for rand iterations for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try{ b.leave(); } catch (KeeperException e){ } catch (InterruptedException e){ } System.out.println("Left barrier"); }}

转载地址:http://weqab.baihongyu.com/

你可能感兴趣的文章
exp-00056 exp-00000 导出终止失败的处理
查看>>
Linux中两块device的minor number相同而造成RAC不能启动的问题
查看>>
DM7修改数据库参数
查看>>
Oracle ASM Partnership and Status Table
查看>>
Oracle 12CR2 Oracle Restart - ASM Startup fails with PRCR-1079
查看>>
In-Memory Column Store
查看>>
Oracle 12C ASM asmcmd amdu_extract
查看>>
Oracle AMDU- ASM Metadata Dump Utility
查看>>
C/C++程序中的profile
查看>>
一个更好的Post process结构,三角形代替四边形。
查看>>
利用Vertex shader实现Point Sprites
查看>>
图形处理器历史简介
查看>>
System Memory,AGP Memory and Video Memory in D3D.
查看>>
使用辅助库建立openGL编程环境
查看>>
使用Win32API开始openGL编程
查看>>
使用MFC开始openGL编程
查看>>
关于Gbuffer中的normal存储
查看>>
近距离观察Tone mapping.
查看>>
Physically based shading
查看>>
Color correction
查看>>