using System; using System.Collections.Generic; using System.Diagnostics; using NetMQ.Utils; namespace NetMQ { /// /// This static class serves to provide extension methods for IOutgoingSocket. /// public static class OutgoingSocketExtensions { /// /// Block until the message can be sent. /// /// /// The call blocks until the message can be sent and cannot be interrupted. /// Whether the message can be sent depends on the socket type. /// /// The socket to send the message on. /// An object with message's data to send. /// Indicate if another frame is expected after this frame public static void Send(this IOutgoingSocket socket, ref Msg msg, bool more) { var result = socket.TrySend(ref msg, SendReceiveConstants.InfiniteTimeout, more); Debug.Assert(result); } #region Sending Byte Array #region Blocking /// /// Transmit a byte-array of data over this socket, block until frame is sent. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) public static void SendFrame(this IOutgoingSocket socket, byte[] data, bool more = false) { SendFrame(socket, data, data.Length, more); } /// /// Transmit a byte-array of data over this socket, block until frame is sent. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// the number of bytes to send from . /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) public static void SendFrame(this IOutgoingSocket socket, byte[] data, int length, bool more = false) { var msg = new Msg(); msg.InitPool(length); data.Slice(0, length).CopyTo(msg); socket.Send(ref msg, more); msg.Close(); } /// /// Transmit a byte-array of data over this socket, block until frame is sent. /// Send more frame, another frame must be sent after this frame. Use to chain Send methods. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// a reference to this IOutgoingSocket so that method-calls may be chained together public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, byte[] data) { SendFrame(socket, data, true); return socket; } /// /// Transmit a byte-array of data over this socket, block until frame is sent. /// Send more frame, another frame must be sent after this frame. Use to chain Send methods. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// the number of bytes to send from . /// a reference to this IOutgoingSocket so that method-calls may be chained together public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, byte[] data, int length) { SendFrame(socket, data, length, true); return socket; } #endregion #region Timeout /// /// Attempt to transmit a single frame on . /// If message cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// the byte-array of data to send /// the number of bytes to send from . /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, byte[] data, int length, bool more = false) { var msg = new Msg(); msg.InitPool(length); data.Slice(0, length).CopyTo(msg); if (!socket.TrySend(ref msg, timeout, more)) { msg.Close(); return false; } msg.Close(); return true; } /// /// Attempt to transmit a single frame on . /// If message cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// the byte-array of data to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, byte[] data, bool more = false) { return TrySendFrame(socket, timeout, data, data.Length, more); } #endregion #region Immediate /// /// Attempt to transmit a single frame on . /// If message cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, byte[] data, bool more = false) { return TrySendFrame(socket, TimeSpan.Zero, data, more); } /// /// Attempt to transmit a single frame on . /// If message cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// the byte-array of data to send /// the number of bytes to send from . /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, byte[] data, int length, bool more = false) { return TrySendFrame(socket, TimeSpan.Zero, data, length, more); } #endregion #endregion #region Sending a multipart message as byte arrays #region Blocking /// /// Send multiple frames on , blocking until all frames are sent. /// /// the IOutgoingSocket to transmit on /// frames to transmit public static void SendMultipartBytes(this IOutgoingSocket socket, params byte[][] frames) { SendMultipartBytes(socket, (IEnumerable)frames); } /// /// Send multiple frames on , blocking until all frames are sent. /// /// the IOutgoingSocket to transmit on /// frames to transmit public static void SendMultipartBytes(this IOutgoingSocket socket, IEnumerable frames) { var enumerator = frames.GetEnumerator(); try { // move to the first element, if false frames is empty if (!enumerator.MoveNext()) { throw new ArgumentException("frames is empty", nameof(frames)); } var current = enumerator.Current; // we always one item back to make sure we send the last frame without the more flag while (enumerator.MoveNext()) { // this is a more frame socket.SendMoreFrame(current); current = enumerator.Current; } // sending the last frame socket.SendFrame(current); } finally { enumerator.Dispose(); } } #endregion #region Timeout /// /// Attempt to transmit a multiple frames on . /// If frames cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// frames to transmit public static bool TrySendMultipartBytes(this IOutgoingSocket socket, TimeSpan timeout, params byte[][] frames) { return TrySendMultipartBytes(socket, timeout, (IEnumerable)frames); } /// /// Attempt to transmit a multiple frames on . /// If frames cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// frames to transmit public static bool TrySendMultipartBytes(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable frames) { var enumerator = frames.GetEnumerator(); try { // move to the first element, if false frames is empty if (!enumerator.MoveNext()) { throw new ArgumentException("frames is empty", nameof(frames)); } var current = enumerator.Current; // only the first frame need to be sent with a timeout if (!enumerator.MoveNext()) { return socket.TrySendFrame(timeout, current); } else { bool sentSuccessfully = socket.TrySendFrame(timeout, current, true); if (!sentSuccessfully) return false; } // fetching the second frame current = enumerator.Current; // we always one item back to make sure we send the last frame without the more flag while (enumerator.MoveNext()) { // this is a more frame socket.SendMoreFrame(current); current = enumerator.Current; } // sending the last frame socket.SendFrame(current); return true; } finally { enumerator.Dispose(); } } #endregion #region Immediate /// /// Attempt to transmit a multiple frames on . /// If frames cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// frames to transmit public static bool TrySendMultipartBytes(this IOutgoingSocket socket, params byte[][] frames) { return TrySendMultipartBytes(socket, TimeSpan.Zero, (IEnumerable)frames); } /// /// Attempt to transmit a multiple frames on . /// If frames cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// frames to transmit public static bool TrySendMultipartBytes(this IOutgoingSocket socket, IEnumerable frames) { return TrySendMultipartBytes(socket, TimeSpan.Zero, frames); } #endregion #endregion #region Sending Strings #region Blocking /// /// Transmit a string over this socket, block until frame is sent. /// /// the IOutgoingSocket to transmit on /// the string to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) public static void SendFrame(this IOutgoingSocket socket, string message, bool more = false) { var msg = new Msg(); // Count the number of bytes required to encode the string. // Note that non-ASCII strings may not have an equal number of characters // and bytes. The encoding must be queried for this answer. // With this number, request a buffer from the pool. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message)); // Encode the string into the buffer SendReceiveConstants.DefaultEncoding.GetBytes(message, msg); socket.Send(ref msg, more); msg.Close(); } /// /// Transmit a string over this socket, block until frame is sent. /// Send more frame, another frame must be sent after this frame. Use to chain Send methods. /// /// the IOutgoingSocket to transmit on /// the string to send /// a reference to this IOutgoingSocket so that method-calls may be chained together public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, string message) { SendFrame(socket, message, true); return socket; } #endregion #region Timeout /// /// Attempt to transmit a single string frame on . /// If message cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// the string to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, string message, bool more = false) { var msg = new Msg(); // Count the number of bytes required to encode the string. // Note that non-ASCII strings may not have an equal number of characters // and bytes. The encoding must be queried for this answer. // With this number, request a buffer from the pool. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message)); // Encode the string into the buffer SendReceiveConstants.DefaultEncoding.GetBytes(message, msg); if (!socket.TrySend(ref msg, timeout, more)) { msg.Close(); return false; } msg.Close(); return true; } #endregion #region Immediate /// /// Attempt to transmit a single string frame on . /// If message cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// the string to send /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, string message, bool more = false) { return TrySendFrame(socket, TimeSpan.Zero, message, more); } #endregion #endregion #region Sending a multipart message as NetMQMessage #region Blocking /// /// Send the multiple part message on the , blocking until the entire message is sent. /// /// the IOutgoingSocket to transmit on /// message to transmit public static void SendMultipartMessage(this IOutgoingSocket socket, NetMQMessage message) { if (message.FrameCount == 0) throw new ArgumentException("message is empty", nameof(message)); for (int i = 0; i < message.FrameCount - 1; i++) { socket.SendMoreFrame(message[i].Buffer, message[i].MessageSize); } socket.SendFrame(message.Last.Buffer, message.Last.MessageSize); } #endregion #region Timeout /// /// Attempt to transmit a multiple message on . /// If message cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// message to transmit public static bool TrySendMultipartMessage(this IOutgoingSocket socket, TimeSpan timeout, NetMQMessage message) { if (message.FrameCount == 0) throw new ArgumentException("message is empty", nameof(message)); else if (message.FrameCount == 1) { return TrySendFrame(socket, timeout, message[0].Buffer, message[0].MessageSize); } else { bool sentSuccessfully = TrySendFrame(socket, timeout, message[0].Buffer, message[0].MessageSize, true); if (!sentSuccessfully) return false; } for (int i = 1; i < message.FrameCount - 1; i++) { socket.SendMoreFrame(message[i].Buffer, message[i].MessageSize); } socket.SendFrame(message.Last.Buffer, message.Last.MessageSize); return true; } #endregion #region Immediate /// /// Attempt to transmit a multiple message on . /// If frames cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// message to transmit public static bool TrySendMultipartMessage(this IOutgoingSocket socket, NetMQMessage message) { return TrySendMultipartMessage(socket, TimeSpan.Zero, message); } #endregion #endregion #region Sending an empty frame #region Blocking /// /// Transmit an empty frame over this socket, block until frame is sent. /// /// the IOutgoingSocket to transmit on /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) public static void SendFrameEmpty(this IOutgoingSocket socket, bool more = false) { SendFrame(socket, EmptyArray.Instance, more); } /// /// Transmit an empty frame over this socket, block until frame is sent. /// Send more frame, another frame must be sent after this frame. Use to chain Send methods. /// /// the IOutgoingSocket to transmit on /// a reference to this IOutgoingSocket so that method-calls may be chained together public static IOutgoingSocket SendMoreFrameEmpty(this IOutgoingSocket socket) { SendFrame(socket, EmptyArray.Instance, true); return socket; } #endregion #region Timeout /// /// Attempt to transmit an empty frame on . /// If message cannot be sent within , return false. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrameEmpty(this IOutgoingSocket socket, TimeSpan timeout, bool more = false) { return TrySendFrame(socket, timeout, EmptyArray.Instance, more); } #endregion #region Immediate /// /// Attempt to transmit an empty frame on . /// If message cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// set this flag to true to signal that you will be immediately sending another frame (optional: default is false) /// true if a message was available, otherwise false. public static bool TrySendFrameEmpty(this IOutgoingSocket socket, bool more = false) { return TrySendFrame(socket, EmptyArray.Instance, more); } #endregion #endregion #region Sending Signals /// /// Transmit a status-signal over this socket. /// /// the IOutgoingSocket to transmit on /// a byte that contains the status signal to send private static void Signal(this IOutgoingSocket socket, byte status) { long signalValue = 0x7766554433221100L + status; Msg msg = new Msg(); msg.InitPool(8); NetworkOrderBitsConverter.PutInt64(signalValue, msg); socket.Send(ref msg, false); msg.Close(); } /// /// Attempt to transmit a status-signal over this socket. /// If signal cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on /// a byte that contains the status signal to send private static bool TrySignal(this IOutgoingSocket socket, byte status) { long signalValue = 0x7766554433221100L + status; Msg msg = new Msg(); msg.InitPool(8); NetworkOrderBitsConverter.PutInt64(signalValue, msg); if (!socket.TrySend(ref msg, TimeSpan.Zero, false)) { msg.Close(); return false; } msg.Close(); return true; } /// /// Transmit a specific status-signal over this socket that indicates OK. /// /// the IOutgoingSocket to transmit on public static void SignalOK(this IOutgoingSocket socket) { socket.Signal(0); } /// /// Attempt to transmit a specific status-signal over this socket that indicates OK. /// If signal cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on public static bool TrySignalOK(this IOutgoingSocket socket) { return TrySignal(socket, 0); } /// /// Transmit a specific status-signal over this socket that indicates there is an error. /// /// the IOutgoingSocket to transmit on public static void SignalError(this IOutgoingSocket socket) { socket.Signal(1); } /// /// Attempt to transmit a specific status-signal over this socket that indicates there is an error. /// If signal cannot be sent immediately, return false. /// /// the IOutgoingSocket to transmit on public static bool TrySignalError(this IOutgoingSocket socket) { return socket.TrySignal(1); } #endregion #region Sending Routing Key /// /// Send routing key over . /// /// the IOutgoingSocket to transmit on /// the routing key to send public static void SendMoreFrame(this IOutgoingSocket socket, RoutingKey routingKey) { socket.SendMoreFrame(routingKey.Bytes); } /// /// Attempt to transmit routing key over . /// If message cannot be sent immediately, return false. /// Routing is always sent as more frame. /// /// the IOutgoingSocket to transmit on /// the routing key to send /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, RoutingKey routingKey) { return socket.TrySendFrame(routingKey.Bytes, true); } /// /// Attempt to transmit routing key over . /// If message cannot be sent within , return false. /// Routing is always sent as more frame. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// the routing key to send /// true if a message was available, otherwise false. public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, RoutingKey routingKey) { return socket.TrySendFrame(timeout, routingKey.Bytes, true); } #endregion #region Sending Routing Keys /// /// Send empty list of routing keys over , append an empty message at the end of the keys. /// /// the IOutgoingSocket to transmit on public static IOutgoingSocket SendEmptyRoutingKeys(this IOutgoingSocket socket) { return socket.SendMoreFrameEmpty(); } /// /// Send a single routing key over , append an empty message afterwards. /// /// the IOutgoingSocket to transmit on /// the routing keys to send public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, params RoutingKey[] routingKeys) { foreach(var routingKey in routingKeys) socket.SendMoreFrame(routingKey); socket.SendMoreFrameEmpty(); return socket; } /// /// Send routing keys over , append an empty message at the end of the keys. /// /// the IOutgoingSocket to transmit on /// the routing keys to send public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys) { foreach(var routingKey in routingKeys) socket.SendMoreFrame(routingKey); socket.SendMoreFrameEmpty(); return socket; } /// /// Attempt to transmit routing keys over . /// If message cannot be sent immediately, return false. /// Routing is always sent as more frame. /// /// the IOutgoingSocket to transmit on /// the routing keys to send /// true if a message was available, otherwise false. public static bool TrySendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys) { return socket.TrySendRoutingKeys(TimeSpan.Zero, routingKeys); } /// /// Attempt to transmit routing key over . /// If message cannot be sent within , return false. /// Routing is always sent as more frame. /// /// the IOutgoingSocket to transmit on /// The maximum period of time to try to send a message. /// the routing keys to send /// true if a message was available, otherwise false. public static bool TrySendRoutingKeys(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable routingKeys) { var enumerator = routingKeys.GetEnumerator(); // Empty collection, just trying to send the empty message if (!enumerator.MoveNext()) return socket.TrySendFrameEmpty(timeout, true); if (!socket.TrySendFrame(enumerator.Current)) return false; while (enumerator.MoveNext()) socket.SendMoreFrame(enumerator.Current); socket.SendMoreFrameEmpty(); return true; } #endregion } }