Java多线程之锁的强化学习
刘架构 人气:0首先强调一点:Java多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁。同时,类锁也是对象锁,类是Class对象
Java8锁
核心思想
关键字在实例方法上,锁为当前实例
关键字在静态方法上,锁为当前Class对象
关键字在代码块上,锁为括号里面的对象
在进行线程执行顺序的时候,如果添加了线程睡眠,那么就要看锁的对象是谁,同一把锁 / 非同一把锁是不一样的
Synchronized
synchronized 是Java提供的关键字,用来保证原子性的
synchronized的作用域如下
- 作用在普通方法上,此方法为原子方法:也就是说同一个时刻只有一个线程可以进入,其他线程必须在方法外等待,此时锁是对象
- 作用在静态方法上,此方法为原子方法:也就是说同一个时刻只有一个线程可以进入,其他线程必须在方法外等待,此时锁是当前的Class对象
- 作用在代码块上,此代码块是原子操作:也就是说同一个时刻只有线程可以进入,其他线程必须在方法外等待,锁是 synchronized(XXX) 里面的 XXX
先看一段简单的代码
public class SynchronizedTest { public static void main(String[] args) { test1(); test2(); } // 使用synchronized修饰的方法 public synchronized static void test1() { System.out.println("SynchronizedTest.test1"); } // 使用synchronized修饰的代码块 public static void test2() { synchronized (SynchronizedTest.class) { System.out.println("SynchronizedTest.test2"); } } }
执行之后,对其进行执行javap -v命令反编译
// 省略啰嗦的代码 public class cn.zq.sync.SynchronizedTest minor version: 0 major version: 52 flags: ACC_PUBLIC, ACC_SUPER { // 源码 public cn.zq.sync.SynchronizedTest(); descriptor: ()V flags: ACC_PUBLIC // main 方法 public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC // synchronized 修饰的静态方法 test1() public static synchronized void test1(); descriptor: ()V // 在这里我们可以看到 flags 中有一个 ACC_SYNCHRONIZED // 这个就是一个标记符这是 保证原子性的关键 // 当方法调用的时候,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标记符是否被设置 // 如果设置了,线程将先获取 monitor,获取成功之后才会执行方法体,方法执行之后,释放monitor // 在方法执行期间,其他任何线程都无法在获得一个 monitor 对象,本质上没区别。 flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #5 // String SynchronizedTest.test1 5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return LineNumberTable: line 17: 0 line 18: 8 // 代码块使用的 synchronized public static void test2(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=2, args_size=0 0: ldc #7 // class cn/zq/sync/SynchronizedTest 2: dup 3: astore_0 // 这个 monitorenter 是一个指令 // 每个对象都有一个监视器锁(monitor),当monitor被占用的时候就会处于锁定状态 // 线程执行monitorenter的时候,尝试获取monitor的锁。过程如下 // 1.任何monitor进入数为0,则线程进入并设置为1,此线程就是monitor的拥有者 // 2.如果线程已经占用,当前线程再次进入的时候,会将monitor的次数+1 // 3.如何其他的线程已经占用了monitor,则线程进阻塞状态,直到monitor的进入数为0 // 4.此时其他线程才能获取当前代码块的执行权 4: monitorenter 5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #8 // String SynchronizedTest.test2 10: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_0 // 执行monitorexit这条指令的线程必须是拥有monitor的 // 执行的之后,monitor的进入数-1.如果为0,那么线程就退出 monitor,不再是此代码块的执行者 // 此时再由其他的线程获得所有权 // 其实 wait/notify 等方法也依赖于monitor对象, // 所以只有在同步方法或者同步代码块中才可以使用,否则会报错 java.lang.IllegalMonitorstateException 异常 14: monitorexit 15: goto 23 18: astore_1 19: aload_0 20: monitorexit 21: aload_1 22: athrow 23: return Exception table: from to target type 5 15 18 any 18 21 18 any LineNumberTable: line 21: 0 line 22: 5 line 23: 13 line 24: 23 StackMapTable: number_of_entries = 2 frame_type = 255 /* full_frame */ offset_delta = 18 locals = [ class java/lang/Object ] stack = [ class java/lang/Throwable ] frame_type = 250 /* chop */ offset_delta = 4 } SourceFile: "SynchronizedTest.java"
总结:
使用synchronized修饰的同步方法
- 通过反编译我们可以看到,被synchronized修饰的方法,其中的 flags中有一个标记:ACC_SYNCHRONIZED
- 当线程执行方法的时候,会先去检查是否有这样的一个标记,如果有的话,说明就是一个同步方法,此时会为当前线程设置 monitor ,获取成功之后才会去执行方法体,执行完毕之后释放monitor
使用synchronized修饰的代码块
- 通过反编译我们看到,在代码块的两侧有JVM指令,在进入代码块之前指令是 monitorenter
- 当线程执行到代码块的时候,会先拿到monitor(初始值为0),然后线程将其设置为1,此时当前线程独占monitor
- 如果当前持有monitor的线程再次进入monitor,则monitor的值+1,当其退出的时候,monitor的次数-1
- 当线程线程退出一次monitor的时候,会执行monitorexit指令,但是只有持有monitor的线程才能获取并执行monitorexit指令,当当前线程monitor为0的时候,当前线程退出持有锁
- 此时其他线程再来争抢
- 但是为什么要有两个 monitorexit呢?
这个时候我们会发现synchronized是可重入锁,其实现原理就是monitor的个数增加和减少
同时wait / notify方法的执行也会依赖 monitor,所以wait和notify方法必须放在同步代码块中,否则会报错 java.lang.IllegalMonitorstateException
因为方法区域很大,所以设置一个标记,现在执行完判断之后,就全部锁起来,而代码块不确定大小,就需要细化monitor的范围
ReentrantLock
ReentrantLock是Lock接口的一个实现类
在ReentrantLock内部有一个抽象静态内部类Sync
其中一个是 NonfairSync(非公平锁),另外一个是 FairSync (公平锁),二者都实现了此抽象内部类Sync,ReentrantLock默认使用的是 非公平锁 ,我们看一下源码:
public class ReentrantLock implements Lock, java.io.Serializable { // 锁的类型 private final Sync sync; // 抽象静态类Sync继承了AbstractQueueSynchroniser [这个在下面进行解释] abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; // 抽象加锁方法 abstract void lock(); // 不公平的 tryLock 也就是不公平的尝试获取 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程对象 final Thread current = Thread.currentThread(); // 获取线程的状态 int c = getState(); // 根据线程的不同状态执行不同的逻辑 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 获取独占模式的线程的当前锁的状态 else if (current == getExclusiveOwnerThread()) { // 获取新的层级大小 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置锁的状态 setState(nextc); return true; } return false; } // 尝试释放方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 返回当前线程是不是独占的 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } // 返回 ConditionObject 对象 final ConditionObject newCondition() { return new ConditionObject(); } // 获得独占的线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 获得独占线程的状态 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // 判断是否是加锁的 final boolean isLocked() { return getState() != 0; } // 序列化 private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); } } // 非公平锁继承了Sync static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 加锁操作 final void lock() { // 判断是不是第一次加锁 底层调用 Unsafe的compareAndSwapInt()方法 if (compareAndSetState(0, 1)) // 设置为独占锁 setExclusiveOwnerThread(Thread.currentThread()); // 如果不是第一次加锁,则调用 acquire 方法在加一层锁 else acquire(1); } // 返回尝试加锁是否成功 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } // 公平锁 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 加锁操作,直接设置为1 final void lock() { acquire(1); } // 尝试加锁 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } }
Lock接口
public interface Lock { // 加锁 void lock(); // 不断加锁 void lockInterruptibly() throws InterruptedException; // 尝试加锁 boolean tryLock(); // 尝试加锁,具有超时时间 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 释放锁 void unlock(); // Condition 对象 Condition newCondition(); }
Condition接口
public interface Condition { // 等待 void await() throws InterruptedException; // 超时等待 boolean await(long time, TimeUnit unit) throws InterruptedException; // 超时纳秒等待 long awaitNanos(long nanosTimeout) throws InterruptedException; // 可中断等待 void awaitUninterruptibly(); // 等待死亡 boolean awaitUntil(Date deadline) throws InterruptedException; // 指定唤醒 void signal(); // 唤醒所有 void signalAll(); }
为什么官方提供的是非公平锁,因为如果是公平锁,假如一个线程需要执行很久,那执行效率会大大降低
ReentrantLock的其他方法
public class ReentrantLock implements Lock, java.io.Serializable { // 锁的类型 private final Sync sync; // 默认是非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // 有参构造,可以设置锁的类型 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } // 加锁 public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } // 解锁 调用release() 因为是重入锁,所以需要减少重入的层数 public void unlock() { sync.release(1); } // 返回Condition对象 ,用来执行线程的唤醒等待等操作 public Condition newCondition() { return sync.newCondition(); } // 获取锁的层数 public int getHoldCount() { return sync.getHoldCount(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } // 是否加锁 public boolean isLocked() { return sync.isLocked(); } // 是否是公平锁 public final boolean isFair() { return sync instanceof FairSync; } // 获取独占锁 protected Thread getOwner() { return sync.getOwner(); } // 查询是否有任何线程正在等待获取此锁 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // 查询给定线程是否正在等待获取此锁 public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } // 获取队列的长度 public final int getQueueLength() { return sync.getQueueLength(); } // 返回一个包含可能正在等待获取该锁的线程的集合 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } // 判断是否等待 public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } // 获得等待队列的长度 public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } // 获取正在等待的线程集合 protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } // toString() public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } }
总结:
1.ReentrantLock是独占锁
2.ReentrantLock是可重入锁
3.底层使用AbstractQueuedSynchronizer实现
4.synchronized 和 ReentrantLock的区别
- synchronized是是关键字,可以作用在静态方法、普通方法、静态代码块,底层使用monitor实现,synchronized是内置锁,是悲观锁,其发生异常会中断锁,所以不会发生死锁。是非中断锁
- ReentrantLock是类,作用在方法中,其比synchronized更加灵活,但是必须手动加锁释放锁,是乐观锁,发生异常不会中断锁,必须在finally中释放锁,是可中断的,使用Lock的读锁可以提供效率
AQS
AQS:AbstractQueueSynchronizer => 抽象队列同步器
AQS定义了一套多线程访问共享资源的同步器框架,很多同步器的实现都依赖AQS。如ReentrantLock、Semaphore、CountDownLatch …
首先看一下AQS队列的框架
它维护了一个volatile int state (代表共享资源)和一个FIFO线程等待队列(多线程争抢资源被阻塞的时候会先进进入此队列),这里的volatile是核心。在下个部分进行讲解~
state的访问方式有三种
- getState()
- setState()
- compareAndSetState()
AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程可以执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore、CountdownLatch)
不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只需要实现共享资源的获取和释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队)AQS在顶层已经实现好了。
自定义同步器时需要实现以下方法即可
- isHeldExclusively():该线程是否正在独占资源。只有用的Condition才需要去实现它
- tryAcquire(int):独占方式。尝试获取资源,成功返回true,否则返回false
- tryRelease(int):独占方式。尝试释放资源,成功返回true,否则返回false
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败,0表示成功但没有剩余可用资源,正数表示成功,且还有剩余资源
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回fasle
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁,然后将state+1,此后其他线程在调用tryAcquire()就会失败,直到A线程unlock()到state为0为止,其他线程才有机会获取该锁。当前在A释放锁之前,A线程是可以重复获取此锁的(state)会累加。这就是可重入,但是获取多少次,就要释放多少次。
再和CountdownLock为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程的个数一致)。这N个子线程是并行执行的,每个子线程执行完之后countDown一次。state会CAS-1。等到所有的子线程都执行完后(即state=0),会upark()主调用线程,然后主调用线程就会从await()函数返回,继续剩余动作
一般来说,自定义同步器要么是独占方法,要么是共享方式,也只需要实现tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一组即可,但是AQS也支持自定义同步器同时实现独占锁和共享锁两种方式,如:ReentrantReadWriteLock
AQS的源码
AbstractQueueSynchronizer 继承了 AbstractOwnableSynchronizer
AbstractOwnableSynchronizer类
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } // 独占模式当前的拥有者 private transient Thread exclusiveOwnerThread; // 设置独占模式当前的拥有者 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } // 得到独占模式当前的拥有者 protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractQueueSynchronizer类
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } // AbstractQueueSynchronizer 中的静态内部类 Node 节点 static final class Node { // 指示节点正在以共享模式等待的标记 static final Node SHARED = new Node(); // 指示节点正在以独占模式等待的标记 static final Node EXCLUSIVE = null; // 表示线程已经取消 static final int CANCELLED = 1; // 表示线程之后需要释放 static final int SIGNAL = -1; // 表示线程正在等待条件 static final int CONDITION = -2; // 指示下一个 acquireShared 应该无条件传播 static final int PROPAGATE = -3; // 状态标记 volatile int waitStatus; // 队列的前一个节点 volatile Node prev; // 队列的后一个节点 volatile Node next; // 线程 volatile Thread thread; // 下一个正在等待的节点 Node nextWaiter; // 判断是否时共享的 final boolean isShared() { return nextWaiter == SHARED; } // 返回上一个节点,不能为null,为null抛出空指针异常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 构造 Node() { // Used to establish initial head or SHARED marker } // 有参构造,用来添加线程的队列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 有参构造,根据等待条件使用 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } // 头节点 private transient volatile Node head; // 尾节点 private transient volatile Node tail; // 状态 private volatile int state; // 获取当前的状态 protected final int getState() { return state; } //设置当前的状态 protected final void setState(int newState) { state = newState; } // 比较设置当前的状态 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // 纳秒数,使之更快的旋转 static final long spinForTimeoutThreshold = 1000L; // 将节点插入队列 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 加一个等待节点 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } // 设置头节点 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // 如果存在后继节点,就唤醒 private void unparkSuccessor(Node node) { // 获得节点的状态 int ws = node.waitStatus; // 如果为负数,就执行比较并设置方法设置状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 唤醒后面的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } // 共享模式的释放动作,并且向后继节点发出信号 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // 设置队列的头,并检查后继者能否在共享模式下等待,如果可以,就是否传播设置为>0或者propagate状态 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } // 取消正在进行的尝试 private void cancelAcquire(Node node) { // 节点为null,直接返回 if (node == null) return; node.thread = null; // 跳过已经取消的前一个节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } // 还有好多方法... 其实本质就是基于 队列的判断和操作,AQS提供了独占锁和共享锁的设计 // 在AQS中,使用到了Unsafe类,所以AQS其实就是基于CAS算法的, // AQS的一些方法就是直接调用 Unsafe 的方法 如下所示 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } // 比较并设置头 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } // 比较并设置尾 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // 比较并设置状态 private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } // 比较并设置下一个节点 private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } // 除此之外 AQS 还有一个实现了Condition的类 如下 public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 条件队列的第一个节点 private transient Node firstWaiter; // 条件队列的最后一个节点 private transient Node lastWaiter; public ConditionObject() { } // 在等待队列中添加一个新的节点 private Node addConditionWaiter() { // 获取最后一个节点 Node t = lastWaiter; // 如果最后一个节点被取消了,就清除它 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } // 删除并转移节点直到它没有取消或者不为null private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 删除所有的节点 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } // 取消节点的连接 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // 将等待最长的线程,唤醒 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 唤醒所有的等待线程 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // 实现不间断的条件等待 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } // 模式意味着在退出等待时重新中断 private static final int REINTERRUPT = 1; // 模式的含义是在退出等待时抛出InterruptedException异常 private static final int THROW_IE = -1; // 检查中断,如果在信号通知之前被中断,则返回THROW_IE; // 如果在信号通知之后,则返回REINTERRUPT;如果未被中断,则返回 0 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // 抛出InterruptedException,重新中断当前线程, // 或不执行任何操作,具体取决于模式。 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } // 实现不可中断的条件等待 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 纳秒级别的等待 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } // 绝对定时等待 public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // 超时等待 public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // 判断是不是独占的 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } // 返回是否有正在等待的 protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } // 获得等待队列的长度 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } // 获取所有正在等待的线程集合 protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } } }
总结:
1.AQS为我们提供了很多实现。AQS内部有两个内部类,ConditionObject和Node节点
2.和开头说的一样,其维护了一个state和一个队列,也提供了独占和共享的实现
3.总结一下流程
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功就直接返回
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式
- acquireQueued()使得线程在队列中休息,有机会(轮到自己,会被unpark())会去尝试获取资源。获取到资源之后才会返回。如果在整个等待过程中被中断过,就返回true,否则返回false
- 如果线程在等待过程中被中断过,它不是响应的。只是获取资源之后才再进行自我中断selfInterrupt(),将中断补上
4.release() 是独占模式下线程共享资源的底层入口,它会释放指定量的资源,如果彻底释放了(state = 0)
5.如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?
这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!
6.获取锁的线程在什么情形下会release抛出异常呢 ?
- 线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;
- 线程被interupt了?线程在运行态是不响应中断的,所以也不会抛出异常;
7.acquireShared()的流程
- tryAcquireShared()尝试获取资源,成功则直接返回;
- 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
8.releaseShared()
释放掉资源之后,唤醒和后继
7.不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
volatile
volatile是Java提供的关键字,是轻量级的同步机制 JSR133提出,Java5增强了语义
volatile关键字有三个重要的特点
- 保证内存可见性
- 不保证原子性
- 禁止指令重排序
提到volatile,就要提到JMM - 什么是JMM
JMM:Java Memory Model
本身就是一种抽象的概念,并不真实存在,它描述的是一组规范和规则,通过这种规则定义了程序的各个变量(包括实例字段、静态字段、和构造数组对象的元素)的访问方式
JMM关于同步的规定
- 线程解锁前,必须把共享变量的值刷新到主内存
- 线程加锁前,必须读取主内存的最新的值到自己的工作内存
- 加锁和解锁必须是同一把锁
happens-before 规则
前一个操作对下一个操作是完全可见的,如果下一个操作对下下一个操作完全可见,那么前一个操作也对下下个操作可见
重排序
JVM对指令的执行,会进行优化重新排序,可以发生在编译重排序、CPU重排序
什么是内存屏障?
内存屏障分为2种
- 读屏障(LoadBarrier)
- 写屏障(Store Barrier)
内存屏障的作用
- 阻止屏障两侧的指令重排序
- 强制把缓冲区 / 高速缓存中的脏数据写回主内存,或者让缓存中相应的的数据失效
编译器生成字节码的时候,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。编译器选择了一个比较保守的JMM内存屏障插入策略,这样就可以保证在任何处理器平台,任何程序中都有正确的volatile语义
- 在每个volatile写操作之前插入一个StoreStore屏障
- 在每个volatile写操作之后入一个StoreLoad屏障
- 在每个volatile读操作之前插入一个LoadLoad屏障
- 在每个volatile读操作之前插入一个LoadStore屏障
原子性
- 问:i++为什么不是线程安全的?
- 因为 i++ 不是原子操作,i++有三个操作
如何解决?
- 使用 synchronized
- 使用AtomicInteger [JUC下的原子类]
有序性
1.计算机在执行程序的时候,为了提高性能,编译器和处理器通常会对指令重排序,一般分为3种-
- 源代码 -> 编译器优化的重排 -> 指令并行的重排 -> 内存系统的重排 -> 最终执行的指令
- 单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致
- 处理器在执行重排序之前必须考虑指令之间的数据依赖性
- 多线程环境种线程交替执行,由于编译器优化重排序的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测
2.指令重排序
多线程环境种线程交替执行,由于编译器优化重排序的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测此时使用volatile禁用指令重排序,就可以解决这个问题
volatile的使用
单例设计模式中的 安全的双重检查锁
volatile的底层实现
根据JMM,所有线程拿到的都是主内存的副本,然后存储到各自线程的空间,当某一线程修改之后,立即修改主内存,然后主内存通知其他线程修改
Java代码 instance = new Singleton();//instance 是 volatile 变量 汇编代码:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 变量修饰的共享变量进行写操作的时候会多第二行汇编代码,通过查 IA-32 架构软件开发者手册可知,lock 前缀的指令在多核处理器下会引发了两件事情。将当前处理器缓存行的数据会写回到系统内存。这个写回内存的操作会引起在其他 CPU 里缓存了该内存地址的数据无效。
如果对声明了volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。
自旋锁 ,自旋锁的其他种类
CAS 自旋锁
- CAS(Compare And Swap)比较并替换,是线程并发运行时用到的一种技术
- CAS是原子操作,保证并发安全,而不能保证并发同步
- CAS是CPU的一个指令(需要JNI调用Native方法,才能调用CPU的指令)
- CAS是非阻塞的、轻量级的乐观锁
我们可以实现通过手写代码完成CAS自旋锁
CAS包括三个操作数
- 内存位置 - V
- 期望值- A
- 新值 - B
如果内存位置的值与期望值匹配,那么处理器会自动将该位置的值设置为新值,否则不做改变。无论是哪种情况,都会在CAS指令之前返回该位置的值。
public class Demo { volatile static int count = 0; public static void request() throws Exception { TimeUnit.MILLISECONDS.sleep(5); // 表示期望值 int expectedCount; while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) { } } public static synchronized boolean compareAndSwap(int expectedCount, int newValue) { if (expectedCount == getCount()) { count = newValue; return true; } return false; } public static int getCount() { return count; } public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); } countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("count :" + count + " 耗时:" + (end - start)); } }
上述是我们自己书写的CAS自旋锁,但是JDK已经提供了响应的方法
Java提供了 CAS 的支持,在 sun.misc.Unsafe 类中,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
参数说明
- var1:表示要操作的对象
- var2:表示要操作对象中属性地址的偏移量
- var4:表示需要修改数据的期望的值
- var5:表示需要修改为的新值
CAS的实现原理
CAS通过调用JNI的代码实现,JNI:Java Native Interface ,允许Java调用其他语言
而CompareAndSwapXxx系列的方法就是借助“C语言”CPU底层指令实现的
以常用的 Inter x86来说,最后映射到CPU的指令为“cmpxchg”,这个是一个原子指令,CPU执行此命令的时候,实现比较并替换的操作
cmpxchg 如何保证多核心下的线程安全
系统底层进行CAS操作的时候,会判断当前操作系统是否为多核心,如果是,就给“总线”加锁,只有一个线程对总线加锁,保证只有一个线程进行操作,加锁之后会执行CAS操作,也就是说CAS的原子性是平台级别的
CAS这么强,有没有什么问题?
高并发情况下,CAS会一直重试,会损耗性能
CAS的ABA问题
CAS需要在操作值得时候检查下值有没有变化,如果没有发生变化就更新,但是如果原来一个值为A,经过一轮的操作之后,变成了B,然后又是一轮的操作,又变成了A,此时这个位置有没有发生改变?改变了的,因为不是一直是A,这就是ABA问题
如何解决ABA问题?
解决ABA问题就是给值增加一个修改版本号,每次值的变化,都会修改它的版本号,CAS在操作的时候都会去对比此版本号。
下面给出一个ABA的案例
public class CasAbaDemo { public static AtomicInteger a = new AtomicInteger(1); public static void main(String[] args) { Thread main = new Thread(() -> { System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get()); try { int executedNum = a.get(); int newNum = executedNum + 1; TimeUnit.SECONDS.sleep(3); boolean isCasSuccess = a.compareAndSet(executedNum, newNum); System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess); } catch (InterruptedException e) { e.printStackTrace(); } }, "主线程"); Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); a.incrementAndGet(); System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get()); a.decrementAndGet(); System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get()); } catch (Exception e) { e.printStackTrace(); } }, "干扰线程"); main.start(); thread.start(); } }
Java中ABA解决办法(AtomicStampedReference)
AtomicStampedReference 主要包含一个引用对象以及一个自动更新的整数 “stamp”的pair对象来解决ABA问题
public class AtomicStampedReference<V> { private static class Pair<T> { // 数据引用 final T reference; // 版本号 final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair; /** * 期望引用 * @param expectedReference the expected value of the reference * 新值引用 * @param newReference the new value for the reference * 期望引用的版本号 * @param expectedStamp the expected value of the stamp * 新值的版本号 * @param newStamp the new value for the stamp * @return {@code true} if successful */ public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return // 期望引用与当前引用一致 expectedReference == current.reference && // 期望版本与当前版本一致 expectedStamp == current.stamp && // 数据一致 ((newReference == current.reference && newStamp == current.stamp) || // 数据不一致 casPair(current, Pair.of(newReference, newStamp))); } }
修改之后完成ABA问题
public class CasAbaDemo02 { public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1); public static void main(String[] args) { Thread main = new Thread(() -> { System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference()); try { Integer executedReference = a.getReference(); Integer newReference = executedReference + 1; Integer expectStamp = a.getStamp(); Integer newStamp = expectStamp + 1; TimeUnit.SECONDS.sleep(3); boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp); System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess); } catch (InterruptedException e) { e.printStackTrace(); } }, "主线程"); Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1); System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference()); a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1); System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference()); } catch (Exception e) { e.printStackTrace(); } }, "干扰线程"); main.start(); thread.start(); } }
加载全部内容