using EventBus; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TcpEventBus { public class TcpEventData :BaseResultSubscripData, IEventData { private AutoResetEvent _WaitResult = new AutoResetEvent(false); [AllowNull] private T _Result = default; public override void SetResultValue(byte[] result) { _Result = MsgTool.GetValue(result); _WaitResult.Set(); } public override void Subscrip(byte[] data, Properties properties) { throw new NotSupportedException(); } public override void SubscripData(byte[] data, Properties properties) { lock (_list) { TData t = MsgTool.GetValue(data); var temp = _list.Where(x => x.Value.Properties.FilterRule(properties) && x.Value.HasDelegate).Select(x => x.Value).ToList(); if (temp.Count == 0) return; var tempevent = temp.First(); T result = tempevent.Invoke(this, t, this); var bytes = MsgTool.GetMsg(this, result, properties,true).GetBytes() ; _Socket.Send(bytes); } } public override bool IsAnonymous => false; ConcurrentDictionary> _list = new ConcurrentDictionary>(); private ISocket _Socket; internal TcpEventData(ISocket socket,string eventName,string hash) { _Socket =socket; EventName = eventName; Hash = hash; } public override string EventName { get; } public string Hash { get; } public void Clear() { lock (_list) { _list.Clear(); } } public T Publish(object sender, TData data, Properties? properties = null) { if (data == null || _Socket == null) return default; _Socket.Send(this.GetMsg(data, properties).GetBytes()); _WaitResult.Reset(); if (!_WaitResult.WaitOne(_Socket.ReceiveTimeout)) { return default; } else return _Result; } public Task PublishAsync(object sender, TData data, Properties? properties = null) { return Task.Run(()=> { return Publish(sender, data, properties); }); } public List PublishList(object sender, TData data, Properties? properties = null) { throw new NotImplementedException(); } public Task> PublishListAsync(object sender, TData data, Properties? properties = null) { throw new NotImplementedException(); } public Guid Subscrip(Func, T> func, Properties? properties = null) { lock (_list) { ArgumentNullException.ThrowIfNull(func); properties ??= Properties.Default; var temp = new FuncValue(func, properties); _list[temp.IntPtr] = temp; return temp.IntPtr; } } public Guid SubscripList(Func, List> func, Properties? properties = null) { throw new NotImplementedException(); } public void UnSubscrip(Guid guid) { lock (_list) { if (guid == Guid.Empty) return; _list.TryRemove(guid, out _); } } } }