NetMQProactor.cs 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. using System;
  2. using NetMQ.Sockets;
  3. namespace NetMQ
  4. {
  5. /// <summary>
  6. /// Class to quickly handle incoming messages of socket.
  7. /// New thread is created to handle the messages. Call dispose to stop the thread.
  8. /// Provided socket will not be disposed by the class.
  9. /// </summary>
  10. public class NetMQProactor : IDisposable
  11. {
  12. private readonly NetMQActor m_actor;
  13. private readonly NetMQSocket m_receiveSocket;
  14. private readonly Action<NetMQSocket, NetMQMessage> m_handler;
  15. private NetMQPoller? m_poller;
  16. /// <summary>
  17. /// Create NetMQProactor and start dedicate thread to handle incoming messages.
  18. /// </summary>
  19. /// <param name="receiveSocket">Socket to handle messages from</param>
  20. /// <param name="handler">Handler to handle incoming messages</param>
  21. public NetMQProactor(NetMQSocket receiveSocket, Action<NetMQSocket, NetMQMessage> handler)
  22. {
  23. m_receiveSocket = receiveSocket;
  24. m_handler = handler;
  25. m_actor = NetMQActor.Create(Run);
  26. }
  27. /// <summary>
  28. /// Stop the proactor. Provided socket will not be disposed.
  29. /// </summary>
  30. public void Dispose()
  31. {
  32. m_actor.Dispose();
  33. m_poller?.Dispose();
  34. }
  35. private void Run(PairSocket shim)
  36. {
  37. shim.ReceiveReady += OnShimReady;
  38. m_receiveSocket.ReceiveReady += OnSocketReady;
  39. m_poller = new NetMQPoller { m_receiveSocket, shim };
  40. shim.SignalOK();
  41. m_poller.Run();
  42. m_receiveSocket.ReceiveReady -= OnSocketReady;
  43. }
  44. private void OnShimReady(object sender, NetMQSocketEventArgs e)
  45. {
  46. Assumes.NotNull(m_poller);
  47. string command = e.Socket.ReceiveFrameString();
  48. if (command == NetMQActor.EndShimMessage)
  49. {
  50. m_poller.Stop();
  51. }
  52. }
  53. private void OnSocketReady(object sender, NetMQSocketEventArgs e)
  54. {
  55. NetMQMessage message = m_receiveSocket.ReceiveMultipartMessage();
  56. m_handler(m_receiveSocket, message);
  57. }
  58. }
  59. }