using System;
using System.Collections.Generic;
using System.Diagnostics;
using NetMQ.Utils;
namespace NetMQ
{
///
/// This static class serves to provide extension methods for IOutgoingSocket.
///
public static class OutgoingSocketExtensions
{
///
/// Block until the message can be sent.
///
///
/// The call blocks until the message can be sent and cannot be interrupted.
/// Whether the message can be sent depends on the socket type.
///
/// The socket to send the message on.
/// An object with message's data to send.
/// Indicate if another frame is expected after this frame
public static void Send(this IOutgoingSocket socket, ref Msg msg, bool more)
{
var result = socket.TrySend(ref msg, SendReceiveConstants.InfiniteTimeout, more);
Debug.Assert(result);
}
#region Sending Byte Array
#region Blocking
///
/// Transmit a byte-array of data over this socket, block until frame is sent.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
public static void SendFrame(this IOutgoingSocket socket, byte[] data, bool more = false)
{
SendFrame(socket, data, data.Length, more);
}
///
/// Transmit a byte-array of data over this socket, block until frame is sent.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// the number of bytes to send from .
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
public static void SendFrame(this IOutgoingSocket socket, byte[] data, int length, bool more = false)
{
var msg = new Msg();
msg.InitPool(length);
data.Slice(0, length).CopyTo(msg);
socket.Send(ref msg, more);
msg.Close();
}
///
/// Transmit a byte-array of data over this socket, block until frame is sent.
/// Send more frame, another frame must be sent after this frame. Use to chain Send methods.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// a reference to this IOutgoingSocket so that method-calls may be chained together
public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, byte[] data)
{
SendFrame(socket, data, true);
return socket;
}
///
/// Transmit a byte-array of data over this socket, block until frame is sent.
/// Send more frame, another frame must be sent after this frame. Use to chain Send methods.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// the number of bytes to send from .
/// a reference to this IOutgoingSocket so that method-calls may be chained together
public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, byte[] data, int length)
{
SendFrame(socket, data, length, true);
return socket;
}
#endregion
#region Timeout
///
/// Attempt to transmit a single frame on .
/// If message cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// the byte-array of data to send
/// the number of bytes to send from .
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, byte[] data, int length, bool more = false)
{
var msg = new Msg();
msg.InitPool(length);
data.Slice(0, length).CopyTo(msg);
if (!socket.TrySend(ref msg, timeout, more))
{
msg.Close();
return false;
}
msg.Close();
return true;
}
///
/// Attempt to transmit a single frame on .
/// If message cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// the byte-array of data to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, byte[] data, bool more = false)
{
return TrySendFrame(socket, timeout, data, data.Length, more);
}
#endregion
#region Immediate
///
/// Attempt to transmit a single frame on .
/// If message cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, byte[] data,
bool more = false)
{
return TrySendFrame(socket, TimeSpan.Zero, data, more);
}
///
/// Attempt to transmit a single frame on .
/// If message cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// the byte-array of data to send
/// the number of bytes to send from .
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, byte[] data, int length,
bool more = false)
{
return TrySendFrame(socket, TimeSpan.Zero, data, length, more);
}
#endregion
#endregion
#region Sending a multipart message as byte arrays
#region Blocking
///
/// Send multiple frames on , blocking until all frames are sent.
///
/// the IOutgoingSocket to transmit on
/// frames to transmit
public static void SendMultipartBytes(this IOutgoingSocket socket, params byte[][] frames)
{
SendMultipartBytes(socket, (IEnumerable)frames);
}
///
/// Send multiple frames on , blocking until all frames are sent.
///
/// the IOutgoingSocket to transmit on
/// frames to transmit
public static void SendMultipartBytes(this IOutgoingSocket socket, IEnumerable frames)
{
var enumerator = frames.GetEnumerator();
try
{
// move to the first element, if false frames is empty
if (!enumerator.MoveNext())
{
throw new ArgumentException("frames is empty", nameof(frames));
}
var current = enumerator.Current;
// we always one item back to make sure we send the last frame without the more flag
while (enumerator.MoveNext())
{
// this is a more frame
socket.SendMoreFrame(current);
current = enumerator.Current;
}
// sending the last frame
socket.SendFrame(current);
}
finally
{
enumerator.Dispose();
}
}
#endregion
#region Timeout
///
/// Attempt to transmit a multiple frames on .
/// If frames cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// frames to transmit
public static bool TrySendMultipartBytes(this IOutgoingSocket socket, TimeSpan timeout, params byte[][] frames)
{
return TrySendMultipartBytes(socket, timeout, (IEnumerable)frames);
}
///
/// Attempt to transmit a multiple frames on .
/// If frames cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// frames to transmit
public static bool TrySendMultipartBytes(this IOutgoingSocket socket, TimeSpan timeout,
IEnumerable frames)
{
var enumerator = frames.GetEnumerator();
try
{
// move to the first element, if false frames is empty
if (!enumerator.MoveNext())
{
throw new ArgumentException("frames is empty", nameof(frames));
}
var current = enumerator.Current;
// only the first frame need to be sent with a timeout
if (!enumerator.MoveNext())
{
return socket.TrySendFrame(timeout, current);
}
else
{
bool sentSuccessfully = socket.TrySendFrame(timeout, current, true);
if (!sentSuccessfully)
return false;
}
// fetching the second frame
current = enumerator.Current;
// we always one item back to make sure we send the last frame without the more flag
while (enumerator.MoveNext())
{
// this is a more frame
socket.SendMoreFrame(current);
current = enumerator.Current;
}
// sending the last frame
socket.SendFrame(current);
return true;
}
finally
{
enumerator.Dispose();
}
}
#endregion
#region Immediate
///
/// Attempt to transmit a multiple frames on .
/// If frames cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// frames to transmit
public static bool TrySendMultipartBytes(this IOutgoingSocket socket, params byte[][] frames)
{
return TrySendMultipartBytes(socket, TimeSpan.Zero, (IEnumerable)frames);
}
///
/// Attempt to transmit a multiple frames on .
/// If frames cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// frames to transmit
public static bool TrySendMultipartBytes(this IOutgoingSocket socket, IEnumerable frames)
{
return TrySendMultipartBytes(socket, TimeSpan.Zero, frames);
}
#endregion
#endregion
#region Sending Strings
#region Blocking
///
/// Transmit a string over this socket, block until frame is sent.
///
/// the IOutgoingSocket to transmit on
/// the string to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
public static void SendFrame(this IOutgoingSocket socket, string message, bool more = false)
{
var msg = new Msg();
// Count the number of bytes required to encode the string.
// Note that non-ASCII strings may not have an equal number of characters
// and bytes. The encoding must be queried for this answer.
// With this number, request a buffer from the pool.
msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
// Encode the string into the buffer
SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
socket.Send(ref msg, more);
msg.Close();
}
///
/// Transmit a string over this socket, block until frame is sent.
/// Send more frame, another frame must be sent after this frame. Use to chain Send methods.
///
/// the IOutgoingSocket to transmit on
/// the string to send
/// a reference to this IOutgoingSocket so that method-calls may be chained together
public static IOutgoingSocket SendMoreFrame(this IOutgoingSocket socket, string message)
{
SendFrame(socket, message, true);
return socket;
}
#endregion
#region Timeout
///
/// Attempt to transmit a single string frame on .
/// If message cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// the string to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, string message, bool more = false)
{
var msg = new Msg();
// Count the number of bytes required to encode the string.
// Note that non-ASCII strings may not have an equal number of characters
// and bytes. The encoding must be queried for this answer.
// With this number, request a buffer from the pool.
msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
// Encode the string into the buffer
SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
if (!socket.TrySend(ref msg, timeout, more))
{
msg.Close();
return false;
}
msg.Close();
return true;
}
#endregion
#region Immediate
///
/// Attempt to transmit a single string frame on .
/// If message cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// the string to send
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, string message, bool more = false)
{
return TrySendFrame(socket, TimeSpan.Zero, message, more);
}
#endregion
#endregion
#region Sending a multipart message as NetMQMessage
#region Blocking
///
/// Send the multiple part message on the , blocking until the entire message is sent.
///
/// the IOutgoingSocket to transmit on
/// message to transmit
public static void SendMultipartMessage(this IOutgoingSocket socket, NetMQMessage message)
{
if (message.FrameCount == 0)
throw new ArgumentException("message is empty", nameof(message));
for (int i = 0; i < message.FrameCount - 1; i++)
{
socket.SendMoreFrame(message[i].Buffer, message[i].MessageSize);
}
socket.SendFrame(message.Last.Buffer, message.Last.MessageSize);
}
#endregion
#region Timeout
///
/// Attempt to transmit a multiple message on .
/// If message cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// message to transmit
public static bool TrySendMultipartMessage(this IOutgoingSocket socket, TimeSpan timeout, NetMQMessage message)
{
if (message.FrameCount == 0)
throw new ArgumentException("message is empty", nameof(message));
else if (message.FrameCount == 1)
{
return TrySendFrame(socket, timeout, message[0].Buffer, message[0].MessageSize);
}
else
{
bool sentSuccessfully = TrySendFrame(socket, timeout, message[0].Buffer, message[0].MessageSize, true);
if (!sentSuccessfully)
return false;
}
for (int i = 1; i < message.FrameCount - 1; i++)
{
socket.SendMoreFrame(message[i].Buffer, message[i].MessageSize);
}
socket.SendFrame(message.Last.Buffer, message.Last.MessageSize);
return true;
}
#endregion
#region Immediate
///
/// Attempt to transmit a multiple message on .
/// If frames cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// message to transmit
public static bool TrySendMultipartMessage(this IOutgoingSocket socket, NetMQMessage message)
{
return TrySendMultipartMessage(socket, TimeSpan.Zero, message);
}
#endregion
#endregion
#region Sending an empty frame
#region Blocking
///
/// Transmit an empty frame over this socket, block until frame is sent.
///
/// the IOutgoingSocket to transmit on
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
public static void SendFrameEmpty(this IOutgoingSocket socket, bool more = false)
{
SendFrame(socket, EmptyArray.Instance, more);
}
///
/// Transmit an empty frame over this socket, block until frame is sent.
/// Send more frame, another frame must be sent after this frame. Use to chain Send methods.
///
/// the IOutgoingSocket to transmit on
/// a reference to this IOutgoingSocket so that method-calls may be chained together
public static IOutgoingSocket SendMoreFrameEmpty(this IOutgoingSocket socket)
{
SendFrame(socket, EmptyArray.Instance, true);
return socket;
}
#endregion
#region Timeout
///
/// Attempt to transmit an empty frame on .
/// If message cannot be sent within , return false.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrameEmpty(this IOutgoingSocket socket, TimeSpan timeout, bool more = false)
{
return TrySendFrame(socket, timeout, EmptyArray.Instance, more);
}
#endregion
#region Immediate
///
/// Attempt to transmit an empty frame on .
/// If message cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// set this flag to true to signal that you will be immediately sending another frame (optional: default is false)
/// true if a message was available, otherwise false.
public static bool TrySendFrameEmpty(this IOutgoingSocket socket, bool more = false)
{
return TrySendFrame(socket, EmptyArray.Instance, more);
}
#endregion
#endregion
#region Sending Signals
///
/// Transmit a status-signal over this socket.
///
/// the IOutgoingSocket to transmit on
/// a byte that contains the status signal to send
private static void Signal(this IOutgoingSocket socket, byte status)
{
long signalValue = 0x7766554433221100L + status;
Msg msg = new Msg();
msg.InitPool(8);
NetworkOrderBitsConverter.PutInt64(signalValue, msg);
socket.Send(ref msg, false);
msg.Close();
}
///
/// Attempt to transmit a status-signal over this socket.
/// If signal cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
/// a byte that contains the status signal to send
private static bool TrySignal(this IOutgoingSocket socket, byte status)
{
long signalValue = 0x7766554433221100L + status;
Msg msg = new Msg();
msg.InitPool(8);
NetworkOrderBitsConverter.PutInt64(signalValue, msg);
if (!socket.TrySend(ref msg, TimeSpan.Zero, false))
{
msg.Close();
return false;
}
msg.Close();
return true;
}
///
/// Transmit a specific status-signal over this socket that indicates OK.
///
/// the IOutgoingSocket to transmit on
public static void SignalOK(this IOutgoingSocket socket)
{
socket.Signal(0);
}
///
/// Attempt to transmit a specific status-signal over this socket that indicates OK.
/// If signal cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
public static bool TrySignalOK(this IOutgoingSocket socket)
{
return TrySignal(socket, 0);
}
///
/// Transmit a specific status-signal over this socket that indicates there is an error.
///
/// the IOutgoingSocket to transmit on
public static void SignalError(this IOutgoingSocket socket)
{
socket.Signal(1);
}
///
/// Attempt to transmit a specific status-signal over this socket that indicates there is an error.
/// If signal cannot be sent immediately, return false.
///
/// the IOutgoingSocket to transmit on
public static bool TrySignalError(this IOutgoingSocket socket)
{
return socket.TrySignal(1);
}
#endregion
#region Sending Routing Key
///
/// Send routing key over .
///
/// the IOutgoingSocket to transmit on
/// the routing key to send
public static void SendMoreFrame(this IOutgoingSocket socket, RoutingKey routingKey)
{
socket.SendMoreFrame(routingKey.Bytes);
}
///
/// Attempt to transmit routing key over .
/// If message cannot be sent immediately, return false.
/// Routing is always sent as more frame.
///
/// the IOutgoingSocket to transmit on
/// the routing key to send
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, RoutingKey routingKey)
{
return socket.TrySendFrame(routingKey.Bytes, true);
}
///
/// Attempt to transmit routing key over .
/// If message cannot be sent within , return false.
/// Routing is always sent as more frame.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// the routing key to send
/// true if a message was available, otherwise false.
public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, RoutingKey routingKey)
{
return socket.TrySendFrame(timeout, routingKey.Bytes, true);
}
#endregion
#region Sending Routing Keys
///
/// Send empty list of routing keys over , append an empty message at the end of the keys.
///
/// the IOutgoingSocket to transmit on
public static IOutgoingSocket SendEmptyRoutingKeys(this IOutgoingSocket socket)
{
return socket.SendMoreFrameEmpty();
}
///
/// Send a single routing key over , append an empty message afterwards.
///
/// the IOutgoingSocket to transmit on
/// the routing keys to send
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, params RoutingKey[] routingKeys)
{
foreach(var routingKey in routingKeys)
socket.SendMoreFrame(routingKey);
socket.SendMoreFrameEmpty();
return socket;
}
///
/// Send routing keys over , append an empty message at the end of the keys.
///
/// the IOutgoingSocket to transmit on
/// the routing keys to send
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys)
{
foreach(var routingKey in routingKeys)
socket.SendMoreFrame(routingKey);
socket.SendMoreFrameEmpty();
return socket;
}
///
/// Attempt to transmit routing keys over .
/// If message cannot be sent immediately, return false.
/// Routing is always sent as more frame.
///
/// the IOutgoingSocket to transmit on
/// the routing keys to send
/// true if a message was available, otherwise false.
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, IEnumerable routingKeys)
{
return socket.TrySendRoutingKeys(TimeSpan.Zero, routingKeys);
}
///
/// Attempt to transmit routing key over .
/// If message cannot be sent within , return false.
/// Routing is always sent as more frame.
///
/// the IOutgoingSocket to transmit on
/// The maximum period of time to try to send a message.
/// the routing keys to send
/// true if a message was available, otherwise false.
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable routingKeys)
{
var enumerator = routingKeys.GetEnumerator();
// Empty collection, just trying to send the empty message
if (!enumerator.MoveNext())
return socket.TrySendFrameEmpty(timeout, true);
if (!socket.TrySendFrame(enumerator.Current))
return false;
while (enumerator.MoveNext())
socket.SendMoreFrame(enumerator.Current);
socket.SendMoreFrameEmpty();
return true;
}
#endregion
}
}