123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- using EventBus;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace TcpEventBus
- {
- public class TcpEventData<TData, T> :BaseResultSubscripData, IEventData<TData, T>
- {
- private AutoResetEvent _WaitResult = new AutoResetEvent(false);
- [AllowNull]
- private T _Result = default;
- public override void SetResultValue(byte[] result)
- {
- _Result = MsgTool.GetValue<T>(result);
- _WaitResult.Set();
- }
- public override void Subscrip(byte[] data, Properties properties)
- {
- throw new NotSupportedException();
- }
- public override void SubscripData(byte[] data, Properties properties)
- {
- lock (_list)
- {
- TData t = MsgTool.GetValue<TData>(data);
- var temp = _list.Where(x => x.Value.Properties.FilterRule(properties) && x.Value.HasDelegate).Select(x => x.Value).ToList();
- if (temp.Count == 0) return;
- var tempevent = temp.First();
- T result = tempevent.Invoke(this, t, this);
- var bytes = MsgTool.GetMsg(this, result, properties,true).GetBytes() ;
- _Socket.Send(bytes);
- }
- }
- public override bool IsAnonymous => false;
- ConcurrentDictionary<Guid, FuncValue<TData,T>> _list = new ConcurrentDictionary<Guid, FuncValue<TData,T>>();
- private ISocket _Socket;
- internal TcpEventData(ISocket socket,string eventName,string hash)
- {
- _Socket =socket;
- EventName = eventName;
- Hash = hash;
- }
- public override string EventName { get; }
- public string Hash { get; }
- public void Clear()
- {
- lock (_list)
- {
- _list.Clear();
- }
- }
- public T Publish(object sender, TData data, Properties? properties = null)
- {
- if (data == null || _Socket == null) return default;
- _Socket.Send(this.GetMsg(data, properties).GetBytes());
- _WaitResult.Reset();
- if (!_WaitResult.WaitOne(_Socket.ReceiveTimeout))
- {
- return default;
- }
- else return _Result;
- }
- public Task<T> PublishAsync(object sender, TData data, Properties? properties = null)
- {
- return Task<T>.Run(()=>
- {
- return Publish(sender, data, properties);
- });
- }
- public List<T> PublishList(object sender, TData data, Properties? properties = null)
- {
- throw new NotImplementedException();
- }
- public Task<List<T>> PublishListAsync(object sender, TData data, Properties? properties = null)
- {
- throw new NotImplementedException();
- }
- public Guid Subscrip(Func<object, EventArgs<TData>, T> func, Properties? properties = null)
- {
- lock (_list)
- {
- ArgumentNullException.ThrowIfNull(func);
- properties ??= Properties.Default;
- var temp = new FuncValue<TData, T>(func, properties);
- _list[temp.IntPtr] = temp;
- return temp.IntPtr;
- }
- }
- public Guid SubscripList(Func<object, EventArgs<TData>, List<T>> func, Properties? properties = null)
- {
- throw new NotImplementedException();
- }
- public void UnSubscrip(Guid guid)
- {
- lock (_list)
- {
- if (guid == Guid.Empty) return;
- _list.TryRemove(guid, out _);
- }
- }
- }
- }
|