使用TPL数据流对持续时间或阈值进行批处理

Batching on duration or threshold using TPL Dataflow(使用TPL数据流对持续时间或阈值进行批处理)
本文介绍了使用TPL数据流对持续时间或阈值进行批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。

TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发?

例如,一旦消息从队列中拉出,当前实现将发布该消息。

    postedSuccessfully = targetBuffer.Post(msg.Value);

推荐答案

已经可以通过System.Reactive特别是Buffer运算符按计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需计数或其时间跨度过期。

数据流数据块旨在与System.Reactive配合使用。使用DataflowBlock.AsObservable()和AsObserver()扩展方法将can be converted块传递给可观测对象和观察者。

这使得构建缓冲挡路变得非常容易:

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

此方法使用两个缓冲块来缓冲输入和输出。Buffer()在批处理已满或时间跨度过期时,从输入挡路(可观察对象)读取并写入输出挡路(观察者)。

默认情况下,Rx在当前线程上工作。通过调用ObserveOn(TaskPoolScheduler.Default),我们告诉它在任务池线程上处理数据。

示例

此代码创建5个项目或1秒的挡路缓冲区。它从发布7个项目开始,等待1.1秒,然后发布另外7个项目。每批都与线程ID一起写入控制台:

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

输出为:

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6

这篇关于使用TPL数据流对持续时间或阈值进行批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

DispatcherQueue null when trying to update Ui property in ViewModel(尝试更新ViewModel中的Ui属性时DispatcherQueue为空)
Drawing over all windows on multiple monitors(在多个监视器上绘制所有窗口)
Programmatically show the desktop(以编程方式显示桌面)
c# Generic Setlt;Tgt; implementation to access objects by type(按类型访问对象的C#泛型集实现)
InvalidOperationException When using Context Injection in ASP.Net Core(在ASP.NET核心中使用上下文注入时发生InvalidOperationException)
LINQ many-to-many relationship, how to write a correct WHERE clause?(LINQ多对多关系,如何写一个正确的WHERE子句?)