问题描述
我有一个IObservable<int>
序列,它在前9次订阅时发出单个项目,在进一步订阅时不发出任何内容并立即完成:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
现在我想重复这个序列,直到它完成为止。所以我使用了Repeat
运算符:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
问题是该查询永远不会完成。Repeat
将一次又一次地订阅source
序列。更糟糕的是,当source
停止生成元素时,查询进入一个无情的死循环,劫持了CPU的一个核心(我的四核机器报告连续的CPU利用率为25%)。以下是上述代码的输出:
1
2
3
4
5
6
7
8
9
我想要的是Repeat
运算符的一个变体,它在source
停止生成元素时停止重复source
。通过搜索内置的Rx运算符,我可以看到RepeatWhen
运算符,但显然这只能用于更快地启动下一次重复,而不是完全停止重复:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
我不能100%确定,因为handler
参数的描述相当模糊,所以我可能遗漏了一些东西:
为每个观察者调用的函数,并获取可观察序列对象。它应该返回任意项的可观测项,响应于从源可观测项接收到完成信号,应该用信号通知该任意项。如果此可观察到的信号为终端事件,则序列将改为使用该信号终止。
我的问题是:如何实现重复source
序列直到它为空的RepeatUntilEmpty
运算符?是否可以基于上述RepeatWhen
运算符实现?如果不是,我是否应该从低级(Observable.Create
)开始重新实现基本的Repeat
功能?或者,我可以使用Materialize
运算符,以某种方式将其与现有的Repeat
结合使用吗?我现在没有主意了。我愿意接受任何一种解决方案,无论是高水平还是低水平。
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
将我的原始代码中的Repeat
替换为RepeatUntilEmpty
,应该可以在发出9
元素后立即完成查询。
推荐答案
您可以使用Materialize()
/Dematerialize()
根据从Repeat()
语句收到的通知构建您自己的notifications序列。通知序列如下所示:
1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...
因此,我们寻找两个连续的OnCompleted
通知。如果没有找到,我们仍然返回收到的OnNext
通知,否则返回OnCompleted
通知。代码可能如下所示:
public static void Main(string[] args)
{
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
Console.WriteLine($"counter is now: {counter}");
if (counter > 20) {
System.Environment.Exit(1);
}
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
source
.RepeatUntilEmpty()
.Subscribe(x => {
System.Threading.Thread.Sleep(10);
Console.WriteLine($"SUBSCRIBE: {x}");
}, () => Console.WriteLine("SUBSCRIBE:Completed"));
System.Threading.Thread.Sleep(10000);
Console.WriteLine("Main thread terminated");
}
RepeatUntilEmpty()
方法如下:
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
return source
.Materialize()
.Repeat()
.StartWith((Notification<T>)null)
.Buffer(2, 1)
.Select(it => {
Console.WriteLine($"Buffer content: {String.Join(",", it)}");
if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
return it[1];
}
// it[1] is OnCompleted, check the previous one
if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
// not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
return null;
}
// okay, we have two consecutive OnCompleted, stop this observable.
return it[1];
})
.Where(it => it != null) // remove the NULL marker
.Dematerialize();
}
这将生成以下输出:
counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated
我还没有测试该代码如何处理OnError()
通知,因此您可能需要检查一下。此外,我还遇到了一些问题,即source.Materialize().Repeat()
部分将从原始源读取更多数据,尽管它后来决定停止可观测数据。特别是使用Do().Wait()
语句时,我有时会收到其他输出,如:
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14
这可能也是您的问题,因为Repeat()
部分仍在尝试读取/连接空的可观察对象。
这篇关于如何重复一个可观察到的序列,直到它为空?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!