123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- using System;
- using System.Diagnostics.CodeAnalysis;
- using System.Net;
- using System.Net.Sockets;
- using System.Runtime.InteropServices;
- using System.Text;
- using System.Threading;
- using NetMQ.Sockets;
- namespace NetMQ
- {
- /// <summary>
- /// A NetMQBeaconEventArgs is an EventArgs that provides a property that holds a NetMQBeacon.
- /// </summary>
- public class NetMQBeaconEventArgs : EventArgs
- {
- /// <summary>
- /// Create a new NetMQBeaconEventArgs object containing the given NetMQBeacon.
- /// </summary>
- /// <param name="beacon">the NetMQBeacon object to hold a reference to</param>
- public NetMQBeaconEventArgs(NetMQBeacon beacon)
- {
- Beacon = beacon;
- }
- /// <summary>
- /// Get the NetMQBeacon object that this holds.
- /// </summary>
- public NetMQBeacon Beacon { get; }
- }
- /// <summary>
- /// NetMQBeacon implements a peer-to-peer discovery service for local networks.
- /// </summary>
- /// <remarks>
- /// A beacon can broadcast and/or capture service announcements using UDP messages on the local area network.
- /// You can define the format of your outgoing beacons, and set a filter that validates incoming beacons.
- /// Beacons are sent and received asynchronously in the background.
- ///
- /// We can use the NetMQBeacon to discover and connect to other NetMQ/CZMQ services in the network automatically
- /// without central configuration. Please note that to use NetMQBeacon your infrastructure must support broadcast.
- /// Most cloud providers don't support broadcast.
- /// </remarks>
- public sealed class NetMQBeacon : IDisposable, ISocketPollable
- {
- private const int UdpFrameMax = 255;
- private const string ConfigureCommand = "CONFIGURE";
- private const string PublishCommand = "PUBLISH";
- private const string SilenceCommand = "SILENCE";
- private const string SubscribeCommand = "SUBSCRIBE";
- private const string UnsubscribeCommand = "UNSUBSCRIBE";
- #region Nested class: Shim
- private sealed class Shim : IShimHandler
- {
- private NetMQSocket? m_pipe;
- private Socket? m_udpSocket;
- private int m_udpPort;
- private EndPoint? m_broadcastAddress;
- private NetMQFrame? m_transmit;
- private NetMQFrame? m_filter;
- private NetMQTimer? m_pingTimer;
- private NetMQPoller? m_poller;
- private void Configure(string interfaceName, int port)
- {
- Assumes.NotNull(m_poller);
- Assumes.NotNull(m_pipe);
- // In case the beacon was configured twice
- if (m_udpSocket != null)
- {
- m_poller.Remove(m_udpSocket);
- #if NET35
- m_udpSocket.Close();
- #else
- m_udpSocket.Dispose();
- #endif
- }
- m_udpPort = port;
- m_udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
- m_poller.Add(m_udpSocket, OnUdpReady);
- // Ask operating system for broadcast permissions on socket
- m_udpSocket.EnableBroadcast = true;
- // Allow multiple owners to bind to socket; incoming
- // messages will replicate to each owner
- m_udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
- IPAddress? bindTo = null;
- IPAddress? sendTo = null;
- if (interfaceName == "*")
- {
- bindTo = IPAddress.Any;
- sendTo = IPAddress.Broadcast;
- }
- else if (interfaceName == "loopback")
- {
- bindTo = IPAddress.Loopback;
- sendTo = IPAddress.Broadcast;
- }
- else
- {
- var interfaceCollection = new InterfaceCollection();
- var interfaceAddress = !string.IsNullOrEmpty(interfaceName)
- ? IPAddress.Parse(interfaceName)
- : null;
- foreach (var @interface in interfaceCollection)
- {
- if (interfaceAddress == null || @interface.Address.Equals(interfaceAddress))
- {
- // because windows and unix differ in how they handle broadcast addressing this needs to be platform specific
- // on windows any interface can receive broadcast by requesting to enable broadcast on the socket
- // on linux to receive broadcast you must bind to the broadcast address specifically
- //bindTo = @interface.Address;
- sendTo = @interface.BroadcastAddress;
- #if NET45 || NET47
- if (Environment.OSVersion.Platform==PlatformID.Unix)
- #else
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
- #endif
- {
- bindTo = @interface.BroadcastAddress;
- }
- else
- {
- bindTo = @interface.Address;
- }
- sendTo = @interface.BroadcastAddress;
- break;
- }
- }
- }
- if (bindTo != null)
- {
- m_broadcastAddress = new IPEndPoint(sendTo, m_udpPort);
- m_udpSocket.Bind(new IPEndPoint(bindTo, m_udpPort));
- }
- m_pipe.SendFrame(bindTo?.ToString() ?? "");
- }
- private static bool Compare(NetMQFrame a, NetMQFrame b, int size)
- {
- for (int i = 0; i < size; i++)
- {
- if (a.Buffer[i] != b.Buffer[i])
- return false;
- }
- return true;
- }
- public void Run(PairSocket shim)
- {
- m_pipe = shim;
- shim.SignalOK();
- m_pipe.ReceiveReady += OnPipeReady;
- m_pingTimer = new NetMQTimer(interval: TimeSpan.Zero);
- m_pingTimer.Elapsed += PingElapsed;
- m_pingTimer.Enable = false;
- using (m_poller = new NetMQPoller { m_pipe, m_pingTimer })
- {
- m_poller.Run();
- }
- // the beacon might never been configured
- #if NET35
- m_udpSocket?.Close();
- #else
- m_udpSocket?.Dispose();
- #endif
- }
- private void PingElapsed(object sender, NetMQTimerEventArgs e)
- {
- Assumes.NotNull(m_transmit);
- SendUdpFrame(m_transmit);
- }
- private void OnUdpReady(Socket socket)
- {
- Assumes.NotNull(m_pipe);
- if (!TryReceiveUdpFrame(out NetMQFrame? frame, out string? peerName))
- return;
- // If filter is set, check that beacon matches it
- var isValid = frame.MessageSize >= m_filter?.MessageSize && Compare(frame, m_filter, m_filter.MessageSize);
- // If valid, discard our own broadcasts, which UDP echoes to us
- if (isValid && m_transmit != null)
- {
- if (frame.MessageSize == m_transmit.MessageSize && Compare(frame, m_transmit, m_transmit.MessageSize))
- {
- isValid = false;
- }
- }
- // If still a valid beacon, send on to the API
- if (isValid)
- {
- m_pipe.SendMoreFrame(peerName).SendFrame(frame.Buffer, frame.MessageSize);
- }
- }
- private void OnPipeReady(object sender, NetMQSocketEventArgs e)
- {
- Assumes.NotNull(m_pipe);
- Assumes.NotNull(m_pingTimer);
- Assumes.NotNull(m_poller);
- NetMQMessage message = m_pipe.ReceiveMultipartMessage();
- string command = message.Pop().ConvertToString();
- switch (command)
- {
- case ConfigureCommand:
- string interfaceName = message.Pop().ConvertToString();
- int port = message.Pop().ConvertToInt32();
- Configure(interfaceName, port);
- break;
- case PublishCommand:
- m_transmit = message.Pop();
- m_pingTimer.Interval = message.Pop().ConvertToInt32();
- m_pingTimer.Enable = true;
- SendUdpFrame(m_transmit);
- break;
- case SilenceCommand:
- m_transmit = null;
- m_pingTimer.Enable = false;
- break;
- case SubscribeCommand:
- m_filter = message.Pop();
- break;
- case UnsubscribeCommand:
- m_filter = null;
- break;
- case NetMQActor.EndShimMessage:
- m_poller.Stop();
- break;
- default:
- throw new ArgumentOutOfRangeException();
- }
- }
- private void SendUdpFrame(NetMQFrame frame)
- {
- Assumes.NotNull(m_udpSocket);
- try
- {
- m_udpSocket.SendTo(frame.Buffer, 0, frame.MessageSize, SocketFlags.None, m_broadcastAddress);
- }
- catch (SocketException ex) when (ex.SocketErrorCode == SocketError.AddressNotAvailable)
- {
- // Initiate Creation of new Udp here to solve issue related to 'sudden' network change.
- // On windows (7 OR 10) incorrect/previous ip address might still exist instead of new Ip
- // due to network change which causes crash (if no try/catch and keep trying to send to incorrect/not available address.
- // This approach would solve the issue...
- }
- }
- private bool TryReceiveUdpFrame([NotNullWhen(returnValue: true)] out NetMQFrame? frame, [NotNullWhen(returnValue: true)] out string? peerName)
- {
- Assumes.NotNull(m_udpSocket);
- var buffer = new byte[UdpFrameMax];
- EndPoint peer = new IPEndPoint(IPAddress.Any, 0);
- int bytesRead = 0;
- try
- {
- bytesRead = m_udpSocket.ReceiveFrom(buffer, ref peer);
- }
- catch (SocketException ex) when (ex.SocketErrorCode == SocketError.MessageSize)
- {
- frame = default;
- peerName = null;
- return false;
- }
- peerName = peer.ToString();
- frame = new NetMQFrame(buffer, bytesRead);
- return true;
- }
- }
- #endregion
- private readonly NetMQActor m_actor;
- private readonly EventDelegator<NetMQBeaconEventArgs> m_receiveEvent;
- private string? m_boundTo;
- private string? m_hostName;
- private int m_isDisposed;
- /// <summary>
- /// Create a new NetMQBeacon.
- /// </summary>
- public NetMQBeacon()
- {
- m_actor = NetMQActor.Create(new Shim());
- void OnReceive(object sender, NetMQActorEventArgs e) => m_receiveEvent!.Fire(this, new NetMQBeaconEventArgs(this));
- m_receiveEvent = new EventDelegator<NetMQBeaconEventArgs>(
- () => m_actor.ReceiveReady += OnReceive,
- () => m_actor.ReceiveReady -= OnReceive);
- }
- /// <summary>
- /// Get the host name this beacon is bound to.
- /// </summary>
- /// <remarks>
- /// This may involve a reverse DNS lookup which can take a second or two.
- /// <para/>
- /// An empty string is returned if:
- /// <list type="bullet">
- /// <item>the beacon is not bound,</item>
- /// <item>the beacon is bound to all interfaces,</item>
- /// <item>an error occurred during reverse DNS lookup.</item>
- /// </list>
- /// </remarks>
- public string? HostName
- {
- get
- {
- if (m_hostName != null)
- return m_hostName;
- // create a copy for thread safety
- var boundTo = m_boundTo;
- if (boundTo == null)
- return null;
- if (IPAddress.Any.ToString() == boundTo || IPAddress.IPv6Any.ToString() == boundTo)
- return m_hostName = string.Empty;
- try
- {
- return m_hostName = Dns.GetHostEntry(boundTo).HostName;
- }
- catch
- {
- return m_hostName = string.Empty;
- }
- }
- }
- /// <summary>
- /// Get the IP address this beacon is bound to.
- /// </summary>
- public string? BoundTo => m_boundTo;
- /// <summary>
- /// Get the socket of the contained actor.
- /// </summary>
- NetMQSocket ISocketPollable.Socket => ((ISocketPollable)m_actor).Socket;
- /// <summary>
- /// This event occurs when at least one message may be received from the socket without blocking.
- /// </summary>
- public event EventHandler<NetMQBeaconEventArgs> ReceiveReady
- {
- add => m_receiveEvent.Event += value;
- remove => m_receiveEvent.Event -= value;
- }
- /// <summary>
- /// Configure beacon for the specified port on all interfaces.
- /// </summary>
- /// <remarks>Blocks until the bind operation completes.</remarks>
- /// <param name="port">The UDP port to bind to.</param>
- public void ConfigureAllInterfaces(int port)
- {
- Configure(port, "*");
- }
- /// <summary>
- /// Configure beacon for the specified port and, optionally, to a specific interface.
- /// </summary>
- /// <remarks>Blocks until the bind operation completes.</remarks>
- /// <param name="port">The UDP port to bind to.</param>
- /// <param name="interfaceName">IP address of the interface to bind to. Pass empty string (the default value) to use the default interface.</param>
- public void Configure(int port, string interfaceName = "")
- {
- var message = new NetMQMessage();
- message.Append(ConfigureCommand);
- message.Append(interfaceName);
- message.Append(port);
- m_actor.SendMultipartMessage(message);
- m_boundTo = m_actor.ReceiveFrameString();
- m_hostName = null;
- }
- /// <summary>
- /// Publish beacon immediately and continue to publish when interval elapsed
- /// </summary>
- /// <param name="transmit">Beacon to transmit.</param>
- /// <param name="interval">Interval to transmit beacon</param>
- /// <param name="encoding">Encoding for <paramref name="transmit"/>. Defaults to <see cref="Encoding.UTF8"/>.</param>
- public void Publish(string transmit, TimeSpan interval, Encoding? encoding = null)
- {
- Publish((encoding ?? Encoding.UTF8).GetBytes(transmit), interval);
- }
- /// <summary>
- /// Publish beacon immediately and continue to publish when interval elapsed
- /// </summary>
- /// <param name="transmit">Beacon to transmit</param>
- /// <param name="interval">Interval to transmit beacon</param>
- public void Publish(byte[] transmit, TimeSpan interval)
- {
- var message = new NetMQMessage();
- message.Append(PublishCommand);
- message.Append(transmit);
- message.Append((int)interval.TotalMilliseconds);
- m_actor.SendMultipartMessage(message);
- }
- /// <summary>
- /// Publish beacon immediately and continue to publish every second
- /// </summary>
- /// <param name="transmit">Beacon to transmit</param>
- /// <param name="encoding">Encoding for <paramref name="transmit"/>. Defaults to <see cref="Encoding.UTF8"/>.</param>
- public void Publish(string transmit, Encoding? encoding = null)
- {
- Publish(transmit, TimeSpan.FromSeconds(1), encoding);
- }
- /// <summary>
- /// Publish beacon immediately and continue to publish every second
- /// </summary>
- /// <param name="transmit">Beacon to transmit</param>
- public void Publish(byte[] transmit)
- {
- Publish(transmit, TimeSpan.FromSeconds(1));
- }
- /// <summary>
- /// Stop publishing beacons.
- /// </summary>
- public void Silence()
- {
- m_actor.SendFrame(SilenceCommand);
- }
- /// <summary>
- /// Subscribe to beacon messages that match the specified filter.
- /// </summary>
- /// <remarks>
- /// Beacons must prefix-match with <paramref name="filter"/>.
- /// Any previous subscription is replaced by this one.
- /// </remarks>
- /// <param name="filter">Beacon will be filtered by this</param>
- public void Subscribe(string filter)
- {
- m_actor.SendMoreFrame(SubscribeCommand).SendFrame(filter);
- }
- /// <summary>
- /// Unsubscribe to beacon messages
- /// </summary>
- public void Unsubscribe()
- {
- m_actor.SendFrame(UnsubscribeCommand);
- }
- /// <summary>
- /// Receives the next beacon message, blocking until it arrives.
- /// </summary>
- public BeaconMessage Receive()
- {
- var peerName = m_actor.ReceiveFrameString();
- var bytes = m_actor.ReceiveFrameBytes();
- return new BeaconMessage(bytes, peerName);
- }
- /// <summary>
- /// Receives the next beacon message if one is available before <paramref name="timeout"/> expires.
- /// </summary>
- /// <param name="timeout">The maximum amount of time to wait for the next beacon message.</param>
- /// <param name="message">The received beacon message.</param>
- /// <returns><c>true</c> if a beacon message was received, otherwise <c>false</c>.</returns>
- public bool TryReceive(TimeSpan timeout, out BeaconMessage message)
- {
- if (!m_actor.TryReceiveFrameString(timeout, out string? peerName))
- {
- message = default(BeaconMessage);
- return false;
- }
- var bytes = m_actor.ReceiveFrameBytes();
- message = new BeaconMessage(bytes, peerName);
- return true;
- }
- /// <inheritdoc />
- public void Dispose()
- {
- if (Interlocked.CompareExchange(ref m_isDisposed, 1, 0) != 0)
- return;
- m_actor.Dispose();
- m_receiveEvent.Dispose();
- }
- /// <inheritdoc />
- public bool IsDisposed => m_isDisposed != 0;
- }
- /// <summary>
- /// Contents of a message received from a beacon.
- /// </summary>
- public struct BeaconMessage
- {
- /// <summary>
- /// THe beacon content as a byte array.
- /// </summary>
- public byte[] Bytes { get; }
- /// <summary>
- /// The address of the peer that sent this message. Includes host name and port number.
- /// </summary>
- public string PeerAddress { get; }
- internal BeaconMessage(byte[] bytes, string peerAddress) : this()
- {
- Bytes = bytes;
- PeerAddress = peerAddress;
- }
- /// <summary>
- /// The beacon content as a string.
- /// </summary>
- /// <remarks>Decoded using <see cref="Encoding.UTF8"/>. Other encodings may be used with <see cref="Bytes"/> directly.</remarks>
- public string String => Encoding.UTF8.GetString(Bytes);
- /// <summary>
- /// The host name of the peer that sent this message.
- /// </summary>
- /// <remarks>This is simply the value of <see cref="PeerAddress"/> without the port number.</remarks>
- public string PeerHost
- {
- get
- {
- var i = PeerAddress.IndexOf(':');
- return i == -1 ? PeerAddress : PeerAddress.Substring(0, i);
- }
- }
- }
- }
|