问题描述
最近我意识到Rx 结果:未处理的异常(Fiddle)
这个特性(我想称之为一个缺陷)严重限制了 验证:将上述代码片段中的 替代:我也愿意接受一个合理的答案,解释为什么内置 以下是Finally
运算符的行为方式至少对我来说是意想不到的。我的预期是,finallyAction
抛出的任何错误都将传播到操作员的观察者下游。遗憾的是,情况并非如此。实际上,操作符首先将Antecedent序列的完成(或失败)传播给它的观察者,然后然后在无法传播该操作抛出的潜在错误的时间点调用action
。因此,它在ThreadPool
上抛出错误,并使进程崩溃。这不仅出乎意料,而且非常有问题。以下是此行为的最小演示:
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.Finally(() => throw new ApplicationException("Oops!"))
.Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(1000);
Finally
lambda引发的异常不会由Subscribe
:onError
处理程序处理,因为它是理想的。Finally
运算符在我看来的用处。从本质上讲,只有当我想要调用一个预计永远不会失败的操作时,我才能使用它,如果它失败了,它将指示应用程序状态的灾难性损坏,而此时无法恢复。例如,我可以使用它来Release
aSemaphoreSlim
(例如,就像我所做的here),只有当我的代码有错误时,它才会失败。在这种情况下,我可以接受我的应用程序崩溃。但我也使用它recently调用调用者提供的未知操作,该操作可能会失败,在这种情况下使应用程序崩溃是不可接受的。相反,错误应该向下游传播。所以我在这里问的是如何实现一个具有相同签名的Finally
变体(让我们称之为FinallySafe
),以及下面指定的行为:public static IObservable<TSource> FinallySafe<TSource>(
this IObservable<TSource> source, Action finallyAction);
finallyAction
应在source
序列发出OnCompleted
或OnError
通知之后调用,但在此通知传播到观察者之前调用。finallyAction
调用成功,则应将原始的OnCompleted
/OnError
通知传播给观察者。finallyAction
调用失败,则应该向观察者传播OnError
通知,其中包含刚刚发生的错误。在这种情况下,应忽略(不传播)前一个错误,即可能导致source
失败的错误。source
完成之前取消订阅FinallySafe
时,也应该调用finallyAction
。当订阅者(观察者)处置订阅时,finallyAction
应该被同步调用,任何错误都应该传播给Dispose
方法的调用者。FinallySafe
被多个观察者订阅,则每个订阅应独立调用一次finallyAction
,遵循上述规则。并发调用正常。finallyAction
永远不应超过一次。Finally
替换为FinallySafe
应会导致程序不会因未处理的异常而崩溃。Finally
运算符的行为优于上面指定的自定义FinallySafe
运算符的行为。推荐答案
FinallySafe
运算符的实现,具有问题中指定的行为:/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
Action finallyAction)
{
return Observable.Create<T>(observer =>
{
var finallyOnce = Disposable.Create(finallyAction);
var subscription = source.Subscribe(observer.OnNext, error =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnError(error);
}, () =>
{
try { finallyOnce.Dispose(); }
catch (Exception ex) { observer.OnError(ex); return; }
observer.OnCompleted();
});
return new CompositeDisposable(subscription, finallyOnce);
});
}
finallyAction
被分配为Disposable.Create
一次性实例的Dispose
操作,以确保该操作最多被调用一次。然后,使用CompositeDisposable
实例将此一次性订阅与source
的一次性订阅组合在一起。
finallyAction
可能的错误。在某些情况下,这可能是可取的,但不幸的是,这是不可能的。首先,也是最重要的是,这样做违反了The Observable Contract文件中的一项准则,即:
当观察者向可观察对象发出取消订阅通知时,该可观察对象将尝试停止向观察者发出通知。但是,不能保证观察者在向观察者发出取消订阅通知后不会向观察者发出通知。
因此,这样的实现将是不一致的。更糟糕的是,Observable.Create
方法通过在处理订阅后立即将observer
设置为静音来实施这一指导原则。它通过将观察者封装在AutoDetachObserver
包装器中来实现这一点。即使我们试图通过从头实现IObservable<T>
类型来绕过这一限制,任何可以附加在不一致的Finally
运算符之后的内置运算符都将使取消订阅后的OnError
通知静音。所以这是不可能的。无法将取消订阅过程中的错误传播到刚刚请求取消订阅的订阅服务器。
这篇关于如何实现更好的最终处方运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!