NetMQSelector.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Net.Sockets;
  5. using NetMQ.Core;
  6. using NetMQ.Core.Utils;
  7. namespace NetMQ
  8. {
  9. /// <summary>
  10. /// For selecting on <see cref="NetMQSocket"/> and regular .NET <see cref="Socket"/> objects.
  11. /// </summary>
  12. /// <remarks>
  13. /// This is for advanced scenarios only.
  14. /// Most use cases are better served by <see cref="NetMQPoller"/>.
  15. /// </remarks>
  16. public sealed class NetMQSelector
  17. {
  18. private readonly List<Socket> m_checkRead = new List<Socket>();
  19. private readonly List<Socket> m_checkWrite = new List<Socket>();
  20. private readonly List<Socket> m_checkError = new List<Socket>();
  21. /// <summary>
  22. /// Selector Item used to hold the NetMQSocket/Socket and PollEvents
  23. /// </summary>
  24. public sealed class Item
  25. {
  26. /// <summary>
  27. /// Create a new item for NetMQSocket
  28. /// </summary>
  29. /// <param name="socket"></param>
  30. /// <param name="event"></param>
  31. public Item(NetMQSocket socket, PollEvents @event)
  32. {
  33. Socket = socket;
  34. Event = @event;
  35. }
  36. /// <summary>
  37. /// Create a new item for regular .net socket
  38. /// </summary>
  39. /// <param name="fileDescriptor"></param>
  40. /// <param name="event"></param>
  41. public Item(Socket fileDescriptor, PollEvents @event)
  42. {
  43. FileDescriptor = fileDescriptor;
  44. Event = @event;
  45. }
  46. /// <summary>
  47. /// Item File Descriptor, regular .net Socket
  48. /// </summary>
  49. public Socket? FileDescriptor { get; }
  50. /// <summary>
  51. /// Item NetMQSocket
  52. /// </summary>
  53. public NetMQSocket? Socket { get; }
  54. /// <summary>
  55. /// Events registered for
  56. /// </summary>
  57. public PollEvents Event { get; }
  58. /// <summary>
  59. /// Resulted events
  60. /// </summary>
  61. public PollEvents ResultEvent { get; set; }
  62. }
  63. /// <summary>
  64. /// Select on NetMQSocket or Socket, similar behavior to Socket.Select.
  65. /// </summary>
  66. /// <param name="items">Items to select on (must not be null)</param>
  67. /// <param name="itemsCount">Number of items in the array to select on</param>
  68. /// <param name="timeout">a time-out period, in milliseconds</param>
  69. /// <returns></returns>
  70. /// <exception cref="FaultException">The internal select operation failed.</exception>
  71. /// <exception cref="ArgumentNullException"><paramref name="items"/> is <c>null</c>.</exception>
  72. /// <exception cref="TerminatingException">The socket has been stopped.</exception>
  73. public bool Select(Item[] items, int itemsCount, long timeout)
  74. {
  75. if (items == null)
  76. throw new ArgumentNullException(nameof(items));
  77. if (itemsCount == 0)
  78. return false;
  79. bool firstPass = true;
  80. int numberOfEvents = 0;
  81. Stopwatch? stopwatch = null;
  82. while (true)
  83. {
  84. long currentTimeoutMicroSeconds;
  85. if (firstPass)
  86. {
  87. currentTimeoutMicroSeconds = 0;
  88. }
  89. else if (timeout < 0)
  90. {
  91. // Consider everything below 0 to be infinite
  92. currentTimeoutMicroSeconds = -1;
  93. }
  94. else
  95. {
  96. currentTimeoutMicroSeconds = (timeout - stopwatch!.ElapsedMilliseconds) * 1000;
  97. if (currentTimeoutMicroSeconds < 0)
  98. {
  99. currentTimeoutMicroSeconds = 0;
  100. }
  101. else if (currentTimeoutMicroSeconds > int.MaxValue)
  102. {
  103. currentTimeoutMicroSeconds = int.MaxValue;
  104. }
  105. }
  106. m_checkRead.Clear();
  107. m_checkWrite.Clear();
  108. m_checkError.Clear();
  109. for (int i = 0; i < itemsCount; i++)
  110. {
  111. var pollItem = items[i];
  112. if (pollItem.Socket != null)
  113. {
  114. if (pollItem.Event != PollEvents.None && pollItem.Socket.SocketHandle.Handle.Connected)
  115. m_checkRead.Add(pollItem.Socket.SocketHandle.Handle);
  116. }
  117. else
  118. {
  119. if (pollItem.Event.HasIn())
  120. m_checkRead.Add(pollItem.FileDescriptor!);
  121. if (pollItem.Event.HasOut())
  122. m_checkWrite.Add(pollItem.FileDescriptor!);
  123. }
  124. }
  125. try
  126. {
  127. SocketUtility.Select(m_checkRead, m_checkWrite, m_checkError, (int)currentTimeoutMicroSeconds);
  128. }
  129. catch (SocketException x)
  130. {
  131. #if DEBUG
  132. string textOfListRead = StringLib.AsString(m_checkRead);
  133. string textOfListWrite = StringLib.AsString(m_checkWrite);
  134. string textOfListError = StringLib.AsString(m_checkError);
  135. string xMsg = $"In Selector.Select, Socket.Select({textOfListRead}, {textOfListWrite}, {textOfListError}, {currentTimeoutMicroSeconds}) threw a SocketException: {x.Message}";
  136. Debug.WriteLine(xMsg);
  137. throw new FaultException(innerException: x, message: xMsg);
  138. #else
  139. throw new FaultException(innerException: x, message: "Within SocketUtility.Select");
  140. #endif
  141. }
  142. for (int i = 0; i < itemsCount; i++)
  143. {
  144. var selectItem = items[i];
  145. selectItem.ResultEvent = PollEvents.None;
  146. if (selectItem.Socket != null)
  147. {
  148. var events = (PollEvents)selectItem.Socket.GetSocketOption(ZmqSocketOption.Events);
  149. if (selectItem.Event.HasIn() && events.HasIn())
  150. selectItem.ResultEvent |= PollEvents.PollIn;
  151. if (selectItem.Event.HasOut() && events.HasOut())
  152. selectItem.ResultEvent |= PollEvents.PollOut;
  153. }
  154. else
  155. {
  156. if (m_checkRead.Contains(selectItem.FileDescriptor!))
  157. selectItem.ResultEvent |= PollEvents.PollIn;
  158. if (m_checkWrite.Contains(selectItem.FileDescriptor!))
  159. selectItem.ResultEvent |= PollEvents.PollOut;
  160. }
  161. if (selectItem.ResultEvent != PollEvents.None)
  162. numberOfEvents++;
  163. }
  164. if (timeout == 0)
  165. break;
  166. if (numberOfEvents > 0)
  167. break;
  168. if (timeout < 0)
  169. {
  170. if (firstPass)
  171. firstPass = false;
  172. continue;
  173. }
  174. if (firstPass)
  175. {
  176. stopwatch = Stopwatch.StartNew();
  177. firstPass = false;
  178. continue;
  179. }
  180. // Check also equality as it might frequently occur on 1000Hz clock
  181. if (stopwatch!.ElapsedMilliseconds >= timeout)
  182. break;
  183. }
  184. return numberOfEvents > 0;
  185. }
  186. }
  187. }