using EventBus; using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; namespace TcpEventBus { public sealed class TcpEventBus : IEventBroker { public event EventHandler Connected; public event EventHandler Disconnected; string GetHash() { return DateTime.Now.Ticks.ToString() ; } private readonly Dictionary> _list = new Dictionary>(); [AllowNull] private ISocket socket; private TcpEventBus(ISocket tcp) { if (tcp == null) throw new ArgumentNullException(nameof(tcp)); socket = tcp; tcp.OnData = (data)=> Events_DataReceived(data); tcp.OnConnected = () => Connected?.Invoke(this, EventArgs.Empty); tcp.OnDisconnected+= () => { //_list.Clear(); Disconnected?.Invoke(this, EventArgs.Empty); }; tcp.Start(); } public void Close() { socket?.Disconnect(); } private void Events_DataReceived(ArraySegment e) { try { var msg = MessagePack.MessagePackSerializer.Deserialize(e); if (msg == null) return; if(_list.TryGetValue(msg.EventType,out var baseevent)) { if (baseevent == null || baseevent.Count == 0) return; var subs = baseevent.Cast(); if (subs.First().IsAnonymous) { if (msg.IsResultMsg) { ((BaseResultSubscripData)subs.First(x => x.EventName == msg.EventName)).SetResultValue(msg.Data); } else { if (subs.First(x => x.EventName == msg.EventName) is BaseResultSubscripData sub) { sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties)); } else subs.First(x => x.EventName == msg.EventName).Subscrip(msg.Data, Properties.GetProperties(msg.Properties)); } } else { if (msg.IsResultMsg) { ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data); } else { if (subs.First() is BaseResultSubscripData sub) { sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties)); } else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties)); } } } else { } } catch { } } public void Start() => socket.Start(); static TcpEventBus() { } public string IP { get; private set; } public int Port { get; private set; } public bool IsConnected => socket.IsConnect; public bool IsService => socket.IsService; public static TcpEventBus CreateService(string ip, int port,int timeout=1000) { //return new TcpEventBus(ISocket.SocketTool.CreateService(port, timeout)) //{ // IP = ip, // Port = port, //}; return new TcpEventBus(new Server(ip, port, timeout)) { IP = ip, Port = port, }; } public static TcpEventBus CreateClient(string ip,int port,int timeout=1000) { //return new TcpEventBus(ISocket.SocketTool.CreateClient(ip, port, timeout)) //{ // IP = ip, // Port = port, //}; return new TcpEventBus(new Client(ip, port, timeout)) { IP = ip, Port = port, }; } public IEventData GetEvent() { lock (_list) { string hash = GetHash(); if (_list.TryGetValue(typeof(TcpEventData).FullName!, out List? baseevent)) { baseevent ??= new List(); if (baseevent.Count == 0) baseevent.Add(new TcpEventData(socket,hash, hash)); return (IEventData)baseevent.First(); } else { TcpEventData eventData = new TcpEventData(socket,hash,hash); _list.Add(typeof(TcpEventData).FullName!, new List() { eventData }); return eventData; } } } public IEventData GetEvent() { lock (_list) { string hash = GetHash(); if (_list.TryGetValue(typeof(TcpEventData).FullName!, out List? baseevent)) { baseevent ??= new List(); if (baseevent.Count == 0) baseevent.Add(new TcpEventData(socket, hash, hash)); return (IEventData)baseevent.First(); } else { TcpEventData eventData = new TcpEventData(socket, hash, hash); _list.Add(typeof(TcpEventData).FullName!, new List() { eventData }); return eventData; } } } public IAnonymousEventData GetEvent(string eventName) { lock (_list) { string hash = GetHash(); ArgumentNullException.ThrowIfNullOrEmpty(eventName); if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List? baseevent)) { baseevent ??= new List(); if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash())); return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Equals(eventName)).First(); } else { TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash()); _list.Add(typeof(TcpAnonymousEventData).FullName!, new List() { eventData }); return eventData; } } } public IAnonymousEventData GetEvent(string eventName) { lock (_list) { string hash = GetHash(); ArgumentNullException.ThrowIfNullOrEmpty(eventName); if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List? baseevent)) { baseevent ??= new List(); if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Equals(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash())); return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Equals(eventName)).First(); } else { TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash()); _list.Add(typeof(TcpAnonymousEventData).FullName!, new List() { eventData }); return eventData; } } } } }