Skip to content

Instantly share code, notes, and snippets.

@naraga
Created September 20, 2012 20:06
Show Gist options
  • Select an option

  • Save naraga/3758039 to your computer and use it in GitHub Desktop.

Select an option

Save naraga/3758039 to your computer and use it in GitHub Desktop.
Linear stream proccessing with TPL Dataflow
class Flow
{
private readonly TransformBlock<string, string> _controlAndConvertBlock;
private readonly ActionBlock<string> _acquisitionBlock;
public Flow()
{
_controlAndConvertBlock = new TransformBlock<string, string>(rec =>
{
// perform "compute-boud" operations
var val =
Regex.Match(rec, "x(\\d+)x").Groups
[1].Value;
if (val == "666666666")
{
return null;
// stop when critical condition is found
}
return val;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
_acquisitionBlock = new ActionBlock<string>(async delegate(string s)
{
// perform "io bound operation" (async makes sense onyl here)
await Task.Delay(1000);
Console.WriteLine("stored {0}", s);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
_controlAndConvertBlock.LinkTo(_acquisitionBlock, new DataflowLinkOptions{PropagateCompletion = true}, s => s != null);
}
public bool Post(string record)
{
return _controlAndConvertBlock.Post(record);
}
public void Complete()
{
_controlAndConvertBlock.Complete();
}
public void Wait()
{
_acquisitionBlock.Completion.Wait();
}
}
class Program
{
private const int RecordsCount = 3000;
private static readonly Stopwatch Stopwatch = new Stopwatch();
private const string InputDataFile =
"C:\\Users\\Boris\\Documents\\Visual Studio 2012\\Projects\\DataStreamProcessingInParallel\\DatastreamProcessingWithTdf\\DataFile1.txt";
static void Main(string[] args)
{
GenerateFakeDataFile();
Stopwatch.Start();
var flow = new Flow();
foreach (var record in GetRecordsFromFile())
{
flow.Post(record);
}
flow.Complete();
flow.Wait();
Console.WriteLine("Stats: elapsed={0}", Stopwatch.Elapsed);
Console.ReadLine();
}
static IEnumerable<string> GetRecordsFromFile()
{
using (var streamReader = new StreamReader(InputDataFile, Encoding.ASCII))
{
string str;
while ((str = streamReader.ReadLine()) != null)
{
yield return str;
}
}
}
static void GenerateFakeDataFile()
{
File.WriteAllLines(InputDataFile, Enumerable.Range(0, RecordsCount).Select(x => string.Format("x{0}x", x)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment