NetMQBeacon.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. using System;
  2. using System.Diagnostics.CodeAnalysis;
  3. using System.Net;
  4. using System.Net.Sockets;
  5. using System.Runtime.InteropServices;
  6. using System.Text;
  7. using System.Threading;
  8. using NetMQ.Sockets;
  9. namespace NetMQ
  10. {
  11. /// <summary>
  12. /// A NetMQBeaconEventArgs is an EventArgs that provides a property that holds a NetMQBeacon.
  13. /// </summary>
  14. public class NetMQBeaconEventArgs : EventArgs
  15. {
  16. /// <summary>
  17. /// Create a new NetMQBeaconEventArgs object containing the given NetMQBeacon.
  18. /// </summary>
  19. /// <param name="beacon">the NetMQBeacon object to hold a reference to</param>
  20. public NetMQBeaconEventArgs(NetMQBeacon beacon)
  21. {
  22. Beacon = beacon;
  23. }
  24. /// <summary>
  25. /// Get the NetMQBeacon object that this holds.
  26. /// </summary>
  27. public NetMQBeacon Beacon { get; }
  28. }
  29. /// <summary>
  30. /// NetMQBeacon implements a peer-to-peer discovery service for local networks.
  31. /// </summary>
  32. /// <remarks>
  33. /// A beacon can broadcast and/or capture service announcements using UDP messages on the local area network.
  34. /// You can define the format of your outgoing beacons, and set a filter that validates incoming beacons.
  35. /// Beacons are sent and received asynchronously in the background.
  36. ///
  37. /// We can use the NetMQBeacon to discover and connect to other NetMQ/CZMQ services in the network automatically
  38. /// without central configuration. Please note that to use NetMQBeacon your infrastructure must support broadcast.
  39. /// Most cloud providers don't support broadcast.
  40. /// </remarks>
  41. public sealed class NetMQBeacon : IDisposable, ISocketPollable
  42. {
  43. private const int UdpFrameMax = 255;
  44. private const string ConfigureCommand = "CONFIGURE";
  45. private const string PublishCommand = "PUBLISH";
  46. private const string SilenceCommand = "SILENCE";
  47. private const string SubscribeCommand = "SUBSCRIBE";
  48. private const string UnsubscribeCommand = "UNSUBSCRIBE";
  49. #region Nested class: Shim
  50. private sealed class Shim : IShimHandler
  51. {
  52. private NetMQSocket? m_pipe;
  53. private Socket? m_udpSocket;
  54. private int m_udpPort;
  55. private EndPoint? m_broadcastAddress;
  56. private NetMQFrame? m_transmit;
  57. private NetMQFrame? m_filter;
  58. private NetMQTimer? m_pingTimer;
  59. private NetMQPoller? m_poller;
  60. private void Configure(string interfaceName, int port)
  61. {
  62. Assumes.NotNull(m_poller);
  63. Assumes.NotNull(m_pipe);
  64. // In case the beacon was configured twice
  65. if (m_udpSocket != null)
  66. {
  67. m_poller.Remove(m_udpSocket);
  68. #if NET35
  69. m_udpSocket.Close();
  70. #else
  71. m_udpSocket.Dispose();
  72. #endif
  73. }
  74. m_udpPort = port;
  75. m_udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
  76. m_poller.Add(m_udpSocket, OnUdpReady);
  77. // Ask operating system for broadcast permissions on socket
  78. m_udpSocket.EnableBroadcast = true;
  79. // Allow multiple owners to bind to socket; incoming
  80. // messages will replicate to each owner
  81. m_udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
  82. IPAddress? bindTo = null;
  83. IPAddress? sendTo = null;
  84. if (interfaceName == "*")
  85. {
  86. bindTo = IPAddress.Any;
  87. sendTo = IPAddress.Broadcast;
  88. }
  89. else if (interfaceName == "loopback")
  90. {
  91. bindTo = IPAddress.Loopback;
  92. sendTo = IPAddress.Broadcast;
  93. }
  94. else
  95. {
  96. var interfaceCollection = new InterfaceCollection();
  97. var interfaceAddress = !string.IsNullOrEmpty(interfaceName)
  98. ? IPAddress.Parse(interfaceName)
  99. : null;
  100. foreach (var @interface in interfaceCollection)
  101. {
  102. if (interfaceAddress == null || @interface.Address.Equals(interfaceAddress))
  103. {
  104. // because windows and unix differ in how they handle broadcast addressing this needs to be platform specific
  105. // on windows any interface can receive broadcast by requesting to enable broadcast on the socket
  106. // on linux to receive broadcast you must bind to the broadcast address specifically
  107. //bindTo = @interface.Address;
  108. sendTo = @interface.BroadcastAddress;
  109. #if NET45 || NET47
  110. if (Environment.OSVersion.Platform==PlatformID.Unix)
  111. #else
  112. if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
  113. #endif
  114. {
  115. bindTo = @interface.BroadcastAddress;
  116. }
  117. else
  118. {
  119. bindTo = @interface.Address;
  120. }
  121. sendTo = @interface.BroadcastAddress;
  122. break;
  123. }
  124. }
  125. }
  126. if (bindTo != null)
  127. {
  128. m_broadcastAddress = new IPEndPoint(sendTo, m_udpPort);
  129. m_udpSocket.Bind(new IPEndPoint(bindTo, m_udpPort));
  130. }
  131. m_pipe.SendFrame(bindTo?.ToString() ?? "");
  132. }
  133. private static bool Compare(NetMQFrame a, NetMQFrame b, int size)
  134. {
  135. for (int i = 0; i < size; i++)
  136. {
  137. if (a.Buffer[i] != b.Buffer[i])
  138. return false;
  139. }
  140. return true;
  141. }
  142. public void Run(PairSocket shim)
  143. {
  144. m_pipe = shim;
  145. shim.SignalOK();
  146. m_pipe.ReceiveReady += OnPipeReady;
  147. m_pingTimer = new NetMQTimer(interval: TimeSpan.Zero);
  148. m_pingTimer.Elapsed += PingElapsed;
  149. m_pingTimer.Enable = false;
  150. using (m_poller = new NetMQPoller { m_pipe, m_pingTimer })
  151. {
  152. m_poller.Run();
  153. }
  154. // the beacon might never been configured
  155. #if NET35
  156. m_udpSocket?.Close();
  157. #else
  158. m_udpSocket?.Dispose();
  159. #endif
  160. }
  161. private void PingElapsed(object sender, NetMQTimerEventArgs e)
  162. {
  163. Assumes.NotNull(m_transmit);
  164. SendUdpFrame(m_transmit);
  165. }
  166. private void OnUdpReady(Socket socket)
  167. {
  168. Assumes.NotNull(m_pipe);
  169. if (!TryReceiveUdpFrame(out NetMQFrame? frame, out string? peerName))
  170. return;
  171. // If filter is set, check that beacon matches it
  172. var isValid = frame.MessageSize >= m_filter?.MessageSize && Compare(frame, m_filter, m_filter.MessageSize);
  173. // If valid, discard our own broadcasts, which UDP echoes to us
  174. if (isValid && m_transmit != null)
  175. {
  176. if (frame.MessageSize == m_transmit.MessageSize && Compare(frame, m_transmit, m_transmit.MessageSize))
  177. {
  178. isValid = false;
  179. }
  180. }
  181. // If still a valid beacon, send on to the API
  182. if (isValid)
  183. {
  184. m_pipe.SendMoreFrame(peerName).SendFrame(frame.Buffer, frame.MessageSize);
  185. }
  186. }
  187. private void OnPipeReady(object sender, NetMQSocketEventArgs e)
  188. {
  189. Assumes.NotNull(m_pipe);
  190. Assumes.NotNull(m_pingTimer);
  191. Assumes.NotNull(m_poller);
  192. NetMQMessage message = m_pipe.ReceiveMultipartMessage();
  193. string command = message.Pop().ConvertToString();
  194. switch (command)
  195. {
  196. case ConfigureCommand:
  197. string interfaceName = message.Pop().ConvertToString();
  198. int port = message.Pop().ConvertToInt32();
  199. Configure(interfaceName, port);
  200. break;
  201. case PublishCommand:
  202. m_transmit = message.Pop();
  203. m_pingTimer.Interval = message.Pop().ConvertToInt32();
  204. m_pingTimer.Enable = true;
  205. SendUdpFrame(m_transmit);
  206. break;
  207. case SilenceCommand:
  208. m_transmit = null;
  209. m_pingTimer.Enable = false;
  210. break;
  211. case SubscribeCommand:
  212. m_filter = message.Pop();
  213. break;
  214. case UnsubscribeCommand:
  215. m_filter = null;
  216. break;
  217. case NetMQActor.EndShimMessage:
  218. m_poller.Stop();
  219. break;
  220. default:
  221. throw new ArgumentOutOfRangeException();
  222. }
  223. }
  224. private void SendUdpFrame(NetMQFrame frame)
  225. {
  226. Assumes.NotNull(m_udpSocket);
  227. try
  228. {
  229. m_udpSocket.SendTo(frame.Buffer, 0, frame.MessageSize, SocketFlags.None, m_broadcastAddress);
  230. }
  231. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.AddressNotAvailable)
  232. {
  233. // Initiate Creation of new Udp here to solve issue related to 'sudden' network change.
  234. // On windows (7 OR 10) incorrect/previous ip address might still exist instead of new Ip
  235. // due to network change which causes crash (if no try/catch and keep trying to send to incorrect/not available address.
  236. // This approach would solve the issue...
  237. }
  238. }
  239. private bool TryReceiveUdpFrame([NotNullWhen(returnValue: true)] out NetMQFrame? frame, [NotNullWhen(returnValue: true)] out string? peerName)
  240. {
  241. Assumes.NotNull(m_udpSocket);
  242. var buffer = new byte[UdpFrameMax];
  243. EndPoint peer = new IPEndPoint(IPAddress.Any, 0);
  244. int bytesRead = 0;
  245. try
  246. {
  247. bytesRead = m_udpSocket.ReceiveFrom(buffer, ref peer);
  248. }
  249. catch (SocketException ex) when (ex.SocketErrorCode == SocketError.MessageSize)
  250. {
  251. frame = default;
  252. peerName = null;
  253. return false;
  254. }
  255. peerName = peer.ToString();
  256. frame = new NetMQFrame(buffer, bytesRead);
  257. return true;
  258. }
  259. }
  260. #endregion
  261. private readonly NetMQActor m_actor;
  262. private readonly EventDelegator<NetMQBeaconEventArgs> m_receiveEvent;
  263. private string? m_boundTo;
  264. private string? m_hostName;
  265. private int m_isDisposed;
  266. /// <summary>
  267. /// Create a new NetMQBeacon.
  268. /// </summary>
  269. public NetMQBeacon()
  270. {
  271. m_actor = NetMQActor.Create(new Shim());
  272. void OnReceive(object sender, NetMQActorEventArgs e) => m_receiveEvent!.Fire(this, new NetMQBeaconEventArgs(this));
  273. m_receiveEvent = new EventDelegator<NetMQBeaconEventArgs>(
  274. () => m_actor.ReceiveReady += OnReceive,
  275. () => m_actor.ReceiveReady -= OnReceive);
  276. }
  277. /// <summary>
  278. /// Get the host name this beacon is bound to.
  279. /// </summary>
  280. /// <remarks>
  281. /// This may involve a reverse DNS lookup which can take a second or two.
  282. /// <para/>
  283. /// An empty string is returned if:
  284. /// <list type="bullet">
  285. /// <item>the beacon is not bound,</item>
  286. /// <item>the beacon is bound to all interfaces,</item>
  287. /// <item>an error occurred during reverse DNS lookup.</item>
  288. /// </list>
  289. /// </remarks>
  290. public string? HostName
  291. {
  292. get
  293. {
  294. if (m_hostName != null)
  295. return m_hostName;
  296. // create a copy for thread safety
  297. var boundTo = m_boundTo;
  298. if (boundTo == null)
  299. return null;
  300. if (IPAddress.Any.ToString() == boundTo || IPAddress.IPv6Any.ToString() == boundTo)
  301. return m_hostName = string.Empty;
  302. try
  303. {
  304. return m_hostName = Dns.GetHostEntry(boundTo).HostName;
  305. }
  306. catch
  307. {
  308. return m_hostName = string.Empty;
  309. }
  310. }
  311. }
  312. /// <summary>
  313. /// Get the IP address this beacon is bound to.
  314. /// </summary>
  315. public string? BoundTo => m_boundTo;
  316. /// <summary>
  317. /// Get the socket of the contained actor.
  318. /// </summary>
  319. NetMQSocket ISocketPollable.Socket => ((ISocketPollable)m_actor).Socket;
  320. /// <summary>
  321. /// This event occurs when at least one message may be received from the socket without blocking.
  322. /// </summary>
  323. public event EventHandler<NetMQBeaconEventArgs> ReceiveReady
  324. {
  325. add => m_receiveEvent.Event += value;
  326. remove => m_receiveEvent.Event -= value;
  327. }
  328. /// <summary>
  329. /// Configure beacon for the specified port on all interfaces.
  330. /// </summary>
  331. /// <remarks>Blocks until the bind operation completes.</remarks>
  332. /// <param name="port">The UDP port to bind to.</param>
  333. public void ConfigureAllInterfaces(int port)
  334. {
  335. Configure(port, "*");
  336. }
  337. /// <summary>
  338. /// Configure beacon for the specified port and, optionally, to a specific interface.
  339. /// </summary>
  340. /// <remarks>Blocks until the bind operation completes.</remarks>
  341. /// <param name="port">The UDP port to bind to.</param>
  342. /// <param name="interfaceName">IP address of the interface to bind to. Pass empty string (the default value) to use the default interface.</param>
  343. public void Configure(int port, string interfaceName = "")
  344. {
  345. var message = new NetMQMessage();
  346. message.Append(ConfigureCommand);
  347. message.Append(interfaceName);
  348. message.Append(port);
  349. m_actor.SendMultipartMessage(message);
  350. m_boundTo = m_actor.ReceiveFrameString();
  351. m_hostName = null;
  352. }
  353. /// <summary>
  354. /// Publish beacon immediately and continue to publish when interval elapsed
  355. /// </summary>
  356. /// <param name="transmit">Beacon to transmit.</param>
  357. /// <param name="interval">Interval to transmit beacon</param>
  358. /// <param name="encoding">Encoding for <paramref name="transmit"/>. Defaults to <see cref="Encoding.UTF8"/>.</param>
  359. public void Publish(string transmit, TimeSpan interval, Encoding? encoding = null)
  360. {
  361. Publish((encoding ?? Encoding.UTF8).GetBytes(transmit), interval);
  362. }
  363. /// <summary>
  364. /// Publish beacon immediately and continue to publish when interval elapsed
  365. /// </summary>
  366. /// <param name="transmit">Beacon to transmit</param>
  367. /// <param name="interval">Interval to transmit beacon</param>
  368. public void Publish(byte[] transmit, TimeSpan interval)
  369. {
  370. var message = new NetMQMessage();
  371. message.Append(PublishCommand);
  372. message.Append(transmit);
  373. message.Append((int)interval.TotalMilliseconds);
  374. m_actor.SendMultipartMessage(message);
  375. }
  376. /// <summary>
  377. /// Publish beacon immediately and continue to publish every second
  378. /// </summary>
  379. /// <param name="transmit">Beacon to transmit</param>
  380. /// <param name="encoding">Encoding for <paramref name="transmit"/>. Defaults to <see cref="Encoding.UTF8"/>.</param>
  381. public void Publish(string transmit, Encoding? encoding = null)
  382. {
  383. Publish(transmit, TimeSpan.FromSeconds(1), encoding);
  384. }
  385. /// <summary>
  386. /// Publish beacon immediately and continue to publish every second
  387. /// </summary>
  388. /// <param name="transmit">Beacon to transmit</param>
  389. public void Publish(byte[] transmit)
  390. {
  391. Publish(transmit, TimeSpan.FromSeconds(1));
  392. }
  393. /// <summary>
  394. /// Stop publishing beacons.
  395. /// </summary>
  396. public void Silence()
  397. {
  398. m_actor.SendFrame(SilenceCommand);
  399. }
  400. /// <summary>
  401. /// Subscribe to beacon messages that match the specified filter.
  402. /// </summary>
  403. /// <remarks>
  404. /// Beacons must prefix-match with <paramref name="filter"/>.
  405. /// Any previous subscription is replaced by this one.
  406. /// </remarks>
  407. /// <param name="filter">Beacon will be filtered by this</param>
  408. public void Subscribe(string filter)
  409. {
  410. m_actor.SendMoreFrame(SubscribeCommand).SendFrame(filter);
  411. }
  412. /// <summary>
  413. /// Unsubscribe to beacon messages
  414. /// </summary>
  415. public void Unsubscribe()
  416. {
  417. m_actor.SendFrame(UnsubscribeCommand);
  418. }
  419. /// <summary>
  420. /// Receives the next beacon message, blocking until it arrives.
  421. /// </summary>
  422. public BeaconMessage Receive()
  423. {
  424. var peerName = m_actor.ReceiveFrameString();
  425. var bytes = m_actor.ReceiveFrameBytes();
  426. return new BeaconMessage(bytes, peerName);
  427. }
  428. /// <summary>
  429. /// Receives the next beacon message if one is available before <paramref name="timeout"/> expires.
  430. /// </summary>
  431. /// <param name="timeout">The maximum amount of time to wait for the next beacon message.</param>
  432. /// <param name="message">The received beacon message.</param>
  433. /// <returns><c>true</c> if a beacon message was received, otherwise <c>false</c>.</returns>
  434. public bool TryReceive(TimeSpan timeout, out BeaconMessage message)
  435. {
  436. if (!m_actor.TryReceiveFrameString(timeout, out string? peerName))
  437. {
  438. message = default(BeaconMessage);
  439. return false;
  440. }
  441. var bytes = m_actor.ReceiveFrameBytes();
  442. message = new BeaconMessage(bytes, peerName);
  443. return true;
  444. }
  445. /// <inheritdoc />
  446. public void Dispose()
  447. {
  448. if (Interlocked.CompareExchange(ref m_isDisposed, 1, 0) != 0)
  449. return;
  450. m_actor.Dispose();
  451. m_receiveEvent.Dispose();
  452. }
  453. /// <inheritdoc />
  454. public bool IsDisposed => m_isDisposed != 0;
  455. }
  456. /// <summary>
  457. /// Contents of a message received from a beacon.
  458. /// </summary>
  459. public struct BeaconMessage
  460. {
  461. /// <summary>
  462. /// THe beacon content as a byte array.
  463. /// </summary>
  464. public byte[] Bytes { get; }
  465. /// <summary>
  466. /// The address of the peer that sent this message. Includes host name and port number.
  467. /// </summary>
  468. public string PeerAddress { get; }
  469. internal BeaconMessage(byte[] bytes, string peerAddress) : this()
  470. {
  471. Bytes = bytes;
  472. PeerAddress = peerAddress;
  473. }
  474. /// <summary>
  475. /// The beacon content as a string.
  476. /// </summary>
  477. /// <remarks>Decoded using <see cref="Encoding.UTF8"/>. Other encodings may be used with <see cref="Bytes"/> directly.</remarks>
  478. public string String => Encoding.UTF8.GetString(Bytes);
  479. /// <summary>
  480. /// The host name of the peer that sent this message.
  481. /// </summary>
  482. /// <remarks>This is simply the value of <see cref="PeerAddress"/> without the port number.</remarks>
  483. public string PeerHost
  484. {
  485. get
  486. {
  487. var i = PeerAddress.IndexOf(':');
  488. return i == -1 ? PeerAddress : PeerAddress.Substring(0, i);
  489. }
  490. }
  491. }
  492. }