TcpEventBus.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. using EventBus;
  2. using SuperSimpleTcp;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Diagnostics.CodeAnalysis;
  6. using System.Linq;
  7. using System.Runtime.CompilerServices;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. namespace TcpEventBus
  11. {
  12. public sealed class TcpEventBus : IEventBroker
  13. {
  14. public event EventHandler<ConnectionEventArgs> Connected;
  15. public event EventHandler<ConnectionEventArgs> Disconnected;
  16. string GetHash()
  17. {
  18. return DateTime.Now.Ticks.ToString() ;
  19. }
  20. private readonly Dictionary<string, List<IBaseEventData>> _list = new Dictionary<string, List<IBaseEventData>>();
  21. [AllowNull]
  22. private ISocket socket;
  23. private TcpEventBus(ISocket tcp)
  24. {
  25. if (tcp == null) throw new ArgumentNullException(nameof(tcp));
  26. socket = tcp;
  27. tcp.Start();
  28. tcp.Events.DataReceived += Events_DataReceived;
  29. tcp.Events.Connected += (sender, args) => Connected?.Invoke(sender, args);
  30. tcp.Events.Disconnected += (sender, args) =>
  31. {
  32. //_list.Clear();
  33. Disconnected?.Invoke(sender, args);
  34. };
  35. }
  36. public void Close()
  37. {
  38. socket?.Disconnect();
  39. }
  40. private void Events_DataReceived(object? sender, DataReceivedEventArgs e)
  41. {
  42. try
  43. {
  44. var msg = MessagePack.MessagePackSerializer.Deserialize<DataMsg>(e.Data);
  45. if (msg == null || string.IsNullOrEmpty(msg.EventType)) return;
  46. if(_list.TryGetValue(msg.EventType,out var baseevent))
  47. {
  48. if (baseevent == null || baseevent.Count == 0) return;
  49. var subs = baseevent.Cast<ISubscripData>();
  50. //if(subs.First().IsAnonymous)
  51. //{
  52. //}
  53. //else
  54. {
  55. if (msg.IsResultMsg)
  56. {
  57. ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data);
  58. }
  59. else
  60. {
  61. if (subs.First() is BaseResultSubscripData sub)
  62. {
  63. sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
  64. }
  65. else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
  66. }
  67. }
  68. }
  69. else
  70. {
  71. }
  72. }
  73. catch
  74. {
  75. }
  76. }
  77. public void Start() => socket.Start();
  78. static TcpEventBus()
  79. {
  80. }
  81. public string IP { get; private set; } = "";
  82. public int Port { get; private set; }
  83. public bool IsConnected => socket.IsConnect;
  84. public bool IsService => socket.IsService;
  85. public static TcpEventBus CreateService(string ip, int port,int timeout=1000)
  86. {
  87. return new TcpEventBus(SocketTool.CreateService(port, timeout))
  88. {
  89. IP = ip,
  90. Port = port,
  91. };
  92. }
  93. public static TcpEventBus CreateClient(string ip,int port,int timeout=1000)
  94. {
  95. return new TcpEventBus(SocketTool.CreateClient(ip, port, timeout))
  96. {
  97. IP = ip,
  98. Port = port,
  99. };
  100. }
  101. public IEventData<TData> GetEvent<TData>()
  102. {
  103. lock (_list)
  104. {
  105. string hash = GetHash();
  106. if (_list.TryGetValue(typeof(TcpEventData<TData>).FullName!, out List<IBaseEventData>? baseevent))
  107. {
  108. baseevent ??= new List<IBaseEventData>();
  109. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData>(socket,hash, hash));
  110. return (IEventData<TData>)baseevent.First();
  111. }
  112. else
  113. {
  114. TcpEventData<TData> eventData = new TcpEventData<TData>(socket,hash,hash);
  115. _list.Add(typeof(TcpEventData<TData>).FullName!, new List<IBaseEventData>() { eventData });
  116. return eventData;
  117. }
  118. }
  119. }
  120. public IEventData<TData, T> GetEvent<TData, T>()
  121. {
  122. lock (_list)
  123. {
  124. string hash = GetHash();
  125. if (_list.TryGetValue(typeof(TcpEventData<TData,T>).FullName!, out List<IBaseEventData>? baseevent))
  126. {
  127. baseevent ??= new List<IBaseEventData>();
  128. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData,T>(socket, hash, hash));
  129. return (IEventData<TData,T>)baseevent.First();
  130. }
  131. else
  132. {
  133. TcpEventData<TData,T> eventData = new TcpEventData<TData,T>(socket, hash, hash);
  134. _list.Add(typeof(TcpEventData<TData,T>).FullName!, new List<IBaseEventData>() { eventData });
  135. return eventData;
  136. }
  137. }
  138. }
  139. public IAnonymousEventData GetEvent(string eventName)
  140. {
  141. lock (_list)
  142. {
  143. string hash = GetHash();
  144. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  145. if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List<IBaseEventData>? baseevent))
  146. {
  147. baseevent ??= new List<IBaseEventData>();
  148. if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash()));
  149. return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Contains(eventName)).First();
  150. }
  151. else
  152. {
  153. TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash());
  154. _list.Add(typeof(TcpAnonymousEventData).FullName!, new List<IBaseEventData>() { eventData });
  155. return eventData;
  156. }
  157. }
  158. }
  159. public IAnonymousEventData<T> GetEvent<T>(string eventName)
  160. {
  161. lock (_list)
  162. {
  163. string hash = GetHash();
  164. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  165. if (_list.TryGetValue(typeof(TcpAnonymousEventData<T>).FullName!, out List<IBaseEventData>? baseevent))
  166. {
  167. baseevent ??= new List<IBaseEventData>();
  168. if (!baseevent.Where(x => x is TcpAnonymousEventData<T> && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData<T>(socket, eventName, GetHash()));
  169. return (IAnonymousEventData<T>)baseevent.Where(x => x is TcpAnonymousEventData<T> && (x as TcpAnonymousEventData<T>)!.EventName.Contains(eventName)).First();
  170. }
  171. else
  172. {
  173. TcpAnonymousEventData<T> eventData = new TcpAnonymousEventData<T>(socket, eventName, GetHash());
  174. _list.Add(typeof(TcpAnonymousEventData<T>).FullName!, new List<IBaseEventData>() { eventData });
  175. return eventData;
  176. }
  177. }
  178. }
  179. }
  180. }