using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Utils;
namespace NetMQ
{
///
/// Send and Receive extensions for sockets with group capability (ServerSocket)
///
public static class GroupSocketExtensions
{
#region Sending Byte Array
#region Blocking
///
/// Transmit a byte-array of data over this socket, block until message is sent.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
public static void Send(this IGroupOutSocket socket, string group, byte[] data)
{
Send(socket, group, data, data.Length);
}
///
/// Transmit a byte-array of data over this socket, block until frame is sent.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
/// the number of bytes to send from .
public static void Send(this IGroupOutSocket socket, string group, byte[] data, int length)
{
var msg = new Msg();
msg.InitPool(length);
msg.Group = group;
data.Slice(0, length).CopyTo(msg);
socket.Send(ref msg);
msg.Close();
}
#endregion
#region Timeout
///
/// Attempt to transmit a byte-array of data on .
/// If message cannot be sent within , return false.
///
/// the socket to transmit on
/// The maximum period of time to try to send a message.
/// The group to send the message to.
/// the byte-array of data to send
/// the number of bytes to send from .
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data, int length)
{
var msg = new Msg();
msg.InitPool(length);
msg.Group = group;
data.CopyTo(msg);
if (!socket.TrySend(ref msg, timeout))
{
msg.Close();
return false;
}
msg.Close();
return true;
}
///
/// Attempt to transmit a byte-array of data on .
/// If message cannot be sent within , return false.
///
/// the socket to transmit on
/// The maximum period of time to try to send a message.
/// The group to send the message to.
/// the byte-array of data to send
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data)
{
return TrySend(socket, timeout, group, data, data.Length);
}
#endregion
#region Immediate
///
/// Attempt to transmit a byte-array of data on .
/// If message cannot be sent immediately, return false.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data)
{
return TrySend(socket, TimeSpan.Zero, group, data);
}
///
/// Attempt to transmit a single frame on .
/// If message cannot be sent immediately, return false.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
/// the number of bytes to send from .
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data, int length)
{
return TrySend(socket, TimeSpan.Zero, group, data, length);
}
#endregion
#region Async
///
/// Transmit a byte-array of data over this socket asynchronously.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data)
{
if (socket.TrySend(group, data))
return new ValueTask();
return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data),
TaskCreationOptions.LongRunning));
}
///
/// Transmit a byte-array of data over this socket asynchronously.
///
/// the socket to transmit on
/// The group to send the message to.
/// the byte-array of data to send
/// the number of bytes to send from .
public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data, int length)
{
if (socket.TrySend(group, data, length))
return new ValueTask();
return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data, length),
TaskCreationOptions.LongRunning));
}
#endregion
#endregion
#region Sending Strings
#region Blocking
///
/// Transmit a string over this socket, block until message is sent.
///
/// the socket to transmit on
/// The group to send the message to.
/// the string to send
public static void Send(this IGroupOutSocket socket, string group, string message)
{
var msg = new Msg();
// Count the number of bytes required to encode the string.
// Note that non-ASCII strings may not have an equal number of characters
// and bytes. The encoding must be queried for this answer.
// With this number, request a buffer from the pool.
msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
msg.Group = group;
// Encode the string into the buffer
SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
socket.Send(ref msg);
msg.Close();
}
#endregion
#region Timeout
///
/// Attempt to transmit a single string frame on .
/// If message cannot be sent within , return false.
///
/// the socket to transmit on
/// The maximum period of time to try to send a message.
/// The group to send the message to.
/// the string to send
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, string message)
{
var msg = new Msg();
// Count the number of bytes required to encode the string.
// Note that non-ASCII strings may not have an equal number of characters
// and bytes. The encoding must be queried for this answer.
// With this number, request a buffer from the pool.
msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
msg.Group = group;
// Encode the string into the buffer
SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
if (!socket.TrySend(ref msg, timeout))
{
msg.Close();
return false;
}
msg.Close();
return true;
}
#endregion
#region Immediate
///
/// Attempt to transmit a single string frame on .
/// If message cannot be sent immediately, return false.
///
/// the socket to transmit on
/// The group to send the message to.
/// the string to send
/// true if a message was available, otherwise false.
public static bool TrySend(this IGroupOutSocket socket, string group, string message)
{
return TrySend(socket, TimeSpan.Zero, group, message);
}
#endregion
#region Async
///
/// Transmit a string over this socket asynchronously.
///
/// the socket to transmit on
/// The group to send the message to.
/// the string to send
public static ValueTask SendAsync(this IGroupOutSocket socket, string group, string message)
{
if (socket.TrySend(group, message))
return new ValueTask();
return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, message),
TaskCreationOptions.LongRunning));
}
#endregion
#endregion
#region Receiving byte array
#region Blocking
///
/// Receive a bytes from , blocking until one arrives.
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// Tuple of group and received bytes
/// The token has had cancellation requested.
public static (string, byte[]) ReceiveBytes(this IGroupInSocket socket,
CancellationToken cancellationToken = default)
{
var msg = new Msg();
msg.InitEmpty();
try
{
socket.Receive(ref msg, cancellationToken);
var data = msg.CloneData();
var group = msg.Group;
return (group, data);
}
finally
{
msg.Close();
}
}
#endregion
#region Immediate
///
/// Attempt to receive a byte-array .
/// If no message is immediately available, return false.
///
/// The socket to receive from.
/// The message group
/// The content of the received message, or null if no message was available.
/// true if a message was available, otherwise false.
public static bool TryReceiveBytes(this IGroupInSocket socket,
[NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out byte[]? bytes)
{
return socket.TryReceiveBytes(TimeSpan.Zero, out group, out bytes);
}
#endregion
#region Timeout
///
/// Attempt to receive a byte-array .
/// If no message is available within , return false.
///
/// The socket to receive from.
/// The maximum period of time to wait for a message to become available.
/// The message group.
/// The content of the received message, or null if no message was available.
/// The token to monitor for cancellation requests. The default value is .
/// true if a message was available, otherwise false.
/// The method would return false if cancellation has had requested.
public static bool TryReceiveBytes(this IGroupInSocket socket, TimeSpan timeout,
[NotNullWhen(returnValue: true)] out string? group,
[NotNullWhen(returnValue: true)] out byte[]? bytes, CancellationToken cancellationToken = default)
{
var msg = new Msg();
msg.InitEmpty();
if (!socket.TryReceive(ref msg, timeout, cancellationToken))
{
msg.Close();
bytes = null;
group = null;
return false;
}
bytes = msg.CloneData();
group = msg.Group;
msg.Close();
return true;
}
#endregion
#region Async
///
/// Receive a bytes from asynchronously.
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// Tuple of group and received bytes
/// The token has had cancellation requested.
public static ValueTask<(string, byte[])> ReceiveBytesAsync(this IGroupInSocket socket,
CancellationToken cancellationToken = default)
{
if (TryReceiveBytes(socket, out var group, out var bytes))
return new ValueTask<(string, byte[])>((group, bytes));
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
return new ValueTask<(string, byte[])>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken),
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}
#endregion
#region AsyncEnumerable
#if NETSTANDARD2_1
///
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// An IAsyncEnumerable that receive and returns messages from the socket.
/// The token has had cancellation requested.
public static async IAsyncEnumerable<(string, byte[])> ReceiveBytesAsyncEnumerable(
this IGroupInSocket socket,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
yield return await socket.ReceiveBytesAsync(cancellationToken);
}
}
#endif
#endregion
#endregion
#region Receiving string
#region Blocking
///
/// Receive a string from , blocking until one arrives, and decode using .
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// Tuple of group and the content of the received message as a string.
/// The token has had cancellation requested.
public static (string, string) ReceiveString(this IGroupInSocket socket,
CancellationToken cancellationToken = default)
{
return socket.ReceiveString(SendReceiveConstants.DefaultEncoding, cancellationToken);
}
///
/// Receive a string from , blocking until one arrives, and decode using .
///
/// The socket to receive from.
/// The encoding used to convert the data to a string.
/// The token to monitor for cancellation requests. The default value is .
/// Tuple of group and the content of the received message as a string.
/// The token has had cancellation requested.
public static (string, string) ReceiveString(this IGroupInSocket socket, Encoding encoding,
CancellationToken cancellationToken = default)
{
var msg = new Msg();
msg.InitEmpty();
try
{
socket.Receive(ref msg, cancellationToken);
var group = msg.Group;
var str = msg.Size > 0
? msg.GetString(encoding)
: string.Empty;
return (group, str);
}
finally
{
msg.Close();
}
}
#endregion
#region Immediate
///
/// Attempt to receive a string from , and decode using .
/// If no message is immediately available, return false.
///
/// The socket to receive from.
/// The message group.
/// The content of the received message as a string, or null if no message was available.
/// true if a message was available, otherwise false.
public static bool TryReceiveString(this IGroupInSocket socket,
[NotNullWhen(returnValue: true)] out string? group,
[NotNullWhen(returnValue: true)] out string? str)
{
return socket.TryReceiveString(TimeSpan.Zero, SendReceiveConstants.DefaultEncoding, out group, out str);
}
///
/// Attempt to receive a string from , and decode using .
/// If no message is immediately available, return false.
///
/// The socket to receive from.
/// The encoding used to convert the data to a string.
/// The message group.
/// The content of the received message as a string, or null if no message was available.
/// true if a message was available, otherwise false.
public static bool TryReceiveString(this IGroupInSocket socket, Encoding encoding,
[NotNullWhen(returnValue: true)] out string? group,
[NotNullWhen(returnValue: true)] out string? str)
{
return socket.TryReceiveString(TimeSpan.Zero, encoding, out group, out str);
}
#endregion
#region Timeout
///
/// Attempt to receive a string from , and decode using .
/// If no message is available within , return false.
///
/// The socket to receive from.
/// The maximum period of time to wait for a message to become available.
/// The message group
/// The content of the received message as a string, or null if no message was available.
/// The token to monitor for cancellation requests. The default value is .
/// true if a message was available, otherwise false.
/// The method would return false if cancellation has had requested.
public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout,
[NotNullWhen(returnValue: true)] out string? group,
[NotNullWhen(returnValue: true)] out string? str,
CancellationToken cancellationToken = default)
{
return socket.TryReceiveString(timeout, SendReceiveConstants.DefaultEncoding, out group, out str,
cancellationToken);
}
///
/// Attempt to receive a string from , and decode using .
/// If no message is available within , return false.
///
/// The socket to receive from.
/// The maximum period of time to wait for a message to become available.
/// The encoding used to convert the data to a string.
/// The message group
/// The content of the received message as a string, or null if no message was available.
/// The token to monitor for cancellation requests. The default value is .
/// true if a message was available, otherwise false.
/// The method would return false if cancellation has had requested.
public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout,
Encoding encoding,
[NotNullWhen(returnValue: true)] out string? group,
[NotNullWhen(returnValue: true)] out string? str,
CancellationToken cancellationToken = default)
{
var msg = new Msg();
msg.InitEmpty();
if (socket.TryReceive(ref msg, timeout, cancellationToken))
{
group = msg.Group;
try
{
str = msg.Size > 0
? msg.GetString(encoding)
: string.Empty;
return true;
}
finally
{
msg.Close();
}
}
str = null;
group = null;
msg.Close();
return false;
}
#endregion
#region Async
///
/// Receive a string from asynchronously.
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// Tuple of group and a string
/// The token has had cancellation requested.
public static ValueTask<(string, string)> ReceiveStringAsync(this IGroupInSocket socket,
CancellationToken cancellationToken = default)
{
if (TryReceiveString(socket, out var group, out var msg))
return new ValueTask<(string, string)>((group, msg));
// TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
return new ValueTask<(string, string)>(Task.Factory.StartNew(() => socket.ReceiveString(cancellationToken),
cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}
#endregion
#region AsyncEnumerable
#if NETSTANDARD2_1
///
/// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
///
/// The socket to receive from.
/// The token to monitor for cancellation requests. The default value is .
/// An IAsyncEnumerable that receive and returns messages from the socket.
/// The token has had cancellation requested.
public static async IAsyncEnumerable<(string, string)> ReceiveStringAsyncEnumerable(
this IGroupInSocket socket,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
yield return await socket.ReceiveStringAsync(cancellationToken);
}
}
#endif
#endregion
#endregion
}
}