using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ.Utils; namespace NetMQ { /// /// Send and Receive extensions for sockets with group capability (ServerSocket) /// public static class GroupSocketExtensions { #region Sending Byte Array #region Blocking /// /// Transmit a byte-array of data over this socket, block until message is sent. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send public static void Send(this IGroupOutSocket socket, string group, byte[] data) { Send(socket, group, data, data.Length); } /// /// Transmit a byte-array of data over this socket, block until frame is sent. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send /// the number of bytes to send from . public static void Send(this IGroupOutSocket socket, string group, byte[] data, int length) { var msg = new Msg(); msg.InitPool(length); msg.Group = group; data.Slice(0, length).CopyTo(msg); socket.Send(ref msg); msg.Close(); } #endregion #region Timeout /// /// Attempt to transmit a byte-array of data on . /// If message cannot be sent within , return false. /// /// the socket to transmit on /// The maximum period of time to try to send a message. /// The group to send the message to. /// the byte-array of data to send /// the number of bytes to send from . /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data, int length) { var msg = new Msg(); msg.InitPool(length); msg.Group = group; data.CopyTo(msg); if (!socket.TrySend(ref msg, timeout)) { msg.Close(); return false; } msg.Close(); return true; } /// /// Attempt to transmit a byte-array of data on . /// If message cannot be sent within , return false. /// /// the socket to transmit on /// The maximum period of time to try to send a message. /// The group to send the message to. /// the byte-array of data to send /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data) { return TrySend(socket, timeout, group, data, data.Length); } #endregion #region Immediate /// /// Attempt to transmit a byte-array of data on . /// If message cannot be sent immediately, return false. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data) { return TrySend(socket, TimeSpan.Zero, group, data); } /// /// Attempt to transmit a single frame on . /// If message cannot be sent immediately, return false. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send /// the number of bytes to send from . /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data, int length) { return TrySend(socket, TimeSpan.Zero, group, data, length); } #endregion #region Async /// /// Transmit a byte-array of data over this socket asynchronously. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data) { if (socket.TrySend(group, data)) return new ValueTask(); return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data), TaskCreationOptions.LongRunning)); } /// /// Transmit a byte-array of data over this socket asynchronously. /// /// the socket to transmit on /// The group to send the message to. /// the byte-array of data to send /// the number of bytes to send from . public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data, int length) { if (socket.TrySend(group, data, length)) return new ValueTask(); return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data, length), TaskCreationOptions.LongRunning)); } #endregion #endregion #region Sending Strings #region Blocking /// /// Transmit a string over this socket, block until message is sent. /// /// the socket to transmit on /// The group to send the message to. /// the string to send public static void Send(this IGroupOutSocket socket, string group, string message) { var msg = new Msg(); // Count the number of bytes required to encode the string. // Note that non-ASCII strings may not have an equal number of characters // and bytes. The encoding must be queried for this answer. // With this number, request a buffer from the pool. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message)); msg.Group = group; // Encode the string into the buffer SendReceiveConstants.DefaultEncoding.GetBytes(message, msg); socket.Send(ref msg); msg.Close(); } #endregion #region Timeout /// /// Attempt to transmit a single string frame on . /// If message cannot be sent within , return false. /// /// the socket to transmit on /// The maximum period of time to try to send a message. /// The group to send the message to. /// the string to send /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, string message) { var msg = new Msg(); // Count the number of bytes required to encode the string. // Note that non-ASCII strings may not have an equal number of characters // and bytes. The encoding must be queried for this answer. // With this number, request a buffer from the pool. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message)); msg.Group = group; // Encode the string into the buffer SendReceiveConstants.DefaultEncoding.GetBytes(message, msg); if (!socket.TrySend(ref msg, timeout)) { msg.Close(); return false; } msg.Close(); return true; } #endregion #region Immediate /// /// Attempt to transmit a single string frame on . /// If message cannot be sent immediately, return false. /// /// the socket to transmit on /// The group to send the message to. /// the string to send /// true if a message was available, otherwise false. public static bool TrySend(this IGroupOutSocket socket, string group, string message) { return TrySend(socket, TimeSpan.Zero, group, message); } #endregion #region Async /// /// Transmit a string over this socket asynchronously. /// /// the socket to transmit on /// The group to send the message to. /// the string to send public static ValueTask SendAsync(this IGroupOutSocket socket, string group, string message) { if (socket.TrySend(group, message)) return new ValueTask(); return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, message), TaskCreationOptions.LongRunning)); } #endregion #endregion #region Receiving byte array #region Blocking /// /// Receive a bytes from , blocking until one arrives. /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// Tuple of group and received bytes /// The token has had cancellation requested. public static (string, byte[]) ReceiveBytes(this IGroupInSocket socket, CancellationToken cancellationToken = default) { var msg = new Msg(); msg.InitEmpty(); try { socket.Receive(ref msg, cancellationToken); var data = msg.CloneData(); var group = msg.Group; return (group, data); } finally { msg.Close(); } } #endregion #region Immediate /// /// Attempt to receive a byte-array . /// If no message is immediately available, return false. /// /// The socket to receive from. /// The message group /// The content of the received message, or null if no message was available. /// true if a message was available, otherwise false. public static bool TryReceiveBytes(this IGroupInSocket socket, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out byte[]? bytes) { return socket.TryReceiveBytes(TimeSpan.Zero, out group, out bytes); } #endregion #region Timeout /// /// Attempt to receive a byte-array . /// If no message is available within , return false. /// /// The socket to receive from. /// The maximum period of time to wait for a message to become available. /// The message group. /// The content of the received message, or null if no message was available. /// The token to monitor for cancellation requests. The default value is . /// true if a message was available, otherwise false. /// The method would return false if cancellation has had requested. public static bool TryReceiveBytes(this IGroupInSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out byte[]? bytes, CancellationToken cancellationToken = default) { var msg = new Msg(); msg.InitEmpty(); if (!socket.TryReceive(ref msg, timeout, cancellationToken)) { msg.Close(); bytes = null; group = null; return false; } bytes = msg.CloneData(); group = msg.Group; msg.Close(); return true; } #endregion #region Async /// /// Receive a bytes from asynchronously. /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// Tuple of group and received bytes /// The token has had cancellation requested. public static ValueTask<(string, byte[])> ReceiveBytesAsync(this IGroupInSocket socket, CancellationToken cancellationToken = default) { if (TryReceiveBytes(socket, out var group, out var bytes)) return new ValueTask<(string, byte[])>((group, bytes)); // TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously return new ValueTask<(string, byte[])>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)); } #endregion #region AsyncEnumerable #if NETSTANDARD2_1 /// /// Provides a consuming IAsyncEnumerable for receiving messages from the socket. /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// An IAsyncEnumerable that receive and returns messages from the socket. /// The token has had cancellation requested. public static async IAsyncEnumerable<(string, byte[])> ReceiveBytesAsyncEnumerable( this IGroupInSocket socket, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (true) { yield return await socket.ReceiveBytesAsync(cancellationToken); } } #endif #endregion #endregion #region Receiving string #region Blocking /// /// Receive a string from , blocking until one arrives, and decode using . /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// Tuple of group and the content of the received message as a string. /// The token has had cancellation requested. public static (string, string) ReceiveString(this IGroupInSocket socket, CancellationToken cancellationToken = default) { return socket.ReceiveString(SendReceiveConstants.DefaultEncoding, cancellationToken); } /// /// Receive a string from , blocking until one arrives, and decode using . /// /// The socket to receive from. /// The encoding used to convert the data to a string. /// The token to monitor for cancellation requests. The default value is . /// Tuple of group and the content of the received message as a string. /// The token has had cancellation requested. public static (string, string) ReceiveString(this IGroupInSocket socket, Encoding encoding, CancellationToken cancellationToken = default) { var msg = new Msg(); msg.InitEmpty(); try { socket.Receive(ref msg, cancellationToken); var group = msg.Group; var str = msg.Size > 0 ? msg.GetString(encoding) : string.Empty; return (group, str); } finally { msg.Close(); } } #endregion #region Immediate /// /// Attempt to receive a string from , and decode using . /// If no message is immediately available, return false. /// /// The socket to receive from. /// The message group. /// The content of the received message as a string, or null if no message was available. /// true if a message was available, otherwise false. public static bool TryReceiveString(this IGroupInSocket socket, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out string? str) { return socket.TryReceiveString(TimeSpan.Zero, SendReceiveConstants.DefaultEncoding, out group, out str); } /// /// Attempt to receive a string from , and decode using . /// If no message is immediately available, return false. /// /// The socket to receive from. /// The encoding used to convert the data to a string. /// The message group. /// The content of the received message as a string, or null if no message was available. /// true if a message was available, otherwise false. public static bool TryReceiveString(this IGroupInSocket socket, Encoding encoding, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out string? str) { return socket.TryReceiveString(TimeSpan.Zero, encoding, out group, out str); } #endregion #region Timeout /// /// Attempt to receive a string from , and decode using . /// If no message is available within , return false. /// /// The socket to receive from. /// The maximum period of time to wait for a message to become available. /// The message group /// The content of the received message as a string, or null if no message was available. /// The token to monitor for cancellation requests. The default value is . /// true if a message was available, otherwise false. /// The method would return false if cancellation has had requested. public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out string? str, CancellationToken cancellationToken = default) { return socket.TryReceiveString(timeout, SendReceiveConstants.DefaultEncoding, out group, out str, cancellationToken); } /// /// Attempt to receive a string from , and decode using . /// If no message is available within , return false. /// /// The socket to receive from. /// The maximum period of time to wait for a message to become available. /// The encoding used to convert the data to a string. /// The message group /// The content of the received message as a string, or null if no message was available. /// The token to monitor for cancellation requests. The default value is . /// true if a message was available, otherwise false. /// The method would return false if cancellation has had requested. public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout, Encoding encoding, [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out string? str, CancellationToken cancellationToken = default) { var msg = new Msg(); msg.InitEmpty(); if (socket.TryReceive(ref msg, timeout, cancellationToken)) { group = msg.Group; try { str = msg.Size > 0 ? msg.GetString(encoding) : string.Empty; return true; } finally { msg.Close(); } } str = null; group = null; msg.Close(); return false; } #endregion #region Async /// /// Receive a string from asynchronously. /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// Tuple of group and a string /// The token has had cancellation requested. public static ValueTask<(string, string)> ReceiveStringAsync(this IGroupInSocket socket, CancellationToken cancellationToken = default) { if (TryReceiveString(socket, out var group, out var msg)) return new ValueTask<(string, string)>((group, msg)); // TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously return new ValueTask<(string, string)>(Task.Factory.StartNew(() => socket.ReceiveString(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)); } #endregion #region AsyncEnumerable #if NETSTANDARD2_1 /// /// Provides a consuming IAsyncEnumerable for receiving messages from the socket. /// /// The socket to receive from. /// The token to monitor for cancellation requests. The default value is . /// An IAsyncEnumerable that receive and returns messages from the socket. /// The token has had cancellation requested. public static async IAsyncEnumerable<(string, string)> ReceiveStringAsyncEnumerable( this IGroupInSocket socket, [EnumeratorCancellation] CancellationToken cancellationToken = default) { while (true) { yield return await socket.ReceiveStringAsync(cancellationToken); } } #endif #endregion #endregion } }