重入锁
小L要努力吖 人气:0一,ReentrantLock和synchronized的异同点
-
1.ReentrantLock和synchronized都是独占锁,只允许线程互斥的访问临界区。但是实现上两者不同:synchronized加锁解锁的过程是隐式的,用户不用手动操作,优点是操作简单,但显得不够灵活。一般并发场景使用synchronized的就够了;ReentrantLock需要手动加锁和解锁,且解锁的操作尽量要放在finally代码块中,保证线程正确释放锁。ReentrantLock操作较为复杂,但是因为可以手动控制加锁和解锁过程,在复杂的并发场景中能派上用场。
-
2.ReentrantLock和synchronized都是可重入的。synchronized因为可重入因此可以放在被递归执行的方法上,且不用担心线程最后能否正确释放锁;而ReentrantLock在重入时要却确保重复获取锁的次数必须和重复释放锁的次数一样,否则可能导致其他线程无法获得该锁。
- synchronized是基于JVM层面实现的,而Lock是基于JDK层面实现的。曾经反复的找过synchronized的实现,可惜最终无果。但Lock却是基于JDK实现的,我们可以通过阅读JDK的源码来理解Lock的实现。
在底层分析中我们可知ReentrantLock实现Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子类,所有的同步操作都是依靠AbstractQueuedSynchronizer(队列同步器)实现。
下面是Reetrantlock锁的方法调用图:
可以结合上图对底下代码做分析:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; private final Sync sync;
//AbstractQueuedSynchronizer 是一个抽象类,所以在使用这个同步器的时候,需要通过自己实现预期的逻辑,Sync、FairSync和NonfairSync都是ReentrantLock为了实现自己的需求而实现的内部类,
//之所以做成内部类,我认为是只在ReentrantLock使用上述几个类,在外部没有使用到。
abstract static class Sync extends AbstractQueuedSynchronizer { //静态内部类Sync private static final long serialVersionUID = -5179523762034025860L; abstract void lock();//由于子类锁有公平和非公平之分,所以此方法需待子类来重写 final boolean nonfairTryAcquire(int acquires) {//父类中只实现了非公平性的获取锁 final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } ………………………………略 }
***************************************************************************
static final class NonfairSync extends Sync {//静态内部类NonfairSync非公平锁 private static final long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);//直接调用父类的方法 } }
*********************************************************************
static final class FairSync extends Sync {//静态内部类FairSync公平锁 private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
****************************************************************************************** public ReentrantLock() {//调用默认的构造函数返回来的是非公平锁 sync = new NonfairSync(); } public ReentrantLock(boolean fair) {//掉用带参的构造函数时,参数为true返回公平锁,参数为false返回非公平锁 sync = fair ? new FairSync() : new NonfairSync(); }
//动态调用,由于在方法体内调用的是父类的抽象方法,但子类已重写了此方法,所以又动态的调用子类重写后的方法
public void lock() { sync.lock(); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } }
//补充方法
//从上面的代码中,我们可以观察到不论是公平还是非公平锁,在其方法体内最终都会调用acquire方法,该方法是继承的AQS机制的。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1 final boolean nonfairTryAcquire(int acquires) //非公平获取 2 { final Thread current = Thread.currentThread(); //获取当前线程 3 int c = getState(); //获取state值,state=0表示锁当前是空闲状态,state>0代表当前有state个线程在等待 4 if (c == 0) { //表示锁空闲 5 if (compareAndSetState(0, acquires)) {//如果当前线程获取锁成功 6 setExclusiveOwnerThread(current); //则将当前线程定义成锁的占用者 7 return true; 8 } 9 } else if (current == getExclusiveOwnerThread()) { //判断当前请求的线程是否就是持锁的线程(保证重入性) 10 int nextc = c + acquires; //累加 11 if (nextc < 0) throw new Error("Maximum lock count exceeded"); 12 setState(nextc);//更新state值 13 return true; 14 } 15 return false; 16 }
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。如果没有重新获取到锁或者锁的占用线程和当前线程不是一个线程,方法返回false。那么这时候通过看acquire方法可知,需把获取锁失败的线程添加到同步队列中,调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,那接下来我们先看看addWaiter()方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//将当前线程创建成一个新结点 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//true表示同步队列已经被初始化过了,此时只需将当前结点的线程添加到同步队列中 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//false表示同步队列未被初始化,此时只需先初始化队列
return node;
}
在enq方法中主要做两个动作,一个是创建一个虚拟结点,用来表示当前正占用锁的那个线程的结点(因为之前这个线程直接获取锁成功,所以未进过队列,因此也没有机会new结点),第二个是将(在addWaiter中创建,通过参数传过来)当前线程创建的结点在此方法中连接在队列的末尾,这也就是在Node方法中存在一个死循环的目的:执行第二个动作。如果想详细了解该方法的执行过程可以看下这篇博客:https://blog.csdn.net/java_lyvee/articlehttps://img.qb5200.com/download-x/details/98966684
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
接下来就是acquireQueued()方法,在该方法中先是判断当前结点的前一个结点是不是头结点,如果是头结点,在次尝试获取下锁(万一在你new结点的过程中,前一个节点刚好执行完释放锁了呢),如果成功,那皆大欢喜,获取锁并将当前线程的结点设置为头结点,因为头结点一般都存的都是当前正占用锁的线程。
此处是做Node节点线程的自旋过程,自旋过程主要检查当前节点是不是head节点的next节点,如果是,则尝试获取锁,如果获取成功,那么释放当前节点,同时返回。如果不是,则一直循环做自旋过程。
如果这里一直不断的循环检查,其实是很耗费性能的,JDK的实现肯定不会这么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,这两个方法就实现了线程的等待从而避免无限的轮询:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//前一个结点的 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
首先,检查一下当前Node的前一个节点pred是否是SIGNAL,如果是SIGNAL,那么证明前置Node的线程已经Park了。(参考上图)如果ws>0,那么表示当前节点已经等待超时或者中断,那么需要不断调整当前节点的前置节点,将已经Concel的和已经中断的线程移除队列。如果waitStatus<0,那么设置前一个结点的waitStatus为SIGNAL,因为调用shouldParkAfterFailedAcquire的方法为死循环调用,所以终将返回true。接下来看parkAndCheckInterrupt方法,当shouldParkAfterFailedAcquire返回True的时候执行parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
1 protected final boolean tryRelease(int releases) { 2 int c = getState() - releases; //计算出state的新值 3 if (Thread.currentThread() != getExclusiveOwnerThread())//判断当前线程是否为持锁的线程 4 throw new IllegalMonitorStateException(); //如果不是,就抛异常 5 boolean free = false; 6 if (c == 0) { //当state的值为0,说明当前在无线程占用锁,即锁可以被释放掉 7 free = true; //修改free值 8 setExclusiveOwnerThread(null); 9 } 10 setState(c); //更新state值 11 return free; //返回true表示锁被释放,返回false表示还有线程占用着锁 12 }
true
创建公平锁,如果传入的是false
或没传参数则创建的是非公平锁。ReentrantLock lock = new ReentrantLock(true);1 final boolean nonfairTryAcquire(int acquires) //非公平获取 2 { final Thread current = Thread.currentThread(); //获取当前线程 3 int c = getState(); //获取state值,state=0表示锁当前是空闲状态,state>0代表当前有state个线程在等待 4 if (c == 0) { //表示锁空闲 5 if (compareAndSetState(0, acquires)) {//如果当前线程获取锁成功 6 setExclusiveOwnerThread(current); //则将当前线程定义成锁的占用者 7 return true; 8 } 9 } else if (current == getExclusiveOwnerThread()) { //判断当前请求的线程是否就是持锁的线程(保证重入性) 10 int nextc = c + acquires; //累加 11 if (nextc < 0) throw new Error("Maximum lock count exceeded"); 12 setState(nextc);//更新state值 13 return true; 14 } 15 return false; 16 }
1 protected final boolean tryAcquire(int acquires) { //公平获取 2 final Thread current = Thread.currentThread(); //获取当前线程 3 int c = getState(); //计算新的state值 4 if (c == 0) { //当前锁是空闲状态 5 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 } else if (current == getExclusiveOwnerThread()) { //当前线程是持有锁的线程 10 int nextc = c + acquires; 11 if (nextc < 0) throw new Error("Maximum lock count exceeded"); 12 setState(nextc); 13 return true; 14 } 15 return false; 16 }
该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即在获取锁前,先判断加入了同步队列中当前节点是否有前驱节点,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。可参考:https://www.cnblogs.com/ljl150/p/12585076.html
【代码演示】:公平性获取
1 import java.util.concurrent.locks.Lock; 2 import java.util.concurrent.locks.ReentrantLock; 3 class ThreadDome implements Runnable{ 4 String name; 5 public ThreadDome(String name){ 6 this.name=name; 7 } 8 Lock lock=new ReentrantLock(true);//公平性 9 @Override 10 public void run() { 11 for (int i = 0; i <2; i++){ 12 lock.lock(); 13 try{ 14 Thread.sleep(1000); 15 System.out.println(this.name); 16 }catch (Exception e){ 17 e.getMessage(); 18 }finally { 19 lock.unlock(); 20 } 21 } 22 } 23 } 24 public class ReentrantLockTest { 25 public static void main(String[] args) { 26 for (int i = 0; i <5; i++) { 27 new Thread(new ThreadDome(""+i)).start(); 28 } 29 } 30 }
【代码演示】:非公平性获取
与公平性的代码仅在这句代码:Lock lock=new ReentrantLock(false);//非公平性
运行结果:
在代码运行结果图中可知,公平性锁每次都是从同步队列中的第一个节点获取到锁,所以每次运行的线程都不一样,而非公平性锁出现了一个线程连续获取锁的情况。
为什么会出现线程连续获取锁的情况呢?回顾nonfairTryAcquire(int acquires)方法,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。这也就是非公平性锁会可能造成使线程“饥饿”的原因,既然这样,它又为什么被设定成默认的实现呢?因为非公平性锁的开销更小。如果把每次不同线程获取到锁定义为1次切换,公平性锁在测试中进行了10次切换,而非公平性锁只有5次切换。
3.ReentrantLock可响应中断
当使用synchronized实现锁时,阻塞在锁上的线程除非获得锁否则将一直等待下去,也就是说这种无限等待获取锁的行为无法被中断。而ReentrantLock给我们提供了一个可以响应中断的获取锁的方法lockInterruptibly()
。该方法可以用来解决死锁问题。
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public final void acquireInterruptibly(int arg)throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
- getHoldCount() 查询当前线程保持此锁的次数,也就是执行此线程执行lock方法的次数
- getQueueLength()返回正等待获取此锁的线程估计数,比如启动10个线程,1个线程获得锁,此时返回的是9
- getWaitQueueLength(Condition condition)返回等待与此锁相关的给定条件的线程估计数。比如10个线程,用同一个condition对象,并且此时这10个线程都执行了condition对象的await方法,那么此时执行此方法返回10
- hasWaiters(Condition condition)查询是否有线程等待与此锁有关的给定条件(condition),对于指定contidion对象,有多少线程执行了condition.await方法
- hasQueuedThread(Thread thread)查询给定线程是否等待获取此锁
- hasQueuedThreads()是否有线程等待此锁
- isFair()该锁是否公平锁
- isHeldByCurrentThread() 当前线程是否保持锁锁定,线程的执行lock方法的前后分别是false和true
- isLock()此锁是否有任意线程占用
- lockInterruptibly()如果当前线程未被中断,获取锁
- tryLock()尝试获得锁,仅在调用时锁未被线程占用,获得锁
- tryLock(long timeout TimeUnit unit)如果锁在给定等待时间内没有被另一个线程保持,则获取该锁
加载全部内容