如何通过 Java 中的 Executor Framework 在 DynamoDb 中获得最佳批量插入率?

How to get optimal bulk insertion rate in DynamoDb through Executor Framework in Java?(如何通过 Java 中的 Executor Framework 在 DynamoDb 中获得最佳批量插入率?)
本文介绍了如何通过 Java 中的 Executor Framework 在 DynamoDb 中获得最佳批量插入率?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 DynamoDB SDK for Java 在本地 Dynamo DB 中进行批量写入(大约 5.5k 项)的 POC.我知道每个批量写入不能超过 25 个写入操作,因此我将整个数据集划分为每个 25 个项目的块.然后我将这些块作为 Executor 框架中的可调用操作传递.尽管如此,我还是没有得到令人满意的结果,因为 5.5k 记录在 100 多秒内被插入.

I'm doing a POC on Bulk write (around 5.5k items) in local Dynamo DB using DynamoDB SDK for Java. I'm aware that each bulk write cannot have more than 25 write operations, so I am dividing the whole dataset into chunks of 25 items each. Then I'm passing these chunks as callable actions in Executor framework. Still, I'm not having a satisfactory result as the 5.5k records are getting inserted in more than 100 seconds.

我不确定我还能如何优化它.在创建表时,我将 WriteCapacityUnit 设置为 400(不确定我可以给出的最大值是多少)并对其进行了一些试验,但它从未有任何区别.我也尝试过更改执行器中的线程数.

I'm not sure how else can I optimize this. While creating the table I provisioned the WriteCapacityUnit as 400(not sure what's the maximum value I can give) and experimented with it a bit, but it never made any difference. I have also tried changing the number of threads in executor.

这是执行批量写入操作的主要代码:

This is the main code to perform the bulk write operation:


    public static void main(String[] args) throws Exception {

        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        final AmazonDynamoDB aws = new AmazonDynamoDBClient(new BasicAWSCredentials("x", "y"));
        aws.setEndpoint("http://localhost:8000");

        JSONArray employees = readFromFile();
        Iterator<JSONObject> iterator = employees.iterator();

        List<WriteRequest> batchList = new ArrayList<WriteRequest>();

        ExecutorService service = Executors.newFixedThreadPool(20);

        List<BatchWriteItemRequest> listOfBatchItemsRequest = new ArrayList<>();
        while(iterator.hasNext()) {
            if (batchList.size() == 25) {
                Map<String, List<WriteRequest>> batchTableRequests = new HashMap<String, List<WriteRequest>>();
                batchTableRequests.put("Employee", batchList);
                BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
                batchWriteItemRequest.setRequestItems(batchTableRequests);
                listOfBatchItemsRequest.add(batchWriteItemRequest);
                batchList = new ArrayList<WriteRequest>();
            }
            PutRequest putRequest = new PutRequest();
            putRequest.setItem(ItemUtils.fromSimpleMap((Map) iterator.next()));
            WriteRequest writeRequest = new WriteRequest();
            writeRequest.setPutRequest(putRequest);
            batchList.add(writeRequest);
        }

        StopWatch watch = new StopWatch();
        watch.start();

        List<Future<BatchWriteItemResult>> futureListOfResults = listOfBatchItemsRequest.stream().
                map(batchItemsRequest -> service.submit(() -> aws.batchWriteItem(batchItemsRequest))).collect(Collectors.toList());

        service.shutdown();

        while(!service.isTerminated());

        watch.stop();
        System.out.println("Total time taken : " + watch.getTotalTimeSeconds());

    }

}

这是用于创建 dynamoDB 表的代码:

This is the code used to create the dynamoDB table:

    public static void main(String[] args) throws Exception {
        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        DynamoDB dynamoDB = new DynamoDB(client);
        String tableName = "Employee";
        try {
            System.out.println("Creating the table, wait...");
            Table table = dynamoDB.createTable(tableName, Arrays.asList(new KeySchemaElement("ID", KeyType.HASH)

            ), Arrays.asList(new AttributeDefinition("ID", ScalarAttributeType.S)),
                    new ProvisionedThroughput(1000L, 1000L));
            table.waitForActive();
            System.out.println("Table created successfully.  Status: " + table.getDescription().getTableStatus());

        } catch (Exception e) {
            System.err.println("Cannot create the table: ");
            System.err.println(e.getMessage());
        }
    }

推荐答案

DynamoDB Local 是作为一种工具提供给需要为 DynamoDB 进行离线开发的开发人员,并且不是为扩展或性能而设计的.因此,它不适用于规模测试,如果您需要测试批量负载或其他高速工作负载,最好使用真实表.在实时表上进行开发测试所产生的实际成本通常非常小,因为这些表只需要在测试运行期间提供高容量.

DynamoDB Local is provided as a tool for developers who need to develop offline for DynamoDB and is not designed for scale or performance. As such it is not intended for scale testing, and if you need to test bulk loads or other high velocity workloads it is best to use a real table. The actual cost incurred from dev testing on a live table is usually quite minimal as the tables only need to be provisioned for high capacity during the test runs.

这篇关于如何通过 Java 中的 Executor Framework 在 DynamoDb 中获得最佳批量插入率?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

How can create a producer using Spring Cloud Kafka Stream 3.1(如何使用Spring Cloud Kafka Stream 3.1创建制片人)
Insert a position in a linked list Java(在链接列表中插入位置Java)
Did I write this constructor properly?(我是否正确地编写了这个构造函数?)
Head value set to null but tail value still gets displayed(Head值设置为空,但仍显示Tail值)
printing nodes from a singly-linked list(打印单链接列表中的节点)
Control namespace prefixes in web services?(控制Web服务中的命名空间前缀?)