Selector.cs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Net.Sockets;
  5. namespace NetMQ.Core.Utils
  6. {
  7. /// <summary>
  8. /// A SelectItem is a pairing of a (Socket or SocketBase) and a PollEvents value.
  9. /// </summary>
  10. internal sealed class SelectItem
  11. {
  12. public SelectItem(SocketBase socket, PollEvents @event)
  13. {
  14. Socket = socket;
  15. Event = @event;
  16. }
  17. public SelectItem(Socket fileDescriptor, PollEvents @event)
  18. {
  19. FileDescriptor = fileDescriptor;
  20. Event = @event;
  21. }
  22. public Socket FileDescriptor { get; private set; }
  23. public SocketBase Socket { get; private set; }
  24. public PollEvents Event { get; private set; }
  25. public PollEvents ResultEvent { get; set; }
  26. }
  27. /// <summary>
  28. /// A Selector holds three lists of Sockets - for read, write, and error,
  29. /// and provides a Select method.
  30. /// </summary>
  31. internal sealed class Selector
  32. {
  33. private readonly List<Socket> m_checkRead = new List<Socket>();
  34. private readonly List<Socket> m_checkWrite = new List<Socket>();
  35. private readonly List<Socket> m_checkError = new List<Socket>();
  36. /// <summary>
  37. /// </summary>
  38. /// <param name="items"> (must not be null)</param>
  39. /// <param name="itemsCount"></param>
  40. /// <param name="timeout">a time-out period, in milliseconds</param>
  41. /// <returns></returns>
  42. /// <exception cref="FaultException">The internal select operation failed.</exception>
  43. /// <exception cref="ArgumentNullException"><paramref name="items"/> is <c>null</c>.</exception>
  44. /// <exception cref="TerminatingException">The socket has been stopped.</exception>
  45. public bool Select( SelectItem[] items, int itemsCount, long timeout)
  46. {
  47. if (items == null)
  48. throw new ArgumentNullException("items");
  49. if (itemsCount == 0)
  50. {
  51. return false;
  52. }
  53. bool firstPass = true;
  54. int numberOfEvents = 0;
  55. Stopwatch stopwatch = null;
  56. while (true)
  57. {
  58. long currentTimeoutMicroSeconds;
  59. if (firstPass)
  60. {
  61. currentTimeoutMicroSeconds = 0;
  62. }
  63. else if (timeout < 0)
  64. {
  65. // Consider everything below 0 to be infinite
  66. currentTimeoutMicroSeconds = -1;
  67. }
  68. else
  69. {
  70. currentTimeoutMicroSeconds = (timeout - stopwatch.ElapsedMilliseconds) * 1000;
  71. if (currentTimeoutMicroSeconds < 0)
  72. {
  73. currentTimeoutMicroSeconds = 0;
  74. }
  75. else if (currentTimeoutMicroSeconds > int.MaxValue)
  76. {
  77. currentTimeoutMicroSeconds = int.MaxValue;
  78. }
  79. }
  80. m_checkRead.Clear();
  81. m_checkWrite.Clear();
  82. m_checkError.Clear();
  83. for (int i = 0; i < itemsCount; i++)
  84. {
  85. var pollItem = items[i];
  86. if (pollItem.Socket != null)
  87. {
  88. if (pollItem.Event != PollEvents.None && pollItem.Socket.Handle.Connected)
  89. m_checkRead.Add(pollItem.Socket.Handle);
  90. }
  91. else
  92. {
  93. if (pollItem.Event.HasIn())
  94. m_checkRead.Add(pollItem.FileDescriptor);
  95. if (pollItem.Event.HasOut())
  96. m_checkWrite.Add(pollItem.FileDescriptor);
  97. }
  98. }
  99. try
  100. {
  101. SocketUtility.Select(m_checkRead, m_checkWrite, m_checkError, (int)currentTimeoutMicroSeconds);
  102. }
  103. catch (SocketException x)
  104. {
  105. #if DEBUG
  106. string textOfListRead = StringLib.AsString(m_checkRead);
  107. string textOfListWrite = StringLib.AsString(m_checkWrite);
  108. string textOfListError = StringLib.AsString(m_checkError);
  109. string xMsg = string.Format("In Selector.Select, Socket.Select({0}, {1}, {2}, {3}) threw a SocketException: {4}", textOfListRead, textOfListWrite, textOfListError, currentTimeoutMicroSeconds, x.Message);
  110. Debug.WriteLine(xMsg);
  111. throw new FaultException(innerException: x, message: xMsg);
  112. #else
  113. throw new FaultException(innerException: x, message: "Within SocketUtility.Select");
  114. #endif
  115. }
  116. for (int i = 0; i < itemsCount; i++)
  117. {
  118. var selectItem = items[i];
  119. selectItem.ResultEvent = PollEvents.None;
  120. if (selectItem.Socket != null)
  121. {
  122. var events = (PollEvents)selectItem.Socket.GetSocketOption(ZmqSocketOption.Events);
  123. if (selectItem.Event.HasIn() && events.HasIn())
  124. selectItem.ResultEvent |= PollEvents.PollIn;
  125. if (selectItem.Event.HasOut() && events.HasOut())
  126. selectItem.ResultEvent |= PollEvents.PollOut;
  127. }
  128. else
  129. {
  130. if (m_checkRead.Contains(selectItem.FileDescriptor))
  131. selectItem.ResultEvent |= PollEvents.PollIn;
  132. if (m_checkWrite.Contains(selectItem.FileDescriptor))
  133. selectItem.ResultEvent |= PollEvents.PollOut;
  134. }
  135. if (selectItem.ResultEvent != PollEvents.None)
  136. numberOfEvents++;
  137. }
  138. if (timeout == 0)
  139. break;
  140. if (numberOfEvents > 0)
  141. break;
  142. if (timeout < 0)
  143. {
  144. if (firstPass)
  145. firstPass = false;
  146. continue;
  147. }
  148. if (firstPass)
  149. {
  150. stopwatch = Stopwatch.StartNew();
  151. firstPass = false;
  152. continue;
  153. }
  154. // Check also equality as it might frequently occur on 1000Hz clock
  155. if (stopwatch.ElapsedMilliseconds >= timeout)
  156. break;
  157. }
  158. return numberOfEvents > 0;
  159. }
  160. }
  161. }