Created
November 7, 2025 13:06
-
-
Save nukopy/daf08b3c70a6cf297c4b3b5fa3730c00 to your computer and use it in GitHub Desktop.
電脳ノマチ:NATS.Net 実装例
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Text.Json; | |
| using System.Text.Json.Serialization; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using UnityEngine; | |
| using NATS.Client.Core; | |
| using NATS.Net; | |
| namespace Dnm | |
| { | |
| public static class Subjects | |
| { | |
| public const string SubscribeFilter = "dnm.>"; | |
| public const string SessionStarted = "dnm.session.started"; | |
| public const string SessionConversationStarted = "dnm.session.conversation.started"; | |
| public const string SessionConversationEnded = "dnm.session.conversation.ended"; | |
| public const string SessionConversationContent = "dnm.session.conversation.content"; | |
| public const string SessionEnded = "dnm.session.ended"; | |
| public const string RobotsStatusChanged = "dnm.robots.status.changed"; | |
| public const string RobotsRawPosition = "dnm.robots.raw.position"; | |
| public const string RobotsRawBattery = "dnm.robots.raw.battery"; | |
| } | |
| public enum EventType | |
| { | |
| Unknown = 0, | |
| SessionStarted, | |
| SessionConversationStarted, | |
| SessionConversationEnded, | |
| SessionConversationContent, | |
| SessionEnded, | |
| RobotsStatusChanged, | |
| RobotsRawPosition, | |
| RobotsRawBattery | |
| } | |
| public static class EventResolver | |
| { | |
| public static EventType Subject2EventType(string subject) | |
| { | |
| if (string.IsNullOrEmpty(subject)) | |
| { | |
| return EventType.Unknown; | |
| } | |
| if (subject.Equals(Subjects.SessionStarted, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.SessionStarted; | |
| } | |
| if (subject.Equals(Subjects.SessionConversationStarted, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.SessionConversationStarted; | |
| } | |
| if (subject.Equals(Subjects.SessionConversationEnded, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.SessionConversationEnded; | |
| } | |
| if (subject.Equals(Subjects.SessionConversationContent, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.SessionConversationContent; | |
| } | |
| if (subject.Equals(Subjects.SessionEnded, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.SessionEnded; | |
| } | |
| if (subject.Equals(Subjects.RobotsStatusChanged, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.RobotsStatusChanged; | |
| } | |
| if (subject.Equals(Subjects.RobotsRawPosition, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.RobotsRawPosition; | |
| } | |
| if (subject.Equals(Subjects.RobotsRawBattery, StringComparison.OrdinalIgnoreCase)) | |
| { | |
| return EventType.RobotsRawBattery; | |
| } | |
| return EventType.Unknown; | |
| } | |
| } | |
| [Serializable] | |
| public struct Robot | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("name")] public string Name { get; set; } | |
| [JsonPropertyName("cubeId")] public string CubeId { get; set; } | |
| } | |
| [Serializable] | |
| public struct EventSessionStarted | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("sessionId")] public string SessionId { get; set; } | |
| [JsonPropertyName("robots")] public List<Robot> Robots { get; set; } | |
| [JsonPropertyName("timestamp")] public long Timestamp { get; set; } | |
| } | |
| [Serializable] | |
| public struct EventSessionConversationStarted | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("sessionId")] public string SessionId { get; set; } | |
| [JsonPropertyName("robots")] public List<Robot> Robots { get; set; } | |
| [JsonPropertyName("timestamp")] public long Timestamp { get; set; } | |
| } | |
| [Serializable] | |
| public struct EventSessionConversationEnded | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("sessionId")] public string SessionId { get; set; } | |
| [JsonPropertyName("robots")] public List<Robot> Robots { get; set; } | |
| [JsonPropertyName("timestamp")] public long Timestamp { get; set; } | |
| } | |
| [Serializable] | |
| public struct EventSessionConversationContent | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("sessionId")] public string SessionId { get; set; } | |
| [JsonPropertyName("from")] public Robot From { get; set; } | |
| [JsonPropertyName("to")] public Robot To { get; set; } | |
| [JsonPropertyName("content")] public string Content { get; set; } | |
| [JsonPropertyName("timestamp")] public long Timestamp { get; set; } | |
| } | |
| [Serializable] | |
| public struct EventSessionEnded | |
| { | |
| [JsonPropertyName("id")] public string Id { get; set; } | |
| [JsonPropertyName("sessionId")] public string SessionId { get; set; } | |
| [JsonPropertyName("robots")] public List<Robot> Robots { get; set; } | |
| [JsonPropertyName("timestamp")] public long Timestamp { get; set; } | |
| } | |
| public sealed class EventBus : MonoBehaviour | |
| { | |
| [Header("NATS")][SerializeField] private string serverUrl = "nats://localhost:4222"; // Listener が接続するサーバー URL | |
| [SerializeField] private string subjectFilter = Subjects.SubscribeFilter; | |
| [SerializeField, Range(1, 128)] private int maxMessagesPerFrame = 1; | |
| private readonly ConcurrentQueue<InboundMessage> _pendingMessages = new(); | |
| private readonly JsonSerializerOptions _jsonOptions = new() | |
| { | |
| PropertyNameCaseInsensitive = true, | |
| ReadCommentHandling = JsonCommentHandling.Skip, | |
| AllowTrailingCommas = true | |
| }; | |
| private CancellationTokenSource _cts; | |
| private NatsClient _client; | |
| private Task _subscriptionLoop; | |
| private string _lastError; | |
| private readonly struct InboundMessage | |
| { | |
| public InboundMessage(string subject, string payload) | |
| { | |
| Subject = subject; | |
| Payload = payload; | |
| } | |
| public string Subject { get; } | |
| public string Payload { get; } | |
| } | |
| private async void Start() | |
| { | |
| Debug.Log("[NATS] Starting app..."); | |
| Application.runInBackground = true; | |
| await InitializeAsync(); | |
| Debug.Log("[NATS] App started!"); | |
| } | |
| private void Update() | |
| { | |
| var processed = 0; | |
| while (processed < maxMessagesPerFrame && _pendingMessages.TryDequeue(out InboundMessage message)) | |
| { | |
| processed++; | |
| HandleMessage(message); | |
| } | |
| if (processed > 0) | |
| { | |
| Debug.Log($"[NATS] Processed {processed} messages"); | |
| } | |
| } | |
| private async Task InitializeAsync() | |
| { | |
| if (_client != null) | |
| { | |
| return; | |
| } | |
| _cts = new CancellationTokenSource(); | |
| try | |
| { | |
| Debug.Log($"[NATS] Connecting to {serverUrl}"); | |
| _client = new NatsClient(serverUrl); | |
| await _client.ConnectAsync(); | |
| Debug.Log($"[NATS] Connected to {serverUrl}"); | |
| _subscriptionLoop = PumpSubscriptionAsync(_cts.Token); | |
| Debug.Log($"[NATS] Subscribed to '{subjectFilter}' via {serverUrl}"); | |
| } | |
| catch (Exception ex) | |
| { | |
| _lastError = ex.Message; | |
| Debug.LogError($"[NATS] Failed to connect: {ex.Message}"); | |
| } | |
| } | |
| private async Task PumpSubscriptionAsync(CancellationToken token) | |
| { | |
| try | |
| { | |
| await foreach (NatsMsg<string> msg in _client.SubscribeAsync<string>(subjectFilter, cancellationToken: token)) | |
| { | |
| string payload = msg.Data ?? string.Empty; | |
| _pendingMessages.Enqueue(new InboundMessage(msg.Subject, payload)); | |
| } | |
| } | |
| catch (OperationCanceledException) | |
| { | |
| // expected | |
| } | |
| catch (Exception ex) | |
| { | |
| _lastError = ex.Message; | |
| Debug.LogError($"[NATS] Subscription error: {ex.Message}"); | |
| } | |
| } | |
| private void HandleMessage(InboundMessage message) | |
| { | |
| if (TryHandleEvent(message)) | |
| { | |
| return; | |
| } | |
| Debug.Log($"[NATS] {message.Subject} :: {message.Payload}"); | |
| } | |
| private bool TryHandleEvent(InboundMessage message) | |
| { | |
| Debug.Log($"[NATS] Handling event: {message.Subject}"); | |
| switch (EventResolver.Subject2EventType(message.Subject)) | |
| { | |
| case EventType.SessionStarted: | |
| if (TryDeserialize<EventSessionStarted>(message.Payload, out var sessionStarted)) | |
| { | |
| Debug.Log($"[NATS] SessionStarted id={sessionStarted.Id} session={sessionStarted.SessionId} robots={sessionStarted.Robots?.Count ?? 0}"); | |
| return true; | |
| } | |
| return false; | |
| case EventType.SessionEnded: | |
| if (TryDeserialize<EventSessionEnded>(message.Payload, out var sessionEnded)) | |
| { | |
| Debug.Log($"[NATS] SessionEnded id={sessionEnded.Id} session={sessionEnded.SessionId} robots={sessionEnded.Robots?.Count ?? 0}"); | |
| return true; | |
| } | |
| return false; | |
| case EventType.SessionConversationStarted: | |
| if (TryDeserialize<EventSessionConversationStarted>(message.Payload, out var sessionConversationStarted)) | |
| { | |
| Debug.Log($"[NATS] SessionConversationStarted id={sessionConversationStarted.Id} session={sessionConversationStarted.SessionId} robots={sessionConversationStarted.Robots?.Count ?? 0}"); | |
| return true; | |
| } | |
| return false; | |
| case EventType.SessionConversationEnded: | |
| if (TryDeserialize<EventSessionConversationEnded>(message.Payload, out var sessionConversationEnded)) | |
| { | |
| Debug.Log($"[NATS] SessionConversationEnded id={sessionConversationEnded.Id} session={sessionConversationEnded.SessionId} robots={sessionConversationEnded.Robots?.Count ?? 0}"); | |
| return true; | |
| } | |
| return false; | |
| case EventType.SessionConversationContent: | |
| if (TryDeserialize<EventSessionConversationContent>(message.Payload, out var sessionConversationContent)) | |
| { | |
| Debug.Log($"[NATS] SessionConversationContent id={sessionConversationContent.Id} session={sessionConversationContent.SessionId} from={sessionConversationContent.From.Name} to={sessionConversationContent.To.Name} content={sessionConversationContent.Content}"); | |
| return true; | |
| } | |
| return false; | |
| default: | |
| Debug.LogWarning($"[NATS] Unknown event: {message.Subject}"); | |
| return false; | |
| } | |
| } | |
| private bool TryDeserialize<T>(string payload, out T result) | |
| { | |
| try | |
| { | |
| Debug.Log($"[NATS] Deserializing {typeof(T).Name}: {payload}"); | |
| result = JsonSerializer.Deserialize<T>(payload, _jsonOptions); | |
| return result != null; | |
| } | |
| catch (Exception ex) | |
| { | |
| _lastError = ex.Message; | |
| Debug.LogError($"[NATS] Failed to deserialize {typeof(T).Name}: {ex.Message}" + Environment.NewLine + payload); | |
| result = default; | |
| return false; | |
| } | |
| } | |
| private async void OnDestroy() | |
| { | |
| await ShutdownAsync(); | |
| } | |
| private async Task ShutdownAsync() | |
| { | |
| if (_cts == null) | |
| { | |
| return; | |
| } | |
| _cts.Cancel(); | |
| if (_subscriptionLoop != null) | |
| { | |
| try | |
| { | |
| await _subscriptionLoop; | |
| } | |
| catch (OperationCanceledException) | |
| { | |
| // ignored | |
| } | |
| } | |
| if (_client != null) | |
| { | |
| await _client.DisposeAsync(); | |
| _client = null; | |
| } | |
| _cts.Dispose(); | |
| _cts = null; | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| set -euo pipefail | |
| SERVER_URL=${NATS_SERVER:-"nats://127.0.0.1:4222"} | |
| SESSION_ID=${SESSION_ID:-"session-$(date +%s)"} | |
| ROBOT1=${ROBOT1:-"robot1"} | |
| ROBOT2=${ROBOT2:-"robot2"} | |
| publish() { | |
| local subject=$1 | |
| local payload=$2 | |
| echo "[$(date -Iseconds)] $subject" | |
| nats pub --server "$SERVER_URL" "$subject" "$payload" | |
| } | |
| timestamp() { | |
| # milliseconds | |
| date +%s%3; | |
| } | |
| started_at() { date -Iseconds; } | |
| STARTED_PAYLOAD=$(cat <<JSON | |
| { | |
| "id": "conv-${SESSION_ID}", | |
| "sessionId": "${SESSION_ID}", | |
| "robots": [ | |
| {"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"}, | |
| {"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"} | |
| ], | |
| "timestamp": $(timestamp) | |
| } | |
| JSON | |
| ) | |
| CONTENT_R1_TO_R2=$(cat <<JSON | |
| { | |
| "id": "content-${SESSION_ID}-r1", | |
| "sessionId": "${SESSION_ID}", | |
| "from": {"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"}, | |
| "to": {"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"}, | |
| "content": "Hello from ${ROBOT1}", | |
| "timestamp": $(timestamp) | |
| } | |
| JSON | |
| ) | |
| CONTENT_R2_TO_R1=$(cat <<JSON | |
| { | |
| "id": "content-${SESSION_ID}-r2", | |
| "sessionId": "${SESSION_ID}", | |
| "from": {"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"}, | |
| "to": {"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"}, | |
| "content": "Reply from ${ROBOT2}", | |
| "timestamp": $(timestamp) | |
| } | |
| JSON | |
| ) | |
| ENDED_PAYLOAD=$(cat <<JSON | |
| { | |
| "id": "ended-${SESSION_ID}", | |
| "sessionId": "${SESSION_ID}", | |
| "robots": [ | |
| {"name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"}, | |
| {"name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"} | |
| ], | |
| "timestamp": $(timestamp) | |
| } | |
| JSON | |
| ) | |
| publish "dnm.session.conversation.started" "$STARTED_PAYLOAD" | |
| sleep 1 | |
| publish "dnm.session.conversation.content" "$CONTENT_R1_TO_R2" | |
| sleep 1 | |
| publish "dnm.session.conversation.content" "$CONTENT_R2_TO_R1" | |
| sleep 1 | |
| publish "dnm.session.conversation.ended" "$ENDED_PAYLOAD" |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
環境
動かし方
localhost:4222)SampleScene)にEventBusという Empty ゲームオブジェクトを追加EventBus.csをEventBusオブジェクトにアタッチsh send-nats-event.shを実行する