123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- using Apache.NMS;
- using Apache.NMS.Util;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Net.Http.Headers;
- using System.Text;
- using System.Threading.Tasks;
- namespace EasyMQ
- {
- internal sealed class ActiveMQRPC : MQRPC
- {
- private string clientid = "";
- private bool isclosed = false;
- private Apache.NMS.ISession _session;
- private class RespondHandle
- {
- public IMessageProducer Producer { get; set; }
- public List<(IMessageConsumer, string)> Consumer { get; set; } = new List<(IMessageConsumer, string)>();
- }
- private ConcurrentDictionary<TypeHandle, MQHandle> _RequestHandles = new ConcurrentDictionary<TypeHandle, MQHandle>();
- private ConcurrentDictionary<TypeHandle, RespondHandle> _ResponseHandles = new ConcurrentDictionary<TypeHandle, RespondHandle>();
- private IBytesMessage _BytesMessage;
- public ActiveMQRPC( Apache.NMS.ActiveMQ.Connection connection):base()
- {
- _session = connection.CreateSession();
- _session.RequestTimeout = TimeSpan.FromSeconds(100);
- _BytesMessage = _session.CreateBytesMessage();
- clientid = connection.ConnectionId.Value;
- }
- public override Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest request, Dictionary<string, string>? properties = null, CancellationToken token = default, uint timeout = 1000)
- {
- return Task<TResponse>.Factory.StartNew(() =>
- {
- bool result = false;
- return Request<TRequest, TResponse>(request,ref result, properties, token, timeout);
- });
- }
-
- public override TResponse Request<TRequest, TResponse>(TRequest request, ref bool istimeout, Dictionary<string, string>? properties = null, CancellationToken token = default, UInt32 timeout = 1000)
- {
- if (isclosed) return default(TResponse);
- istimeout = false;
- string guid = Guid.NewGuid().ToString();
- MQHandle? handle = null;
- TypeHandle typeHandle = TypeHandle.GetHandle<TRequest, TResponse>();
- if (_RequestHandles.TryGetValue(typeHandle, out handle!))
- {
- }
- if (handle == null)
- {
- var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
- Apache.NMS.IMessageProducer producer = _session.CreateProducer(destination);
- producer.Priority = MsgPriority.VeryHigh;
- Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='out'");
- handle = new MQHandle(producer, consumer);
- _RequestHandles.TryAdd(typeHandle, handle);
- }
- if (handle == null || handle.Consumer == null || handle.Producer == null)
- {
- throw new Exception("Handle Not Created");
- }
- _BytesMessage.ClearBody();
- _BytesMessage.ClearProperties();
- _BytesMessage.Content = MessageConverter.Default.Serializer.Serialize(ref request);
- _BytesMessage.Properties.SetString("direction", "in");
- _BytesMessage.Properties.SetString("guid", guid);
- if(properties!=null)
- {
- foreach(var property in properties)
- {
- if (property.Key == "direction" || property.Key == "guid") continue;
- _BytesMessage.Properties.SetString(property.Key, property.Value);
- }
- }
- _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY, clientid);
- handle.Producer.Send(_BytesMessage);
- if (token.IsCancellationRequested)
- {
- CancellationTokenSource source = new CancellationTokenSource();
- token = source.Token;
- }
- DateTime dateTime = DateTime.Now;
- while (!token.IsCancellationRequested)
- {
- if ((DateTime.Now - dateTime).TotalMilliseconds >= timeout && timeout > 0) break;
- var resultmsg = handle.Consumer.Receive(TimeSpan.FromMilliseconds(10)) as IBytesMessage;
- if (resultmsg == null || resultmsg.Properties.GetString("guid") != guid) continue;
- return MessageConverter.Default.Serializer.Deserialize<TResponse>(resultmsg.Content);
- }
- istimeout = true;
- return default;
- }
- public override void Respond<TRequest, TResponse>(Func<TRequest, IPrimitiveMap, TResponse> responder, string selector = "")
- {
- if (isclosed) return;
- if (responder == null) throw new ArgumentNullException(nameof(responder));
- RespondHandle? handle = null;
- TypeHandle typeHandle = TypeHandle.GetHandle<TRequest, TResponse>();
- if (_ResponseHandles.TryGetValue(typeHandle, out handle!))
- {
- }
- if (handle == null)
- {
- var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
- Apache.NMS.IMessageProducer producer = _session.CreateProducer(destination);
- producer.Priority = MsgPriority.VeryHigh;
- Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='in'" + (string.IsNullOrEmpty(selector) ? "" : " and " + selector));
- handle = new RespondHandle()
- {
- Producer = producer,
- Consumer = new List<(IMessageConsumer, string)> { (consumer, selector) }
- };
- _ResponseHandles.TryAdd(typeHandle, handle);
- consumer.Listener += (sender) =>
- {
- if (sender is IBytesMessage msg && sender.Properties.Keys.Cast<string>().Contains("guid"))
- {
- string guid = msg.Properties.GetString("guid");
- var resultdata = responder(MessageConverter.Default.Serializer.Deserialize<TRequest>(msg.Content), sender.Properties);
- var resultmsg = producer.CreateBytesMessage();
- resultmsg.Content = MessageConverter.Default.Serializer.Serialize(ref resultdata);
- resultmsg.Properties.SetString("direction", "out");
- resultmsg.Properties.SetString("guid", guid);
- producer.Send(resultmsg);
- }
- };
- }
- else
- {
- if (handle.Consumer.Any(x => x.Item2 == selector)) return;
- var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
- Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='in'" + (string.IsNullOrEmpty(selector) ? "" : " and " + selector));
- handle.Consumer.Add((consumer, selector));
- consumer.Listener += (sender) =>
- {
- if (sender is IBytesMessage msg && sender.Properties.Keys.Cast<string>().Contains("guid"))
- {
- string guid = msg.Properties.GetString("guid");
- var resultdata = responder(MessageConverter.Default.Serializer.Deserialize<TRequest>(msg.Content), sender.Properties);
- var resultmsg = handle.Producer.CreateBytesMessage();
- resultmsg.Content = MessageConverter.Default.Serializer.Serialize(ref resultdata);
- resultmsg.Properties.SetString("direction", "out");
- resultmsg.Properties.SetString("guid", guid);
- handle.Producer.Send(resultmsg);
- }
- };
- }
- }
- public override void DisposeManaged()
- {
- lock (_session)
- {
- base.DisposeManaged();
- foreach (var val in _RequestHandles.Values)
- {
- val.Consumer?.Dispose();
- val.Producer?.Dispose();
- }
- foreach (var val in _ResponseHandles.Values)
- {
- val.Consumer.ForEach(x => x.Item1?.Dispose());
- val.Producer?.Dispose();
- }
- _RequestHandles.Clear();
- _ResponseHandles.Clear();
- _session?.Dispose();
- isclosed = true;
- }
- }
- public override void DisposeUnmanaged()
- {
- base.DisposeUnmanaged();
- }
- }
- }
|