123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- using EventBus;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Runtime.CompilerServices;
- using System.Text;
- using System.Threading.Tasks;
- namespace TcpEventBus
- {
- public sealed class TcpEventBus : IEventBroker
- {
- public event EventHandler Connected;
- public event EventHandler Disconnected;
- string GetHash()
- {
- return DateTime.Now.Ticks.ToString() ;
- }
- private readonly Dictionary<string, List<IBaseEventData>> _list = new Dictionary<string, List<IBaseEventData>>();
- [AllowNull]
- private ISocket socket;
- private TcpEventBus(ISocket tcp)
- {
- if (tcp == null) throw new ArgumentNullException(nameof(tcp));
- socket = tcp;
- tcp.OnData = (data)=> Events_DataReceived(data);
- tcp.OnConnected = () => Connected?.Invoke(this, EventArgs.Empty);
- tcp.OnDisconnected+= () =>
- {
- //_list.Clear();
- Disconnected?.Invoke(this, EventArgs.Empty);
- };
- tcp.Start();
- }
- public void Close()
- {
- socket?.Disconnect();
- }
- private void Events_DataReceived(ArraySegment<byte> e)
- {
- try
- {
- var msg = MessagePack.MessagePackSerializer.Deserialize<DataMsg>(e);
- if (msg == null) return;
- if(_list.TryGetValue(msg.EventType,out var baseevent))
- {
- if (baseevent == null || baseevent.Count == 0) return;
- var subs = baseevent.Cast<ISubscripData>();
- if (subs.First().IsAnonymous)
- {
- if (msg.IsResultMsg)
- {
- ((BaseResultSubscripData)subs.First(x => x.EventName == msg.EventName)).SetResultValue(msg.Data);
- }
- else
- {
- if (subs.First(x => x.EventName == msg.EventName) is BaseResultSubscripData sub)
- {
- sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
- }
- else subs.First(x => x.EventName == msg.EventName).Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
- }
- }
- else
- {
- if (msg.IsResultMsg)
- {
- ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data);
- }
- else
- {
- if (subs.First() is BaseResultSubscripData sub)
- {
- sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
- }
- else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
- }
- }
- }
- else
- {
- }
- }
- catch
- {
- }
- }
- public void Start() => socket.Start();
- static TcpEventBus()
- {
- }
- public string IP { get; private set; }
- public int Port { get; private set; }
- public bool IsConnected => socket.IsConnect;
- public bool IsService => socket.IsService;
- public static TcpEventBus CreateService(string ip, int port,int timeout=1000)
- {
- //return new TcpEventBus(ISocket.SocketTool.CreateService(port, timeout))
- //{
- // IP = ip,
- // Port = port,
- //};
- return new TcpEventBus(new WatsonTcpServer(ip, port, timeout))
- {
- IP = ip,
- Port = port,
- };
- }
- public static TcpEventBus CreateClient(string ip,int port,int timeout=1000)
- {
- //return new TcpEventBus(ISocket.SocketTool.CreateClient(ip, port, timeout))
- //{
- // IP = ip,
- // Port = port,
- //};
- return new TcpEventBus(new WatsonTcpClient(ip, port, timeout))
- {
- IP = ip,
- Port = port,
- };
- }
- public IEventData<TData> GetEvent<TData>()
- {
- lock (_list)
- {
- string hash = GetHash();
- if (_list.TryGetValue(typeof(TcpEventData<TData>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData>(socket,hash, hash));
- return (IEventData<TData>)baseevent.First();
- }
- else
- {
- TcpEventData<TData> eventData = new TcpEventData<TData>(socket,hash,hash);
- _list.Add(typeof(TcpEventData<TData>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IEventData<TData, T> GetEvent<TData, T>()
- {
- lock (_list)
- {
- string hash = GetHash();
- if (_list.TryGetValue(typeof(TcpEventData<TData, T>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData,T>(socket, hash, hash));
- return (IEventData<TData,T>)baseevent.First();
- }
- else
- {
- TcpEventData<TData,T> eventData = new TcpEventData<TData,T>(socket, hash, hash);
- _list.Add(typeof(TcpEventData<TData, T>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IAnonymousEventData GetEvent(string eventName)
- {
- lock (_list)
- {
- string hash = GetHash();
- ArgumentNullException.ThrowIfNullOrEmpty(eventName);
- if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash()));
- return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Equals(eventName)).First();
- }
- else
- {
- TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash());
- _list.Add(typeof(TcpAnonymousEventData).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IAnonymousEventData<T> GetEvent<T>(string eventName)
- {
- lock (_list)
- {
- string hash = GetHash();
- ArgumentNullException.ThrowIfNullOrEmpty(eventName);
- if (_list.TryGetValue(typeof(TcpAnonymousEventData<T>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (!baseevent.Where(x => x is TcpAnonymousEventData<T> && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData<T>(socket, eventName, GetHash()));
- return (IAnonymousEventData<T>)baseevent.Where(x => x is TcpAnonymousEventData<T> && (x as TcpAnonymousEventData<T>)!.EventName.Equals(eventName)).First();
- }
- else
- {
- TcpAnonymousEventData<T> eventData = new TcpAnonymousEventData<T>(socket, eventName, GetHash());
- _list.Add(typeof(TcpAnonymousEventData<T>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- }
- }
|