hashmap是否线程安全?

一、糟糕的面试

面试官:小王,你说说HashMap的是线程安全的吗?

小王:HashMap不安全,在多线程下,会出现线程安全问题。他兄弟HashTable

线程是安全的,但是出于性能考虑,我们往往会选择ConcurrentHashMap。

面试官:HashMap线程不安全的原因是什么?

小王:这个...暂时忘记了

面试官:为什么HashTable线程安全,为什么性能低?

小王:这个...

面试官:ConcurrentHashMap是怎么实现线程安全的?性能为什么较高?

小王:...

面试官:回答的很不错,回去等通知吧。

hashmap是否线程安全?

二、hashMap

2.1 暴露问题

大家都知道,HashMap在多线程下会存在线程安全问题,如下:

public class Demo2 {    public static void main(String[] args) {        //shift+ctrl+alt+u        HashMap<String, String> map = new HashMap<>();        Thread t1 =   new Thread(new Runnable() {            @Override            public void run() {                for (int i = 0; i <= 10; i++) {                    map.put(i+"",i+"");                }            }        });        Thread t2 =   new Thread(new Runnable() {            @Override            public void run() {                for (int i = 11; i <= 20; i++) {                    map.put(i+"",i+"");                }            }        });        t1.start();        t2.start();        //确保两个子线程执行完毕之后,主线程再来打印hashmap        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        //遍历hashMap        for (int i = 1; i <= 20; i++) {            System.out.println(map.get(i + ""));        }    }}

控制台:

null2nullnullnull678910nullnull13nullnullnull17181920

以上例子证明了,HashMap确实存在线程安全问题。

2.2 源码追踪

翻阅源码(1.8)如下:

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,               boolean evict) {    Node<K,V>[] tab; Node<K,V> p; int n, i;    if ((tab = table) == null || (n = tab.length) == 0)        n = (tab = resize()).length;    //此处线程不安全    if ((p = tab[i = (n - 1) & hash]) == null)        tab[i] = newNode(hash, key, value, null);    else {        Node<K,V> e; K k;        if (p.hash == hash &&            ((k = p.key) == key || (key != null && key.equals(k))))            e = p;        else if (p instanceof TreeNode)            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);        else {            for (int binCount = 0; ; ++binCount) {                if ((e = p.next) == null) {                    p.next = newNode(hash, key, value, null);                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st                        treeifyBin(tab, hash);                    break;                }                if (e.hash == hash &&                    ((k = e.key) == key || (key != null && key.equals(k))))                    break;                p = e;            }        }        if (e != null) { // existing mapping for key            V oldValue = e.value;            if (!onlyIfAbsent || oldValue == null)                e.value = value;            afterNodeAccess(e);            return oldValue;        }    }    ++modCount;    //此处线程不安全。    if (++size > threshold)        resize();    afterNodeInsertion(evict);    return null;}

(1)代码一

if ((p = tab[i = (n - 1) & hash]) == null)        tab[i] = newNode(hash, key, value, null);

是否Hash冲突,没冲突就直接赋值给数组当前索引。

线程A判断通过,进入方法,切换B线程,判断通过,进入方法,切换A线程,赋值成功,切换B线程赋值成功,B线程的值覆盖了A线程的值,发生了数据覆盖,用户感受到是数据丢失。

(2) 代码二

if (++size > threshold)        resize();

当元素个数size大于扩容阈值,则扩容,这里会有两个问题。

成员的size变量没有保证原子性,因此多线程下size自增是存在原子性问题。即添加了两个元素,但是size只增加了1。两个线程如果都通过上面阈值的判断,就会发生扩容两次的情况,这也是一种安全问题。

三、HashTable

3.1 线程安全演示

我们可以使用HashTable来解决上面的安全问题。

看下面的代码:

public class Demo2 {    public static void main(String[] args) {        Hashtable<String, String> map = new Hashtable<>();        Thread t1 =   new Thread(new Runnable() {            @Override            public void run() {                for (int i = 0; i <= 10; i++) {                    map.put(i+"",i+"");//{i,i}                }            }        });        //20,20  21,21 ... 39,39        Thread t2 =   new Thread(new Runnable() {            @Override            public void run() {                for (int i = 11; i <= 20; i++) {                    map.put(i+"",i+"");//{i,i}                }            }        });        t1.start();        t2.start();        //确保两个子线程执行完毕之后,主线程再来打印hashmap        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        for (int i = 0; i <= 20; i++) {            System.out.println(map.get(i + ""));        }    }}

控制台:

01234567891011121314151617181920

以上代码,说明了Hashtable的确是线程安全的。

3.2 翻看源码

Hashtable源码:

public synchronized V put(K key, V value) {    // Make sure the value is not null    if (value == null) {        throw new NullPointerException();    }    // Makes sure the key is not already in the hashtable.    Entry<?,?> tab[] = table;    int hash = key.hashCode();    int index = (hash & 0x7FFFFFFF) % tab.length;    @SuppressWarnings("unchecked")    Entry<K,V> entry = (Entry<K,V>)tab[index];    for(; entry != null ; entry = entry.next) {        if ((entry.hash == hash) && entry.key.equals(key)) {            V old = entry.value;            entry.value = value;            return old;        }    }    addEntry(hash, key, value, index);    return null;}    public synchronized V get(Object key) {        Entry<?,?> tab[] = table;        int hash = key.hashCode();        int index = (hash & 0x7FFFFFFF) % tab.length;        for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {            if ((e.hash == hash) && e.key.equals(key)) {                return (V)e.value;            }        }        return null;    }    public synchronized int size() {        return count;    }

通过阅读源码可以发现,Hashtable每个操作数据的方法,都是使用了重量级锁synchronized。线程A在操作数据的时候,线程B只能阻塞。保证了整个Hash表只能线程串行化执行,从而解决了多线程产生的安全问题。

因为Hashtable是对整个哈希表进行加锁,加锁粒度过大,发生线程阻塞的概率非常大,虽然synchronized有自己的锁优化机制,但是也很快就会升级成重量级锁。而当synchronized成为了重量级锁,就会请求底层系统锁,跳出jvm级别,频繁涉及用户态和内核态的切换,性能开销比较大。

所以在今天已经不推荐使用HashTable了。

四、ConcurrentHashMap

4.1 线程安全演示

以上两个章节我们发现,在Map集合中HashMap是最常用的集合对象。但是多线程操作HashMap会有线程安全问题,解决方式就是使用HashTable,但是HashTable会全表加锁性能牺牲很大。

JDK1.5以后所提供了ConcurrentHashMap,使用它既能解决线程安全问题,性能又比HashTable高很多,所以这是目前主流的折中方案。

代码如下:

public class Demo3 {    public static void main(String[] args) {        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();        Thread t1 = new Thread(new Runnable() {            @Override            public void run() {                for (int i = 0; i <= 10; i++) {                    map.put(i + "", i + "");//{i,i}                }            }        });        //20,20  21,21 ... 39,39        Thread t2 = new Thread(new Runnable() {            @Override            public void run() {                for (int i = 11; i <= 20; i++) {                    map.put(i + "", i + "");//{i,i}                }            }        });        t1.start();        t2.start();        //确保两个子线程执行完毕之后,主线程再来打印hashmap        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        for (int i = 0; i <= 20; i++) {            System.out.println(map.get(i + ""));        }    }}

控制台:

01234567891011121314151617181920

线程安全得到保证。

总结 :

1 ,HashMap是线程不安全的。多线程环境下会有数据安全问题

2 ,Hashtable是线程安全的,但是会将整张表锁起来,效率低下

3,ConcurrentHashMap也是线程安全的,效率较高。 在JDK7和JDK8中,底层原理不一样。

4.2 jdk7原理解析

1.ConcurrentHashMap集合底层是一个默认长度为16,加载因子为0.75的大数组 Segment数组。大数组通常是创建之后长度就固定的,而扩容是指小数组扩容。

2.默认情况下还会创建一个长度为2的小数组,把地址赋值给0索引处,其他索引此时的元素仍为null。

(Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]。)

如下图

hashmap是否线程安全?

3.调用put方法时,此时会根据key的哈希值来计算出在大数组中存储的索引位置。

如果这个索引此时为null,则按照0素引的模板小数组来创建小数组。创建完毕后会二次哈希计算出key在小数组中存储的位置,然后把键值对对象存储小数组的该索引位。

如下图,先根据key的哈希算出来在大数组的4索引,创建小数组挂在4索引。接着继续使用key的hash算出存在小数组的0素引。

hashmap是否线程安全?

4.调用put方法时,此时会根据key的哈希值来计算出在大数组中存储的索引位置。

如果该位置不为null,就会根据记录的地址值找到小数组。二次哈希计算出小数组的索引位置。

如果需要扩容就把小数组扩容2倍。

如果不需要扩容,则会判断小数组该索引是否有元素

如果没有元素,就直接存

如果有元素,就调用equals方法比较key是否相同

比较发现没有重复,就会在小数组上挂链表。

如下图

hashmap是否线程安全?

线程一来访问索引4,此时就会对索引4的Segment进行加锁。其他线程访问索引4就会阻塞,访问其他索引就可以访问,这种技术叫分段锁,将数据拆成一段一段的进行加锁。

在当前例子中,我们没有指定大数组的长度,因此长度默认是 16。在理想情况下,最多可以支持16个线程同时操作不同的segment对象,达到了并发的目的。但是如果多个线程同时操作同一个segment,就会阻塞,串行化执行。

关键字:分段锁、二次哈希、Segment数组不能扩容、HashEntry数组可以扩容

总结

ConcurrentHashMap1.7使用Segment+HashEntry数组实现的。本质上是一个 Segment 数组,Segment 继承 ReentrantLock ,同时具备了加锁和释放锁的功能。每个Segment都线程安全,全局也就安全了。把Hashtable的锁全表,变成了锁一段段的数据,粒度细提高性能。

补充:ConcurrentHashMap1.8则完全不同,放弃了Segment。数据结构使用synchronized+CAS+红黑树。锁的粒度也从段锁缩小为结点锁,粒度更细,同时数组支持扩容,并发能力更高。使用synchronized其实也是因为1.6jdk对synchronized的优化有关。

4.3 jdk8 原理解析

在1.8中,ConcurrentHashMap可以说发生翻天覆地的变化,底层数据结构不再采用segment数组,也不再采用分段锁。而是采用 数组+链表+红黑树来实现,锁也从分段锁提升成了节点锁,粒度更细。使用CAS+synchronized来保证线程安全。

底层结构:数组+链表+红黑树

CAS + synchronized同步代码块 保证线程安全

hashmap是否线程安全?

初始化数组源码如下:

//假设多线程来扩容,concurrentHashMap为了线程安全,只能让一个线程成功初始化数组,其他线程均失败。private final Node<K,V>[] initTable() {        Node<K,V>[] tab; int sc;        //所有线程进入循环,去抢着初始化数组        while ((tab = table) == null || tab.length == 0) {            if ((sc = sizeCtl) < 0)                Thread.yield(); //让线程让出cpu,以至于把cpu更多的可能让给初始化操作的线程            //CAS操作,保证一个线程进入下面的逻辑,其他线程最终只能执行 Thread.yield();            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {                try {                    if ((tab = table) == null || tab.length == 0) {                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;                        @SuppressWarnings("unchecked")                        //这个就是初始化数组,默认长度n为16                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];                        table = tab = nt;                        sc = n - (n >>> 2);                    }                } finally {                    sizeCtl = sc;                }                break;            }        }        return tab;    }

通过源码发现,这里使用了 自旋+CAS+线程让出cpu。

其中 自旋+Cas:初始化操作必须且只会由一个线程执行一次,不会初始化多个数组。

线程让出cpu:提高性能,让初始化操作更快执行

put操作源码如下:

   final V putVal(K key, V value, boolean onlyIfAbsent) {        if (key == null || value == null) throw new NullPointerException();        int hash = spread(key.hashCode());        int binCount = 0;        for (Node<K,V>[] tab = table;;) {            Node<K,V> f; int n, i, fh;            //如果数组是null,说明第一次put,那就初始化数组            if (tab == null || (n = tab.length) == 0)                tab = initTable();            //根据key找到索引,如果此索引为null,则使用CAS将node直接赋给当前索引            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                if (casTabAt(tab, i, null,                             new Node<K,V>(hash, key, value, null)))                    break;                   // no lock when adding to empty bin            }            //如果当前索引位置的元素的hash==MOVED说明此时正在发生数组扩容的数据迁移操作,            //当前线程帮助完成数据迁移            else if ((fh = f.hash) == MOVED)                tab = helpTransfer(tab, f);            //当前索引既不是null,也没有在数据迁移。此时索引位置存储的要么链表要么红黑树            else {                V oldVal = null;                //对头结点进行加结点锁,保证同索引下的结点线程串行化执行                synchronized (f) {                    if (tabAt(tab, i) == f) {                        if (fh >= 0) {                            binCount = 1;                            for (Node<K,V> e = f;; ++binCount) {                                K ek;                                if (e.hash == hash &&                                    ((ek = e.key) == key ||                                     (ek != null && key.equals(ek)))) {                                    oldVal = e.val;                                    if (!onlyIfAbsent)                                        e.val = value;                                    break;                                }                                Node<K,V> pred = e;                                if ((e = e.next) == null) {                                    pred.next = new Node<K,V>(hash, key,                                                              value, null);                                    break;                                }                            }                        }                        else if (f instanceof TreeBin) {                            Node<K,V> p;                            binCount = 2;                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                                           value)) != null) {                                oldVal = p.val;                                if (!onlyIfAbsent)                                    p.val = value;                            }                        }                    }                }                if (binCount != 0) {                    if (binCount >= TREEIFY_THRESHOLD)                        treeifyBin(tab, i);                    if (oldVal != null)                        return oldVal;                    break;                }            }        }        addCount(1L, binCount);        return null;    }

通过源码发现:

put操作如果没有发生hash冲突,则CAS直接赋值到索引

如果发生了hash冲突,判断此时是否正在扩容数据迁移,是就加入帮忙数据迁移

如果此时是链表或者红黑树,就加节点锁,保证当前索引操作的线程串行化执行。

扩容源码如下:

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        int n = tab.length, stride;        //计算步长,算法的目的是,cpu核数越多,步长越小。        //步长最少为16,最多为数组的长度        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)            stride = MIN_TRANSFER_STRIDE; // subdivide range        if (nextTab == null) {            // initiating            try {                @SuppressWarnings("unchecked")                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];                nextTab = nt;            } catch (Throwable ex) {      // try to cope with OOME                sizeCtl = Integer.MAX_VALUE;                return;            }            nextTable = nextTab;            transferIndex = n;        }        int nextn = nextTab.length;        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        boolean advance = true;        boolean finishing = false; // to ensure sweep before committing nextTab        for (int i = 0, bound = 0;;) {            Node<K,V> f; int fh;            while (advance) {                int nextIndex, nextBound;                if (--i >= bound || finishing)                    advance = false;                else if ((nextIndex = transferIndex) <= 0) {                    i = -1;                    advance = false;                }                else if (U.compareAndSwapInt                         (this, TRANSFERINDEX, nextIndex,                          nextBound = (nextIndex > stride ?                                       nextIndex - stride : 0))) {                    bound = nextBound;                    i = nextIndex - 1;                    advance = false;                }            }            if (i < 0 || i >= n || i + n >= nextn) {                int sc;                if (finishing) {                    nextTable = null;                    table = nextTab;                    sizeCtl = (n << 1) - (n >>> 1);                    return;                }                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)                        return;                    finishing = advance = true;                    i = n; // recheck before commit                }            }            else if ((f = tabAt(tab, i)) == null)                advance = casTabAt(tab, i, null, fwd);            else if ((fh = f.hash) == MOVED)                advance = true; // already processed            else {                synchronized (f) {                    if (tabAt(tab, i) == f) {                        Node<K,V> ln, hn;                        if (fh >= 0) {                            int runBit = fh & n;                            Node<K,V> lastRun = f;                            for (Node<K,V> p = f.next; p != null; p = p.next) {                                int b = p.hash & n;                                if (b != runBit) {                                    runBit = b;                                    lastRun = p;                                }                            }                            if (runBit == 0) {                                ln = lastRun;                                hn = null;                            }                            else {                                hn = lastRun;                                ln = null;                            }                            for (Node<K,V> p = f; p != lastRun; p = p.next) {                                int ph = p.hash; K pk = p.key; V pv = p.val;                                if ((ph & n) == 0)                                    ln = new Node<K,V>(ph, pk, pv, ln);                                else                                    hn = new Node<K,V>(ph, pk, pv, hn);                            }                            setTabAt(nextTab, i, ln);                            setTabAt(nextTab, i + n, hn);                            setTabAt(tab, i, fwd);                            advance = true;                        }                        else if (f instanceof TreeBin) {                            TreeBin<K,V> t = (TreeBin<K,V>)f;                            TreeNode<K,V> lo = null, loTail = null;                            TreeNode<K,V> hi = null, hiTail = null;                            int lc = 0, hc = 0;                            for (Node<K,V> e = t.first; e != null; e = e.next) {                                int h = e.hash;                                TreeNode<K,V> p = new TreeNode<K,V>                                    (h, e.key, e.val, null, null);                                if ((h & n) == 0) {                                    if ((p.prev = loTail) == null)                                        lo = p;                                    else                                        loTail.next = p;                                    loTail = p;                                    ++lc;                                }                                else {                                    if ((p.prev = hiTail) == null)                                        hi = p;                                    else                                        hiTail.next = p;                                    hiTail = p;                                    ++hc;                                }                            }                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :                                (hc != 0) ? new TreeBin<K,V>(lo) : t;                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :                                (lc != 0) ? new TreeBin<K,V>(hi) : t;                            setTabAt(nextTab, i, ln);                            setTabAt(nextTab, i + n, hn);                            setTabAt(tab, i, fwd);                            advance = true;                        }                    }                }            }        }    }

通过源码发现:

ConcurrentHashMap采用的是多线程扩容,来提高扩容的效率。总体思想是,cpu核数越多线程越多,每个线程分得数据迁移任务量越小。

步长:单个线程负责迁移的桶数量。

下面做一个模拟扩容的流程:

假设现在数组长度有512,cpu核数2,步长32。

线程一:负责迁移索引(512-步长-1,512-1),即数组[479] - 数组[511]

线程二:负责迁移索引 数组[446] 数组[478]

如果线程一和线程二都执行完毕,两个线程就会通过CAS去抢下一个任务

如果线程二抢到了,数组[413] 数组[445]

线程一失败了,自旋继续CAS抢,抢到了

线程一 数组[381] 数组[412]

4.4 对比区别

(1)1.7用的是segment+hashentry数组实现的分段锁。只要线程没同时访问同一个分段数组,就可以并行访问默认长度16,segment数组不可以扩容(大数组),hashentry数组可以扩容。

(2)1.8用的是CAS+synchronized+voletile 实现的,底层是数组+链表+红黑树。对比1.7 锁的粒度更细,锁到了节点级别。

(3)1.8为什么synchronized替换segment?ReentrantLock:park unpark 用户态 - 内核态 性能开销比较大。synchronized:1.6之后优化了,偏向锁 轻量级锁 。此时加锁粒度非常小,比1.7小。转成重量级锁概率极小。

五、总结

通过上面的学习得知,hashmap在多线程情况下初始化数组和扩容的时候均会出现线程安全问题。我们可以通过HashTable来解决,HashTable是对整个hash表加锁,相当于线程串行化操作hash表,在解决问题的同时也会导致性能极低。最终我们可以使用ConcurrentHashMap将锁的粒度控制到最小,将性能影响控制到最低,同时扩容的时候ConcurrentHashMap还支持多线程扩容。可以说ConcurrentHashMap是多线程操作hashmap场景的不二之选,比如SpringCache框架中就使用了ConcurrentHashMap来作为本地缓存。

本文来自投稿,不代表重蔚自留地立场,如若转载,请注明出处https://www.cwhello.com/262807.html

如有侵犯您的合法权益请发邮件951076433@qq.com联系删除

(0)
黑马程序员黑马程序员订阅用户
上一篇 2023年5月12日 10:25
下一篇 2023年5月12日

相关推荐

联系我们

QQ:951076433

在线咨询:点击这里给我发消息邮件:951076433@qq.com工作时间:周一至周五,9:30-18:30,节假日休息