Skip to content

Instantly share code, notes, and snippets.

@nukopy
Created November 7, 2025 13:06
Show Gist options
  • Select an option

  • Save nukopy/daf08b3c70a6cf297c4b3b5fa3730c00 to your computer and use it in GitHub Desktop.

Select an option

Save nukopy/daf08b3c70a6cf297c4b3b5fa3730c00 to your computer and use it in GitHub Desktop.
電脳ノマチ:NATS.Net 実装例
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;
using NATS.Client.Core;
using NATS.Net;
namespace Dnm
{
public static class Subjects
{
public const string SubscribeFilter = "dnm.>";
public const string SessionStarted = "dnm.session.started";
public const string SessionConversationStarted = "dnm.session.conversation.started";
public const string SessionConversationEnded = "dnm.session.conversation.ended";
public const string SessionConversationContent = "dnm.session.conversation.content";
public const string SessionEnded = "dnm.session.ended";
public const string RobotsStatusChanged = "dnm.robots.status.changed";
public const string RobotsRawPosition = "dnm.robots.raw.position";
public const string RobotsRawBattery = "dnm.robots.raw.battery";
}
public enum EventType
{
Unknown = 0,
SessionStarted,
SessionConversationStarted,
SessionConversationEnded,
SessionConversationContent,
SessionEnded,
RobotsStatusChanged,
RobotsRawPosition,
RobotsRawBattery
}
public static class EventResolver
{
public static EventType Subject2EventType(string subject)
{
if (string.IsNullOrEmpty(subject))
{
return EventType.Unknown;
}
if (subject.Equals(Subjects.SessionStarted, StringComparison.OrdinalIgnoreCase))
{
return EventType.SessionStarted;
}
if (subject.Equals(Subjects.SessionConversationStarted, StringComparison.OrdinalIgnoreCase))
{
return EventType.SessionConversationStarted;
}
if (subject.Equals(Subjects.SessionConversationEnded, StringComparison.OrdinalIgnoreCase))
{
return EventType.SessionConversationEnded;
}
if (subject.Equals(Subjects.SessionConversationContent, StringComparison.OrdinalIgnoreCase))
{
return EventType.SessionConversationContent;
}
if (subject.Equals(Subjects.SessionEnded, StringComparison.OrdinalIgnoreCase))
{
return EventType.SessionEnded;
}
if (subject.Equals(Subjects.RobotsStatusChanged, StringComparison.OrdinalIgnoreCase))
{
return EventType.RobotsStatusChanged;
}
if (subject.Equals(Subjects.RobotsRawPosition, StringComparison.OrdinalIgnoreCase))
{
return EventType.RobotsRawPosition;
}
if (subject.Equals(Subjects.RobotsRawBattery, StringComparison.OrdinalIgnoreCase))
{
return EventType.RobotsRawBattery;
}
return EventType.Unknown;
}
}
[Serializable]
public struct Robot
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("name")] public string Name { get; set; }
[JsonPropertyName("cubeId")] public string CubeId { get; set; }
}
[Serializable]
public struct EventSessionStarted
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("sessionId")] public string SessionId { get; set; }
[JsonPropertyName("robots")] public List<Robot> Robots { get; set; }
[JsonPropertyName("timestamp")] public long Timestamp { get; set; }
}
[Serializable]
public struct EventSessionConversationStarted
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("sessionId")] public string SessionId { get; set; }
[JsonPropertyName("robots")] public List<Robot> Robots { get; set; }
[JsonPropertyName("timestamp")] public long Timestamp { get; set; }
}
[Serializable]
public struct EventSessionConversationEnded
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("sessionId")] public string SessionId { get; set; }
[JsonPropertyName("robots")] public List<Robot> Robots { get; set; }
[JsonPropertyName("timestamp")] public long Timestamp { get; set; }
}
[Serializable]
public struct EventSessionConversationContent
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("sessionId")] public string SessionId { get; set; }
[JsonPropertyName("from")] public Robot From { get; set; }
[JsonPropertyName("to")] public Robot To { get; set; }
[JsonPropertyName("content")] public string Content { get; set; }
[JsonPropertyName("timestamp")] public long Timestamp { get; set; }
}
[Serializable]
public struct EventSessionEnded
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("sessionId")] public string SessionId { get; set; }
[JsonPropertyName("robots")] public List<Robot> Robots { get; set; }
[JsonPropertyName("timestamp")] public long Timestamp { get; set; }
}
public sealed class EventBus : MonoBehaviour
{
[Header("NATS")][SerializeField] private string serverUrl = "nats://localhost:4222"; // Listener が接続するサーバー URL
[SerializeField] private string subjectFilter = Subjects.SubscribeFilter;
[SerializeField, Range(1, 128)] private int maxMessagesPerFrame = 1;
private readonly ConcurrentQueue<InboundMessage> _pendingMessages = new();
private readonly JsonSerializerOptions _jsonOptions = new()
{
PropertyNameCaseInsensitive = true,
ReadCommentHandling = JsonCommentHandling.Skip,
AllowTrailingCommas = true
};
private CancellationTokenSource _cts;
private NatsClient _client;
private Task _subscriptionLoop;
private string _lastError;
private readonly struct InboundMessage
{
public InboundMessage(string subject, string payload)
{
Subject = subject;
Payload = payload;
}
public string Subject { get; }
public string Payload { get; }
}
private async void Start()
{
Debug.Log("[NATS] Starting app...");
Application.runInBackground = true;
await InitializeAsync();
Debug.Log("[NATS] App started!");
}
private void Update()
{
var processed = 0;
while (processed < maxMessagesPerFrame && _pendingMessages.TryDequeue(out InboundMessage message))
{
processed++;
HandleMessage(message);
}
if (processed > 0)
{
Debug.Log($"[NATS] Processed {processed} messages");
}
}
private async Task InitializeAsync()
{
if (_client != null)
{
return;
}
_cts = new CancellationTokenSource();
try
{
Debug.Log($"[NATS] Connecting to {serverUrl}");
_client = new NatsClient(serverUrl);
await _client.ConnectAsync();
Debug.Log($"[NATS] Connected to {serverUrl}");
_subscriptionLoop = PumpSubscriptionAsync(_cts.Token);
Debug.Log($"[NATS] Subscribed to '{subjectFilter}' via {serverUrl}");
}
catch (Exception ex)
{
_lastError = ex.Message;
Debug.LogError($"[NATS] Failed to connect: {ex.Message}");
}
}
private async Task PumpSubscriptionAsync(CancellationToken token)
{
try
{
await foreach (NatsMsg<string> msg in _client.SubscribeAsync<string>(subjectFilter, cancellationToken: token))
{
string payload = msg.Data ?? string.Empty;
_pendingMessages.Enqueue(new InboundMessage(msg.Subject, payload));
}
}
catch (OperationCanceledException)
{
// expected
}
catch (Exception ex)
{
_lastError = ex.Message;
Debug.LogError($"[NATS] Subscription error: {ex.Message}");
}
}
private void HandleMessage(InboundMessage message)
{
if (TryHandleEvent(message))
{
return;
}
Debug.Log($"[NATS] {message.Subject} :: {message.Payload}");
}
private bool TryHandleEvent(InboundMessage message)
{
Debug.Log($"[NATS] Handling event: {message.Subject}");
switch (EventResolver.Subject2EventType(message.Subject))
{
case EventType.SessionStarted:
if (TryDeserialize<EventSessionStarted>(message.Payload, out var sessionStarted))
{
Debug.Log($"[NATS] SessionStarted id={sessionStarted.Id} session={sessionStarted.SessionId} robots={sessionStarted.Robots?.Count ?? 0}");
return true;
}
return false;
case EventType.SessionEnded:
if (TryDeserialize<EventSessionEnded>(message.Payload, out var sessionEnded))
{
Debug.Log($"[NATS] SessionEnded id={sessionEnded.Id} session={sessionEnded.SessionId} robots={sessionEnded.Robots?.Count ?? 0}");
return true;
}
return false;
case EventType.SessionConversationStarted:
if (TryDeserialize<EventSessionConversationStarted>(message.Payload, out var sessionConversationStarted))
{
Debug.Log($"[NATS] SessionConversationStarted id={sessionConversationStarted.Id} session={sessionConversationStarted.SessionId} robots={sessionConversationStarted.Robots?.Count ?? 0}");
return true;
}
return false;
case EventType.SessionConversationEnded:
if (TryDeserialize<EventSessionConversationEnded>(message.Payload, out var sessionConversationEnded))
{
Debug.Log($"[NATS] SessionConversationEnded id={sessionConversationEnded.Id} session={sessionConversationEnded.SessionId} robots={sessionConversationEnded.Robots?.Count ?? 0}");
return true;
}
return false;
case EventType.SessionConversationContent:
if (TryDeserialize<EventSessionConversationContent>(message.Payload, out var sessionConversationContent))
{
Debug.Log($"[NATS] SessionConversationContent id={sessionConversationContent.Id} session={sessionConversationContent.SessionId} from={sessionConversationContent.From.Name} to={sessionConversationContent.To.Name} content={sessionConversationContent.Content}");
return true;
}
return false;
default:
Debug.LogWarning($"[NATS] Unknown event: {message.Subject}");
return false;
}
}
private bool TryDeserialize<T>(string payload, out T result)
{
try
{
Debug.Log($"[NATS] Deserializing {typeof(T).Name}: {payload}");
result = JsonSerializer.Deserialize<T>(payload, _jsonOptions);
return result != null;
}
catch (Exception ex)
{
_lastError = ex.Message;
Debug.LogError($"[NATS] Failed to deserialize {typeof(T).Name}: {ex.Message}" + Environment.NewLine + payload);
result = default;
return false;
}
}
private async void OnDestroy()
{
await ShutdownAsync();
}
private async Task ShutdownAsync()
{
if (_cts == null)
{
return;
}
_cts.Cancel();
if (_subscriptionLoop != null)
{
try
{
await _subscriptionLoop;
}
catch (OperationCanceledException)
{
// ignored
}
}
if (_client != null)
{
await _client.DisposeAsync();
_client = null;
}
_cts.Dispose();
_cts = null;
}
}
}
#!/usr/bin/env bash
set -euo pipefail
SERVER_URL=${NATS_SERVER:-"nats://127.0.0.1:4222"}
SESSION_ID=${SESSION_ID:-"session-$(date +%s)"}
ROBOT1=${ROBOT1:-"robot1"}
ROBOT2=${ROBOT2:-"robot2"}
publish() {
local subject=$1
local payload=$2
echo "[$(date -Iseconds)] $subject"
nats pub --server "$SERVER_URL" "$subject" "$payload"
}
timestamp() {
# milliseconds
date +%s%3;
}
started_at() { date -Iseconds; }
STARTED_PAYLOAD=$(cat <<JSON
{
"id": "conv-${SESSION_ID}",
"sessionId": "${SESSION_ID}",
"robots": [
{"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"},
{"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"}
],
"timestamp": $(timestamp)
}
JSON
)
CONTENT_R1_TO_R2=$(cat <<JSON
{
"id": "content-${SESSION_ID}-r1",
"sessionId": "${SESSION_ID}",
"from": {"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"},
"to": {"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"},
"content": "Hello from ${ROBOT1}",
"timestamp": $(timestamp)
}
JSON
)
CONTENT_R2_TO_R1=$(cat <<JSON
{
"id": "content-${SESSION_ID}-r2",
"sessionId": "${SESSION_ID}",
"from": {"id": "id-${ROBOT2}", "name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"},
"to": {"id": "id-${ROBOT1}", "name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"},
"content": "Reply from ${ROBOT2}",
"timestamp": $(timestamp)
}
JSON
)
ENDED_PAYLOAD=$(cat <<JSON
{
"id": "ended-${SESSION_ID}",
"sessionId": "${SESSION_ID}",
"robots": [
{"name": "${ROBOT1}", "cubeId": "cube-${ROBOT1}"},
{"name": "${ROBOT2}", "cubeId": "cube-${ROBOT2}"}
],
"timestamp": $(timestamp)
}
JSON
)
publish "dnm.session.conversation.started" "$STARTED_PAYLOAD"
sleep 1
publish "dnm.session.conversation.content" "$CONTENT_R1_TO_R2"
sleep 1
publish "dnm.session.conversation.content" "$CONTENT_R2_TO_R1"
sleep 1
publish "dnm.session.conversation.ended" "$ENDED_PAYLOAD"
@nukopy
Copy link
Author

nukopy commented Nov 7, 2025

環境

  • Unity 2022.3.53f1
  • nats-server v2.10.14
  • NATS CLI v0.1.5

動かし方

  1. nats-server 起動(localhost:4222
  2. Unity プロジェクト作成(3D)
  3. Unity Editor 起動
  4. デフォルトのシーン(SampleScene)に EventBus という Empty ゲームオブジェクトを追加
  5. EventBus.csEventBus オブジェクトにアタッチ
  6. Unity Editor でゲーム再生
  7. sh send-nats-event.sh を実行する
  8. Unity Editor のコンソールでイベントを受信しているログを確認

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment