TcpEventBus.cs 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. using EventBus;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Diagnostics.CodeAnalysis;
  5. using System.Linq;
  6. using System.Runtime.CompilerServices;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace TcpEventBus
  10. {
  11. public sealed class TcpEventBus : IEventBroker
  12. {
  13. public event EventHandler Connected;
  14. public event EventHandler Disconnected;
  15. string GetHash()
  16. {
  17. return DateTime.Now.Ticks.ToString() ;
  18. }
  19. private readonly Dictionary<string, List<IBaseEventData>> _list = new Dictionary<string, List<IBaseEventData>>();
  20. [AllowNull]
  21. private ISocket socket;
  22. private TcpEventBus(ISocket tcp)
  23. {
  24. if (tcp == null) throw new ArgumentNullException(nameof(tcp));
  25. socket = tcp;
  26. tcp.OnData = (data)=> Events_DataReceived(data);
  27. tcp.OnConnected = () => Connected?.Invoke(this, EventArgs.Empty);
  28. tcp.OnDisconnected+= () =>
  29. {
  30. //_list.Clear();
  31. Disconnected?.Invoke(this, EventArgs.Empty);
  32. };
  33. tcp.Start();
  34. }
  35. public void Close()
  36. {
  37. socket?.Disconnect();
  38. }
  39. private void Events_DataReceived(ArraySegment<byte> e)
  40. {
  41. try
  42. {
  43. var msg = MessagePack.MessagePackSerializer.Deserialize<DataMsg>(e);
  44. if (msg == null) return;
  45. if(_list.TryGetValue(msg.EventType,out var baseevent))
  46. {
  47. if (baseevent == null || baseevent.Count == 0) return;
  48. var subs = baseevent.Cast<ISubscripData>();
  49. if (subs.First().IsAnonymous)
  50. {
  51. if (msg.IsResultMsg)
  52. {
  53. ((BaseResultSubscripData)subs.First(x => x.EventName == msg.EventName)).SetResultValue(msg.Data);
  54. }
  55. else
  56. {
  57. if (subs.First(x => x.EventName == msg.EventName) is BaseResultSubscripData sub)
  58. {
  59. sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
  60. }
  61. else subs.First(x => x.EventName == msg.EventName).Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
  62. }
  63. }
  64. else
  65. {
  66. if (msg.IsResultMsg)
  67. {
  68. ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data);
  69. }
  70. else
  71. {
  72. if (subs.First() is BaseResultSubscripData sub)
  73. {
  74. sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
  75. }
  76. else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
  77. }
  78. }
  79. }
  80. else
  81. {
  82. }
  83. }
  84. catch
  85. {
  86. }
  87. }
  88. public void Start() => socket.Start();
  89. static TcpEventBus()
  90. {
  91. }
  92. public string IP { get; private set; }
  93. public int Port { get; private set; }
  94. public bool IsConnected => socket.IsConnect;
  95. public bool IsService => socket.IsService;
  96. public static TcpEventBus CreateService(string ip, int port,int timeout=1000)
  97. {
  98. //return new TcpEventBus(ISocket.SocketTool.CreateService(port, timeout))
  99. //{
  100. // IP = ip,
  101. // Port = port,
  102. //};
  103. return new TcpEventBus(new WatsonTcpServer(ip, port, timeout))
  104. {
  105. IP = ip,
  106. Port = port,
  107. };
  108. }
  109. public static TcpEventBus CreateClient(string ip,int port,int timeout=1000)
  110. {
  111. //return new TcpEventBus(ISocket.SocketTool.CreateClient(ip, port, timeout))
  112. //{
  113. // IP = ip,
  114. // Port = port,
  115. //};
  116. return new TcpEventBus(new WatsonTcpClient(ip, port, timeout))
  117. {
  118. IP = ip,
  119. Port = port,
  120. };
  121. }
  122. public IEventData<TData> GetEvent<TData>()
  123. {
  124. lock (_list)
  125. {
  126. string hash = GetHash();
  127. if (_list.TryGetValue(typeof(TcpEventData<TData>).FullName!, out List<IBaseEventData>? baseevent))
  128. {
  129. baseevent ??= new List<IBaseEventData>();
  130. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData>(socket,hash, hash));
  131. return (IEventData<TData>)baseevent.First();
  132. }
  133. else
  134. {
  135. TcpEventData<TData> eventData = new TcpEventData<TData>(socket,hash,hash);
  136. _list.Add(typeof(TcpEventData<TData>).FullName!, new List<IBaseEventData>() { eventData });
  137. return eventData;
  138. }
  139. }
  140. }
  141. public IEventData<TData, T> GetEvent<TData, T>()
  142. {
  143. lock (_list)
  144. {
  145. string hash = GetHash();
  146. if (_list.TryGetValue(typeof(TcpEventData<TData, T>).FullName!, out List<IBaseEventData>? baseevent))
  147. {
  148. baseevent ??= new List<IBaseEventData>();
  149. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData,T>(socket, hash, hash));
  150. return (IEventData<TData,T>)baseevent.First();
  151. }
  152. else
  153. {
  154. TcpEventData<TData,T> eventData = new TcpEventData<TData,T>(socket, hash, hash);
  155. _list.Add(typeof(TcpEventData<TData, T>).FullName!, new List<IBaseEventData>() { eventData });
  156. return eventData;
  157. }
  158. }
  159. }
  160. public IAnonymousEventData GetEvent(string eventName)
  161. {
  162. lock (_list)
  163. {
  164. string hash = GetHash();
  165. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  166. if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List<IBaseEventData>? baseevent))
  167. {
  168. baseevent ??= new List<IBaseEventData>();
  169. if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash()));
  170. return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Equals(eventName)).First();
  171. }
  172. else
  173. {
  174. TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash());
  175. _list.Add(typeof(TcpAnonymousEventData).FullName!, new List<IBaseEventData>() { eventData });
  176. return eventData;
  177. }
  178. }
  179. }
  180. public IAnonymousEventData<T> GetEvent<T>(string eventName)
  181. {
  182. lock (_list)
  183. {
  184. string hash = GetHash();
  185. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  186. if (_list.TryGetValue(typeof(TcpAnonymousEventData<T>).FullName!, out List<IBaseEventData>? baseevent))
  187. {
  188. baseevent ??= new List<IBaseEventData>();
  189. if (!baseevent.Where(x => x is TcpAnonymousEventData<T> && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData<T>(socket, eventName, GetHash()));
  190. return (IAnonymousEventData<T>)baseevent.Where(x => x is TcpAnonymousEventData<T> && (x as TcpAnonymousEventData<T>)!.EventName.Equals(eventName)).First();
  191. }
  192. else
  193. {
  194. TcpAnonymousEventData<T> eventData = new TcpAnonymousEventData<T>(socket, eventName, GetHash());
  195. _list.Add(typeof(TcpAnonymousEventData<T>).FullName!, new List<IBaseEventData>() { eventData });
  196. return eventData;
  197. }
  198. }
  199. }
  200. }
  201. }