123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- using Apache.NMS;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Runtime.CompilerServices;
- using System.Text;
- using System.Threading.Tasks;
- namespace EasyMQ
- {
- internal sealed class ActiveMQPubSub : MQPubSub
- {
- private string clientid = "";
- private bool isclosed = false;
- private ISession _Session;
- private ConcurrentDictionary<string,IMessageProducer> _Producers= new ConcurrentDictionary<string,IMessageProducer>();
- private ConcurrentDictionary<string,List<Apache.NMS.IMessageConsumer>> _Consumers = new ConcurrentDictionary<string, List<IMessageConsumer>> ();
-
- public ActiveMQPubSub(Apache.NMS.ActiveMQ.Connection connection) : base()
- {
- _Session = connection.CreateSession();
- clientid = connection.ConnectionId.Value;
- // _Session.RequestTimeout = TimeSpan.FromSeconds(100);
- }
- public override Task PublishAsync<T>(T message, Dictionary<string, string>? properties = null)
- {
- lock (_Session)
- {
- if (message == null) throw new ArgumentNullException(nameof(message));
- string topicname = typeof(T).FullName!;
- if (isclosed) return Task.Delay(1);
- if (_Producers.TryGetValue(topicname, out var producer) && producer != null)
- {
- }
- else
- {
- var topic = _Session.GetTopic("topic://PubSub." + topicname);
- producer = _Session.CreateProducer(topic);
- _Producers.TryAdd(topicname, producer);
- }
- IBytesMessage _BytesMessage = _Session.CreateBytesMessage(MessageConverter.Default.Serializer.Serialize(ref message));
- if (properties != null && properties.Count > 0)
- {
- foreach (var val in properties)
- {
- if (!string.IsNullOrEmpty(val.Key) && !string.IsNullOrEmpty(val.Value))
- {
- _BytesMessage.Properties.SetString(val.Key, val.Value);
- }
- }
- }
- _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY,clientid);
- return producer.SendAsync(_BytesMessage);
- }
- }
- public override Task PublishAsync(string topicname, ref byte message, uint byteslength, Dictionary<string, string>? properties = null)
- {
- lock (_Session)
- {
- if (isclosed) return Task.Delay(1);
- if (byteslength == 0) throw new ArgumentNullException(nameof(message));
- if (_Producers.TryGetValue(topicname, out var producer) && producer != null)
- {
- }
- else
- {
- var topic = _Session.GetTopic("topic://PubSub." + topicname);
- producer = _Session.CreateProducer(topic);
- _Producers.TryAdd(topicname, producer);
- }
- Byte[] temp = new Byte[byteslength];
- Unsafe.CopyBlock(ref temp[0], ref message, byteslength);
- IBytesMessage _BytesMessage = _Session.CreateBytesMessage(temp);
- if (properties != null && properties.Count > 0)
- {
- foreach (var val in properties)
- {
- if (!string.IsNullOrEmpty(val.Key) && !string.IsNullOrEmpty(val.Value))
- {
- _BytesMessage.Properties.SetString(val.Key, val.Value);
- }
- }
- }
- _BytesMessage.Properties.SetString(ActiveHutch.CLIENT_ID_KEY, clientid);
- return producer.SendAsync(_BytesMessage);
- }
- }
- public override void Subscribe<T>(Action<T, IPrimitiveMap> onMessage, string? selector = "")
- {
- IMessageConsumer? consumer = null;
- lock (_Session)
- {
- if (isclosed) return;
- if (onMessage == null) throw new ArgumentNullException(nameof(onMessage));
- string topicname = typeof(T).FullName!;
- if (string.IsNullOrEmpty(selector)) selector = null;
- if (_Consumers.TryGetValue(topicname, out var consumers) && consumers != null && consumers.Any(x => string.Equals(x.MessageSelector, selector)))
- {
- consumer = consumers.First(x => x.MessageSelector == selector);
- }
- else
- {
- if (consumers == null)
- {
- consumers = new List<IMessageConsumer>();
- _Consumers.GetOrAdd(topicname, consumers);
- }
- var topic = _Session.GetTopic("topic://PubSub." + topicname);
- consumer = string.IsNullOrEmpty(selector) ? _Session.CreateConsumer(topic) : _Session.CreateConsumer(topic, selector);
- consumers.Add(consumer);
- }
- }
- if (consumer == null) return;
- consumer.Listener += (sender) =>
- {
- if (sender is IBytesMessage msg)
- {
- onMessage?.Invoke(MessageConverter.Default.Serializer.Deserialize<T>(msg.Content), msg.Properties);
- }
- };
- }
- public override void Subscribe(string topicname, OnMessageHandle onMessage, string? selector = "")
- {
- IMessageConsumer? consumer = null;
- lock (_Session)
- {
- if (isclosed) return;
- if (onMessage == null) throw new ArgumentNullException(nameof(onMessage));
- if (string.IsNullOrEmpty(selector)) selector = null;
- if (_Consumers.TryGetValue(topicname, out var consumers) && consumers != null && consumers.Any(x => string.Equals(x.MessageSelector, selector)))
- {
- consumer = consumers.First(x => x.MessageSelector == selector);
- }
- else
- {
- if (consumers == null)
- {
- consumers = new List<IMessageConsumer>();
- _Consumers.GetOrAdd(topicname, consumers);
- }
- var topic = _Session.GetTopic("topic://" + topicname);
- consumer = string.IsNullOrEmpty(selector) ? _Session.CreateConsumer(topic) : _Session.CreateConsumer(topic, selector);
- consumers.Add(consumer);
- }
- }
- if (consumer == null) return;
- consumer.Listener += (sender) =>
- {
- if (sender is IBytesMessage msg)
- {
- Dictionary<string, object> properties = new Dictionary<string, object>();
- if (msg.Properties != null && msg.Properties.Count > 0)
- {
- foreach (var val in msg.Properties.Keys)
- {
- if (val == null || (val != null && string.IsNullOrEmpty(val.ToString()))) continue;
- properties[val!.ToString()!] = msg.Properties[val.ToString()];
- }
- }
- onMessage?.Invoke(ref msg.Content[0], (uint)msg.Content.Length, properties);
- }
- };
- }
- public override void DisposeManaged()
- {
- lock (_Session)
- {
- base.DisposeManaged();
- foreach (var val in _Producers.Values)
- {
- val?.Dispose();
- }
- _Producers.Clear();
- var vals = _Consumers.Values.SelectMany(x => x).ToList();
- foreach (var val in vals)
- {
- val?.Dispose();
- }
- _Consumers.Clear();
- _Session?.Dispose();
- isclosed = true;
- }
- }
- }
- }
|