本文介绍了如何制作一个只能订阅一次的轻量级`Replay`运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在各种情况下,我都希望RxReplay
操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay
运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例:
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(500))
.SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
.Take(800_000)
.Do(x =>
{
if (x % 100_000 == 0) Console.WriteLine(
$"{DateTime.Now:HH:mm:ss.fff} > " +
$"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
})
.Replay()
.AutoConnect(0);
await Task.Delay(2200);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");
// First subscription
await observable.Do(x =>
{
if (x % 100_000 == 0)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});
// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");
可观测对象总共生成800,000个值。Replay
机制立即连接到源,并且在其完成之前订阅了一半。
输出:
16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000
订阅后内存使用量持续增长。这是意料之中的,因为所有值都被缓冲,并在重放的可观察对象的整个生命周期内保持缓冲。理想的行为是在订阅后内存使用量骤降。在传播缓冲值之后,应该丢弃该缓冲区,因为在订阅之后它就没有用处了。另外,第二个订阅(await observable.Count()
)应该失败,并显示InvalidOperationException
。在可观察对象失去Replay
功能后,我不想再次订阅它。
下面是我试图实现的自定义ReplayOnce
操作符的存根。有谁知道如何实施它吗?
public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
return source.Replay(); // TODO: enforce the subscribe-once policy
}
有一个相关的问题here,是关于如何使Replay
运算符具有可按需偶尔清空的缓冲区的。我的问题不同,因为我希望在订阅后完全禁用缓冲区,并且不再开始增长。推荐答案
我想出了ReplayOnce
运算符的实现,它基于组播自定义ReplayOnceSubject<T>
。此主题最初由ReplaySubject<T>
支持,在第一次(且仅允许)订阅期间使用正常的Subject<T>
进行切换:
public static IConnectableObservable<T> ReplayOnce<T>(
this IObservable<T> source)
{
return source.Multicast(new ReplayOnceSubject<T>());
}
private class ReplayOnceSubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value)
{
lock (_locker) _subject.OnNext(value);
}
public void OnError(Exception error)
{
lock (_locker) _subject.OnError(error);
}
public void OnCompleted()
{
lock (_locker) _subject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
_subject = subject;
return subscription;
}
else
throw new InvalidOperationException("Already subscribed.");
}
}
}
replaySubject.Subscribe(subject)
行确保不仅将缓冲值传播到观察者,而且还将传播任何可能的OnError
/OnCompleted
通知。订阅后,ReplaySubject
不再被引用,并且有资格进行垃圾回收。
这篇关于如何制作一个只能订阅一次的轻量级`Replay`运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!