AsyncReceiveExtensions.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. #if NETSTANDARD2_0 || NETSTANDARD2_1 || NET47
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace NetMQ
  8. {
  9. /// <summary>
  10. /// Provides extension methods for the <see cref="NetMQSocket"/>,
  11. /// via which messages may be received asynchronously.
  12. /// </summary>
  13. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "MemberCanBePrivate.Global")]
  14. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "UnusedMember.Global")]
  15. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "UnusedMethodReturnValue.Global")]
  16. public static class AsyncReceiveExtensions
  17. {
  18. static Task<bool> s_trueTask = Task.FromResult(true);
  19. static Task<bool> s_falseTask = Task.FromResult(false);
  20. #region Receiving frames as a multipart message
  21. /// <summary>
  22. /// Receive a single frame from <paramref name="socket"/>, asynchronously.
  23. /// </summary>
  24. /// <param name="socket">The socket to receive from.</param>
  25. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  26. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  27. /// an extra allocation will occur, but the result will still be correct.</param>
  28. /// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
  29. /// <returns>The content of the received message.</returns>
  30. public static async Task<NetMQMessage> ReceiveMultipartMessageAsync(
  31. this NetMQSocket socket,
  32. int expectedFrameCount = 4,
  33. CancellationToken cancellationToken = default(CancellationToken))
  34. {
  35. var message = new NetMQMessage(expectedFrameCount);
  36. while (true)
  37. {
  38. (byte[] bytes, bool more) = await socket.ReceiveFrameBytesAsync(cancellationToken);
  39. message.Append(bytes);
  40. if (!more)
  41. {
  42. break;
  43. }
  44. }
  45. return message;
  46. }
  47. #endregion
  48. #region Receiving a frame as a byte array
  49. /// <summary>
  50. /// Receive a single frame from <paramref name="socket"/>, asynchronously.
  51. /// </summary>
  52. /// <param name="socket">The socket to receive from.</param>
  53. /// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
  54. /// <returns>The content of the received message frame and boolean indicate if another frame of the same message follows.</returns>
  55. public static Task<(byte[], bool)> ReceiveFrameBytesAsync(
  56. this NetMQSocket socket,
  57. CancellationToken cancellationToken = default(CancellationToken)
  58. )
  59. {
  60. if (NetMQRuntime.Current == null)
  61. throw new InvalidOperationException("NetMQRuntime must be created before calling async functions");
  62. socket.AttachToRuntime();
  63. var msg = new Msg();
  64. msg.InitEmpty();
  65. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  66. {
  67. var data = msg.CloneData();
  68. bool more = msg.HasMore;
  69. msg.Close();
  70. return Task.FromResult((data, more));
  71. }
  72. TaskCompletionSource<(byte[], bool)> source = new TaskCompletionSource<(byte[], bool)>();
  73. CancellationTokenRegistration? registration = null;
  74. if (cancellationToken.CanBeCanceled)
  75. {
  76. registration = cancellationToken.Register(PropagateCancel);
  77. }
  78. void Listener(object sender, NetMQSocketEventArgs args)
  79. {
  80. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  81. {
  82. var data = msg.CloneData();
  83. bool more = msg.HasMore;
  84. msg.Close();
  85. socket.ReceiveReady -= Listener;
  86. registration?.Dispose();
  87. source.TrySetResult((data, more));
  88. }
  89. }
  90. void PropagateCancel()
  91. {
  92. socket.ReceiveReady -= Listener;
  93. registration?.Dispose();
  94. source.TrySetCanceled();
  95. }
  96. socket.ReceiveReady += Listener;
  97. return source.Task;
  98. }
  99. #endregion
  100. #region Receiving a frame as a string
  101. /// <summary>
  102. /// Receive a single frame from <paramref name="socket"/>, asynchronously, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  103. /// </summary>
  104. /// <param name="socket">The socket to receive from.</param>
  105. /// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
  106. /// <returns>The content of the received message frame as a string and a boolean indicate if another frame of the same message follows.</returns>
  107. public static Task<(string, bool)> ReceiveFrameStringAsync(
  108. this NetMQSocket socket,
  109. CancellationToken cancellationToken = default(CancellationToken)
  110. )
  111. {
  112. return socket.ReceiveFrameStringAsync(SendReceiveConstants.DefaultEncoding, cancellationToken);
  113. }
  114. /// <summary>
  115. /// Receive a single frame from <paramref name="socket"/>, asynchronously, and decode as a string using <paramref name="encoding"/>.
  116. /// </summary>
  117. /// <param name="socket">The socket to receive from.</param>
  118. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  119. /// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
  120. /// <returns>The content of the received message frame as a string and boolean indicate if another frame of the same message follows..</returns>
  121. public static Task<(string, bool)> ReceiveFrameStringAsync(
  122. this NetMQSocket socket,
  123. Encoding encoding,
  124. CancellationToken cancellationToken = default(CancellationToken))
  125. {
  126. if (NetMQRuntime.Current == null)
  127. throw new InvalidOperationException("NetMQRuntime must be created before calling async functions");
  128. socket.AttachToRuntime();
  129. var msg = new Msg();
  130. msg.InitEmpty();
  131. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  132. {
  133. var str = msg.Size > 0
  134. ? msg.GetString(encoding)
  135. : string.Empty;
  136. msg.Close();
  137. return Task.FromResult((str, msg.HasMore));
  138. }
  139. TaskCompletionSource<(string, bool)> source = new TaskCompletionSource<(string,bool)>();
  140. CancellationTokenRegistration? registration = null;
  141. if (cancellationToken.CanBeCanceled)
  142. {
  143. registration = cancellationToken.Register(PropagateCancel);
  144. }
  145. void Listener(object sender, NetMQSocketEventArgs args)
  146. {
  147. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  148. {
  149. var str = msg.Size > 0
  150. ? msg.GetString(encoding)
  151. : string.Empty;
  152. bool more = msg.HasMore;
  153. msg.Close();
  154. socket.ReceiveReady -= Listener;
  155. registration?.Dispose();
  156. source.TrySetResult((str, more));
  157. }
  158. }
  159. void PropagateCancel()
  160. {
  161. socket.ReceiveReady -= Listener;
  162. registration?.Dispose();
  163. source.TrySetCanceled();
  164. }
  165. socket.ReceiveReady += Listener;
  166. return source.Task;
  167. }
  168. #endregion
  169. #region Skipping a message
  170. /// <summary>
  171. /// Receive a single frame from <paramref name="socket"/>, asynchronously, then ignore its content.
  172. /// </summary>
  173. /// <param name="socket">The socket to receive from.</param>
  174. /// <param name="cancellationToken">The token used to propagate notification that this operation should be canceled.</param>
  175. /// <returns>Boolean indicate if another frame of the same message follows</returns>
  176. public static Task<bool> SkipFrameAsync(
  177. this NetMQSocket socket,
  178. CancellationToken cancellationToken = default(CancellationToken)
  179. )
  180. {
  181. if (NetMQRuntime.Current == null)
  182. throw new InvalidOperationException("NetMQRuntime must be created before calling async functions");
  183. socket.AttachToRuntime();
  184. var msg = new Msg();
  185. msg.InitEmpty();
  186. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  187. {
  188. bool more = msg.HasMore;
  189. msg.Close();
  190. return more ? s_trueTask : s_falseTask;
  191. }
  192. TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
  193. CancellationTokenRegistration? registration = null;
  194. if (cancellationToken.CanBeCanceled)
  195. {
  196. registration = cancellationToken.Register(PropagateCancel);
  197. }
  198. void Listener(object sender, NetMQSocketEventArgs args)
  199. {
  200. if (socket.TryReceive(ref msg, TimeSpan.Zero))
  201. {
  202. bool more = msg.HasMore;
  203. msg.Close();
  204. socket.ReceiveReady -= Listener;
  205. registration?.Dispose();
  206. source.TrySetResult(more);
  207. }
  208. }
  209. void PropagateCancel()
  210. {
  211. socket.ReceiveReady -= Listener;
  212. registration?.Dispose();
  213. source.TrySetCanceled();
  214. }
  215. socket.ReceiveReady += Listener;
  216. return source.Task;
  217. }
  218. #endregion
  219. #region Skipping all frames of a multipart message
  220. /// <summary>
  221. /// Receive all frames of the next message from <paramref name="socket"/>, asynchronously, then ignore their contents.
  222. /// </summary>
  223. /// <param name="socket">The socket to receive from.</param>
  224. public static async Task SkipMultipartMessageAsync(this NetMQSocket socket)
  225. {
  226. while (true)
  227. {
  228. bool more = await socket.SkipFrameAsync();
  229. if (!more)
  230. break;
  231. }
  232. }
  233. #endregion
  234. #region Receiving a routing key
  235. /// <summary>
  236. /// Receive a routing-key from <paramref name="socket"/>, blocking until one arrives.
  237. /// </summary>
  238. /// <param name="socket">The socket to receive from.</param>
  239. /// <returns>The routing key and a boolean indicate if another frame of the same message follows.</returns>
  240. public static async Task<(RoutingKey, bool)> ReceiveRoutingKeyAsync(this NetMQSocket socket)
  241. {
  242. var (bytes, more) = await socket.ReceiveFrameBytesAsync();
  243. return (new RoutingKey(bytes), more);
  244. }
  245. #endregion
  246. }
  247. }
  248. #endif