System.Reactive中的并發(fā)訂閱服務(wù)器執(zhí)行

我正在編寫一個批處理管道,它每Y秒處理X個未完成的操作。感覺System.Reactive很適合這樣做,但我無法讓訂閱服務(wù)器并行執(zhí)行。我的代碼如下所示:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

是否有一種優(yōu)雅的方式以并發(fā)方式讀取這個緩沖區(qū)Subject

? 最佳回答:

Rx訂閱代碼始終是同步的。您需要做的是從Subscribe委托中刪除處理代碼,并使其成為可觀察序列的side-effect。以下是如何做到這一點:

Subject<int> subject = new();
int concurrentCount = 0;

Task processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(item => Observable.Defer(() => Observable.Start(() =>
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty()
    .ToTask();

Parallel.For(0, 1_000_000, new() { MaxDegreeOfParallelism = 2 }, i =>
{
    subject.OnNext(i);
});
subject.OnCompleted();

processor.Wait();

Select+Observable.Defer+Observable.Start組合將源序列轉(zhuǎn)換為IObservable<IObservable<Unit>>。它是一個嵌套序列,每個內(nèi)部序列表示一個item的處理。當(dāng)Observable.Start內(nèi)的委托完成時,內(nèi)部序列發(fā)出一個Unit值,然后完成。包裝Defer操作符對內(nèi)部序列進行“冷”處理,以便在訂閱之前不會啟動它們。然后跟隨Merge操作符,它將外部嵌套序列展開為平面IObservable<Unit>序列。maxConcurrent參數(shù)配置將同時訂閱多少內(nèi)部序列。每當(dāng)Merge運算符訂閱內(nèi)部序列時,相應(yīng)的Observable.Start委托開始在ThreadPoolthread上運行。

如果將maxConcurrent設(shè)置得太高,ThreadPool可能會耗盡工作線程(換句話說,它可能會飽和),代碼的并發(fā)性將取決于ThreadPool的可用性。如果愿意,可以使用ThreadPool.SetMinThreads方法增加ThreadPool根據(jù)需要立即創(chuàng)建的工人數(shù)量。但是如果您的工作負載為CPU-bound,并且您將workerthreads增加到Environment.ProcessorCount值之上,那么您的CPU很可能會飽和。

如果您的工作負載是異步的,那么可以用Observable.FromAsync操作符替換Observable.Defer+Observable.Start組合,如下所示。

主站蜘蛛池模板: 亚洲区精品久久一区二区三区| 午夜视频在线观看一区二区 | 国产亚洲日韩一区二区三区| 亚洲精品国产suv一区88| 国产亚洲欧洲Aⅴ综合一区| 性色av无码免费一区二区三区 | 国产精品va一区二区三区| 无码国产精品一区二区免费式直播 | 亚洲综合色一区二区三区| 亚洲bt加勒比一区二区| 久久精品一区二区东京热| 日韩人妻无码一区二区三区久久99| 一区二区三区免费在线视频| 日本免费一区尤物| 国产精品无码一区二区三区免费| 99热门精品一区二区三区无码| 国产AV一区二区三区无码野战| 国产精品被窝福利一区 | 国产亚洲一区二区三区在线不卡| 国产成人AV区一区二区三| 日本一区二区三区在线观看视频| 中文字幕精品一区二区| 夜夜高潮夜夜爽夜夜爱爱一区| 色欲AV蜜桃一区二区三| 亚洲av综合av一区二区三区| 在线|一区二区三区| 一区二区三区内射美女毛片| 精品久久国产一区二区三区香蕉 | 日韩在线不卡免费视频一区| 无码少妇一区二区三区浪潮AV| 久久久久人妻精品一区二区三区 | 在线视频亚洲一区| 国产一区二区三区露脸| 一区二区三区免费高清视频| 久久综合一区二区无码| 国产亚洲一区二区手机在线观看| 国产激情视频一区二区三区| 精品黑人一区二区三区| 夜精品a一区二区三区| 精品日韩在线视频一区二区三区| 中文字幕一区在线|