Skip to content

Instantly share code, notes, and snippets.

@sodablue
Last active July 24, 2020 17:44
Show Gist options
  • Select an option

  • Save sodablue/d7f4a63dc5e81d93cb594bdd1819918d to your computer and use it in GitHub Desktop.

Select an option

Save sodablue/d7f4a63dc5e81d93cb594bdd1819918d to your computer and use it in GitHub Desktop.
using RabbitMQ.Client;
using System;
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
internal class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection("RPCClient"))
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"RPC Client [init]");
Task t = InvokeAsync(channel, 1.ToString());
t.Wait();
}
Console.WriteLine(" Press [enter] to start.");
Console.ReadLine();
var length = 1000;
for (int i = 0; i < length; i++)
{
try
{
using (var channel = connection.CreateModel())
{
Console.WriteLine($"RPC Client [{i}]");
Task t = InvokeAsync(channel, "2");
t.Wait();
Thread.Sleep(1);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
connection.Close();
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
System.GC.Collect(2, GCCollectionMode.Forced, true, true); // Just force it
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static async Task InvokeAsync(IModel channel, string n)
{
var rnd = new Random(Guid.NewGuid().GetHashCode());
var rpcClient = new RpcClient(channel);
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n.ToString());
Console.WriteLine(" [.] Got '{0}'", response);
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class RpcClient
{
private const string QUEUE_NAME = "rpc_queue";
private readonly IModel _channel;
private readonly string _replyQueueName;
private readonly EventingBasicConsumer _consumer;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper =
new ConcurrentDictionary<string, TaskCompletionSource<string>>();
public RpcClient(IModel channel)
{
_channel = channel;
_replyQueueName = _channel.QueueDeclare().QueueName;
_consumer = new EventingBasicConsumer(_channel);
_consumer.Received += (model, ea) =>
{
if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs))
{
return;
}
var response = Encoding.UTF8.GetString(ea.Body.ToArray());
tcs.TrySetResult(response);
var consumer = (EventingBasicConsumer)model;
consumer.Model.BasicCancel(ea.ConsumerTag);
};
}
public Task<string> CallAsync(string message, CancellationToken cancellationToken = default(CancellationToken))
{
IBasicProperties props = _channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = _replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
_callbackMapper.TryAdd(correlationId, tcs);
_channel.BasicPublish(
exchange: "",
routingKey: QUEUE_NAME,
basicProperties: props,
body: messageBytes);
_channel.BasicConsume(
consumer: _consumer,
queue: _replyQueueName,
autoAck: true);
cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp));
return tcs.Task;
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.1.0" />
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment