本文介绍了使用TPL数据流对持续时间或阈值进行批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。
TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发?
例如,一旦消息从队列中拉出,当前实现将发布该消息。
postedSuccessfully = targetBuffer.Post(msg.Value);
推荐答案
已经可以通过System.Reactive特别是Buffer运算符按计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需计数或其时间跨度过期。
数据流数据块旨在与System.Reactive配合使用。使用DataflowBlock.AsObservable()和AsObserver()扩展方法将can be converted块传递给可观测对象和观察者。这使得构建缓冲挡路变得非常容易:
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
此方法使用两个缓冲块来缓冲输入和输出。Buffer()
在批处理已满或时间跨度过期时,从输入挡路(可观察对象)读取并写入输出挡路(观察者)。
默认情况下,Rx在当前线程上工作。通过调用ObserveOn(TaskPoolScheduler.Default)
,我们告诉它在任务池线程上处理数据。
示例
此代码创建5个项目或1秒的挡路缓冲区。它从发布7个项目开始,等待1.1秒,然后发布另外7个项目。每批都与线程ID一起写入控制台:
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
输出为:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
这篇关于使用TPL数据流对持续时间或阈值进行批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!