博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper 实现分布式锁
阅读量:6906 次
发布时间:2019-06-27

本文共 8163 字,大约阅读时间需要 27 分钟。

 

实现互斥锁

package com.zookeeper.lock;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.exception.ZkNoNodeException;public class BaseDistributedLock {    private final ZkClient client;    private final String path;    // zookeeper中locker节点的路径    private final String basePath;    private final String lockName;    private static final Integer MAX_RETRY_COUNT = 10;    public BaseDistributedLock(ZkClient client, String path, String lockName) {        this.client = client;        this.basePath = path;        this.path = path.concat("/").concat(lockName);        this.lockName = lockName;    }    private void deleteOurPath(String ourPath) throws Exception {        client.delete(ourPath);    }    private String createLockNode(ZkClient client, String path)            throws Exception {        return client.createEphemeralSequential(path, null);    }    private boolean waitToLock(long startMillis, Long millisToWait,            String ourPath) throws Exception {        boolean haveTheLock = false;        boolean doDelete = false;        try {            while (!haveTheLock) {                // 获取lock节点下的所有节点                List children = getSortedChildren();                String sequenceNodeName = ourPath                        .substring(basePath.length() + 1);                // 获取当前节点的在所有节点列表中的位置                int ourIndex = children.indexOf(sequenceNodeName);                // 节点位置小于0,说明没有找到节点                if (ourIndex < 0) {                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);                }                // 节点位置大于0说明还有其他节点在当前的节点前面,就需要等待其他的节点都释放                boolean isGetTheLock = ourIndex == 0;                String pathToWatch = (String) (isGetTheLock ? null : children                        .get(ourIndex - 1));                if (isGetTheLock) {                    haveTheLock = true;                } else {                    String previousSequencePath = basePath.concat("/").concat(                            pathToWatch);                    final CountDownLatch latch = new CountDownLatch(1);                    final IZkDataListener previousListener = new IZkDataListener() {                        public void handleDataDeleted(String dataPath)                                throws Exception {                            latch.countDown();                        }                        public void handleDataChange(String dataPath,                                Object data) throws Exception {                            // ignore                        }                    };                    try {                        // 如果节点不存在会出现异常                        client.subscribeDataChanges(previousSequencePath,                                previousListener);                        if (millisToWait != null) {                            millisToWait -= (System.currentTimeMillis() - startMillis);                            startMillis = System.currentTimeMillis();                            if (millisToWait <= 0) {                                doDelete = true; // timed out - delete our node                                break;                            }                            latch.await(millisToWait, TimeUnit.MICROSECONDS);                        } else {                            latch.await();                        }                    } catch (ZkNoNodeException e) {                        // ignore                    } finally {                        client.unsubscribeDataChanges(previousSequencePath,                                previousListener);                    }                }            }        } catch (Exception e) {            // 发生异常需要删除节点            doDelete = true;            throw e;        } finally {            // 如果需要删除节点            if (doDelete) {                deleteOurPath(ourPath);            }        }        return haveTheLock;    }    private String getLockNodeNumber(String str, String lockName) {        int index = str.lastIndexOf(lockName);        if (index >= 0) {            index += lockName.length();            return index <= str.length() ? str.substring(index) : "";        }        return str;    }    List
getSortedChildren() throws Exception { try { List
children = client.getChildren(basePath); Collections.sort(children, new Comparator
() { public int compare(String lhs, String rhs) { return getLockNodeNumber(lhs, lockName).compareTo( getLockNodeNumber(rhs, lockName)); } }); return children; } catch (ZkNoNodeException e) { client.createPersistent(basePath, true); return getSortedChildren(); } } protected void releaseLock(String lockPath) throws Exception { deleteOurPath(lockPath); } protected String attemptLock(long time, TimeUnit unit) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; int retryCount = 0; // 网络闪断需要重试一试 while (!isDone) { isDone = true; try { ourPath = createLockNode(client, path); hasTheLock = waitToLock(startMillis, millisToWait, ourPath); } catch (ZkNoNodeException e) { if (retryCount++ < MAX_RETRY_COUNT) { isDone = false; } else { throw e; } } } if (hasTheLock) { return ourPath; } return null; }}
View Code

接口类

package com.zookeeper.lock;import java.util.concurrent.TimeUnit;public interface DistributedLock {    /** 获取锁,如果没有得到就等待 */    public void acquire() throws Exception;    /**     * 获取锁,直到超时     *      * @param time     *            超时时间     * @param unit     *            参数的单位     * @return 是否获取到锁     * @throws Exception     */    public boolean acquire(long time, TimeUnit unit) throws Exception;    /**     * 释放锁     *      * @throws Exception     */    public void release() throws Exception;}
View Code

测试类

package com.zookeeper.lock;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;import coom.zookeeperdemo.lock.SimpleDistributedLockMutex;public class TestDistributedLock {    public static void main(String[] args) {        final ZkClient zkClientExt1 = new ZkClient("192.168.1.105:2181",                5000, 5000, new BytesPushThroughSerializer());        final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(                zkClientExt1, "/Mutex");        final ZkClient zkClientExt2 = new ZkClient("192.168.1.105:2181",                5000, 5000, new BytesPushThroughSerializer());        final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(                zkClientExt2, "/Mutex");        try {            mutex1.acquire();            System.out.println("Client1 locked");            Thread client2Thd = new Thread(new Runnable() {                public void run() {                    try {                        mutex2.acquire();                        System.out.println("Client2 locked");                        mutex2.release();                        System.out.println("Client2 released lock");                    } catch (Exception e) {                        e.printStackTrace();                    }                }            });            client2Thd.start();            Thread.sleep(5000);            mutex1.release();            System.out.println("Client1 released lock");            client2Thd.join();        } catch (Exception e) {            e.printStackTrace();        }    }}
View Code

 

原文不知道地址了

转载于:https://www.cnblogs.com/newlangwen/p/10143879.html

你可能感兴趣的文章
项目管理培训的一些总结
查看>>
Hibernate 配置属性
查看>>
如何用Beyond Compare设置比较文件夹对齐方式
查看>>
01-HTML基础与进阶-day6-录像280
查看>>
SNMP 实战1
查看>>
linux TCP客户端指定端口号连接服务端
查看>>
RTP协议 Q&A
查看>>
linux下php调用root权限实现方案
查看>>
5.Spring Cloud初相识-------Hystrix熔断器
查看>>
CSS3设置Table奇数行和偶数行样式
查看>>
PHP版本过狗Shell
查看>>
BZOJ 2127 happiness ——网络流
查看>>
N皇后问题
查看>>
JavaScript检测数据类型
查看>>
观察者模式
查看>>
《CLR via C#》读书笔记 之 类型基础
查看>>
EXt js 学习笔记总结
查看>>
Vue---父子组件之间的通信
查看>>
第八章:手工建库
查看>>
JavaScript语法
查看>>