java开发非公平锁不可打断源码示例解析
Emanon 人气:0非公平锁不可打断调试代码
package test; import java.util.concurrent.locks.ReentrantLock; public class TestReenTrantLock { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); new Thread(() -> { System.out.println("1 start"); lock.lock(); System.out.println("1 entry"); try { Thread.sleep(1000 * 60 * 10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } },"t1").start(); Thread.sleep(2000 ); new Thread(() -> { System.out.println("2 start"); lock.lock(); System.out.println("2 entry"); try { } finally { lock.unlock(); } },"t2").start(); } }
保证线程1先获取到锁,睡眠10分钟,因为需要打断点,线程2再去获取锁。
非公平锁不可打断加锁源码
lock
public void lock() { sync.lock(); }
final void lock() { //首先用线程1使用 cas 尝试将 state 从 0 改为 1,如果成功表示获得了锁 //因为线程1获取到了锁state现在等于1,所以此时线程2获取锁失败。 //线程2执行acquire(1); //非公平的体现:上来就加锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
acquire
//arg = 1 public final void acquire(int arg) { //线程2执行tryAcquire(arg),返回false代表锁获取失败,!tryAcquire(arg) ==true //由于是&&判断 //所以线程2调用addWaiter做尾部入队操作 //线程2接着调用acquireQueued进入park阻塞 if (!tryAcquire(arg) && //addWaiter(Node.EXCLUSIVE) 返回的是 线程2的所在的Node节点 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ //acquireQueued方法返回的是打断标志 如果阻塞状态或者运行状态被打断 //返回true 那么会执行selfInterrupt自我打断 //selfInterrupt方法只有1句代码:Thread.currentThread().interrupt(); selfInterrupt(); } }
tryAcquire:尝试加锁&判断锁重入
//acquires=1 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
//此时 线程2累计尝试2次加锁 final boolean nonfairTryAcquire(int acquires) { //acquires=1 final Thread current = Thread.currentThread(); int c = getState(); //如果线程1已经释放锁 此时c==0满足 会再次使用cas尝试加锁 //这里线程1仍然持有锁 条件不满足 if (c == 0) { // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列 // 非公平锁可以提高并发度,但是会导致饥饿,可以使用超时时间解决饥饿 // 线程切换的开销,其实就是非公平锁效率高于公平锁的原因 // 因为非公平锁减少了线程挂起的几率,后来的线程有一定几率节省被挂起的开销 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); //代表加锁成功 return true; } } // 判断是否锁重入 else if (current == getExclusiveOwnerThread()) { //使用原来的state + acquires,这里acquires = 1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //返回false代表线程2获取锁失败 return false; }
acquireQueued:加入同步队列
addWaiter方法的第一个参数是mode,Node.EXCLUSIVE是个null。
static final Node EXCLUSIVE = null;
acquireQueued方法的第一个参数是node,其实就是线程2所在的Node节点。第二个参数是1:代表了本次state加锁成功累加的数量。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
由于acquireQueued方法的参数是addWaiter方法的返回值,因此先看addWaiter方法
//node是Node.EXCLUSIVE 默认是null //enq方法创建的队列如下:头结点->尾结点(线程2所在节点) //后续的节点都在addWaiter方法中入队,不再进入enq:头结点->尾结点(线程2所在节点)->尾结点(线程3所在节点) private Node addWaiter(Node mode) { //static final Node EXCLUSIVE = null; //node为持有当前线程的node //mode为null 可以看到赋值给了 nextWaiter //也就是线程2所在节点的next指针指向了null //注意:nextWaiter是等待队列中的指针 /*** Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } ***/ Node node = new Node(Thread.currentThread(), mode); //获取同步队列的尾部节点 Node pred = tail; //此时同步队列的tail是null,因为到目前为止并没有执行过enq方法 //如果tail不为null:使用cas尝试将Node对象插入队列尾部,作为新的尾结点 if (pred != null) { //将当前node节点的前一个节点指向原tail节点 node.prev = pred; //将当前node节点作为新的尾节点 if (compareAndSetTail(pred, node)) { //原来的尾节点作为当前节点的下一个节点 pred.next = node; return node; } } //因为tail节点是null 尝试将Node加入队列 enq(node); //返回线程2节点 return node; }
//下面解释中的当前节点指的是Thread-2所在的节点 //enq相当于是初始化头尾结点和第一个入队的节点 //只有第1个入队的节点才会进入该方法 //后续的线程都会直接执行enq(node)之前的代码加入尾节点 //enq方法构造了1个双向队列:头结点->尾结点(线程2所在节点) private Node enq(final Node node) { for (;;) { Node t = tail; //第一次进入循环 tail是尾节点为null if (t == null) { //第一次进入循环:设置头结点为哨兵节点也叫哑结点:因为没有对应的线程与之关联 // head节点的属性:thread=null if (compareAndSetHead(new Node())) //第一次进入循环:将头结点赋值给尾节点 此时头和尾是同一个节点 这点很重要 tail = head; } else { //第二次进入循环:此处的t就是head,将当前节点的前置指针指向头节点 node.prev = t; //第二次进入循环:使用cas将尾节点设置为当前节点 //第二次进入循环:此时头结点是哨兵节点(哑结点),尾节点即Thread-2所在的线程的节点 if (compareAndSetTail(t, node)) { //第二次进入循环:将head.next指向当前节点那么这个链表是双向链表 t.next = node; //循环结束 return t; } } } }
//node是 线程2的节点 //arg = 1 //node.predecessor():获取当前节点的上一个节点 //node.predecessor()和node.prev不同的是: //node.prev如果是null不会抛出异常 //node.predecessor()中如果 node.prev是 null 会抛出异常 //acquireQueued方法返回的是打断状态 final boolean acquireQueued(final Node node, int arg) { //node即Thread-2所在的线程的节点 boolean failed = true; try { boolean interrupted = false; //死循环开始 for (;;) { //p是Thread-2所在的线程的节点的前置节点即头结点 final Node p = node.predecessor(); //p == head 即Thread-2所在的线程的节点的前置节点是头结点 //tryAcquire(arg) 使用cas再次尝试获取锁 获取锁失败 代码不进入if向下执行 //此时累计尝试3次 if (p == head && tryAcquire(arg)) { //如果获取锁成功将当前节点设置为头结点并将当前节点的thread属性和prev属性设置为null //也就是当前节点的prev和原来的头节点断开 //因为当前节点获取锁成功,意味着线程1已经释放锁,此时需要和代表线程1的原来的头结点断开。 setHead(node); //将原来的头节点断开和当前节点的连接 相当于原来的节点出队 p.next = null; // help GC failed = false; //注意这是在死循环里 //如果interrupted返回的是true 将会执行 selfInterrupt(); 自我中断 // if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){selfInterrupt();} //即:获取锁阻塞的过程中被打断,也要重新进入死循环一直等到获取锁才能执行打断,这就是不可打断。 //可打断是指在等待锁的过程中,其它线程可以用interrupt方法终止等待,synchronized锁是不可打断的。 //我们要想在等锁的过程中被打断,就要使用lockInterruptibly()方法对lock对象加锁,而不是lock()方法。 return interrupted; } //第一次进入shouldParkAfterFailedAcquire //将当前节点的前置节点即头结点改为-1 返回false (累计尝试4次) //如果当前节点的前置节点以及更前面的节点有取消的节点 //要断开这些节点 包括当前节点的前置节点 //第二次进入shouldParkAfterFailedAcquire //如果当前节点的前置节点是-1 返回true //shouldParkAfterFailedAcquire 返回true时 //会进入parkAndCheckInterrupt()方法中,然后会park当前线程 //Thread-2所在node被阻塞,然后等待唤醒,此时node的waitStatus=0 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ //在不可打断模式中 //线程park在parkAndCheckInterrupt方法里 //如果线程被打断,parkAndCheckInterrupt方法返回true //执行以下代码 //Interrupted = true //Interrupted = true 代表被阻塞期间打断过 //然后继续进入死循环直到获取锁 //获取锁后返回Interrupted = true //最后返回到acquire方法 //进入selfInterrupt();执行Thread.currentThread().interrupt(); //在可打断模式中 //线程park在parkAndCheckInterrupt方法里 //如果线程被打断,parkAndCheckInterrupt方法返回true //执行以下代码 //throw new InterruptedException(); interrupted = true; } }//死循环结束 } finally { /** 这里的failed 什么时候变成true的? 默认的failed=true 在死循环一直都是true!!!因为一直没有获取锁成功!! 除非是获取到了锁才被赋值为false 1.try代码块抛出异常 ***/ if (failed) cancelAcquire(node); } }
//node是当前节点 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
shouldParkAfterFailedAcquire:判断是否需要park
//p是Thread-2所在节点的前置节点即头结点 //node是 Thread-2所在节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //p是Thread-2所在节点的前置节点即头结点 //头结点的waitStatus=0 int ws = pred.waitStatus; //第一次进入 ws=0 修改waitStatus为-1 //第二次进入ws=-1 Node.SIGNAL=-1 代表等待唤醒 返回true if (ws == Node.SIGNAL){ // 上一个节点=-1 都在阻塞, 那么自己也阻塞好了 //返回true代表要park return true; } //如果当前节点的前置节点的waitStatus>0 //说明当前节点D的前置节点C被取消,那么要把当前节点D的前置节点重新设置为[当前节点的前置节点C的前置节点B] //B<---C<---D //假如B节点被取消,此时需要断开C那么直接将D指向B即可 //A<---B<---C<---D //假如BC节点被取消,此时需要断开BC那么直接将D指向A即可 if (ws > 0) { do { //首先做 do //获取当前节点的前置节点的前置节点pred.prev //因为当前节点的前置节点pred的status大于0 说明当前节点是被取消的 需要断开 //继续往前找当前节点的前置节点的前置节点pred.prev //如果当前节点的前置节点的前置节点pred.prev的status还是大于0 说明也是被取消的 //那么继续往前找 //一直到将当前节点的前置节点以及当前节点的前置节点之前被取消的节点都断开 //看看代码是怎么做的 //获取当前节点的前置节点的前置节点作为当前节点的前置节点 pred = pred.prev; //然后将当前节点的前置指针指向当前节点的前置节点的前置节点 node.prev = pred; } while (pred.waitStatus > 0); //断开的是当前节点的前置节点 以及 当前节点的前置节点之前被取消的节点 //从后往前断开的 pred.next = node; } else { //将当前节点的前置节点即头结点改为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //第一次进入返回false 因为是死循环 等第二次进入的时候 // 符合 ws == Node.SIGNAL 会返回true return false; }
parkAndCheckInterrupt:park并检查中断
//代码块10 private final boolean parkAndCheckInterrupt() { //此处park LockSupport.park(this); //当前线程被unpark唤醒时,当前方法返回true或者false都要重新进入死循环然后陷入阻塞,一直等获取到锁才能被打断 //不同的是 //parkAndCheckInterrupt:返回true //会执行interrupted = true; //再次进入死循环,再次执行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()然后阻塞 //parkAndCheckInterrupt:返回false //会直接进入死循环再次执行shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt()然后阻塞 //这里为什么要调用interrupted() 而不是isInterrupted() ? //interrupted会重置打断标记为false 而isInterrupted只是返回打断标记 //当park的线程在被调用interrupt方法时,会把中断状态设置为true。 //然后park方法会去判断中断状态,如果为true,就直接返回,然后往下继续执行,如果为false继续阻塞 return Thread.interrupted(); }
注意 是否需要进入park阻塞是由当前节点的前驱节点的waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定。
目前头结点的waitStatus==Node.SIGNAL==-1,线程2所在节点的waitStatus==0。
判断前置节点waitstatus是否是SIGNAL即-1阻塞等待唤醒,如果前置节点是-1那么自己也进入阻塞
如果前置节点的waitstatus是大于0,说明节点已经被取消,递归断开这些节点返回false。
继续进入死循环判断前置节点状态,此时前置节点的waitstatus是0,将当前节点的前置节点即头结点改为-1,返回false。
继续进入死循环判断前置节点状态,此时前置节点的waitstatus是-1,那么自己也进入阻塞返回true
加锁是从当前节点往前找,如果前置节点已经被取消,那么继续往前找,找到一个没有被取消的节点为止。
解锁是从当前节点往后找,如果后置节点已经被取消,那么继续从后往前找,找到一个没有被取消的节点为止。
cancelAcquire:出队
出队是有条件的:必须抛出异常。只有在打断模式下才会抛出异常进入finally调用cancelAcquire方法出队。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //省略代码 }finally { /** 这里的failed 什么时候变成true的? 默认的failed=true 在死循环一直都是true!!!因为一直没有获取锁成功!! 除非是获取到了锁才被赋值为false 1.try代码块抛出异常 ***/ if (failed) cancelAcquire(node); }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
获取锁或者阻塞过程中,线程宕掉(系统异常或手动kill线程) 。
则会进入到acquireQueued的finally代码里,并判断failed是否为true,若为true则执行cancelAcquire方法放弃获取锁。
我们一般都说这个方法是用来中断线程的,那么这个中断应该怎么理解呢? 就是说把当前正在执行的线程中断掉,不让它继续往下执行吗?
其实,不然。 此处,说的中断仅仅是给线程设置一个中断的标识(设置为true),线程还是会继续往下执行的。而线程怎么停止,则需要由我们自己去处理。 一会儿会用代码来说明这个。
下面的示例代码说明当1个线程在park状态下被interrupt()方法打断或者被stop,会从之前阻塞的代码处唤醒并继续往下执行代码,而不是我们想象的直接跳出代码。
//示例代码1 public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //此处可以用 thread.stop(); 不推荐 thread.interrupt(); }
但是有1个问题,为什么跳不出来循环呢?
原来当调用interrupt方法时,会把中断状态设置为true,然后park方法会去判断中断状态,如果为true,就直接返回,然后往下继续执行,并不会抛出异常。
注意,这里并不会清除中断标志。
参考: https:
此时我们想到使用Thread.interrupted();方法重置打断标记为false
//示例代码2 static volatile Boolean flag = false; public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); if (flag){ Thread.interrupted(); } } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //此处可以用 thread.stop(); 不推荐 thread.interrupt(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } flag=true; }
发现上面的代码还是跳不出循环,而是被park阻塞。这个时候我们尝试使用抛出异常。
//示例代码3 static volatile Boolean flag = false; public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); if (flag){ throw new RuntimeException(); } } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //此处可以用 thread.stop(); 不推荐 thread.interrupt(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } flag=true; }
抛出异常成功终止循环并执行了finally。
其实上面的示例2就是不可打断模式的原理,示例2是可打断模式的原理。
非公平锁不可打断解锁源码
unlock
// 解锁实现 public void unlock() { sync.release(1); }
release
// AQS 继承过来的方法, 方便阅读, 放在此处 public final boolean release(int arg) { // 如果所有的锁释放成功即state=0 if (tryRelease(arg)) { // 队列头节点 Node h = head; // 头结点不为null 且 waitStatus不等于0 才需要唤醒头结点的后置节点 // h != null 说明有等待队列 // h.waitStatus != 0 说明头结点后面有节点在等待锁 // 假设头结点的下一个节点还没来得及修改h.waitStatus= -1 会有问题吗? // 不会 因为如果h.waitStatus=0,此时头结点的下一个节点还会再尝试一次获取锁 // 因为锁在这里已经被释放 所以头结点的下一个节点必定能获取到锁 if (h != null && h.waitStatus != 0) { // h是队列头节点 // unpark AQS 中等待的线程, 进入 ㈡ unparkSuccessor(h); } return true; } return false; }
tryRelease
// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, state 减为 0, 表明所有的锁都释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //设置state为c,c不一定等于0 setState(c); //返回锁标志位 return free; }
unparkSuccessor
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处 //node是头结点 private void unparkSuccessor(Node node) { // 此处的node节点为头结点 // 如果头节点的状态小于0 尝试重置头节点的状态为0 //改为0的意义在于:在下面的代码中:头结点的下一个节点被唤醒时会再次尝试加锁 //在shouldParkAfterFailedAcquire 方法中有1个判断 //if (ws == Node.SIGNAL) { return true; } //返回true代表获取锁失败进入parkAndCheckInterrupt方法阻塞 //这里改为0以后 那么头结点的下一个节点会在被unpark的时候再一次尝试加锁 //如果不改为0 那么头结点的下一个节点会直接进入死循环被park 陷入了死循环无解了。 int ws = node.waitStatus; if (ws < 0) { //配合唤醒线程 再一次尝试加锁 //配合唤醒线程 再一次尝试加锁 //配合唤醒线程 再一次尝试加锁 compareAndSetWaitStatus(node, ws, 0); } //获取头结点的下一个节点 Node s = node.next; //node是头节点 //如果头结点的后置节点为空或被取消 //那么从队列的末尾从后往前找,找到最前面一个需要unpark的节点 //如果头结点的后置节点不为空且没被取消 //那么就唤醒头节点的下一个节点 //这里也是非公平的体现 if (s == null || s.waitStatus > 0) { s = null; //循环遍历从 AQS 队列从队列的末尾从后往前找,找到最前面一个需要unpark的节点 //注意这里做了判断t不等于null且t不等于头结点且t.waitStatus <= 0 //也就是找到的节点必定是有效的 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0){ s = t; } } //唤醒头结点的下一个节点 或者 从后往前找到的第1个t.waitStatus<= 0的节点 if (s != null) //唤醒线程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次尝试加锁 //唤醒线程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次尝试加锁 //唤醒线程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次尝试加锁 LockSupport.unpark(s.thread); } }
非公平锁可重入源码
getExclusiveOwnerThread
static final class NonfairSync extends Sync { // Sync 继承过来的方法, 方便阅读, 放在此处 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Sync 继承过来的方法, 方便阅读, 放在此处 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }
加载全部内容