Akka.net-如何等待子参与者在停止之前处理所有挂起的消息

Akka.net - How to wait child actor to process all pending messages prior to stop(Akka.net-如何等待子参与者在停止之前处理所有挂起的消息)
本文介绍了Akka.net-如何等待子参与者在停止之前处理所有挂起的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个名为A的集群分片参与者,并且它有多个子参与者使用每实体模式创建的子项,如下所示。 当我们将100条消息从演员B告知D并且演员D花费比方说500毫秒来处理每条消息时,同时,当我们使用Context.Parent.Tell(new钝化(PoisonPill.Instance))将毒丸发送给演员A时; 它会立即停止所有子执行元(包括执行元D),而不处理挂起的消息。

    A
    |
    B    
   / 
  C   D

是否有办法等待执行元D处理所有消息?

推荐答案

https://stackoverflow.com/a/70286526/377476是一个良好的开端;您将需要一个自定义关闭消息。当父执行元终止时,其子级将通过/system消息自动终止,这些消息将取代其队列中任何未处理的/user消息。

因此,您需要做的是确保在父级终止自身之前处理它们的所有/user消息。有一种简单的方法可以将GracefulStop扩展方法与您的自定义停止消息结合使用:

public sealed class ActorA : ReceiveActor{
    private IActorRef _actorB;  
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorA(){
        Receive<StartWork>(w => {
            foreach(var i in Enumerable.Range(0, w.WorkCount)){
                _actorB.Tell(i);
            }
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            _log.Info("Begin shutdown");
            
            // stop child actor B with the same custom message
            await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
            
            // shut ourselves down after child is done
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        _actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
    }
}

public sealed class ActorB : ReceiveActor{
    private IActorRef _actorC;
    private IActorRef _actorD;
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorB(){
        Receive<int>(i => {
            _actorC.Tell(i);
            _actorD.Tell(i);
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            
            _log.Info("Begin shutdown");
            
            // stop both actors in parallel
            var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
            var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
            
            // compose stop Tasks
            var bothStopped = Task.WhenAll(stopC, stopD);
            await bothStopped;
            
            // shut ourselves down immediately
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        var workerProps = Props.Create(() => new WorkerActor());
        _actorC = Context.ActorOf(workerProps, "c");
        _actorD = Context.ActorOf(workerProps, "d");
    }
}

public sealed class WorkerActor : ReceiveActor {
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public WorkerActor(){
        ReceiveAsync<int>(async i => {
            await Task.Delay(10);
            _log.Info("Received {0}", i);
        });
    }
}

我在这里创建了此示例的可运行版本:https://dotnetfiddle.net/xiGyWM-您将看到MyStopMessage在示例开始后不久收到,但C和D完成工作后收到。在此方案中,所有工作都在任何参与者终止之前完成。

这篇关于Akka.net-如何等待子参与者在停止之前处理所有挂起的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

DispatcherQueue null when trying to update Ui property in ViewModel(尝试更新ViewModel中的Ui属性时DispatcherQueue为空)
Drawing over all windows on multiple monitors(在多个监视器上绘制所有窗口)
Programmatically show the desktop(以编程方式显示桌面)
c# Generic Setlt;Tgt; implementation to access objects by type(按类型访问对象的C#泛型集实现)
InvalidOperationException When using Context Injection in ASP.Net Core(在ASP.NET核心中使用上下文注入时发生InvalidOperationException)
LINQ many-to-many relationship, how to write a correct WHERE clause?(LINQ多对多关系,如何写一个正确的WHERE子句?)