ActiveMQPubSub.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. using Apache.NMS;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Runtime.CompilerServices;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace EasyMQ
  10. {
  11. internal sealed class ActiveMQPubSub : MQPubSub
  12. {
  13. private string clientid = "";
  14. private bool isclosed = false;
  15. private ISession _Session;
  16. private ConcurrentDictionary<string,IMessageProducer> _Producers= new ConcurrentDictionary<string,IMessageProducer>();
  17. private ConcurrentDictionary<string,List<Apache.NMS.IMessageConsumer>> _Consumers = new ConcurrentDictionary<string, List<IMessageConsumer>> ();
  18. public ActiveMQPubSub(Apache.NMS.ActiveMQ.Connection connection) : base()
  19. {
  20. _Session = connection.CreateSession();
  21. clientid = connection.ConnectionId.Value;
  22. // _Session.RequestTimeout = TimeSpan.FromSeconds(100);
  23. }
  24. public override Task PublishAsync<T>(T message, Dictionary<string, string>? properties = null)
  25. {
  26. lock (_Session)
  27. {
  28. if (message == null) throw new ArgumentNullException(nameof(message));
  29. string topicname = typeof(T).FullName!;
  30. if (isclosed) return Task.Delay(1);
  31. if (_Producers.TryGetValue(topicname, out var producer) && producer != null)
  32. {
  33. }
  34. else
  35. {
  36. var topic = _Session.GetTopic("topic://PubSub." + topicname);
  37. producer = _Session.CreateProducer(topic);
  38. _Producers.TryAdd(topicname, producer);
  39. }
  40. IBytesMessage _BytesMessage = _Session.CreateBytesMessage(MessageConverter.Default.Serializer.Serialize(ref message));
  41. if (properties != null && properties.Count > 0)
  42. {
  43. foreach (var val in properties)
  44. {
  45. if (!string.IsNullOrEmpty(val.Key) && !string.IsNullOrEmpty(val.Value))
  46. {
  47. _BytesMessage.Properties.SetString(val.Key, val.Value);
  48. }
  49. }
  50. }
  51. _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY,clientid);
  52. return producer.SendAsync(_BytesMessage);
  53. }
  54. }
  55. public override Task PublishAsync(string topicname, ref byte message, uint byteslength, Dictionary<string, string>? properties = null)
  56. {
  57. lock (_Session)
  58. {
  59. if (isclosed) return Task.Delay(1);
  60. if (byteslength == 0) throw new ArgumentNullException(nameof(message));
  61. if (_Producers.TryGetValue(topicname, out var producer) && producer != null)
  62. {
  63. }
  64. else
  65. {
  66. var topic = _Session.GetTopic("topic://PubSub." + topicname);
  67. producer = _Session.CreateProducer(topic);
  68. _Producers.TryAdd(topicname, producer);
  69. }
  70. Byte[] temp = new Byte[byteslength];
  71. Unsafe.CopyBlock(ref temp[0], ref message, byteslength);
  72. IBytesMessage _BytesMessage = _Session.CreateBytesMessage(temp);
  73. if (properties != null && properties.Count > 0)
  74. {
  75. foreach (var val in properties)
  76. {
  77. if (!string.IsNullOrEmpty(val.Key) && !string.IsNullOrEmpty(val.Value))
  78. {
  79. _BytesMessage.Properties.SetString(val.Key, val.Value);
  80. }
  81. }
  82. }
  83. _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY, clientid);
  84. return producer.SendAsync(_BytesMessage);
  85. }
  86. }
  87. public override void Subscribe<T>(Action<T, IPrimitiveMap> onMessage, string? selector = "")
  88. {
  89. IMessageConsumer? consumer = null;
  90. lock (_Session)
  91. {
  92. if (isclosed) return;
  93. if (onMessage == null) throw new ArgumentNullException(nameof(onMessage));
  94. string topicname = typeof(T).FullName!;
  95. if (string.IsNullOrEmpty(selector)) selector = null;
  96. if (_Consumers.TryGetValue(topicname, out var consumers) && consumers != null && consumers.Any(x => string.Equals(x.MessageSelector, selector)))
  97. {
  98. consumer = consumers.First(x => x.MessageSelector == selector);
  99. }
  100. else
  101. {
  102. if (consumers == null)
  103. {
  104. consumers = new List<IMessageConsumer>();
  105. _Consumers.GetOrAdd(topicname, consumers);
  106. }
  107. var topic = _Session.GetTopic("topic://PubSub." + topicname);
  108. consumer = string.IsNullOrEmpty(selector) ? _Session.CreateConsumer(topic) : _Session.CreateConsumer(topic, selector);
  109. consumers.Add(consumer);
  110. }
  111. }
  112. if (consumer == null) return;
  113. consumer.Listener += (sender) =>
  114. {
  115. if (sender is IBytesMessage msg)
  116. {
  117. onMessage?.Invoke(MessageConverter.Default.Serializer.Deserialize<T>(msg.Content), msg.Properties);
  118. }
  119. };
  120. }
  121. public override void Subscribe(string topicname, OnMessageHandle onMessage, string? selector = "")
  122. {
  123. IMessageConsumer? consumer = null;
  124. lock (_Session)
  125. {
  126. if (isclosed) return;
  127. if (onMessage == null) throw new ArgumentNullException(nameof(onMessage));
  128. if (string.IsNullOrEmpty(selector)) selector = null;
  129. if (_Consumers.TryGetValue(topicname, out var consumers) && consumers != null && consumers.Any(x => string.Equals(x.MessageSelector, selector)))
  130. {
  131. consumer = consumers.First(x => x.MessageSelector == selector);
  132. }
  133. else
  134. {
  135. if (consumers == null)
  136. {
  137. consumers = new List<IMessageConsumer>();
  138. _Consumers.GetOrAdd(topicname, consumers);
  139. }
  140. var topic = _Session.GetTopic("topic://" + topicname);
  141. consumer = string.IsNullOrEmpty(selector) ? _Session.CreateConsumer(topic) : _Session.CreateConsumer(topic, selector);
  142. consumers.Add(consumer);
  143. }
  144. }
  145. if (consumer == null) return;
  146. consumer.Listener += (sender) =>
  147. {
  148. if (sender is IBytesMessage msg)
  149. {
  150. Dictionary<string, object> properties = new Dictionary<string, object>();
  151. if (msg.Properties != null && msg.Properties.Count > 0)
  152. {
  153. foreach (var val in msg.Properties.Keys)
  154. {
  155. if (val == null || (val != null && string.IsNullOrEmpty(val.ToString()))) continue;
  156. properties[val!.ToString()!] = msg.Properties[val.ToString()];
  157. }
  158. }
  159. onMessage?.Invoke(ref msg.Content[0], (uint)msg.Content.Length, properties);
  160. }
  161. };
  162. }
  163. public override void DisposeManaged()
  164. {
  165. lock (_Session)
  166. {
  167. base.DisposeManaged();
  168. foreach (var val in _Producers.Values)
  169. {
  170. val?.Dispose();
  171. }
  172. _Producers.Clear();
  173. var vals = _Consumers.Values.SelectMany(x => x).ToList();
  174. foreach (var val in vals)
  175. {
  176. val?.Dispose();
  177. }
  178. _Consumers.Clear();
  179. _Session?.Dispose();
  180. isclosed = true;
  181. }
  182. }
  183. }
  184. }