Created
September 10, 2012 15:17
-
-
Save keithbloom/3691448 to your computer and use it in GitHub Desktop.
0MQ and threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var fileList = EnumerateDirectory(@"C:\example\directory", "*.*", SearchOption.AllDirectories); | |
| ventilator.Run(fileList); | |
| var result = sink.Run(fileList.Length); | |
| Console.WriteLine("Found the length of {0} files in {1} milliseconds.\nDirectory size is {2}", | |
| fileList.Length, stopWatch.ElapsedMilliseconds, result); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var context = ZmqContext.Create() | |
| var ventilator = new Ventilator(context); | |
| var sink = new Sink(context); | |
| ventilator.Start(); | |
| sink.Start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const int workersCount = 4; | |
| var workers = new Thread[workersCount]; | |
| for (int i = 0; i < workersCount; i++) | |
| { | |
| (workers[i] = new Thread(() => new TaskWorker(context).Run())).Start(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public void Start() | |
| { | |
| _receiver = _context.CreateSocket(SocketType.PULL); | |
| _receiver.Bind("inproc://sink"); | |
| } | |
| public Int64 Run(int length) | |
| { | |
| Int64 sizeOfDirectory = 0; | |
| for (var i = 0; i < length; i++) | |
| { | |
| var size = _receiver.Receive(Encoding.Unicode); | |
| Int64 temp; | |
| if(Int64.TryParse(size, out temp)) | |
| { | |
| sizeOfDirectory += temp; | |
| } | |
| } | |
| return sizeOfDirectory; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public void Start() | |
| { | |
| _ventilator = _context.CreateSocket(SocketType.PUSH); | |
| _ventilator.Bind("inproc://ventilator"); | |
| } | |
| public void Run(string[] fileList) | |
| { | |
| foreach (var fileName in fileList) | |
| { | |
| _ventilator.Send(fileName, Encoding.Unicode); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public void Run() | |
| { | |
| using (ZmqSocket ventilator = _context.CreateSocket(SocketType.PULL), | |
| sink = _context.CreateSocket(SocketType.PUSH)) | |
| { | |
| ventilator.Connect("inproc://ventilator"); | |
| sink.Connect("inproc://sink"); | |
| ventilator.ReceiveReady += (socket, events) => | |
| { | |
| RecieverPollInHandler(ventilator, sink); | |
| }; | |
| var poller = new Poller(new[] {ventilator, controller}); | |
| while (true) | |
| { | |
| poller.Poll(); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private void RecieverPollInHandler(ZmqSocket reciever, ZmqSocket) | |
| { | |
| Thread.Sleep(100); | |
| // Pull the job from the Ventilator | |
| var fileToMeasure = reciever.Receive(Encoding.Unicode); | |
| Int64 fileLength = 0; | |
| FileStream fs = null; | |
| try | |
| { | |
| fs = File.OpenRead(fileToMeasure); | |
| fileLength = fs.Length; | |
| } | |
| catch (IOException) { } | |
| finally | |
| { | |
| if (fs != null) fs.Dispose(); | |
| } | |
| Console.Write("."); | |
| // Push the result to the Sink | |
| sender.Send(fileLength.ToString(), Encoding.Unicode); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment