Powerful of .NET Dataflow

RiCo 技術農場
RiCosNote
Published in
18 min readMay 2, 2024

--

先前寫過一篇簡單建立資料處理管線,簡單使用TPL即可建立資料管線,開發人員可以不用處理threads lock、queues和deadlock,只需把重心放在每步驟的資料商業處理邏輯,而TPL也提供資料流程函示庫(System.Threading.Tasks.Dataflow)。

TPL Dataflow

訊息傳遞(message passing)為基礎,具有高傳輸和低延遲,適合需大量 CPU 與 I/O 的基礎應用程式。

資料流程由多總區塊組合而成處理管線,主要定義三種區塊,來源(讀取資料來源)、目標(接收資料)和傳播程式(可讀取和寫入),可讓開發人員更輕鬆設計並建立滿足條件的資料管線(呼叫Link()連接區塊),三種介面如下

System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>
System.Threading.Tasks.Dataflow.ITargetBlock<TInput>
System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput>

預先定義資料流程區塊

  • 緩衝區塊:保存供資料消費者使用的資料。

BufferBlock<T>: 這個類別會儲存可由多個來源寫入或由多個目標讀取之訊息的先進先出 (FIFO)

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}

/* Output:
0
1
2
*/

BroadcastBlock<T>:當您想要將訊息廣播至多個元件時很有用。可讀取多次,因在已讀取值之後並不會從 BroadcastBlock<T> 物件中移除這些值,所以每次可用此相同的值。

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/

WriteOnceBlock<T>:只想要散佈眾多訊息中的第一個時會很有用,類似BroadcastBlock<T>,但因只能被寫一次。

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
Message 2
*/
  • 執行區塊:已接受資料的每個部分呼叫使用者提供的委派。

ActionBlock<TInput>:視為在資料可用時會非同步執行的委派。每個輸入項目作同步與非同步處理。

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
0
10
20
*/

TransformBlock<TInput,TOutput>:類似ActionBlock<TInput>,但可同時作為來源和目標。

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}

/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/

TransformManyBlock<TInput,TOutput>:類似TransformBlock<TInput,TOutput>,但輸入值會產生零或多個輸出值,而不是只有一個。

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
H
e
l
l
o
W
o
r
l
d
*/
  • 群組區塊:合併來自一個或多個來源、以及在各種條件約束下的資料。

BatchBlock<T>:輸入資料集合併為輸出資料陣列。預設使用Greedy mode效能較非Greedy mode(可設定Greedy = false)好。

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());

/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/

JoinBlock<T1,T2>:收集輸入項目,並散佈包含這些項目的物件。

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}

/* Output:
3 + 5 = 8
6 - 4 = 2
*/

BatchedJoinBlock<T1,T2>:視為 BatchBlock<T>JoinBlock<T1,T2> 的組合。

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}

/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/

我繼續使用簡單建立資料處理管線需求,但改用TPL Dataflow來建立資料處理管線,程式碼如下

var cts = new CancellationTokenSource();
//定義輸入字串清理區塊
var sourceBlock = new TransformBlock<string, string>(input =>
{
var cleanText = new string(input.Where(c => !char.IsPunctuation(c)).ToArray());
return cleanText.ToLower();
}, new ExecutionDataflowBlockOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 2 });

//定義字元頻率計算區塊
var wordCounterBlock = new TransformBlock<string, Dictionary<string, int>>(cleanText =>
{
var wordFrequency = new Dictionary<string, int>();
var words = cleanText.SelectMany(c => c.ToString());
foreach (var word in words)
{
if (string.IsNullOrWhiteSpace(word.ToString()))
continue;

if (wordFrequency.TryGetValue(word.ToString(), out var value))
{
wordFrequency[word.ToString()] = ++value;
}
else
{
wordFrequency[word.ToString()] = 1;
}
}
return wordFrequency;
}, new ExecutionDataflowBlockOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 4 });

//定義字元計算結果區塊
var textSummarizerBlock = new TransformBlock<Dictionary<string, int>, string>(wordFrequency =>
{
var topWords = wordFrequency
.OrderByDescending(c => c.Value)
.Take(3)
.Select(kv => kv.Key);

var result = $"Top Words:" +
$"{string.Join(", ", topWords)}";
return result;
}, new ExecutionDataflowBlockOptions { CancellationToken = cts.Token, MaxDegreeOfParallelism = 4 });

//定義顯示字元計算結果區塊
var printResultBlock = new ActionBlock<string>(textReport =>
{
Console.WriteLine(textReport);
Console.WriteLine("workflow done.");
});

// Connect the dataflow blocks to create data pipeline
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
sourceBlock.LinkTo(wordCounterBlock, linkOptions);
wordCounterBlock.LinkTo(textSummarizerBlock, linkOptions);
textSummarizerBlock.LinkTo(printResultBlock, linkOptions);

// Process input
var input = "這是一個測試,包含標點符號,測試整段字串;要能分析出前三高頻率字元";
await sourceBlock.SendAsync(input);
// Signal the completion of the sourceBlock
sourceBlock.Complete();

// Wait for all blocks to complete
await Task.WhenAll(sourceBlock.Completion, wordCounterBlock.Completion
, textSummarizerBlock.Completion, printResultBlock.Completion);

Console.ReadLine();

Demo:

--

--

RiCo 技術農場
RiCosNote

分享工作上實戰經驗,如SQL Server,NetCore,C#,WEBApi,Kafka,Azure…等,Architect,Software Engineer, Technical Manger,Writer and Speaker