123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- using EventBus;
- using SuperSimpleTcp;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Runtime.CompilerServices;
- using System.Text;
- using System.Threading.Tasks;
- namespace TcpEventBus
- {
- public sealed class TcpEventBus : IEventBroker
- {
- public event EventHandler<ConnectionEventArgs> Connected;
- public event EventHandler<ConnectionEventArgs> Disconnected;
- string GetHash()
- {
- return DateTime.Now.Ticks.ToString() ;
- }
- private readonly Dictionary<string, List<IBaseEventData>> _list = new Dictionary<string, List<IBaseEventData>>();
- [AllowNull]
- private ISocket socket;
- private TcpEventBus(ISocket tcp)
- {
- if (tcp == null) throw new ArgumentNullException(nameof(tcp));
- socket = tcp;
- tcp.Start();
- tcp.Events.DataReceived += Events_DataReceived;
- tcp.Events.Connected += (sender, args) => Connected?.Invoke(sender, args);
- tcp.Events.Disconnected += (sender, args) =>
- {
- //_list.Clear();
- Disconnected?.Invoke(sender, args);
- };
- }
- public void Close()
- {
- socket?.Disconnect();
- }
- private void Events_DataReceived(object? sender, DataReceivedEventArgs e)
- {
- try
- {
- var msg = MessagePack.MessagePackSerializer.Deserialize<DataMsg>(e.Data);
- if (msg == null || string.IsNullOrEmpty(msg.EventType)) return;
- if(_list.TryGetValue(msg.EventType,out var baseevent))
- {
- if (baseevent == null || baseevent.Count == 0) return;
- var subs = baseevent.Cast<ISubscripData>();
- //if(subs.First().IsAnonymous)
- //{
- //}
- //else
- {
- if (msg.IsResultMsg)
- {
- ((BaseResultSubscripData)subs.First()).SetResultValue(msg.Data);
- }
- else
- {
- if (subs.First() is BaseResultSubscripData sub)
- {
- sub.SubscripData(msg.Data, Properties.GetProperties(msg.Properties));
- }
- else subs.First().Subscrip(msg.Data, Properties.GetProperties(msg.Properties));
- }
- }
- }
- else
- {
- }
- }
- catch
- {
- }
- }
- public void Start() => socket.Start();
- static TcpEventBus()
- {
- }
- public string IP { get; private set; } = "";
- public int Port { get; private set; }
- public bool IsConnected => socket.IsConnect;
- public bool IsService => socket.IsService;
- public static TcpEventBus CreateService(string ip, int port,int timeout=1000)
- {
- return new TcpEventBus(SocketTool.CreateService(port, timeout))
- {
- IP = ip,
- Port = port,
- };
- }
- public static TcpEventBus CreateClient(string ip,int port,int timeout=1000)
- {
-
- return new TcpEventBus(SocketTool.CreateClient(ip, port, timeout))
- {
- IP = ip,
- Port = port,
- };
- }
- public IEventData<TData> GetEvent<TData>()
- {
- lock (_list)
- {
- string hash = GetHash();
- if (_list.TryGetValue(typeof(TcpEventData<TData>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData>(socket,hash, hash));
- return (IEventData<TData>)baseevent.First();
- }
- else
- {
- TcpEventData<TData> eventData = new TcpEventData<TData>(socket,hash,hash);
- _list.Add(typeof(TcpEventData<TData>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IEventData<TData, T> GetEvent<TData, T>()
- {
- lock (_list)
- {
- string hash = GetHash();
- if (_list.TryGetValue(typeof(TcpEventData<TData,T>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (baseevent.Count == 0) baseevent.Add(new TcpEventData<TData,T>(socket, hash, hash));
- return (IEventData<TData,T>)baseevent.First();
- }
- else
- {
- TcpEventData<TData,T> eventData = new TcpEventData<TData,T>(socket, hash, hash);
- _list.Add(typeof(TcpEventData<TData,T>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IAnonymousEventData GetEvent(string eventName)
- {
- lock (_list)
- {
- string hash = GetHash();
- ArgumentNullException.ThrowIfNullOrEmpty(eventName);
- if (_list.TryGetValue(typeof(TcpAnonymousEventData).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (!baseevent.Where(x => x is TcpAnonymousEventData && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData(socket, eventName, GetHash()));
- return (IAnonymousEventData)baseevent.Where(x => x is TcpAnonymousEventData && (x as TcpAnonymousEventData)!.EventName.Contains(eventName)).First();
- }
- else
- {
- TcpAnonymousEventData eventData = new TcpAnonymousEventData(socket, eventName, GetHash());
- _list.Add(typeof(TcpAnonymousEventData).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- public IAnonymousEventData<T> GetEvent<T>(string eventName)
- {
- lock (_list)
- {
- string hash = GetHash();
- ArgumentNullException.ThrowIfNullOrEmpty(eventName);
- if (_list.TryGetValue(typeof(TcpAnonymousEventData<T>).FullName!, out List<IBaseEventData>? baseevent))
- {
- baseevent ??= new List<IBaseEventData>();
- if (!baseevent.Where(x => x is TcpAnonymousEventData<T> && x.EventName.Contains(eventName)).Any()) baseevent.Add(new TcpAnonymousEventData<T>(socket, eventName, GetHash()));
- return (IAnonymousEventData<T>)baseevent.Where(x => x is TcpAnonymousEventData<T> && (x as TcpAnonymousEventData<T>)!.EventName.Contains(eventName)).First();
- }
- else
- {
- TcpAnonymousEventData<T> eventData = new TcpAnonymousEventData<T>(socket, eventName, GetHash());
- _list.Add(typeof(TcpAnonymousEventData<T>).FullName!, new List<IBaseEventData>() { eventData });
- return eventData;
- }
- }
- }
- }
- }
|