Proxy.cs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. using System;
  2. using System.Diagnostics;
  3. using System.Threading;
  4. namespace NetMQ
  5. {
  6. /// <summary>
  7. /// Forwards messages bidirectionally between two sockets. You can also specify a control socket tn which proxied messages will be sent.
  8. /// </summary>
  9. /// <remarks>
  10. /// This class must be explicitly started by calling <see cref="Start"/>. If an external <see cref="NetMQPoller"/> has been specified,
  11. /// then that call will block until <see cref="Stop"/> is called.
  12. /// <para/>
  13. /// If using an external <see cref="NetMQPoller"/>, ensure the front and back end sockets have been added to it.
  14. /// <para/>
  15. /// Users of this class must call <see cref="Stop"/> when messages should no longer be proxied.
  16. /// </remarks>
  17. public class Proxy
  18. {
  19. private readonly NetMQSocket m_frontend;
  20. private readonly NetMQSocket m_backend;
  21. private readonly NetMQSocket? m_controlIn;
  22. private readonly NetMQSocket? m_controlOut;
  23. private INetMQPoller? m_poller;
  24. private readonly bool m_externalPoller;
  25. private int m_state = StateStopped;
  26. private const int StateStopped = 0;
  27. private const int StateStarting = 1;
  28. private const int StateStarted = 2;
  29. private const int StateStopping = 3;
  30. /// <summary>
  31. /// Create a new instance of a Proxy (NetMQ.Proxy)
  32. /// with the given sockets to serve as a front-end, a back-end, and a control socket.
  33. /// </summary>
  34. /// <param name="frontend">the socket that messages will be forwarded from</param>
  35. /// <param name="backend">the socket that messages will be forwarded to</param>
  36. /// <param name="controlIn">this socket will have incoming messages also sent to it - you can set this to null if not needed</param>
  37. /// <param name="controlOut">this socket will have outgoing messages also sent to it - you can set this to null if not needed</param>
  38. /// <param name="poller">an optional external poller to use within this proxy</param>
  39. public Proxy(NetMQSocket frontend, NetMQSocket backend, NetMQSocket? controlIn, NetMQSocket? controlOut, INetMQPoller? poller = null)
  40. {
  41. if (poller != null)
  42. {
  43. m_externalPoller = true;
  44. m_poller = poller;
  45. }
  46. m_frontend = frontend;
  47. m_backend = backend;
  48. m_controlIn = controlIn;
  49. m_controlOut = controlOut ?? controlIn;
  50. }
  51. /// <summary>
  52. /// Create a new instance of a Proxy (NetMQ.Proxy)
  53. /// with the given sockets to serve as a front-end, a back-end, and a control socket.
  54. /// </summary>
  55. /// <param name="frontend">the socket that messages will be forwarded from</param>
  56. /// <param name="backend">the socket that messages will be forwarded to</param>
  57. /// <param name="control">this socket will have messages also sent to it - you can set this to null if not needed</param>
  58. /// <param name="poller">an optional external poller to use within this proxy</param>
  59. /// <exception cref="InvalidOperationException"><paramref name="poller"/> is not <c>null</c> and either <paramref name="frontend"/> or <paramref name="backend"/> are not contained within it.</exception>
  60. public Proxy(NetMQSocket frontend, NetMQSocket backend, NetMQSocket? control = null, INetMQPoller? poller = null)
  61. : this(frontend, backend, control, null, poller)
  62. {}
  63. /// <summary>
  64. /// Start proxying messages between the front and back ends. Blocks, unless using an external <see cref="NetMQPoller"/>.
  65. /// </summary>
  66. /// <exception cref="InvalidOperationException">The proxy has already been started.</exception>
  67. public void Start()
  68. {
  69. if (Interlocked.CompareExchange(ref m_state, StateStarting, StateStopped) != StateStopped)
  70. throw new InvalidOperationException("Proxy has already been started");
  71. m_frontend.ReceiveReady += OnFrontendReady;
  72. m_backend.ReceiveReady += OnBackendReady;
  73. if (m_externalPoller)
  74. {
  75. m_state = StateStarted;
  76. }
  77. else
  78. {
  79. m_poller = new NetMQPoller { m_frontend, m_backend };
  80. m_state = StateStarted;
  81. m_poller.Run();
  82. }
  83. }
  84. /// <summary>
  85. /// Stops the proxy, blocking until the underlying <see cref="NetMQPoller"/> has completed.
  86. /// </summary>
  87. /// <exception cref="InvalidOperationException">The proxy has not been started.</exception>
  88. public void Stop()
  89. {
  90. if (Interlocked.CompareExchange(ref m_state, StateStopping, StateStarted) != StateStarted)
  91. throw new InvalidOperationException("Proxy has not been started");
  92. if (!m_externalPoller)
  93. {
  94. Assumes.NotNull(m_poller);
  95. m_poller.Stop();
  96. m_poller.Dispose();
  97. m_poller = null;
  98. }
  99. m_frontend.ReceiveReady -= OnFrontendReady;
  100. m_backend.ReceiveReady -= OnBackendReady;
  101. m_state = StateStopped;
  102. }
  103. private void OnFrontendReady(object sender, NetMQSocketEventArgs e) => ProxyBetween(m_frontend, m_backend, m_controlIn);
  104. private void OnBackendReady (object sender, NetMQSocketEventArgs e) => ProxyBetween(m_backend, m_frontend, m_controlOut);
  105. private static void ProxyBetween(IReceivingSocket from, IOutgoingSocket to, IOutgoingSocket? control)
  106. {
  107. var msg = new Msg();
  108. msg.InitEmpty();
  109. var copy = new Msg();
  110. copy.InitEmpty();
  111. while (true)
  112. {
  113. from.Receive(ref msg);
  114. var more = msg.HasMore;
  115. if (control != null)
  116. {
  117. copy.Copy(ref msg);
  118. control.Send(ref copy, more);
  119. }
  120. to.Send(ref msg, more);
  121. if (!more)
  122. break;
  123. }
  124. copy.Close();
  125. msg.Close();
  126. }
  127. }
  128. }