Created
September 20, 2012 19:48
-
-
Save naraga/3757947 to your computer and use it in GitHub Desktop.
Linear stream proccessing with Parallel.Foreach
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Program | |
| { | |
| private const int RecordsCount = 3000; | |
| private static int _recordsLoaded; | |
| private static bool _finishedLoading; | |
| private static int _recordsProccessed; | |
| private static bool _wasError; | |
| private static readonly Stopwatch Stopwatch = new Stopwatch(); | |
| private const string InputDataFile = | |
| "C:\\Users\\Boris\\Documents\\Visual Studio 2012\\Projects\\DataStreamProcessingInParallel\\LinearStreamProccessingWithParallelForeach\\DataFile1.txt"; | |
| static void Main(string[] args) | |
| { | |
| GenerateFakeDataFile(); | |
| Stopwatch.Start(); | |
| Parallel.ForEach(GetRecordsFromFile(), ProccessRecord); | |
| WaitForJobsToFinish(); | |
| Console.WriteLine("Stats: elapsed={0}, loaded={1}, proccessed={2}", Stopwatch.Elapsed, _recordsLoaded, _recordsProccessed); | |
| Console.ReadLine(); | |
| } | |
| private async static void ProccessRecord(string s, ParallelLoopState state, long arg3) | |
| { | |
| // perform "compute-boud" operations | |
| var val = Regex.Match(s, "x(\\d+)x").Groups[1].Value; | |
| if (val == "666666666") | |
| { | |
| _wasError = true; | |
| state.Break(); // stop when critical condition is found | |
| } | |
| // perform "io bound operation" (async makes sense onyl here) | |
| await StoreIntoDbAsync(val); | |
| Interlocked.Increment(ref _recordsProccessed); | |
| } | |
| private async static Task StoreIntoDbAsync(string dataToStore) | |
| { | |
| await Task.Delay(1000); // call DB (1s = network latency + proccessing time) | |
| } | |
| static IEnumerable<string> GetRecordsFromFile() | |
| { | |
| using (var streamReader = new StreamReader(InputDataFile, Encoding.ASCII)) | |
| { | |
| string str; | |
| while ((str = streamReader.ReadLine()) != null) | |
| { | |
| yield return str; | |
| _recordsLoaded++; | |
| } | |
| _finishedLoading = true; | |
| } | |
| } | |
| static void GenerateFakeDataFile() | |
| { | |
| File.WriteAllLines(InputDataFile, Enumerable.Range(0, RecordsCount).Select(x => string.Format("x{0}x", x))); | |
| } | |
| private static void WaitForJobsToFinish() | |
| { | |
| while (!_wasError && (!_finishedLoading || _recordsLoaded > _recordsProccessed)) | |
| { | |
| Thread.Sleep(1000); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment