访问 ConcurrentHashMap<Element, Boolean> 的每

Scalable way to access every element of ConcurrentHashMaplt;Element, Booleangt; exactly once(访问 ConcurrentHashMaplt;Element, Booleangt; 的每个元素的可扩展方式恰好一次)
本文介绍了访问 ConcurrentHashMap<Element, Boolean> 的每个元素的可扩展方式恰好一次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 32 个机器线程和一个 ConcurrentHashMap<Key,Value>map,其中包含很多键.Key 定义了一个公共方法 visit().我想visit() 使用我可用的处理能力以及可能的某种线程池,只对 map 的每个元素进行一次.

I have 32 machine threads and one ConcurrentHashMap<Key,Value> map, which contains a lot of keys. Key has defined a public method visit(). I want to visit() every element of map exactly once using the processing power I have available and possibly some sort of thread pooling.

我可以尝试的事情:

  • 我可以使用 map.keys() 方法.生成的 Enumeration 可以使用 nextElement() 进行迭代,但是由于对 key.visit() 的调用非常简短,因此我无法使线程保持忙碌.枚举本质上是单线程的.
  • 我可以使用同步的 HashSet<Key> 代替,调用方法 toArray() 并将数组上的工作分成所有 32 个线程.我严重怀疑这个解决方案,因为方法 toArray() 很可能是单线程瓶颈.
  • 我可以尝试从 ConcurrentHashMap 继承,掌握其内部 Segment<K,V> 的实例,尝试将它们分成 32 个组并工作分别在每组上.不过,这听起来像是一种硬核方法.
  • 或使用 Enumeration<Key> 的类似魔法.
  • I could use the method map.keys(). The resulting Enumeration<Key> could be iterated over using nextElement(), but since a call to key.visit() is very brief I won't manage to keep threads busy. The Enumeration is inherently single-threaded.
  • I could use a synchronised HashSet<Key> instead, invoke a method toArray() and split the work on the array into all 32 threads. I seriously doubt in this solution, since the method toArray() will likely be a single-thread bottle-neck.
  • I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment<K,V>, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.
  • or similar magic with Enumeration<Key>.

理想情况下:

  • 理想情况下,ConcurrentHashMap<Key, Value> 将定义一个方法 keysEnumerator(intapproxPosition),这可能会使我丢失大约前 1/32 个元素的枚举器,即map.keysEnumerator(map.size()/32)
  • Ideally a ConcurrentHashMap<Key, Value> would define a method keysEnumerator(int approximatePosition), which could drop me an enumerator missing approximately first 1/32 elements, i.e. map.keysEnumerator(map.size()/32)

我错过了什么明显的东西吗?有没有人遇到过类似的问题?

Am I missing anything obvious? Has anybody run into similar problem before?

编辑

我已经进行了分析,看看这个问题是否真的会影响实践中的性能.由于目前我无法访问集群,因此我使用笔记本电脑并尝试将结果外推到更大的数据集.在我的机器上,我可以创建一个 200 万个键 ConcurrentHashMap,并且在每个键上调用 visit() 方法来迭代它大约需要 1 秒.该程序应该扩展到 8500 万键(及以上).集群的处理器稍微快一些,但它仍然需要大约 40 秒来迭代整个地图.现在谈谈程序的逻辑流程.呈现的逻辑是顺序的,即在上一步中的所有线程都完成之前,不允许任何线程进行下一步:

I've had a go at profiling to see whether this problem is actually going to affect the performance in practice. As I don't have access to the cluster at the moment I used my laptop and tried to extrapolate the results to a bigger dataset. On my machine I can create a 2 million keys ConcurrentHashMap and it takes about 1 second to iterate over it invoking the visit() method on every key. The program is supposed to scale to 85 million keys (and over). The cluster's processor is slightly faster, but it still should take about 40 seconds to iterate over entire map. Now a few words about the logic flow of the program. The logic presented is sequential, i.e. it is not allowed for any thread to proceed to the next step until all the threads in the previous step are finished:

  1. 创建哈希映射,创建键并填充哈希映射
  2. 遍历访问所有键的整个哈希映射.
  3. 进行一些数据混洗,即并行插入和删除.
  4. 将第 2 步和第 3 步重复数百次.
  1. Create the hash map, create the keys and populate the hash map
  2. Iterate over entire hash map visiting all the keys.
  3. Do some data shuffling which is parallel insertions and deletions.
  4. Repeat step 2 and 3 a few hundred times.

这个逻辑流程意味着一个 40 秒的迭代将被重复几百次,比如 100 次.这让我们在访问节点上花费了 一个多小时.使用一组 32 个并行迭代器,它可以缩短到几分钟,这是一个显着的性能改进.

That logic flow means that a 40 second iteration is going to be repeated a few hundred times, say 100. Which gives us a bit over an hour spent just in visiting the nodes. With a set of 32 parallel iterators it could go down to just a few minutes, which is a significant performance improvement.

现在谈谈 ConcurrentHashMap 是如何工作的(或者我认为它是如何工作的).每个 ConcurrentHashMap 都由段组成(默认为 16 个).对哈希映射的每次写入都会在相关段上同步.假设我们正在尝试将两个新键 k1 和 k2 写入哈希映射,并且它们将被解析为属于同一段,例如 s1.如果尝试同时写入它们,则其中一个将首先获取锁,然后再添加另一个.两个元素被解析为属于同一段的机会是多少?如果我们有一个好的散列函数和 16 个段,那么它就是 1/16.

Now a few words on how ConcurrentHashMap works (Or how I believe it works). Every ConcurrentHashMap consists of segments (by default 16). Every write to a hash map is synchronised on a relevant segment. So say we're trying to write two new keys k1 and k2 to the hash map and that they would be resolved to belong to the same segment, say s1. If they are attempted to be written simultaneously, one of them is going to acquire the lock first and be added earlier then the other. What is the chance of two elements to be resolved to belong to the same segment? In case we have got a good hash function and 16 segements it is 1/16.

我相信 ConcurrentHashMap 应该有一个方法 concurrentKeys(),它将返回一个枚举数组,每个段一个.我有一些想法如何通过继承将它添加到 ConcurrentHashMap,如果我成功了,我会告诉你的.就目前而言,解决方案似乎是创建一个 ConcurrentHashMaps 数组并预​​先散列每个键以解析为此类数组的一个成员.准备好后,我也会分享该代码.

I belive that ConcurrentHashMap should have a method concurrentKeys(), which would return an array of Enumerations, one per each segment. I have got a few ideas how to add it to ConcurrentHashMap through inheritance and I'll let you know if I succeed. As for now the solution seems to be to create an array of ConcurrentHashMaps and pre-hashing every key to resolve to one member of such array. I'll share that code as well, once it's ready.

编辑

这是不同语言的相同问题:

This is the same problem in a different language:

并行迭代器

推荐答案

我最终会采用的解决方案是一个 ConcurrentHashMaps 数组,而不是一个 ConcurrentHashMap.这是临时的,但似乎与我的用例有关.我不在乎第二步的速度很慢,因为它不会影响我的代码的性能.解决办法是:

The solution I will eventually go for is an array of ConcurrentHashMaps instead of one ConcurrentHashMap. This is ad hoc, but seems to be relevant for my usecase. I don't care about the second step being slow as it doesn't affect my code's performance. The solution is:

对象创建:

  1. 创建一个大小为 t 的 ConcurrentHashMaps 数组,其中 t 是线程数.
  2. 创建一个大小为 t 的 Runnables 数组.
  1. Create an array of size t of ConcurrentHashMaps, where t is a number of threads.
  2. Create an array of Runnables, also of size t.

数组填充(单线程,不是问题):

Array Population (single threaded, not an issue):

  1. 创建键并应用 pre-hash 函数,它将返回 0 ... t-1 范围内的 int.在我的情况下,只需模 t.
  2. 通过访问数组中的适当条目,将键放入哈希图中.例如.如果预哈希导致索引为 4,则使用 hashArray[4].put(key)
  1. Create the keys and apply pre-hash function, which will return an int in the range 0 ... t-1. In my case simply modulo t.
  2. Put the key in the hashmap, by accessing appropriate entry in the array. E.g. if the pre-hash has resulted in index 4, then go for hashArray[4].put(key)

数组迭代(很好的多线程,性能提升):

Array Iteration (nicely multithreaded, performance gain):

  1. 为 Runnables 数组中的每个线程分配一个使用相应索引遍历 hashmap 的作业.与单线程相比,这应该使迭代时间缩短 t 倍.
  1. Assign every thread from Runnables array a job of iterating over the hashmap with a corresponding index. This should give give a t times shorter iteration as opposed to single threaded.

要查看概念验证代码(因为它有一些来自项目的依赖项,我无法在此处发布)前往我在 github 上的项目

To see the proof of concept code (as it's got some dependencies from the project I can't post it here) head towards my project on github

编辑

实际上,为我的系统实施上述概念证明已被证明是耗时、容易出错且令人非常失望的.此外,我发现我会错过标准库 ConcurrentHashMap 的许多功能.我最近一直在探索的解决方案是使用 Scala,它看起来不那么特别而且更有希望,它产生的字节码可以与 Java 完全互操作.概念证明依赖于 本文 中描述的令人惊叹的库和 AFAIK考虑到标准库和相应第三方库的当前状态,如果不编写数千行代码,目前在 vanilla Java 中实现相应的解决方案是不可能的.

Actually, implementing the above proof of concept for my system has proven to be time-consuming, bug-prone and grossly disappointing. Additionally I've discovered I would have missed many features of the standard library ConcurrentHashMap. The solution I have been exploring recently, which looks much less ad-hoc and much more promising is to use Scala, which produces bytecode that is fully interoperable with Java. The proof of concept relies on stunning library described in this paper and AFAIK it is currently IMPOSSIBLE to achieve a corresponding solution in vanilla Java without writing thousands lines of code, given the current state of the standard library and corresponding third-party libraries.

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}

这篇关于访问 ConcurrentHashMap&lt;Element, Boolean&gt; 的每个元素的可扩展方式恰好一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

How can create a producer using Spring Cloud Kafka Stream 3.1(如何使用Spring Cloud Kafka Stream 3.1创建制片人)
Insert a position in a linked list Java(在链接列表中插入位置Java)
Did I write this constructor properly?(我是否正确地编写了这个构造函数?)
Head value set to null but tail value still gets displayed(Head值设置为空,但仍显示Tail值)
printing nodes from a singly-linked list(打印单链接列表中的节点)
Control namespace prefixes in web services?(控制Web服务中的命名空间前缀?)