Skip to content

Instantly share code, notes, and snippets.

@AdjWang
Created March 16, 2023 08:10
Show Gist options
  • Select an option

  • Save AdjWang/0ad2c46bb4efd806839396aeddbff5a8 to your computer and use it in GitHub Desktop.

Select an option

Save AdjWang/0ad2c46bb4efd806839396aeddbff5a8 to your computer and use it in GitHub Desktop.
Message passing using shared log API
using System.Diagnostics;
using System.Collections.Generic;
using System.Linq;
namespace MessageBus
{
public abstract class MessageEndpoint
{
public MessageEndpoint(string id)
{
Id = id;
}
public void Attach(MessageBroker broker)
{
Debug.Assert(broker != null);
this.broker = broker;
}
public readonly string Id;
protected MessageBroker? broker;
}
public class MessageSource<T> : MessageEndpoint
where T : notnull
{
public MessageSource(string id) : base(id) { }
public int Propose(string[] targets, T payload)
{
Debug.Assert(broker != null);
return broker.Append(targets, payload);
}
}
public class MessageSink<T> : MessageEndpoint
where T : notnull
{
public MessageSink(string id) : base(id) { }
public T[] Sync()
{
Debug.Assert(broker != null);
List<T> result = new();
while(true){
(int pos, object? obj) = broker.ReadNext(Id, _nextPos);
if(obj == null) { break; }
result.Add((T)obj);
_nextPos = pos + 1;
}
return result.ToArray();
}
private int _nextPos = 0;
}
public class MessageBroker
{
public int Append(string[] tags, object payload)
{
Debug.Assert(tags.Any());
_log.Add(new(tags.ToHashSet(), payload));
return _log.Count - 1;
}
public (int, bool) CheckTail(string tag)
{
int lastPos = -1;
for (int i = 0; i < _log.Count; i++)
{
Entry entry = _log[i];
if (entry.Tags.Contains(tag))
{
lastPos = i;
}
}
return (lastPos, lastPos != -1);
}
public (int, object?) ReadNext(string tag, int min)
{
if(min >= _log.Count) { return (-1, null); }
for (int i = min; i < _log.Count; i++)
{
Entry entry = _log[i];
if (entry.Tags.Contains(tag))
{
return (i, entry.Payload);
}
}
return (-1, null);
}
private record Entry(HashSet<string> Tags, object Payload);
private readonly List<Entry> _log = new();
}
}
using NUnit.Framework;
namespace MessageBus
{
public class TestMessageBroker
{
[SetUp]
public void Setup()
{
}
[Test]
public void TestAppend()
{
MessageBroker broker = new();
{
int pos = broker.Append(new string[] { "a" }, 123);
Assert.AreEqual(0, pos);
}
{
int pos = broker.Append(new string[] { "b" }, 321);
Assert.AreEqual(1, pos);
}
{
int pos = broker.Append(new string[] { "a" }, 456);
Assert.AreEqual(2, pos);
}
}
[Test]
public void TestCheckTail()
{
MessageBroker broker = new();
{
(int _, bool ok) = broker.CheckTail("a");
Assert.IsFalse(ok);
}
{
broker.Append(new string[] { "a", "b" }, 123);
(int pos, bool ok) = broker.CheckTail("a");
Assert.IsTrue(ok);
Assert.AreEqual(0, pos);
}
{
broker.Append(new string[] { "a" }, 123);
(int pos, bool ok) = broker.CheckTail("a");
Assert.IsTrue(ok);
Assert.AreEqual(1, pos);
}
}
[Test]
public void TestReadNext()
{
MessageBroker broker = new();
{
(int _, object? obj) = broker.ReadNext("a", 1);
Assert.IsNull(obj);
}
broker.Append(new string[] { "a" }, 123);
{
(int _, object? obj) = broker.ReadNext("a", 1);
Assert.IsNull(obj);
}
{
(int pos, object? obj) = broker.ReadNext("a", 0);
Assert.IsNotNull(obj);
Assert.AreEqual(0, pos);
}
}
}
public class TestMessageEndpoint
{
[Test]
public void TestProposeSync()
{
MessageBroker broker = new();
MessageSource<string> source = new("s");
source.Attach(broker);
MessageSink<string> sinka = new("a");
sinka.Attach(broker);
MessageSink<string> sinkb = new("b");
sinkb.Attach(broker);
{
string[] resa = sinka.Sync();
Assert.AreEqual(0, resa.Length);
string[] resb = sinkb.Sync();
Assert.AreEqual(0, resb.Length);
}
source.Propose(new string[] { "a" }, "msg");
{
string[] resa = sinka.Sync();
Assert.AreEqual(1, resa.Length);
string[] resb = sinkb.Sync();
Assert.AreEqual(0, resb.Length);
}
source.Propose(new string[] { "a", "b" }, "msg again");
{
string[] resa = sinka.Sync();
Assert.AreEqual(1, resa.Length);
string[] resb = sinkb.Sync();
Assert.AreEqual(1, resb.Length);
}
{
string[] resa = sinka.Sync();
Assert.AreEqual(0, resa.Length);
string[] resb = sinkb.Sync();
Assert.AreEqual(0, resb.Length);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment