Created
September 20, 2012 20:06
-
-
Save naraga/3758039 to your computer and use it in GitHub Desktop.
Linear stream proccessing with TPL Dataflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Flow | |
| { | |
| private readonly TransformBlock<string, string> _controlAndConvertBlock; | |
| private readonly ActionBlock<string> _acquisitionBlock; | |
| public Flow() | |
| { | |
| _controlAndConvertBlock = new TransformBlock<string, string>(rec => | |
| { | |
| // perform "compute-boud" operations | |
| var val = | |
| Regex.Match(rec, "x(\\d+)x").Groups | |
| [1].Value; | |
| if (val == "666666666") | |
| { | |
| return null; | |
| // stop when critical condition is found | |
| } | |
| return val; | |
| }, new ExecutionDataflowBlockOptions | |
| { | |
| MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded | |
| }); | |
| _acquisitionBlock = new ActionBlock<string>(async delegate(string s) | |
| { | |
| // perform "io bound operation" (async makes sense onyl here) | |
| await Task.Delay(1000); | |
| Console.WriteLine("stored {0}", s); | |
| }, new ExecutionDataflowBlockOptions | |
| { | |
| MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, | |
| }); | |
| _controlAndConvertBlock.LinkTo(_acquisitionBlock, new DataflowLinkOptions{PropagateCompletion = true}, s => s != null); | |
| } | |
| public bool Post(string record) | |
| { | |
| return _controlAndConvertBlock.Post(record); | |
| } | |
| public void Complete() | |
| { | |
| _controlAndConvertBlock.Complete(); | |
| } | |
| public void Wait() | |
| { | |
| _acquisitionBlock.Completion.Wait(); | |
| } | |
| } | |
| class Program | |
| { | |
| private const int RecordsCount = 3000; | |
| private static readonly Stopwatch Stopwatch = new Stopwatch(); | |
| private const string InputDataFile = | |
| "C:\\Users\\Boris\\Documents\\Visual Studio 2012\\Projects\\DataStreamProcessingInParallel\\DatastreamProcessingWithTdf\\DataFile1.txt"; | |
| static void Main(string[] args) | |
| { | |
| GenerateFakeDataFile(); | |
| Stopwatch.Start(); | |
| var flow = new Flow(); | |
| foreach (var record in GetRecordsFromFile()) | |
| { | |
| flow.Post(record); | |
| } | |
| flow.Complete(); | |
| flow.Wait(); | |
| Console.WriteLine("Stats: elapsed={0}", Stopwatch.Elapsed); | |
| Console.ReadLine(); | |
| } | |
| static IEnumerable<string> GetRecordsFromFile() | |
| { | |
| using (var streamReader = new StreamReader(InputDataFile, Encoding.ASCII)) | |
| { | |
| string str; | |
| while ((str = streamReader.ReadLine()) != null) | |
| { | |
| yield return str; | |
| } | |
| } | |
| } | |
| static void GenerateFakeDataFile() | |
| { | |
| File.WriteAllLines(InputDataFile, Enumerable.Range(0, RecordsCount).Select(x => string.Format("x{0}x", x))); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment