TcpAnonymousEventData{T}.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. using EventBus;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Diagnostics.CodeAnalysis;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace TcpEventBus
  10. {
  11. public class TcpAnonymousEventData<T> :BaseResultSubscripData, IAnonymousEventData<T>
  12. {
  13. private AutoResetEvent _WaitResult = new AutoResetEvent(false);
  14. [AllowNull]
  15. private T _Result = default;
  16. public TcpAnonymousEventData(ISocket socket,string eventName,string hash)
  17. {
  18. _Socket = socket;
  19. EventName = eventName;
  20. Hash = hash;
  21. }
  22. private ISocket _Socket;
  23. ConcurrentDictionary<Guid, FuncValue<object[], T>> _list = new ConcurrentDictionary<Guid, FuncValue<object[], T>>();
  24. public override string EventName { get; }
  25. public string Hash { get; }
  26. public override void SetResultValue(byte[] result)
  27. {
  28. _Result = MsgTool.GetValue<T>(result);
  29. _WaitResult.Set();
  30. }
  31. public override void Subscrip(byte[] data, Properties properties)
  32. {
  33. throw new NotSupportedException();
  34. }
  35. public override void SubscripData(byte[] data, Properties properties)
  36. {
  37. lock (_list)
  38. {
  39. var t = MsgTool.GetAnonyDatas(data);
  40. var temp = _list.Where(x => x.Value.Properties.FilterRule(properties) && x.Value.HasDelegate).Select(x => x.Value).ToList();
  41. if (temp.Count == 0) return;
  42. var tempevent = temp.First();
  43. T result = tempevent.Invoke(this, t, this);
  44. var bytes = MsgTool.GetMsg(this, result, properties, true).GetBytes();
  45. _Socket.Send(bytes);
  46. }
  47. }
  48. public void Clear()
  49. {
  50. lock (_list)
  51. {
  52. _list.Clear();
  53. }
  54. }
  55. public override bool IsAnonymous => true;
  56. public T Publish(object sender, Properties? properties = null, params object[] data)
  57. {
  58. if (data == null || _Socket == null) return default;
  59. _Socket.Send(this.GetAnonyMsg(data, properties).GetBytes());
  60. _WaitResult.Reset();
  61. if (!_WaitResult.WaitOne(_Socket.ReceiveTimeout))
  62. {
  63. return default;
  64. }
  65. else return _Result;
  66. }
  67. public Task<T> PublishAsync(object sender, Properties? properties = null, params object[] data)
  68. {
  69. return Task<T>.Run(() =>
  70. {
  71. return Publish(sender, properties,data);
  72. });
  73. }
  74. public List<T> PublishList(object sender, Properties? properties = null, params object[] data)
  75. {
  76. throw new NotImplementedException();
  77. }
  78. public Task<List<T>> PublishListAsync(object sender, Properties? properties = null, params object[] data)
  79. {
  80. throw new NotImplementedException();
  81. }
  82. public Guid Subscrip(Func<object, EventArgs<object[]>, T> func, Properties? properties = null)
  83. {
  84. lock (_list)
  85. {
  86. ArgumentNullException.ThrowIfNull(func);
  87. properties ??= Properties.Default;
  88. var temp = new FuncValue<object[], T>(func, properties);
  89. _list[temp.IntPtr] = temp;
  90. return temp.IntPtr;
  91. }
  92. }
  93. public Guid SubscripList(Func<object, EventArgs<object[]>, List<T>> func, Properties? properties = null)
  94. {
  95. throw new NotImplementedException();
  96. }
  97. public void UnSubscrip(Guid guid)
  98. {
  99. lock (_list)
  100. {
  101. if (guid == Guid.Empty) return;
  102. _list.TryRemove(guid, out _);
  103. }
  104. }
  105. }
  106. }