基于zookeeper实现的分布式锁

A distributed lock base on zookeeper.

zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hive, pig等, 其实他们都是动物, 所以叫zookeeper(本人歪歪).

zookeeper其实是集群中每个节点都维护着一棵相同的树, 树的结构跟linux的目录结构的概念差不多, 以/为跟节点, 下边可以扩展任意的节点和叶子节点, 每个节点都可以写入数据. 基于zookeeper的分布式锁的实现, 其实是得益于zookeeper同步文件的强大性, 我们相信每时每刻我们访问zookeeper的树时, 相同节点返回的数据都是一致的. 这要靠zookeeper内部的一些算法来实现. 特别是leader的选举算法, 这里就不说了, 感兴趣的话可以去搜索一下看看.

我们知道了zookeeper集群的每个节点的数据都是一致的, 那么我们可以通过这些节点来作为锁的标志.

首先给锁设置一下API, 至少要包含, lock(锁住), unlock(解锁), isLocked(是否锁住)三个方法

然后我们可以创建一个工厂(LockFactory), 用来专门生产锁.

锁的创建过程如下描述:

前提:每个锁都需要一个路径来指定(如:/jiacheo/lock)

1.根据指定的路径, 查找zookeeper集群下的这个节点是否存在.(说明已经有锁了)

2. 如果存在, 根据查询者的一些特征数据(如ip地址/hostname), 当前的锁是不是查询者的

3. 如果不是查询者的锁, 则返回null, 说明创建锁失败

4. 如果是查询者的锁, 则把这个锁返回给查询者

5. 如果这个节点不存在, 说明当前没有锁, 那么创建一个临时节点, 并将查询者的特征信息写入这个节点的数据中, 然后返回这个锁.

根据以上5部, 一个分布式的锁就可以创建了.

创建的锁有三种状态:

1. 创建失败(null), 说明该锁被其他查询者使用了.'

2. 创建成功, 但当前没有锁住(unlocked), 可以使用

3. 创建成功, 但当前已经锁住(locked)了, 不能继续加锁.

distrib lock

如图, 如果我们getLock("/jiacheo/lock1","192.168.0.100"), 想要获取/jiacheo/lock1这个锁的话, 我们先判断这个节点是否存在, 存在的话获取他的数据(data), 然后通过解析data, 我们可以知道这个节点是不是我们查询者创建的(通过ip地址写入节点数据中), 然后就可以返回一个锁了.

具体的java实现(implementation)代码如下:
1. Lock.java

package org.jiacheo.zkdl.lock;

import java.net.InetAddress;
import java.net.UnknownHostException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 类名:<b>Lock</b> <br/>
 * <p>
 * 类描述: 
 * </p>
 * 创建人:jiacheo <br/>
 * 创建时间:2011-1-27 上午01:30:25  <br/>  
 * @version 2011-1-27  
 *
 */
public class Lock {
	private String path;
	private ZooKeeper zooKeeper;
	public Lock(String path){
		this.path = path;
	}
	
	/**
	 * <p>
	 * 方法描述: 上锁 lock it
	 * </p>
	 * 创建人:jiacheo <br/>
	 * 创建时间:2011-1-27 上午01:30:50  <br/>
	 * @throws Exception
	 */
	public synchronized void lock() throws Exception{
		Stat stat = zooKeeper.exists(path, true);
		String data = InetAddress.getLocalHost().getHostAddress()+":lock";
		zooKeeper.setData(path, data.getBytes(), stat.getVersion());
	}
	
	/**
	 * <p>
	 * 方法描述:开锁 unlock it
	 * </p>
	 * 创建人:jiacheo <br/>
	 * 创建时间:2011-1-27 上午01:31:20  <br/>
	 * @throws Exception
	 */
	public synchronized void unLock() throws Exception{
		Stat stat = zooKeeper.exists(path, true);
		String data = InetAddress.getLocalHost().getHostAddress()+":unlock";
		zooKeeper.setData(path, data.getBytes(), stat.getVersion());
	}
	
