TcpEventBus.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. using EventBus;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Diagnostics.CodeAnalysis;
  5. using System.Linq;
  6. using System.Runtime.CompilerServices;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace TcpEventBus
  10. {
  11. public sealed class TcpEventBus : IEventBroker
  12. {
  13. public event EventHandler Connected;
  14. public event EventHandler Disconnected;
  15. string GetHash()
  16. {
  17. return DateTime.Now.Ticks.ToString() ;
  18. }
  19. private readonly Dictionary<string, List<IBaseEventData>> _list = new Dictionary<string, List<IBaseEventData>>();
  20. [AllowNull]
  21. private ISocket socket;
  22. private TcpEventBus(ISocket tcp)
  23. {
  24. if (tcp == null) throw new ArgumentNullException(nameof(tcp));
  25. socket = tcp;
  26. tcp.OnData = (data)=> Events_DataReceived(data);
  27. tcp.OnConnected = () => Connected?.Invoke(this, EventArgs.Empty);
  28. tcp.OnDisconnected+= () =>
  29. {
  30. //_list.Clear();
  31. Disconnected?.Invoke(this, EventArgs.Empty);
  32. };
  33. tcp.Start();
  34. Thread.Sleep(50);
  35. }
  36. public void Close()
  37. {
  38. socket?.Disconnect();
  39. }
  40. private void Events_DataReceived(ArraySegment<byte> e)
  41. {
  42. try
  43. {
  44. var msg = MessagePack.MessagePackSerializer.Deserialize<DataMsg>(e);
  45. if (msg == null) return;
  46. if(_list.TryGetValue(msg.EventType,out var baseevent))
  47. {
  48. if (baseevent == null || baseevent.Count == 0) return;
  49. var subs = baseevent.Cast<ISubscripData>();
  50. if (subs.First().IsAnonymous)
  51. {
  52. if (msg.IsResultMsg)
  53. {
  54. ((BaseResultSubscripData)subs.First(x => x.EventName == msg.EventName)).SetResultValue(msg.Data);
  55. }
  56. else
  57. {
  58. if (subs.First(x => x.EventName == msg.EventName) is BaseResultSubscripData sub)
  59. {
  60. sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
  61. }
  62. else subs.First(x => x.EventName == msg.EventName).Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
  63. }
  64. }
  65. else
  66. {
  67. if (msg.IsResultMsg)
  68. {
  69. ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data);
  70. }
  71. else
  72. {
  73. if (subs.First() is BaseResultSubscripData sub)
  74. {
  75. sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
  76. }
  77. else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
  78. }
  79. }
  80. }
  81. else
  82. {
  83. }
  84. }
  85. catch
  86. {
  87. }
  88. }
  89. public void Start() => socket.Start();
  90. static TcpEventBus()
  91. {
  92. }
  93. public string IP { get; private set; } = "";
  94. public int Port { get; private set; }
  95. public bool IsConnected => socket.IsConnect;
  96. public bool IsService => socket.IsService;
  97. public static TcpEventBus CreateService(string ip, int port,int timeout=1000)
  98. {
  99. return new TcpEventBus(ISocket.SocketTool.CreateService(port, timeout))
  100. {
  101. IP = ip,
  102. Port = port,
  103. };
  104. }
  105. public static TcpEventBus CreateClient(string ip,int port,int timeout=1000)
  106. {
  107. return new TcpEventBus(ISocket.SocketTool.CreateClient(ip, port, timeout))
  108. {
  109. IP = ip,
  110. Port = port,
  111. };
  112. }
  113. public IEventData<TData> GetEvent<TData>()
  114. {
  115. lock (_list)
  116. {
  117. string hash = GetHash();
  118. if (_list.TryGetValue(typeof(TcpEventData<TData>).FullName!, out List<IBaseEventData>? baseevent))
  119. {
  120. baseevent ??= new List<IBaseEventData>();
  121. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData>(socket,hash, hash));
  122. return (IEventData<TData>)baseevent.First();
  123. }
  124. else
  125. {
  126. TcpEventData<TData> eventData = new TcpEventData<TData>(socket,hash,hash);
  127. _list.Add(typeof(TcpEventData<TData>).FullName!, new List<IBaseEventData>() { eventData });
  128. return eventData;
  129. }
  130. }
  131. }
  132. public IEventData<TData, T> GetEvent<TData, T>()
  133. {
  134. lock (_list)
  135. {
  136. string hash = GetHash();
  137. if (_list.TryGetValue(typeof(TcpEventData<TData, T>).FullName!, out List<IBaseEventData>? baseevent))
  138. {
  139. baseevent ??= new List<IBaseEventData>();
  140. if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData,T>(socket, hash, hash));
  141. return (IEventData<TData,T>)baseevent.First();
  142. }
  143. else
  144. {
  145. TcpEventData<TData,T> eventData = new TcpEventData<TData,T>(socket, hash, hash);
  146. _list.Add(typeof(TcpEventData<TData, T>).FullName!, new List<IBaseEventData>() { eventData });
  147. return eventData;
  148. }
  149. }
  150. }
  151. public IAnonymousEventData GetEvent(string eventName)
  152. {
  153. lock (_list)
  154. {
  155. string hash = GetHash();
  156. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  157. if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List<IBaseEventData>? baseevent))
  158. {
  159. baseevent ??= new List<IBaseEventData>();
  160. if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash()));
  161. return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Contains(eventName)).First();
  162. }
  163. else
  164. {
  165. TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash());
  166. _list.Add(typeof(TcpAnonymousEventData).FullName!, new List<IBaseEventData>() { eventData });
  167. return eventData;
  168. }
  169. }
  170. }
  171. public IAnonymousEventData<T> GetEvent<T>(string eventName)
  172. {
  173. lock (_list)
  174. {
  175. string hash = GetHash();
  176. ArgumentNullException.ThrowIfNullOrEmpty(eventName);
  177. if (_list.TryGetValue(typeof(TcpAnonymousEventData<T>).FullName!, out List<IBaseEventData>? baseevent))
  178. {
  179. baseevent ??= new List<IBaseEventData>();
  180. if (!baseevent.Where(x => x is TcpAnonymousEventData<T> && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData<T>(socket, eventName, GetHash()));
  181. return (IAnonymousEventData<T>)baseevent.Where(x => x is TcpAnonymousEventData<T> && (x as TcpAnonymousEventData<T>)!.EventName.Contains(eventName)).First();
  182. }
  183. else
  184. {
  185. TcpAnonymousEventData<T> eventData = new TcpAnonymousEventData<T>(socket, eventName, GetHash());
  186. _list.Add(typeof(TcpAnonymousEventData<T>).FullName!, new List<IBaseEventData>() { eventData });
  187. return eventData;
  188. }
  189. }
  190. }
  191. }
  192. }