博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Redis搭建简单的Java分布式锁
阅读量:6531 次
发布时间:2019-06-24

本文共 10569 字,大约阅读时间需要 35 分钟。

知识储备[SETNX]

抢占锁标记位置 SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

设置成功,返回 1 。

设置失败,返回 0 。

这里我们用这样一个redis操作解决了抢占标记位置的问题,当一个程序的线程设置该标记位置的值后,其他的程序将不能在设置该标记位置。

do{         opt = _lock();//抢占锁表记位置,返回值现在我们只考虑两种OPT_WAIT[抢占失败进入等待],OPT_LOCK[抢占成功]   if(opt==OPT_WAIT){
//如果抢占标记位置为失败进入休眠 sleep(timeout); }}while (opt!=OPT_LOCK);//抢占成功退出抢占操作,进入加锁代码块

抢占锁标记失败该怎么办

我们应当让程序进入等待,Java中有很多方式让程序进入等待

Thread.sleep(long time);TimeUnit.[UNIT].sleep(long time);LockSupport.park(boolean isAbsolute, long time);//这个也许比较少见,有机会再写

 

抢占锁标志位成功的程序线程如何退出且通知其他抢占线程

上文中我们讲到了;没有抢占到标记位置的程序会进入等待,然后开始下一次循环,假设我们,抢占失败的程序休眠10s;而抢占成功的程序在它休眠的第5s中释放了所操作,我们应当怎么办;当然上面代码我们可以看到程序会在休眠结束后继续循环,再次调用抢占代码,但这5s中就相当于白白浪费了。所以我们需要有一个操作可以叫醒休眠的程序。换言之就是程序怎么去获取锁释放消息。

获取锁标记释放消息[PSUBSCRIBE与PUBLISH]

这里涉及到订阅与发布模式,就像你在淘宝看中一样东西,但是价格你暂时觉得有点偏高,那怎么办了,有种方式就是你每天去看一次,就是我们上面的循环,还有一种就是淘宝上面的降价通知,你勾选后;当商品降价的时候会发送一条消息给你。

这个操作就是我们这里要讲的。

SUBSCRIBE channel [channel ...]

订阅给定的一个或多个频道的信息。就相当于你勾选了一个或者多个商品的降价通知。这里我们可能会使用 PSUBSCRIBE pattern [pattern ...] 来简化订阅操作

 PUBLISH channel message

将信息 message 发送到指定的频道 channel 。就相当于淘宝给你发送降价通知。

更多命令详情请参考     

namespace = "...";//在程序一开始我们定义一个命名空间;这个底下用来存放锁相关信息notifier = new Thread(new Runnable() {
//因为订阅时阻塞的。所以程序启动时建立一个通知线程专门来订阅消息 public void run() { try(Jedis resource = jedisPool.getResource()) { resource.psubscribe(new LockPubSub(), namespace + '*');//订阅这个命名空间下所有消息 } }});private int _lock() { Jedis resource = jedisPool.getResource(); try { Long result = resource.setnx(lockKey, holder);//尝试抢占所标记 if (Long.valueOf(1).equals(result)) {
//抢占成功 resource.incr(countKey);//锁计数器加一 return OPT_LOCK;//返回加锁成功 } else { String realHolder = resource.get(lockKey);//获取当前加锁的线程标记 if (holder.equals(realHolder)) {
//如果自己持有的锁标记和当前加锁的线程标记一致;则是重入 resource.incr(countKey);//重入计数加一 return OPT_LOCK;//返回加锁成功 } else { resource.rpush(queueKey, holder);//加锁标记不一致,尝试将自己加入释放通知队列末尾 if (resource.exists(lockKey)) {
//如果放入队列之后加锁的线程还在操作;则进入真正的等待 return OPT_WAIT; } else { if(holder.equals(resource.lindex(queueKey,0))) {
//否则判断自己是不是等待队列的第一个元素 resource.lrem(queueKey, 1, holder);//如果是第一个;则重试一次; return OPT_RETRY; }else{ return OPT_WAIT;//否则进入真正的等待 } } } } } finally { resource.close(); }}public void unLock() { Jedis resource = jedisPool.getResource(); try { String realHolder = resource.get(lockKey); if(holder.equals(realHolder)){
//判断锁标记是否与持有者一致;否则应当是程序异常;当抛出失败 if(resource.decr(countKey)==0){
//锁计数减一;如果为0;锁释放完成; resource.del(lockKey);//释放锁标记位置 String first = resource.lpop(queueKey);//向最先订阅释放消息的程序发布释放消息 if(first!=null) { resource.publish(lockKey, first); } } }else{ throw new IllegalStateException("holder|excepted [" + realHolder + "] | actual [" + holder + "]"); } } finally { resource.close(); }}//holder为线程标志private void sleep(long timeout){ waitMap.put(holder,Thread.currentThread());//当程序休眠时把添加到等待队列中去 try { TimeUnit.MILLISECONDS.sleep(timeout);//程序休眠 } catch (InterruptedException e) { //wake 唤醒;响应中断消息 } finally { waitMap.remove(holder);//休眠结束;将线程移除出等待对列 }}//用来建立锁订阅通知class LockPubSub extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { Thread first = waitMap.get(message);//message就是上文的holder if(first!=null){ first.interrupt();//触发线程中断;打破休眠状态 } }}

完整代码

/**   *博客园地址:https://home.cnblogs.com/u/dev-lluo/  *个人博客地址:https://little.beer/ */ import java.util.concurrent.TimeoutException;public interface Lock {    void lock();    void tryLock(int count) throws TimeoutException;    void unLock();}public interface LockBuilder {    Lock newLock(LockToken lockToken);}public interface LockToken {    String getLockToken();}import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPubSub;import java.lang.management.ManagementFactory;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class DistributedLockBuilder implements LockBuilder {    private Thread notifier;    private JedisPool jedisPool;    private final String namespace;    private final String HOLDER_PRE;    private final int OPT_LOCK, OPT_RETRY, OPT_WAIT;    private final Map waitMap;    private int timeout;    {        notifier = new Thread(new Runnable() {            public void run() {                try (Jedis resource = jedisPool.getResource()) {                    resource.psubscribe(new LockPubSub(), namespace + '*');                }            }        });        waitMap = Collections.synchronizedMap(new HashMap());        Map env = System.getenv();        HOLDER_PRE = new StringBuilder(env.get("COMPUTERNAME")).append('[').append(env.get("USERNAME")).append(']').append(ManagementFactory.getRuntimeMXBean().getName()).toString();        OPT_LOCK = 0;        OPT_RETRY = 1;        OPT_WAIT = 2;    }    public DistributedLockBuilder(JedisPool jedisPool) {        this(jedisPool, "__dtd_lock__:jedis");    }    public DistributedLockBuilder(JedisPool jedisPool, String namespace) {        this(jedisPool, namespace, 10);    }    public DistributedLockBuilder(JedisPool jedisPool, String namespace, int timeout) {        this.jedisPool = jedisPool;        this.namespace = namespace;        this.timeout = timeout;        notifier.start();    }    public Lock newLock(LockToken lockToken) {        return new Lock(lockToken);    }    public class Lock implements com.outlook.lluo.dev.lock.api.Lock {        private LockToken lockToken;        private String lockKey;        private String countKey;        private String queueKey;        private String holder;        private Lock(LockToken lockToken) {            this.lockToken = lockToken;            this.lockKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":lock").toString();            this.countKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":count").toString();            this.queueKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":queue").toString();            this.holder = new StringBuilder(HOLDER_PRE).append('(').append(Thread.currentThread().getId()).append(")@").append(Thread.currentThread().getName()).toString();        }        public void lock() {            int opt;            do {                opt = _lock();                if (opt == OPT_WAIT) {                    sleep(timeout);                }            } while (opt != OPT_LOCK);        }        public void tryLock(int count) throws TimeoutException {            int opt;            do {                opt = _lock();                if (count == 0) {                    Jedis resource = jedisPool.getResource();                    try {                        resource.lrem(queueKey, 1, holder);                        throw new TimeoutException(holder);                    } finally {                        resource.close();                    }                }                if (opt == OPT_WAIT) {                    sleep(10);                    count--;                }            } while (opt != OPT_LOCK);        }        private void sleep(long timeout) {            waitMap.put(holder, Thread.currentThread());            try {                TimeUnit.MILLISECONDS.sleep(timeout);            } catch (InterruptedException e) {                //wake            } finally {                waitMap.remove(holder);            }        }        private int _lock() {            Jedis resource = jedisPool.getResource();            try {                Long result = resource.setnx(lockKey, holder);                if (Long.valueOf(1).equals(result)) {                    resource.incr(countKey);                    return OPT_LOCK;                } else {                    String realHolder = resource.get(lockKey);                    if (holder.equals(realHolder)) {                        resource.incr(countKey);                        return OPT_LOCK;                    } else {                        resource.rpush(queueKey, holder);                        if (resource.exists(lockKey)) {                            return OPT_WAIT;                        } else {                            if (holder.equals(resource.lindex(queueKey, 0))) {                                resource.lrem(queueKey, 1, holder);                                return OPT_RETRY;                            } else {                                return OPT_WAIT;                            }                        }                    }                }            } finally {                resource.close();            }        }        public void unLock() {            Jedis resource = jedisPool.getResource();            try {                String realHolder = resource.get(lockKey);                if (holder.equals(realHolder)) {                    if (resource.decr(countKey) == 0) {                        resource.del(lockKey);                        String first = resource.lpop(queueKey);                        if (first != null) {                            resource.publish(lockKey, first);                        }                    }                } else {                    throw new IllegalStateException("holder|excepted [" + realHolder + "] | actual [" + holder + "]");                }            } finally {                resource.close();            }        }        @Override        public String toString() {            return "Lock{" +                    "lockToken=" + lockToken.getLockToken() +                    ", holder='" + holder + '\'' +                    '}';        }    }    class LockPubSub extends JedisPubSub {        @Override        public void onPMessage(String pattern, String channel, String message) {            Thread first = waitMap.get(message);            if (first != null) {                first.interrupt();            }        }    }}

 

转载于:https://www.cnblogs.com/dev-lluo/p/10161266.html

你可能感兴趣的文章
div布局小技巧
查看>>
OCP 12c最新考试原题及答案(071-4)
查看>>
MHA故障切换和在线手工切换原理
查看>>
JAVA并发,同步锁性能测试
查看>>
Python版本切换和Pip安装
查看>>
SilverLigth学习笔记--控制 Silverlight控件样式(转)
查看>>
poj3262
查看>>
第四十天笔记
查看>>
4、动态代理
查看>>
Loj #6073.「2017 山东一轮集训 Day5」距离
查看>>
我的TCP/IP学习笔记
查看>>
shell--字符串的截取变量子串串
查看>>
Cas_个人理解
查看>>
UISearchController
查看>>
梦断代码阅读笔记02
查看>>
轮毂电机光电增量编码器的ABZ信号详解
查看>>
TextBox Template
查看>>
Linux MySQL 储存中文失败简单解决办法
查看>>
求最大值及其下标
查看>>
洛谷——P1330 封锁阳光大学
查看>>