	/**
	 * <p>
	 * 方法描述:是否锁住了, isLocked?
	 * </p>
	 * 创建人:jiacheo <br/>
	 * 创建时间:2011-1-27 上午01:31:43  <br/>
	 * @return
	 */
	public synchronized boolean isLock(){
		try {
			Stat stat = zooKeeper.exists(path, true);
			String data = InetAddress.getLocalHost().getHostAddress()+":lock";
			String nodeData = new String(zooKeeper.getData(path, true, stat));
			if(data.equals(nodeData)){
//				lock = true;
				return true;
			}
		} catch (UnknownHostException e) {
			// ignore it
		} catch (KeeperException e) {
			//TODO use log system and throw a new exception
		} catch (InterruptedException e) {
			// TODO use log system and throw a new exception
		}
		return false;
	}

	public String getPath() {
		return path;
	}

	public void setPath(String path) {
		this.path = path;
	}

	public void setZooKeeper(ZooKeeper zooKeeper) {
		this.zooKeeper = zooKeeper;
	}
	
	
}

2.LockFactory.java

package org.jiacheo.zkdl.lock;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

public class LockFactory {
	
	public static final ZooKeeper DEFAULT_ZOOKEEPER = getDefaultZookeeper();
	//data格式:  ip:stat  如: 10.232.35.70:lock 10.232.35.70:unlock
	public static synchronized Lock getLock(String path,String ip) throws Exception{
		if(DEFAULT_ZOOKEEPER != null){
			Stat stat = null;
			try{
				stat = DEFAULT_ZOOKEEPER.exists(path, true);
			}catch (Exception e) {
				// TODO: use log system and throw new exception
			}
			if(stat!=null){
				byte[] data = DEFAULT_ZOOKEEPER.getData(path, null, stat);
				String dataStr = new String(data);
				String[] ipv = dataStr.split(":");
				if(ip.equals(ipv[0])){
					Lock lock = new Lock(path);
					lock.setZooKeeper(DEFAULT_ZOOKEEPER);
					return lock;
				}
				//is not your lock, return null
				else{
					return null;
				}
			}
			//no lock created yet, you can get it
			else{
				createZnode(path);
				Lock lock = new Lock(path);
				lock.setZooKeeper(DEFAULT_ZOOKEEPER);
				return lock;
			}
		}
		return null;
	}
	
	private static ZooKeeper getDefaultZookeeper() {
		try {
			ZooKeeper zooKeeper = new ZooKeeper("10.232.35.72", 10*1000, new Watcher(){
				public void process(WatchedEvent event) {
					//节点的事件处理. you can do something when the node's data change
//					System.out.println("event " + event.getType() + " has happened!");
				}
			});
			return zooKeeper;
		} catch (IOException e) {
			e.printStackTrace();
		}
		return null;
	}

	private static void createZnode(String path) throws Exception{
		
		if(DEFAULT_ZOOKEEPER!=null){
			InetAddress address = InetAddress.getLocalHost();
			String data = address.getHostAddress()+":unlock";
			DEFAULT_ZOOKEEPER.create(path, data.getBytes(),Collections.singletonList(new ACL(Perms.ALL,Ids.ANYONE_ID_UNSAFE)) , CreateMode.EPHEMERAL);
		}
	}
}
分享到:

7 条评论

昵称

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

  1. Pingback: C# + zookeeper开发和安装 | 排名说官方博客

  2. Pingback: 靠谱的分布式锁实现 | jiacheo杂谈

  3. knightliao
    1. jiacheo

      是的,这里实现想太复杂了, 后来我们另外的线上系统用的是类似官方的版本, 不过官方的有问题, 就是连接创建太多会导致问题, 我们采用了单链接的形式..

  4. Pingback: HIVE-HQL | 萝莉

  5. fabius

    个人觉得unlock方法里应该是执行这条命令:
    zooKeeper.delete(path, -1);
    因为LZ是通过stat==null来判断能否建锁的.要是unlock方法里仅仅是把数据改成ip:unlock,那后面的客户端里的getLock
    永远都只能返回NULL。。。

    1. jiacheo

      呵呵, 我只是简单用几台机测试过, 还真没遇到这个问题... 看来我还需继续深入研究一下.