NetMQActor.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. using System;
  2. using System.Threading;
  3. using NetMQ.Sockets;
  4. namespace NetMQ
  5. {
  6. #region IShimHandler
  7. /// <summary>
  8. /// An IShimHandler provides a Run(PairSocket) method.
  9. /// </summary>
  10. public interface IShimHandler
  11. {
  12. /// <summary>
  13. /// Execute whatever action this <c>IShimHandler</c> represents against the given shim.
  14. /// </summary>
  15. /// <param name="shim"></param>
  16. void Run(PairSocket shim);
  17. }
  18. #endregion
  19. #region NetMQActorEventArgs
  20. /// <summary>
  21. /// This is an EventArgs that provides an Actor property.
  22. /// </summary>
  23. public class NetMQActorEventArgs : EventArgs
  24. {
  25. /// <summary>
  26. /// Create a new NetMQActorEventArgs with the given NetMQActor.
  27. /// </summary>
  28. /// <param name="actor">the NetMQActor for this exception to reference</param>
  29. public NetMQActorEventArgs(NetMQActor actor)
  30. {
  31. Actor = actor;
  32. }
  33. /// <summary>
  34. /// Get the NetMQActor that this exception references.
  35. /// </summary>
  36. public NetMQActor Actor { get; }
  37. }
  38. #endregion
  39. #region Delegates
  40. /// <summary>
  41. /// This delegate represents the action for this actor to execute.
  42. /// </summary>
  43. /// <param name="shim">the <seealso cref="PairSocket"/> that is the shim to execute this action</param>
  44. public delegate void ShimAction(PairSocket shim);
  45. /// <summary>
  46. /// This delegate represents the action for this actor to execute - along with a state-information object.
  47. /// </summary>
  48. /// <typeparam name="T">the type to use for the state-information object</typeparam>
  49. /// <param name="shim">the <seealso cref="PairSocket"/> that is the shim to execute this action</param>
  50. /// <param name="state">the state-information that the action will use</param>
  51. public delegate void ShimAction<in T>(PairSocket shim, T state);
  52. #endregion
  53. /// <summary>
  54. /// The Actor represents one end of a two-way pipe between 2 PairSocket(s). Where
  55. /// the actor may be passed messages, that are sent to the other end of the pipe
  56. /// which is called the "shim"
  57. /// </summary>
  58. public class NetMQActor : IOutgoingSocket, IReceivingSocket, ISocketPollable, IDisposable
  59. {
  60. /// <summary>
  61. /// The terminate-shim command.
  62. /// This is just the literal string "endPipe".
  63. /// </summary>
  64. public const string EndShimMessage = "endPipe";
  65. #region Action shim handlers
  66. private sealed class ActionShimHandler<T> : IShimHandler
  67. {
  68. private readonly ShimAction<T> m_action;
  69. private readonly T m_state;
  70. /// <summary>
  71. /// Create a new ActionShimHandler with the given type T to serve as the state-information,
  72. /// and the given action to operate upon that type.
  73. /// </summary>
  74. /// <param name="action">a ShimAction of type T that comprises the action to perform</param>
  75. /// <param name="state">the state-information</param>
  76. public ActionShimHandler(ShimAction<T> action, T state)
  77. {
  78. m_action = action;
  79. m_state = state;
  80. }
  81. /// <summary>
  82. /// Perform the action upon the given shim, using our state-information.
  83. /// </summary>
  84. /// <param name="shim">a <see cref="PairSocket"/> that is the shim to perform the action upon</param>
  85. public void Run(PairSocket shim)
  86. {
  87. m_action(shim, m_state);
  88. }
  89. }
  90. private sealed class ActionShimHandler : IShimHandler
  91. {
  92. private readonly ShimAction m_action;
  93. /// <summary>
  94. /// Create a new ActionShimHandler with a given action to operate upon that type.
  95. /// </summary>
  96. /// <param name="action">a ShimAction that comprises the action to perform</param>
  97. public ActionShimHandler(ShimAction action)
  98. {
  99. m_action = action;
  100. }
  101. /// <summary>
  102. /// Perform the action upon the given shim, using our state-information.
  103. /// </summary>
  104. /// <param name="shim">a <see cref="PairSocket"/> that is the shim to perform the action upon</param>
  105. public void Run(PairSocket shim)
  106. {
  107. m_action(shim);
  108. }
  109. }
  110. #endregion
  111. private readonly PairSocket m_self;
  112. private readonly PairSocket m_shim;
  113. private readonly Thread m_shimThread;
  114. private readonly IShimHandler m_shimHandler;
  115. private readonly EventDelegator<NetMQActorEventArgs> m_receiveEvent;
  116. private readonly EventDelegator<NetMQActorEventArgs> m_sendEvent;
  117. private int m_isDisposed;
  118. #region Creating Actor
  119. private NetMQActor(PairSocket self, PairSocket shim, IShimHandler shimHandler)
  120. {
  121. m_shimHandler = shimHandler;
  122. m_self = self;
  123. m_shim = shim;
  124. var args = new NetMQActorEventArgs(this);
  125. void OnReceive(object sender, NetMQSocketEventArgs e) => m_receiveEvent!.Fire(this, args);
  126. void OnSend (object sender, NetMQSocketEventArgs e) => m_sendEvent !.Fire(this, args);
  127. m_receiveEvent = new EventDelegator<NetMQActorEventArgs>(
  128. () => m_self.ReceiveReady += OnReceive,
  129. () => m_self.ReceiveReady -= OnReceive);
  130. m_sendEvent = new EventDelegator<NetMQActorEventArgs>(
  131. () => m_self.SendReady += OnSend,
  132. () => m_self.SendReady -= OnSend);
  133. var random = new Random();
  134. // Bind and connect pipe ends
  135. string actorName;
  136. string endPoint;
  137. while (true)
  138. {
  139. try
  140. {
  141. actorName = $"NetMQActor-{random.Next(0, 10000)}-{random.Next(0, 10000)}";
  142. endPoint = $"inproc://{actorName}";
  143. m_self.Bind(endPoint);
  144. break;
  145. }
  146. catch (AddressAlreadyInUseException)
  147. {
  148. // Loop around and try another random address
  149. }
  150. }
  151. m_shim.Connect(endPoint);
  152. m_shimThread = new Thread(RunShim) { Name = actorName };
  153. m_shimThread.Start();
  154. // Mandatory handshake for new actor so that constructor returns only
  155. // when actor has also initialised. This eliminates timing issues at
  156. // application start up.
  157. m_self.ReceiveSignal();
  158. }
  159. /// <summary>
  160. /// Create a new <see cref="NetMQActor"/> with the given shimHandler.
  161. /// </summary>
  162. /// <param name="shimHandler">an <c>IShimHandler</c> that provides the Run method</param>
  163. /// <returns>the newly-created <c>NetMQActor</c></returns>
  164. public static NetMQActor Create(IShimHandler shimHandler)
  165. {
  166. return new NetMQActor(new PairSocket(), new PairSocket(), shimHandler);
  167. }
  168. /// <summary>
  169. /// Create a new <see cref="NetMQActor"/> with the action, and state-information.
  170. /// </summary>
  171. /// <param name="action">a <c>ShimAction</c> - delegate for the action to perform</param>
  172. /// <param name="state">the state-information - of the generic type T</param>
  173. /// <returns>the newly-created <c>NetMQActor</c></returns>
  174. public static NetMQActor Create<T>(ShimAction<T> action, T state)
  175. {
  176. return new NetMQActor(new PairSocket(), new PairSocket(), new ActionShimHandler<T>(action, state));
  177. }
  178. /// <summary>
  179. /// Create a new <see cref="NetMQActor"/> with the given <see cref="ShimAction"/>.
  180. /// </summary>
  181. /// <param name="action">a <c>ShimAction</c> - delegate for the action to perform</param>
  182. /// <returns>the newly-created <c>NetMQActor</c></returns>
  183. public static NetMQActor Create(ShimAction action)
  184. {
  185. return new NetMQActor(new PairSocket(), new PairSocket(), new ActionShimHandler(action));
  186. }
  187. #endregion
  188. /// <summary>
  189. /// Execute the shim handler's Run method, signal ok and then dispose of the shim.
  190. /// </summary>
  191. private void RunShim()
  192. {
  193. try
  194. {
  195. m_shimHandler.Run(m_shim);
  196. }
  197. catch (TerminatingException)
  198. {
  199. }
  200. // Do not block, if the other end of the pipe is already deleted
  201. m_shim.TrySignalOK();
  202. m_shim.Dispose();
  203. }
  204. /// <summary>
  205. /// Transmit the given Msg over this actor's own socket.
  206. /// </summary>
  207. /// <param name="msg">the <c>Msg</c> to transmit</param>
  208. /// <param name="timeout">The maximum length of time to try and send a message. If <see cref="TimeSpan.Zero"/>, no
  209. /// wait occurs.</param>
  210. /// <param name="more">Indicate if another frame is expected after this frame</param>
  211. /// <returns><c>true</c> if a message was sent, otherwise <c>false</c>.</returns>
  212. /// <exception cref="TerminatingException">The socket has been stopped.</exception>
  213. /// <exception cref="FaultException"><paramref name="msg"/> is not initialised.</exception>
  214. public bool TrySend(ref Msg msg, TimeSpan timeout, bool more)
  215. {
  216. return m_self.TrySend(ref msg, timeout, more);
  217. }
  218. #region IReceivingSocket
  219. /// <summary>
  220. /// Attempt to receive a message for the specified period of time, returning true if successful or false if it times-out.
  221. /// </summary>
  222. /// <param name="msg">a <c>Msg</c> to write the received message into</param>
  223. /// <param name="timeout">a <c>TimeSpan</c> specifying how long to block, waiting for a message, before timing out</param>
  224. /// <returns>true only if a message was indeed received</returns>
  225. public bool TryReceive(ref Msg msg, TimeSpan timeout)
  226. {
  227. return m_self.TryReceive(ref msg, timeout);
  228. }
  229. #endregion
  230. #region Events Handling
  231. /// <summary>
  232. /// This event occurs when at least one message may be received from the socket without blocking.
  233. /// </summary>
  234. public event EventHandler<NetMQActorEventArgs> ReceiveReady
  235. {
  236. add => m_receiveEvent.Event += value;
  237. remove => m_receiveEvent.Event -= value;
  238. }
  239. /// <summary>
  240. /// This event occurs when a message is ready to be transmitted from the socket.
  241. /// </summary>
  242. public event EventHandler<NetMQActorEventArgs> SendReady
  243. {
  244. add => m_sendEvent.Event += value;
  245. remove => m_sendEvent.Event -= value;
  246. }
  247. NetMQSocket ISocketPollable.Socket => m_self;
  248. #endregion
  249. #region Disposing
  250. /// <inheritdoc />
  251. public void Dispose()
  252. {
  253. Dispose(true);
  254. GC.SuppressFinalize(this);
  255. }
  256. /// <summary>
  257. /// Release any contained resources.
  258. /// </summary>
  259. /// <param name="disposing">true if managed resources are to be released</param>
  260. protected virtual void Dispose(bool disposing)
  261. {
  262. if (Interlocked.CompareExchange(ref m_isDisposed, 1, 0) != 0)
  263. return;
  264. if (!disposing)
  265. return;
  266. // send destroy message to pipe
  267. if (m_self.TrySendFrame(EndShimMessage))
  268. m_self.ReceiveSignal();
  269. m_shimThread.Join();
  270. m_self.Dispose();
  271. m_sendEvent.Dispose();
  272. m_receiveEvent.Dispose();
  273. }
  274. /// <inheritdoc />
  275. public bool IsDisposed => m_isDisposed != 0;
  276. #endregion
  277. }
  278. }