亲宝软件园·资讯

展开

ConcurrentHashMap如何保证线程安全

AntzUhl 人气:3

HashMap的put,get,size等方法都不是线程安全的,而HashTable虽然保证了线程安全,但却是用了效率极低的方法,在put,get,size等方法上加上了synchronized,这就导致所有的并发进程都要竞争同一把锁,一个线程在进行同步操作时,其他线程都需要等待。

为了保证集合的线程安全性,Jdk提供了同步包装器,比如说Collections.synchronizedMap,不过它们都是利用粗粒度的同步方式,高并发情况下性能较差。

可以看看Collections.synchronizedMap的实现。

与并发安全有关的代码都使用了synchronized关键字进行了修饰,使用的锁mutex可以在构造方法中看到就是this。

虽然说不在是用synchronized来修饰方法,但是还是使用了this做mutex锁,治标不治本。

更加普遍的选择是利用并发包提供的线程安全容器类,它提供了各种并发容器,比如ConcurrentHashMap、CopyOnWriteArrayList。

各种线程安全队列,如ArrayBlockingQueue、SynchronousQueue。

ConcurrentHashMap分析

ConcurrentHashMap和HashMap一样,在Java8时结构上发生了很大变化。

之前的ConcurrentHashMap的实现是基于分离锁,也就是内部进行分段,每一个段中都拥有HashEntry数组,hashcode相同的key也是按照链表的方式存储的。

HashEntry内部使用volatile修饰的value字段来保证可见性,也利用了不可变对象的机制以改进利用Unsafe提供的底层能力,比如volatile access,去直接完成部分操作,以最优化性能,毕竟Unsafe中的很多操作都是JVM intrinsic优化过的。

Segment的初始大小由DEFAULT_CONCURRENCY_LEVEL来确定,默认是16,在构造方法中可以自定义大小,不过必须是2的幂,如果输入7,会自动创建8。

对于get方法来说,只需要保证可见性,无需其他同步操作。

关键是put方法,首先是通过二次哈希避免哈希冲突,然后以Unsafe调用方式,直接获取相应的Segment,然后进行线程安全的put操作。

 public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 二次哈希,以保证数据的分散性,避免哈希冲突
    int hash = hash(key.hashCode());
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
    (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
 }

put的核心实现如下。

fnal V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // scanAndLockForPut会去查找是否有key相同Node
    // 无论如何,确保获取锁
    HashEntry<K,V> node = tryLock() ? null :
    scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> frs = entryAt(tab, index);
        for (HashEntry<K,V> e = frs;;) {
            if (e != null) {
                K k;
                // 更新已有value...
            } else {
            // 放置HashEntry到特定位置,如果超过阈值,进行rehash
            // ...
            }
        }
    } fnally {
        unlock();
    }
    return oldValue;
}

首先也是通过 Key 的 Hash 定位到具体的 Segment,在 put 之前会进行一次扩容校验。这里比 HashMap 要好的一点是:HashMap 是插入元素之后再看是否需要扩容,有可能扩容之后后续就没有插入就浪费了本次扩容(扩容非常消耗性能)。而 ConcurrentHashMap 不一样,它是在将数据插入之前检查是否需要扩容,之后再做插入操作。

ConcurrentHashMap会获取再入锁,以保证数据一致性,Segment本身就是基于ReentrantLock的扩展实现,所以,在并发修改期间,相应Segment是被锁定的。

在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突
是ConcurrentHashMap的常见技巧。

另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。
试想,如果不进行同步,简单的计算所有Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有Segment进行计算,就会变得非常昂贵。

所以,ConcurrentHashMap的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对
比Segment.modCount),就直接返回,否则获取锁进行操作。

每个Segment都有一个volatile修饰的全局变量count,求整个 ConcurrentHashMap的size时很明显就是将所有的count累加即可。但是volatile修饰的变量却不能保证多线程的原子性,所有直接累加很容易出现并发问题。

通过尝试两次将count累加,如果容器的count发生了变化再加锁来统计size,可以有效的避免加锁的问题。

以上是JDK1.7及之前的ConcurrentHashMap的结构,在1.8之后,ConcurrentHashMap结构发生了很大变化,由之前的分段锁变为了CAS+synchronized。

我在网上找到了一幅结构图。

总体结构上,它的内部存储和HashMap结构非常相似,同样是大的桶(bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要更细致一些。

其内部仍然有Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。

因为不再使用Segment,初始化操作大大简化,修改为lazy-load形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。

将1.7中存放数据的HashEntry改为Node,但作用都是相同的。其中的val next 都用了volatile修饰,保证了可见性。

 satic class Node<K,V> implements Map.Entry<K,V> {
    fnal int hash;
    fnal K key;
    volatile V val;
    volatile Node<K,V> next;
    // …
 }

在Node中,key被定义为了final,因为照常理来说,key是不会改变了,而是value会发生改变,因此在val中添加了volatile来修饰。

来看看它的put操作。

fnal V putVal(K key, V value, boolean onlyIfAbsent) { 
    if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 利用CAS去进行无锁线程安全操作,如果bin是空的
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;
            }
            else if ((fh = f.hash) == MOVED)
 tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // 不加锁,进行检查
 && fh == hash
 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
 && (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                synchronized (f) {
                // 细粒度的同步修改操作...
                }
            }
            // Bin超过阈值,进行树化
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

具体流程如下:

  1. 根据key计算出hashcode 。
  2. 判断是否需要进行初始化initTable。
  3. f即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
  4. 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
  5. 如果都不满足,则利用 synchronized 锁写入数据。
  6. 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

可以发现1.8以后的锁的颗粒度,是加在链表头上的,这是一个很重要的突破。

1.8中不依赖与segment加锁,segment数量与桶数量一致。

首先判断容器是否为空,为空则进行初始化利用volatile的sizeCtl作为互斥手段,如果发现竞争性的初始化,就暂停在那里,等待条件恢复,否则利用CAS设置排他标志(U.compareAndSwapInt(this, SIZECTL, sc, -1)),否则重试
对key hash计算得到该key存放的桶位置,判断该桶是否为空,为空则利用CAS设置新节点,否则使用synchronize加锁,遍历桶中数据,替换或新增加点到桶中。最后判断是否需要转为红黑树,转换之前判断是否需要扩容。

最后来看看1.8 ConcurrentHashMap的size方法实现。


在sumCount方法中使用到了CounterCell,对于CounterCell的操作,是基于java.util.concurrent.atomic.LongAdder进行的,是一种JVM利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。

// 一种用于分配计数的填充单元。改编自LongAdder和Striped64。请查看他们的内部文档进行解释。
@sun.misc.Contended 
static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

加载全部内容

相关教程
猜你喜欢
用户评论