我正在編寫一個批處理管道,它每Y秒處理X個未完成的操作。感覺System.Reactive
很適合這樣做,但我無法讓訂閱服務(wù)器并行執(zhí)行。我的代碼如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
是否有一種優(yōu)雅的方式以并發(fā)方式讀取這個緩沖區(qū)Subject
?
Rx訂閱代碼始終是同步的。您需要做的是從
Subscribe
委托中刪除處理代碼,并使其成為可觀察序列的side-effect。以下是如何做到這一點:Select
+Observable.Defer
+Observable.Start
組合將源序列轉(zhuǎn)換為IObservable<IObservable<Unit>>
。它是一個嵌套序列,每個內(nèi)部序列表示一個item
的處理。當(dāng)Observable.Start
內(nèi)的委托完成時,內(nèi)部序列發(fā)出一個Unit
值,然后完成。包裝Defer
操作符對內(nèi)部序列進行“冷”處理,以便在訂閱之前不會啟動它們。然后跟隨Merge
操作符,它將外部嵌套序列展開為平面IObservable<Unit>
序列。maxConcurrent
參數(shù)配置將同時訂閱多少內(nèi)部序列。每當(dāng)Merge
運算符訂閱內(nèi)部序列時,相應(yīng)的Observable.Start
委托開始在ThreadPool
thread上運行。如果將
maxConcurrent
設(shè)置得太高,ThreadPool
可能會耗盡工作線程(換句話說,它可能會飽和),代碼的并發(fā)性將取決于ThreadPool
的可用性。如果愿意,可以使用ThreadPool.SetMinThreads
方法增加ThreadPool
根據(jù)需要立即創(chuàng)建的工人數(shù)量。但是如果您的工作負載為CPU-bound,并且您將workerthreads增加到Environment.ProcessorCount
值之上,那么您的CPU很可能會飽和。如果您的工作負載是異步的,那么可以用
Observable.FromAsync
操作符替換Observable.Defer
+Observable.Start
組合,如下所示。