Last active
July 24, 2020 17:44
-
-
Save sodablue/d7f4a63dc5e81d93cb594bdd1819918d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using RabbitMQ.Client; | |
| using System; | |
| using System.Runtime; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| internal class Program | |
| { | |
| public static void Main(string[] args) | |
| { | |
| var factory = new ConnectionFactory() { HostName = "localhost" }; | |
| using (var connection = factory.CreateConnection("RPCClient")) | |
| { | |
| using (var channel = connection.CreateModel()) | |
| { | |
| Console.WriteLine($"RPC Client [init]"); | |
| Task t = InvokeAsync(channel, 1.ToString()); | |
| t.Wait(); | |
| } | |
| Console.WriteLine(" Press [enter] to start."); | |
| Console.ReadLine(); | |
| var length = 1000; | |
| for (int i = 0; i < length; i++) | |
| { | |
| try | |
| { | |
| using (var channel = connection.CreateModel()) | |
| { | |
| Console.WriteLine($"RPC Client [{i}]"); | |
| Task t = InvokeAsync(channel, "2"); | |
| t.Wait(); | |
| Thread.Sleep(1); | |
| } | |
| } | |
| catch (Exception ex) | |
| { | |
| Console.WriteLine(ex.Message); | |
| } | |
| } | |
| connection.Close(); | |
| GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce; | |
| System.GC.Collect(2, GCCollectionMode.Forced, true, true); // Just force it | |
| } | |
| Console.WriteLine(" Press [enter] to exit."); | |
| Console.ReadLine(); | |
| } | |
| private static async Task InvokeAsync(IModel channel, string n) | |
| { | |
| var rnd = new Random(Guid.NewGuid().GetHashCode()); | |
| var rpcClient = new RpcClient(channel); | |
| Console.WriteLine(" [x] Requesting fib({0})", n); | |
| var response = await rpcClient.CallAsync(n.ToString()); | |
| Console.WriteLine(" [.] Got '{0}'", response); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using RabbitMQ.Client; | |
| using RabbitMQ.Client.Events; | |
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| class RpcClient | |
| { | |
| private const string QUEUE_NAME = "rpc_queue"; | |
| private readonly IModel _channel; | |
| private readonly string _replyQueueName; | |
| private readonly EventingBasicConsumer _consumer; | |
| private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = | |
| new ConcurrentDictionary<string, TaskCompletionSource<string>>(); | |
| public RpcClient(IModel channel) | |
| { | |
| _channel = channel; | |
| _replyQueueName = _channel.QueueDeclare().QueueName; | |
| _consumer = new EventingBasicConsumer(_channel); | |
| _consumer.Received += (model, ea) => | |
| { | |
| if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out TaskCompletionSource<string> tcs)) | |
| { | |
| return; | |
| } | |
| var response = Encoding.UTF8.GetString(ea.Body.ToArray()); | |
| tcs.TrySetResult(response); | |
| var consumer = (EventingBasicConsumer)model; | |
| consumer.Model.BasicCancel(ea.ConsumerTag); | |
| }; | |
| } | |
| public Task<string> CallAsync(string message, CancellationToken cancellationToken = default(CancellationToken)) | |
| { | |
| IBasicProperties props = _channel.CreateBasicProperties(); | |
| var correlationId = Guid.NewGuid().ToString(); | |
| props.CorrelationId = correlationId; | |
| props.ReplyTo = _replyQueueName; | |
| var messageBytes = Encoding.UTF8.GetBytes(message); | |
| var tcs = new TaskCompletionSource<string>(); | |
| _callbackMapper.TryAdd(correlationId, tcs); | |
| _channel.BasicPublish( | |
| exchange: "", | |
| routingKey: QUEUE_NAME, | |
| basicProperties: props, | |
| body: messageBytes); | |
| _channel.BasicConsume( | |
| consumer: _consumer, | |
| queue: _replyQueueName, | |
| autoAck: true); | |
| cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp)); | |
| return tcs.Task; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <Project Sdk="Microsoft.NET.Sdk"> | |
| <PropertyGroup> | |
| <OutputType>Exe</OutputType> | |
| <TargetFramework>netcoreapp3.1</TargetFramework> | |
| </PropertyGroup> | |
| <ItemGroup> | |
| <PackageReference Include="RabbitMQ.Client" Version="6.1.0" /> | |
| </ItemGroup> | |
| </Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment