ActiveMQRPC.cs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. using Apache.NMS;
  2. using Apache.NMS.Util;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics.CodeAnalysis;
  7. using System.Linq;
  8. using System.Net.Http.Headers;
  9. using System.Text;
  10. using System.Threading.Tasks;
  11. namespace EasyMQ
  12. {
  13. internal sealed class ActiveMQRPC : MQRPC
  14. {
  15. private string clientid = "";
  16. private bool isclosed = false;
  17. private Apache.NMS.ISession _session;
  18. private class RespondHandle
  19. {
  20. public IMessageProducer Producer { get; set; }
  21. public List<(IMessageConsumer, string)> Consumer { get; set; } = new List<(IMessageConsumer, string)>();
  22. }
  23. private ConcurrentDictionary<TypeHandle, MQHandle> _RequestHandles = new ConcurrentDictionary<TypeHandle, MQHandle>();
  24. private ConcurrentDictionary<TypeHandle, RespondHandle> _ResponseHandles = new ConcurrentDictionary<TypeHandle, RespondHandle>();
  25. private IBytesMessage _BytesMessage;
  26. public ActiveMQRPC( Apache.NMS.ActiveMQ.Connection connection):base()
  27. {
  28. _session = connection.CreateSession();
  29. _session.RequestTimeout = TimeSpan.FromSeconds(100);
  30. _BytesMessage = _session.CreateBytesMessage();
  31. clientid = connection.ConnectionId.Value;
  32. }
  33. public override Task<TResponse> RequestAsync<TRequest, TResponse>(TRequest request, Dictionary<string, string>? properties = null, CancellationToken token = default, uint timeout = 1000)
  34. {
  35. return Task<TResponse>.Factory.StartNew(() =>
  36. {
  37. bool result = false;
  38. return Request<TRequest, TResponse>(request,ref result, properties, token, timeout);
  39. });
  40. }
  41. public override TResponse Request<TRequest, TResponse>(TRequest request, ref bool istimeout, Dictionary<string, string>? properties = null, CancellationToken token = default, UInt32 timeout = 1000)
  42. {
  43. if (isclosed) return default(TResponse);
  44. istimeout = false;
  45. string guid = Guid.NewGuid().ToString();
  46. MQHandle? handle = null;
  47. TypeHandle typeHandle = TypeHandle.GetHandle<TRequest, TResponse>();
  48. if (_RequestHandles.TryGetValue(typeHandle, out handle!))
  49. {
  50. }
  51. if (handle == null)
  52. {
  53. var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
  54. Apache.NMS.IMessageProducer producer = _session.CreateProducer(destination);
  55. producer.Priority = MsgPriority.VeryHigh;
  56. Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='out'");
  57. handle = new MQHandle(producer, consumer);
  58. _RequestHandles.TryAdd(typeHandle, handle);
  59. }
  60. if (handle == null || handle.Consumer == null || handle.Producer == null)
  61. {
  62. throw new Exception("Handle Not Created");
  63. }
  64. _BytesMessage.ClearBody();
  65. _BytesMessage.ClearProperties();
  66. _BytesMessage.Content = MessageConverter.Default.Serializer.Serialize(ref request);
  67. _BytesMessage.Properties.SetString("direction", "in");
  68. _BytesMessage.Properties.SetString("guid", guid);
  69. if(properties!=null)
  70. {
  71. foreach(var property in properties)
  72. {
  73. if (property.Key == "direction" || property.Key == "guid") continue;
  74. _BytesMessage.Properties.SetString(property.Key, property.Value);
  75. }
  76. }
  77. _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY, clientid);
  78. handle.Producer.Send(_BytesMessage);
  79. if (token.IsCancellationRequested)
  80. {
  81. CancellationTokenSource source = new CancellationTokenSource();
  82. token = source.Token;
  83. }
  84. DateTime dateTime = DateTime.Now;
  85. while (!token.IsCancellationRequested)
  86. {
  87. if ((DateTime.Now - dateTime).TotalMilliseconds >= timeout && timeout > 0) break;
  88. var resultmsg = handle.Consumer.Receive(TimeSpan.FromMilliseconds(10)) as IBytesMessage;
  89. if (resultmsg == null || resultmsg.Properties.GetString("guid") != guid) continue;
  90. return MessageConverter.Default.Serializer.Deserialize<TResponse>(resultmsg.Content);
  91. }
  92. istimeout = true;
  93. return default;
  94. }
  95. public override void Respond<TRequest, TResponse>(Func<TRequest, IPrimitiveMap, TResponse> responder, string selector = "")
  96. {
  97. if (isclosed) return;
  98. if (responder == null) throw new ArgumentNullException(nameof(responder));
  99. RespondHandle? handle = null;
  100. TypeHandle typeHandle = TypeHandle.GetHandle<TRequest, TResponse>();
  101. if (_ResponseHandles.TryGetValue(typeHandle, out handle!))
  102. {
  103. }
  104. if (handle == null)
  105. {
  106. var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
  107. Apache.NMS.IMessageProducer producer = _session.CreateProducer(destination);
  108. producer.Priority = MsgPriority.VeryHigh;
  109. Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='in'" + (string.IsNullOrEmpty(selector) ? "" : " and " + selector));
  110. handle = new RespondHandle()
  111. {
  112. Producer = producer,
  113. Consumer = new List<(IMessageConsumer, string)> { (consumer, selector) }
  114. };
  115. _ResponseHandles.TryAdd(typeHandle, handle);
  116. consumer.Listener += (sender) =>
  117. {
  118. if (sender is IBytesMessage msg && sender.Properties.Keys.Cast<string>().Contains("guid"))
  119. {
  120. string guid = msg.Properties.GetString("guid");
  121. var resultdata = responder(MessageConverter.Default.Serializer.Deserialize<TRequest>(msg.Content), sender.Properties);
  122. var resultmsg = producer.CreateBytesMessage();
  123. resultmsg.Content = MessageConverter.Default.Serializer.Serialize(ref resultdata);
  124. resultmsg.Properties.SetString("direction", "out");
  125. resultmsg.Properties.SetString("guid", guid);
  126. producer.Send(resultmsg);
  127. }
  128. };
  129. }
  130. else
  131. {
  132. if (handle.Consumer.Any(x => x.Item2 == selector)) return;
  133. var destination = SessionUtil.GetDestination(_session, $"topic://RPC.{typeof(TRequest).FullName}+{typeof(TResponse).FullName}", Apache.NMS.DestinationType.Topic);
  134. Apache.NMS.IMessageConsumer consumer = _session.CreateConsumer(destination, "direction='in'" + (string.IsNullOrEmpty(selector) ? "" : " and " + selector));
  135. handle.Consumer.Add((consumer, selector));
  136. consumer.Listener += (sender) =>
  137. {
  138. if (sender is IBytesMessage msg && sender.Properties.Keys.Cast<string>().Contains("guid"))
  139. {
  140. string guid = msg.Properties.GetString("guid");
  141. var resultdata = responder(MessageConverter.Default.Serializer.Deserialize<TRequest>(msg.Content), sender.Properties);
  142. var resultmsg = handle.Producer.CreateBytesMessage();
  143. resultmsg.Content = MessageConverter.Default.Serializer.Serialize(ref resultdata);
  144. resultmsg.Properties.SetString("direction", "out");
  145. resultmsg.Properties.SetString("guid", guid);
  146. handle.Producer.Send(resultmsg);
  147. }
  148. };
  149. }
  150. }
  151. public override void DisposeManaged()
  152. {
  153. lock (_session)
  154. {
  155. base.DisposeManaged();
  156. foreach (var val in _RequestHandles.Values)
  157. {
  158. val.Consumer?.Dispose();
  159. val.Producer?.Dispose();
  160. }
  161. foreach (var val in _ResponseHandles.Values)
  162. {
  163. val.Consumer.ForEach(x => x.Item1?.Dispose());
  164. val.Producer?.Dispose();
  165. }
  166. _RequestHandles.Clear();
  167. _ResponseHandles.Clear();
  168. _session?.Dispose();
  169. isclosed = true;
  170. }
  171. }
  172. public override void DisposeUnmanaged()
  173. {
  174. base.DisposeUnmanaged();
  175. }
  176. }
  177. }