知识储备[SETNX]
抢占锁标记位置 SETNX key value
将 key
的值设为 value
,当且仅当 key
不存在。
若给定的 key
已经存在,则 SETNX 不做任何动作。
SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
设置成功,返回 1
。
设置失败,返回 0
。
这里我们用这样一个redis操作解决了抢占标记位置的问题,当一个程序的线程设置该标记位置的值后,其他的程序将不能在设置该标记位置。
do{ opt = _lock();//抢占锁表记位置,返回值现在我们只考虑两种OPT_WAIT[抢占失败进入等待],OPT_LOCK[抢占成功] if(opt==OPT_WAIT){ //如果抢占标记位置为失败进入休眠 sleep(timeout); }}while (opt!=OPT_LOCK);//抢占成功退出抢占操作,进入加锁代码块
抢占锁标记失败该怎么办
我们应当让程序进入等待,Java中有很多方式让程序进入等待
Thread.sleep(long time);TimeUnit.[UNIT].sleep(long time);LockSupport.park(boolean isAbsolute, long time);//这个也许比较少见,有机会再写
抢占锁标志位成功的程序线程如何退出且通知其他抢占线程
上文中我们讲到了;没有抢占到标记位置的程序会进入等待,然后开始下一次循环,假设我们,抢占失败的程序休眠10s;而抢占成功的程序在它休眠的第5s中释放了所操作,我们应当怎么办;当然上面代码我们可以看到程序会在休眠结束后继续循环,再次调用抢占代码,但这5s中就相当于白白浪费了。所以我们需要有一个操作可以叫醒休眠的程序。换言之就是程序怎么去获取锁释放消息。
获取锁标记释放消息[PSUBSCRIBE与PUBLISH]
这里涉及到订阅与发布模式,就像你在淘宝看中一样东西,但是价格你暂时觉得有点偏高,那怎么办了,有种方式就是你每天去看一次,就是我们上面的循环,还有一种就是淘宝上面的降价通知,你勾选后;当商品降价的时候会发送一条消息给你。
这个操作就是我们这里要讲的。
SUBSCRIBE channel [channel ...]
订阅给定的一个或多个频道的信息。就相当于你勾选了一个或者多个商品的降价通知。这里我们可能会使用 PSUBSCRIBE pattern [pattern ...] 来简化订阅操作
PUBLISH channel message
将信息 message
发送到指定的频道 channel
。就相当于淘宝给你发送降价通知。
更多命令详情请参考
namespace = "...";//在程序一开始我们定义一个命名空间;这个底下用来存放锁相关信息notifier = new Thread(new Runnable() { //因为订阅时阻塞的。所以程序启动时建立一个通知线程专门来订阅消息 public void run() { try(Jedis resource = jedisPool.getResource()) { resource.psubscribe(new LockPubSub(), namespace + '*');//订阅这个命名空间下所有消息 } }});private int _lock() { Jedis resource = jedisPool.getResource(); try { Long result = resource.setnx(lockKey, holder);//尝试抢占所标记 if (Long.valueOf(1).equals(result)) { //抢占成功 resource.incr(countKey);//锁计数器加一 return OPT_LOCK;//返回加锁成功 } else { String realHolder = resource.get(lockKey);//获取当前加锁的线程标记 if (holder.equals(realHolder)) { //如果自己持有的锁标记和当前加锁的线程标记一致;则是重入 resource.incr(countKey);//重入计数加一 return OPT_LOCK;//返回加锁成功 } else { resource.rpush(queueKey, holder);//加锁标记不一致,尝试将自己加入释放通知队列末尾 if (resource.exists(lockKey)) { //如果放入队列之后加锁的线程还在操作;则进入真正的等待 return OPT_WAIT; } else { if(holder.equals(resource.lindex(queueKey,0))) { //否则判断自己是不是等待队列的第一个元素 resource.lrem(queueKey, 1, holder);//如果是第一个;则重试一次; return OPT_RETRY; }else{ return OPT_WAIT;//否则进入真正的等待 } } } } } finally { resource.close(); }}public void unLock() { Jedis resource = jedisPool.getResource(); try { String realHolder = resource.get(lockKey); if(holder.equals(realHolder)){ //判断锁标记是否与持有者一致;否则应当是程序异常;当抛出失败 if(resource.decr(countKey)==0){ //锁计数减一;如果为0;锁释放完成; resource.del(lockKey);//释放锁标记位置 String first = resource.lpop(queueKey);//向最先订阅释放消息的程序发布释放消息 if(first!=null) { resource.publish(lockKey, first); } } }else{ throw new IllegalStateException("holder|excepted [" + realHolder + "] | actual [" + holder + "]"); } } finally { resource.close(); }}//holder为线程标志private void sleep(long timeout){ waitMap.put(holder,Thread.currentThread());//当程序休眠时把添加到等待队列中去 try { TimeUnit.MILLISECONDS.sleep(timeout);//程序休眠 } catch (InterruptedException e) { //wake 唤醒;响应中断消息 } finally { waitMap.remove(holder);//休眠结束;将线程移除出等待对列 }}//用来建立锁订阅通知class LockPubSub extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { Thread first = waitMap.get(message);//message就是上文的holder if(first!=null){ first.interrupt();//触发线程中断;打破休眠状态 } }}
完整代码
/** *博客园地址:https://home.cnblogs.com/u/dev-lluo/ *个人博客地址:https://little.beer/ */ import java.util.concurrent.TimeoutException;public interface Lock { void lock(); void tryLock(int count) throws TimeoutException; void unLock();}public interface LockBuilder { Lock newLock(LockToken lockToken);}public interface LockToken { String getLockToken();}import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPubSub;import java.lang.management.ManagementFactory;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class DistributedLockBuilder implements LockBuilder { private Thread notifier; private JedisPool jedisPool; private final String namespace; private final String HOLDER_PRE; private final int OPT_LOCK, OPT_RETRY, OPT_WAIT; private final Map waitMap; private int timeout; { notifier = new Thread(new Runnable() { public void run() { try (Jedis resource = jedisPool.getResource()) { resource.psubscribe(new LockPubSub(), namespace + '*'); } } }); waitMap = Collections.synchronizedMap(new HashMap()); Map env = System.getenv(); HOLDER_PRE = new StringBuilder(env.get("COMPUTERNAME")).append('[').append(env.get("USERNAME")).append(']').append(ManagementFactory.getRuntimeMXBean().getName()).toString(); OPT_LOCK = 0; OPT_RETRY = 1; OPT_WAIT = 2; } public DistributedLockBuilder(JedisPool jedisPool) { this(jedisPool, "__dtd_lock__:jedis"); } public DistributedLockBuilder(JedisPool jedisPool, String namespace) { this(jedisPool, namespace, 10); } public DistributedLockBuilder(JedisPool jedisPool, String namespace, int timeout) { this.jedisPool = jedisPool; this.namespace = namespace; this.timeout = timeout; notifier.start(); } public Lock newLock(LockToken lockToken) { return new Lock(lockToken); } public class Lock implements com.outlook.lluo.dev.lock.api.Lock { private LockToken lockToken; private String lockKey; private String countKey; private String queueKey; private String holder; private Lock(LockToken lockToken) { this.lockToken = lockToken; this.lockKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":lock").toString(); this.countKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":count").toString(); this.queueKey = new StringBuilder(namespace).append(':').append(lockToken.getLockToken()).append(":queue").toString(); this.holder = new StringBuilder(HOLDER_PRE).append('(').append(Thread.currentThread().getId()).append(")@").append(Thread.currentThread().getName()).toString(); } public void lock() { int opt; do { opt = _lock(); if (opt == OPT_WAIT) { sleep(timeout); } } while (opt != OPT_LOCK); } public void tryLock(int count) throws TimeoutException { int opt; do { opt = _lock(); if (count == 0) { Jedis resource = jedisPool.getResource(); try { resource.lrem(queueKey, 1, holder); throw new TimeoutException(holder); } finally { resource.close(); } } if (opt == OPT_WAIT) { sleep(10); count--; } } while (opt != OPT_LOCK); } private void sleep(long timeout) { waitMap.put(holder, Thread.currentThread()); try { TimeUnit.MILLISECONDS.sleep(timeout); } catch (InterruptedException e) { //wake } finally { waitMap.remove(holder); } } private int _lock() { Jedis resource = jedisPool.getResource(); try { Long result = resource.setnx(lockKey, holder); if (Long.valueOf(1).equals(result)) { resource.incr(countKey); return OPT_LOCK; } else { String realHolder = resource.get(lockKey); if (holder.equals(realHolder)) { resource.incr(countKey); return OPT_LOCK; } else { resource.rpush(queueKey, holder); if (resource.exists(lockKey)) { return OPT_WAIT; } else { if (holder.equals(resource.lindex(queueKey, 0))) { resource.lrem(queueKey, 1, holder); return OPT_RETRY; } else { return OPT_WAIT; } } } } } finally { resource.close(); } } public void unLock() { Jedis resource = jedisPool.getResource(); try { String realHolder = resource.get(lockKey); if (holder.equals(realHolder)) { if (resource.decr(countKey) == 0) { resource.del(lockKey); String first = resource.lpop(queueKey); if (first != null) { resource.publish(lockKey, first); } } } else { throw new IllegalStateException("holder|excepted [" + realHolder + "] | actual [" + holder + "]"); } } finally { resource.close(); } } @Override public String toString() { return "Lock{" + "lockToken=" + lockToken.getLockToken() + ", holder='" + holder + '\'' + '}'; } } class LockPubSub extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { Thread first = waitMap.get(message); if (first != null) { first.interrupt(); } } }}