Created
March 16, 2023 08:10
-
-
Save AdjWang/0ad2c46bb4efd806839396aeddbff5a8 to your computer and use it in GitHub Desktop.
Message passing using shared log API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.Diagnostics; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| namespace MessageBus | |
| { | |
| public abstract class MessageEndpoint | |
| { | |
| public MessageEndpoint(string id) | |
| { | |
| Id = id; | |
| } | |
| public void Attach(MessageBroker broker) | |
| { | |
| Debug.Assert(broker != null); | |
| this.broker = broker; | |
| } | |
| public readonly string Id; | |
| protected MessageBroker? broker; | |
| } | |
| public class MessageSource<T> : MessageEndpoint | |
| where T : notnull | |
| { | |
| public MessageSource(string id) : base(id) { } | |
| public int Propose(string[] targets, T payload) | |
| { | |
| Debug.Assert(broker != null); | |
| return broker.Append(targets, payload); | |
| } | |
| } | |
| public class MessageSink<T> : MessageEndpoint | |
| where T : notnull | |
| { | |
| public MessageSink(string id) : base(id) { } | |
| public T[] Sync() | |
| { | |
| Debug.Assert(broker != null); | |
| List<T> result = new(); | |
| while(true){ | |
| (int pos, object? obj) = broker.ReadNext(Id, _nextPos); | |
| if(obj == null) { break; } | |
| result.Add((T)obj); | |
| _nextPos = pos + 1; | |
| } | |
| return result.ToArray(); | |
| } | |
| private int _nextPos = 0; | |
| } | |
| public class MessageBroker | |
| { | |
| public int Append(string[] tags, object payload) | |
| { | |
| Debug.Assert(tags.Any()); | |
| _log.Add(new(tags.ToHashSet(), payload)); | |
| return _log.Count - 1; | |
| } | |
| public (int, bool) CheckTail(string tag) | |
| { | |
| int lastPos = -1; | |
| for (int i = 0; i < _log.Count; i++) | |
| { | |
| Entry entry = _log[i]; | |
| if (entry.Tags.Contains(tag)) | |
| { | |
| lastPos = i; | |
| } | |
| } | |
| return (lastPos, lastPos != -1); | |
| } | |
| public (int, object?) ReadNext(string tag, int min) | |
| { | |
| if(min >= _log.Count) { return (-1, null); } | |
| for (int i = min; i < _log.Count; i++) | |
| { | |
| Entry entry = _log[i]; | |
| if (entry.Tags.Contains(tag)) | |
| { | |
| return (i, entry.Payload); | |
| } | |
| } | |
| return (-1, null); | |
| } | |
| private record Entry(HashSet<string> Tags, object Payload); | |
| private readonly List<Entry> _log = new(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using NUnit.Framework; | |
| namespace MessageBus | |
| { | |
| public class TestMessageBroker | |
| { | |
| [SetUp] | |
| public void Setup() | |
| { | |
| } | |
| [Test] | |
| public void TestAppend() | |
| { | |
| MessageBroker broker = new(); | |
| { | |
| int pos = broker.Append(new string[] { "a" }, 123); | |
| Assert.AreEqual(0, pos); | |
| } | |
| { | |
| int pos = broker.Append(new string[] { "b" }, 321); | |
| Assert.AreEqual(1, pos); | |
| } | |
| { | |
| int pos = broker.Append(new string[] { "a" }, 456); | |
| Assert.AreEqual(2, pos); | |
| } | |
| } | |
| [Test] | |
| public void TestCheckTail() | |
| { | |
| MessageBroker broker = new(); | |
| { | |
| (int _, bool ok) = broker.CheckTail("a"); | |
| Assert.IsFalse(ok); | |
| } | |
| { | |
| broker.Append(new string[] { "a", "b" }, 123); | |
| (int pos, bool ok) = broker.CheckTail("a"); | |
| Assert.IsTrue(ok); | |
| Assert.AreEqual(0, pos); | |
| } | |
| { | |
| broker.Append(new string[] { "a" }, 123); | |
| (int pos, bool ok) = broker.CheckTail("a"); | |
| Assert.IsTrue(ok); | |
| Assert.AreEqual(1, pos); | |
| } | |
| } | |
| [Test] | |
| public void TestReadNext() | |
| { | |
| MessageBroker broker = new(); | |
| { | |
| (int _, object? obj) = broker.ReadNext("a", 1); | |
| Assert.IsNull(obj); | |
| } | |
| broker.Append(new string[] { "a" }, 123); | |
| { | |
| (int _, object? obj) = broker.ReadNext("a", 1); | |
| Assert.IsNull(obj); | |
| } | |
| { | |
| (int pos, object? obj) = broker.ReadNext("a", 0); | |
| Assert.IsNotNull(obj); | |
| Assert.AreEqual(0, pos); | |
| } | |
| } | |
| } | |
| public class TestMessageEndpoint | |
| { | |
| [Test] | |
| public void TestProposeSync() | |
| { | |
| MessageBroker broker = new(); | |
| MessageSource<string> source = new("s"); | |
| source.Attach(broker); | |
| MessageSink<string> sinka = new("a"); | |
| sinka.Attach(broker); | |
| MessageSink<string> sinkb = new("b"); | |
| sinkb.Attach(broker); | |
| { | |
| string[] resa = sinka.Sync(); | |
| Assert.AreEqual(0, resa.Length); | |
| string[] resb = sinkb.Sync(); | |
| Assert.AreEqual(0, resb.Length); | |
| } | |
| source.Propose(new string[] { "a" }, "msg"); | |
| { | |
| string[] resa = sinka.Sync(); | |
| Assert.AreEqual(1, resa.Length); | |
| string[] resb = sinkb.Sync(); | |
| Assert.AreEqual(0, resb.Length); | |
| } | |
| source.Propose(new string[] { "a", "b" }, "msg again"); | |
| { | |
| string[] resa = sinka.Sync(); | |
| Assert.AreEqual(1, resa.Length); | |
| string[] resb = sinkb.Sync(); | |
| Assert.AreEqual(1, resb.Length); | |
| } | |
| { | |
| string[] resa = sinka.Sync(); | |
| Assert.AreEqual(0, resa.Length); | |
| string[] resb = sinkb.Sync(); | |
| Assert.AreEqual(0, resb.Length); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment