SendThreadSafeSocketExtensions.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. using System;
  2. using System.Threading.Tasks;
  3. using NetMQ.Utils;
  4. namespace NetMQ
  5. {
  6. /// <summary>
  7. /// Send extension methods for thread-safe sockets that support sending
  8. /// </summary>
  9. public static class SendThreadSafeSocketExtensions
  10. {
  11. #region Sending Byte Array
  12. #region Blocking
  13. /// <summary>
  14. /// Transmit a byte-array of data over this socket, block until message is sent.
  15. /// </summary>
  16. /// <param name="socket">the socket to transmit on</param>
  17. /// <param name="data">the byte-array of data to send</param>
  18. public static void Send(this IThreadSafeOutSocket socket, byte[] data)
  19. {
  20. Send(socket, data, data.Length);
  21. }
  22. /// <summary>
  23. /// Transmit a byte-array of data over this socket, block until message is sent.
  24. /// </summary>
  25. /// <param name="socket">the socket to transmit on</param>
  26. /// <param name="data">the byte-array of data to send</param>
  27. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  28. public static void Send(this IThreadSafeOutSocket socket, byte[] data, int length)
  29. {
  30. var msg = new Msg();
  31. msg.InitPool(length);
  32. data.Slice(0, length).CopyTo(msg);
  33. socket.Send(ref msg);
  34. msg.Close();
  35. }
  36. #endregion
  37. #region Timeout
  38. /// <summary>
  39. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  40. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  41. /// </summary>
  42. /// <param name="socket">the socket to transmit on</param>
  43. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  44. /// <param name="data">the byte-array of data to send</param>
  45. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  46. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  47. public static bool TrySend(this IThreadSafeOutSocket socket, TimeSpan timeout, byte[] data, int length)
  48. {
  49. var msg = new Msg();
  50. msg.InitPool(length);
  51. data.CopyTo(msg);
  52. if (!socket.TrySend(ref msg, timeout))
  53. {
  54. msg.Close();
  55. return false;
  56. }
  57. msg.Close();
  58. return true;
  59. }
  60. /// <summary>
  61. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  62. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  63. /// </summary>
  64. /// <param name="socket">the socket to transmit on</param>
  65. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  66. /// <param name="data">the byte-array of data to send</param>
  67. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  68. public static bool TrySend(this IThreadSafeOutSocket socket, TimeSpan timeout, byte[] data)
  69. {
  70. return TrySend(socket, timeout, data, data.Length);
  71. }
  72. #endregion
  73. #region Immediate
  74. /// <summary>
  75. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  76. /// If message cannot be sent immediately, return <c>false</c>.
  77. /// </summary>
  78. /// <param name="socket">the socket to transmit on</param>
  79. /// <param name="data">the byte-array of data to send</param>
  80. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  81. public static bool TrySend(this IThreadSafeOutSocket socket, byte[] data)
  82. {
  83. return TrySend(socket, TimeSpan.Zero, data);
  84. }
  85. /// <summary>
  86. /// Attempt to transmit a byte-array on <paramref name="socket"/>.
  87. /// If message cannot be sent immediately, return <c>false</c>.
  88. /// </summary>
  89. /// <param name="socket">the socket to transmit on</param>
  90. /// <param name="data">the byte-array of data to send</param>
  91. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  92. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  93. public static bool TrySend(this IThreadSafeOutSocket socket, byte[] data, int length)
  94. {
  95. return TrySend(socket, TimeSpan.Zero, data, length);
  96. }
  97. #endregion
  98. #region Async
  99. /// <summary>
  100. /// Transmit a byte-array of data over this socket asynchronously.
  101. /// </summary>
  102. /// <param name="socket">the socket to transmit on</param>
  103. /// <param name="data">the byte-array of data to send</param>
  104. public static ValueTask SendAsync(this IThreadSafeOutSocket socket, byte[] data)
  105. {
  106. if (socket.TrySend(data))
  107. return new ValueTask();
  108. return new ValueTask(Task.Factory.StartNew(() => Send(socket, data), TaskCreationOptions.LongRunning));
  109. }
  110. /// <summary>
  111. /// Transmit a byte-array of data over this socket asynchronously.
  112. /// </summary>
  113. /// <param name="socket">the socket to transmit on</param>
  114. /// <param name="data">the byte-array of data to send</param>
  115. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  116. public static ValueTask SendAsync(this IThreadSafeOutSocket socket, byte[] data, int length)
  117. {
  118. if (socket.TrySend(data, length))
  119. return new ValueTask();
  120. return new ValueTask(Task.Factory.StartNew(() => Send(socket, data, length), TaskCreationOptions.LongRunning));
  121. }
  122. #endregion
  123. #endregion
  124. #region Sending Strings
  125. #region Blocking
  126. /// <summary>
  127. /// Transmit a string over this socket, block until message is sent.
  128. /// </summary>
  129. /// <param name="socket">the socket to transmit on</param>
  130. /// <param name="message">the string to send</param>
  131. public static void Send(this IThreadSafeOutSocket socket, string message)
  132. {
  133. var msg = new Msg();
  134. // Count the number of bytes required to encode the string.
  135. // Note that non-ASCII strings may not have an equal number of characters
  136. // and bytes. The encoding must be queried for this answer.
  137. // With this number, request a buffer from the pool.
  138. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
  139. // Encode the string into the buffer
  140. SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
  141. socket.Send(ref msg);
  142. msg.Close();
  143. }
  144. #endregion
  145. #region Timeout
  146. /// <summary>
  147. /// Attempt to transmit a single string on <paramref name="socket"/>.
  148. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  149. /// </summary>
  150. /// <param name="socket">the socket to transmit on</param>
  151. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  152. /// <param name="message">the string to send</param>
  153. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  154. public static bool TrySend(this IThreadSafeOutSocket socket, TimeSpan timeout, string message)
  155. {
  156. var msg = new Msg();
  157. // Count the number of bytes required to encode the string.
  158. // Note that non-ASCII strings may not have an equal number of characters
  159. // and bytes. The encoding must be queried for this answer.
  160. // With this number, request a buffer from the pool.
  161. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
  162. // Encode the string into the buffer
  163. SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
  164. if (!socket.TrySend(ref msg, timeout))
  165. {
  166. msg.Close();
  167. return false;
  168. }
  169. msg.Close();
  170. return true;
  171. }
  172. #endregion
  173. #region Immediate
  174. /// <summary>
  175. /// Attempt to transmit a single string on <paramref name="socket"/>.
  176. /// If message cannot be sent immediately, return <c>false</c>.
  177. /// </summary>
  178. /// <param name="socket">the socket to transmit on</param>
  179. /// <param name="message">the string to send</param>
  180. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  181. public static bool TrySend(this IThreadSafeOutSocket socket, string message)
  182. {
  183. return TrySend(socket, TimeSpan.Zero, message);
  184. }
  185. #endregion
  186. #region Async
  187. /// <summary>
  188. /// Transmit a string over this socket asynchronously.
  189. /// </summary>
  190. /// <param name="socket">the socket to transmit on</param>
  191. /// <param name="message">the string to send</param>
  192. public static ValueTask SendAsync(this IThreadSafeOutSocket socket, string message)
  193. {
  194. if (socket.TrySend(message))
  195. return new ValueTask();
  196. return new ValueTask(Task.Factory.StartNew(() => Send(socket, message), TaskCreationOptions.LongRunning));
  197. }
  198. #endregion
  199. #endregion
  200. }
  201. }