Skip to content

Instantly share code, notes, and snippets.

@naraga
Created September 20, 2012 19:48
Show Gist options
  • Select an option

  • Save naraga/3757947 to your computer and use it in GitHub Desktop.

Select an option

Save naraga/3757947 to your computer and use it in GitHub Desktop.
Linear stream proccessing with Parallel.Foreach
class Program
{
private const int RecordsCount = 3000;
private static int _recordsLoaded;
private static bool _finishedLoading;
private static int _recordsProccessed;
private static bool _wasError;
private static readonly Stopwatch Stopwatch = new Stopwatch();
private const string InputDataFile =
"C:\\Users\\Boris\\Documents\\Visual Studio 2012\\Projects\\DataStreamProcessingInParallel\\LinearStreamProccessingWithParallelForeach\\DataFile1.txt";
static void Main(string[] args)
{
GenerateFakeDataFile();
Stopwatch.Start();
Parallel.ForEach(GetRecordsFromFile(), ProccessRecord);
WaitForJobsToFinish();
Console.WriteLine("Stats: elapsed={0}, loaded={1}, proccessed={2}", Stopwatch.Elapsed, _recordsLoaded, _recordsProccessed);
Console.ReadLine();
}
private async static void ProccessRecord(string s, ParallelLoopState state, long arg3)
{
// perform "compute-boud" operations
var val = Regex.Match(s, "x(\\d+)x").Groups[1].Value;
if (val == "666666666")
{
_wasError = true;
state.Break(); // stop when critical condition is found
}
// perform "io bound operation" (async makes sense onyl here)
await StoreIntoDbAsync(val);
Interlocked.Increment(ref _recordsProccessed);
}
private async static Task StoreIntoDbAsync(string dataToStore)
{
await Task.Delay(1000); // call DB (1s = network latency + proccessing time)
}
static IEnumerable<string> GetRecordsFromFile()
{
using (var streamReader = new StreamReader(InputDataFile, Encoding.ASCII))
{
string str;
while ((str = streamReader.ReadLine()) != null)
{
yield return str;
_recordsLoaded++;
}
_finishedLoading = true;
}
}
static void GenerateFakeDataFile()
{
File.WriteAllLines(InputDataFile, Enumerable.Range(0, RecordsCount).Select(x => string.Format("x{0}x", x)));
}
private static void WaitForJobsToFinish()
{
while (!_wasError && (!_finishedLoading || _recordsLoaded > _recordsProccessed))
{
Thread.Sleep(1000);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment