using EventBus; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TcpEventBus { public class TcpAnonymousEventData :BaseResultSubscripData, IAnonymousEventData { private AutoResetEvent _WaitResult = new AutoResetEvent(false); [AllowNull] private T _Result = default; public TcpAnonymousEventData(ISocket socket,string eventName,string hash) { _Socket = socket; EventName = eventName; Hash = hash; } private ISocket _Socket; ConcurrentDictionary> _list = new ConcurrentDictionary>(); public override string EventName { get; } public string Hash { get; } public override void SetResultValue(byte[] result) { _Result = MsgTool.GetValue(result); _WaitResult.Set(); } public override void Subscrip(byte[] data, Properties properties) { throw new NotSupportedException(); } public override void SubscripData(byte[] data, Properties properties) { lock (_list) { var t = MsgTool.GetAnonyDatas(data); var temp = _list.Where(x => x.Value.Properties.FilterRule(properties) && x.Value.HasDelegate).Select(x => x.Value).ToList(); if (temp.Count == 0) return; var tempevent = temp.First(); T result = tempevent.Invoke(this, t, this); var bytes = MsgTool.GetMsg(this, result, properties, true).GetBytes(); _Socket.Send(bytes); } } public void Clear() { lock (_list) { _list.Clear(); } } public override bool IsAnonymous => true; public T Publish(object sender, Properties? properties = null, params object[] data) { if (data == null || _Socket == null) return default; _Socket.Send(this.GetAnonyMsg(data, properties).GetBytes()); _WaitResult.Reset(); if (!_WaitResult.WaitOne(_Socket.ReceiveTimeout)) { return default; } else return _Result; } public Task PublishAsync(object sender, Properties? properties = null, params object[] data) { return Task.Run(() => { return Publish(sender, properties,data); }); } public List PublishList(object sender, Properties? properties = null, params object[] data) { throw new NotImplementedException(); } public Task> PublishListAsync(object sender, Properties? properties = null, params object[] data) { throw new NotImplementedException(); } public Guid Subscrip(Func, T> func, Properties? properties = null) { lock (_list) { ArgumentNullException.ThrowIfNull(func); properties ??= Properties.Default; var temp = new FuncValue(func, properties); _list[temp.IntPtr] = temp; return temp.IntPtr; } } public Guid SubscripList(Func, List> func, Properties? properties = null) { throw new NotImplementedException(); } public void UnSubscrip(Guid guid) { lock (_list) { if (guid == Guid.Empty) return; _list.TryRemove(guid, out _); } } } }