GroupSocketExtensions.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics.CodeAnalysis;
  4. using System.Runtime.CompilerServices;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using NetMQ.Utils;
  9. namespace NetMQ
  10. {
  11. /// <summary>
  12. /// Send and Receive extensions for sockets with group capability (ServerSocket)
  13. /// </summary>
  14. public static class GroupSocketExtensions
  15. {
  16. #region Sending Byte Array
  17. #region Blocking
  18. /// <summary>
  19. /// Transmit a byte-array of data over this socket, block until message is sent.
  20. /// </summary>
  21. /// <param name="socket">the socket to transmit on</param>
  22. /// <param name="group">The group to send the message to.</param>
  23. /// <param name="data">the byte-array of data to send</param>
  24. public static void Send(this IGroupOutSocket socket, string group, byte[] data)
  25. {
  26. Send(socket, group, data, data.Length);
  27. }
  28. /// <summary>
  29. /// Transmit a byte-array of data over this socket, block until frame is sent.
  30. /// </summary>
  31. /// <param name="socket">the socket to transmit on</param>
  32. /// <param name="group">The group to send the message to.</param>
  33. /// <param name="data">the byte-array of data to send</param>
  34. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  35. public static void Send(this IGroupOutSocket socket, string group, byte[] data, int length)
  36. {
  37. var msg = new Msg();
  38. msg.InitPool(length);
  39. msg.Group = group;
  40. data.Slice(0, length).CopyTo(msg);
  41. socket.Send(ref msg);
  42. msg.Close();
  43. }
  44. #endregion
  45. #region Timeout
  46. /// <summary>
  47. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  48. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  49. /// </summary>
  50. /// <param name="socket">the socket to transmit on</param>
  51. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  52. /// <param name="group">The group to send the message to.</param>
  53. /// <param name="data">the byte-array of data to send</param>
  54. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  55. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  56. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data, int length)
  57. {
  58. var msg = new Msg();
  59. msg.InitPool(length);
  60. msg.Group = group;
  61. data.CopyTo(msg);
  62. if (!socket.TrySend(ref msg, timeout))
  63. {
  64. msg.Close();
  65. return false;
  66. }
  67. msg.Close();
  68. return true;
  69. }
  70. /// <summary>
  71. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  72. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  73. /// </summary>
  74. /// <param name="socket">the socket to transmit on</param>
  75. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  76. /// <param name="group">The group to send the message to.</param>
  77. /// <param name="data">the byte-array of data to send</param>
  78. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  79. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, byte[] data)
  80. {
  81. return TrySend(socket, timeout, group, data, data.Length);
  82. }
  83. #endregion
  84. #region Immediate
  85. /// <summary>
  86. /// Attempt to transmit a byte-array of data on <paramref name="socket"/>.
  87. /// If message cannot be sent immediately, return <c>false</c>.
  88. /// </summary>
  89. /// <param name="socket">the socket to transmit on</param>
  90. /// <param name="group">The group to send the message to.</param>
  91. /// <param name="data">the byte-array of data to send</param>
  92. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  93. public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data)
  94. {
  95. return TrySend(socket, TimeSpan.Zero, group, data);
  96. }
  97. /// <summary>
  98. /// Attempt to transmit a single frame on <paramref name="socket"/>.
  99. /// If message cannot be sent immediately, return <c>false</c>.
  100. /// </summary>
  101. /// <param name="socket">the socket to transmit on</param>
  102. /// <param name="group">The group to send the message to.</param>
  103. /// <param name="data">the byte-array of data to send</param>
  104. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  105. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  106. public static bool TrySend(this IGroupOutSocket socket, string group, byte[] data, int length)
  107. {
  108. return TrySend(socket, TimeSpan.Zero, group, data, length);
  109. }
  110. #endregion
  111. #region Async
  112. /// <summary>
  113. /// Transmit a byte-array of data over this socket asynchronously.
  114. /// </summary>
  115. /// <param name="socket">the socket to transmit on</param>
  116. /// <param name="group">The group to send the message to.</param>
  117. /// <param name="data">the byte-array of data to send</param>
  118. public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data)
  119. {
  120. if (socket.TrySend(group, data))
  121. return new ValueTask();
  122. return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data),
  123. TaskCreationOptions.LongRunning));
  124. }
  125. /// <summary>
  126. /// Transmit a byte-array of data over this socket asynchronously.
  127. /// </summary>
  128. /// <param name="socket">the socket to transmit on</param>
  129. /// <param name="group">The group to send the message to.</param>
  130. /// <param name="data">the byte-array of data to send</param>
  131. /// <param name="length">the number of bytes to send from <paramref name="data"/>.</param>
  132. public static ValueTask SendAsync(this IGroupOutSocket socket, string group, byte[] data, int length)
  133. {
  134. if (socket.TrySend(group, data, length))
  135. return new ValueTask();
  136. return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, data, length),
  137. TaskCreationOptions.LongRunning));
  138. }
  139. #endregion
  140. #endregion
  141. #region Sending Strings
  142. #region Blocking
  143. /// <summary>
  144. /// Transmit a string over this socket, block until message is sent.
  145. /// </summary>
  146. /// <param name="socket">the socket to transmit on</param>
  147. /// <param name="group">The group to send the message to.</param>
  148. /// <param name="message">the string to send</param>
  149. public static void Send(this IGroupOutSocket socket, string group, string message)
  150. {
  151. var msg = new Msg();
  152. // Count the number of bytes required to encode the string.
  153. // Note that non-ASCII strings may not have an equal number of characters
  154. // and bytes. The encoding must be queried for this answer.
  155. // With this number, request a buffer from the pool.
  156. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
  157. msg.Group = group;
  158. // Encode the string into the buffer
  159. SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
  160. socket.Send(ref msg);
  161. msg.Close();
  162. }
  163. #endregion
  164. #region Timeout
  165. /// <summary>
  166. /// Attempt to transmit a single string frame on <paramref name="socket"/>.
  167. /// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
  168. /// </summary>
  169. /// <param name="socket">the socket to transmit on</param>
  170. /// <param name="timeout">The maximum period of time to try to send a message.</param>
  171. /// <param name="group">The group to send the message to.</param>
  172. /// <param name="message">the string to send</param>
  173. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  174. public static bool TrySend(this IGroupOutSocket socket, TimeSpan timeout, string group, string message)
  175. {
  176. var msg = new Msg();
  177. // Count the number of bytes required to encode the string.
  178. // Note that non-ASCII strings may not have an equal number of characters
  179. // and bytes. The encoding must be queried for this answer.
  180. // With this number, request a buffer from the pool.
  181. msg.InitPool(SendReceiveConstants.DefaultEncoding.GetByteCount(message));
  182. msg.Group = group;
  183. // Encode the string into the buffer
  184. SendReceiveConstants.DefaultEncoding.GetBytes(message, msg);
  185. if (!socket.TrySend(ref msg, timeout))
  186. {
  187. msg.Close();
  188. return false;
  189. }
  190. msg.Close();
  191. return true;
  192. }
  193. #endregion
  194. #region Immediate
  195. /// <summary>
  196. /// Attempt to transmit a single string frame on <paramref name="socket"/>.
  197. /// If message cannot be sent immediately, return <c>false</c>.
  198. /// </summary>
  199. /// <param name="socket">the socket to transmit on</param>
  200. /// <param name="group">The group to send the message to.</param>
  201. /// <param name="message">the string to send</param>
  202. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  203. public static bool TrySend(this IGroupOutSocket socket, string group, string message)
  204. {
  205. return TrySend(socket, TimeSpan.Zero, group, message);
  206. }
  207. #endregion
  208. #region Async
  209. /// <summary>
  210. /// Transmit a string over this socket asynchronously.
  211. /// </summary>
  212. /// <param name="socket">the socket to transmit on</param>
  213. /// <param name="group">The group to send the message to.</param>
  214. /// <param name="message">the string to send</param>
  215. public static ValueTask SendAsync(this IGroupOutSocket socket, string group, string message)
  216. {
  217. if (socket.TrySend(group, message))
  218. return new ValueTask();
  219. return new ValueTask(Task.Factory.StartNew(() => Send(socket, group, message),
  220. TaskCreationOptions.LongRunning));
  221. }
  222. #endregion
  223. #endregion
  224. #region Receiving byte array
  225. #region Blocking
  226. /// <summary>
  227. /// Receive a bytes from <paramref name="socket"/>, blocking until one arrives.
  228. /// </summary>
  229. /// <param name="socket">The socket to receive from.</param>
  230. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  231. /// <returns>Tuple of group and received bytes</returns>
  232. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  233. public static (string, byte[]) ReceiveBytes(this IGroupInSocket socket,
  234. CancellationToken cancellationToken = default)
  235. {
  236. var msg = new Msg();
  237. msg.InitEmpty();
  238. try
  239. {
  240. socket.Receive(ref msg, cancellationToken);
  241. var data = msg.CloneData();
  242. var group = msg.Group;
  243. return (group, data);
  244. }
  245. finally
  246. {
  247. msg.Close();
  248. }
  249. }
  250. #endregion
  251. #region Immediate
  252. /// <summary>
  253. /// Attempt to receive a byte-array <paramref name="socket"/>.
  254. /// If no message is immediately available, return <c>false</c>.
  255. /// </summary>
  256. /// <param name="socket">The socket to receive from.</param>
  257. /// <param name="group">The message group</param>
  258. /// <param name="bytes">The content of the received message, or <c>null</c> if no message was available.</param>
  259. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  260. public static bool TryReceiveBytes(this IGroupInSocket socket,
  261. [NotNullWhen(returnValue: true)] out string? group, [NotNullWhen(returnValue: true)] out byte[]? bytes)
  262. {
  263. return socket.TryReceiveBytes(TimeSpan.Zero, out group, out bytes);
  264. }
  265. #endregion
  266. #region Timeout
  267. /// <summary>
  268. /// Attempt to receive a byte-array <paramref name="socket"/>.
  269. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  270. /// </summary>
  271. /// <param name="socket">The socket to receive from.</param>
  272. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  273. /// <param name="group">The message group.</param>
  274. /// <param name="bytes">The content of the received message, or <c>null</c> if no message was available.</param>
  275. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  276. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  277. /// <remarks>The method would return false if cancellation has had requested.</remarks>
  278. public static bool TryReceiveBytes(this IGroupInSocket socket, TimeSpan timeout,
  279. [NotNullWhen(returnValue: true)] out string? group,
  280. [NotNullWhen(returnValue: true)] out byte[]? bytes, CancellationToken cancellationToken = default)
  281. {
  282. var msg = new Msg();
  283. msg.InitEmpty();
  284. if (!socket.TryReceive(ref msg, timeout, cancellationToken))
  285. {
  286. msg.Close();
  287. bytes = null;
  288. group = null;
  289. return false;
  290. }
  291. bytes = msg.CloneData();
  292. group = msg.Group;
  293. msg.Close();
  294. return true;
  295. }
  296. #endregion
  297. #region Async
  298. /// <summary>
  299. /// Receive a bytes from <paramref name="socket"/> asynchronously.
  300. /// </summary>
  301. /// <param name="socket">The socket to receive from.</param>
  302. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  303. /// <returns>Tuple of group and received bytes</returns>
  304. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  305. public static ValueTask<(string, byte[])> ReceiveBytesAsync(this IGroupInSocket socket,
  306. CancellationToken cancellationToken = default)
  307. {
  308. if (TryReceiveBytes(socket, out var group, out var bytes))
  309. return new ValueTask<(string, byte[])>((group, bytes));
  310. // TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
  311. return new ValueTask<(string, byte[])>(Task.Factory.StartNew(() => socket.ReceiveBytes(cancellationToken),
  312. cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
  313. }
  314. #endregion
  315. #region AsyncEnumerable
  316. #if NETSTANDARD2_1
  317. /// <summary>
  318. /// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
  319. /// </summary>
  320. /// <param name="socket">The socket to receive from.</param>
  321. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  322. /// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
  323. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  324. public static async IAsyncEnumerable<(string, byte[])> ReceiveBytesAsyncEnumerable(
  325. this IGroupInSocket socket,
  326. [EnumeratorCancellation] CancellationToken cancellationToken = default)
  327. {
  328. while (true)
  329. {
  330. yield return await socket.ReceiveBytesAsync(cancellationToken);
  331. }
  332. }
  333. #endif
  334. #endregion
  335. #endregion
  336. #region Receiving string
  337. #region Blocking
  338. /// <summary>
  339. /// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  340. /// </summary>
  341. /// <param name="socket">The socket to receive from.</param>
  342. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  343. /// <returns>Tuple of group and the content of the received message as a string.</returns>
  344. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  345. public static (string, string) ReceiveString(this IGroupInSocket socket,
  346. CancellationToken cancellationToken = default)
  347. {
  348. return socket.ReceiveString(SendReceiveConstants.DefaultEncoding, cancellationToken);
  349. }
  350. /// <summary>
  351. /// Receive a string from <paramref name="socket"/>, blocking until one arrives, and decode using <paramref name="encoding"/>.
  352. /// </summary>
  353. /// <param name="socket">The socket to receive from.</param>
  354. /// <param name="encoding">The encoding used to convert the data to a string.</param>
  355. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  356. /// <returns>Tuple of group and the content of the received message as a string.</returns>
  357. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  358. public static (string, string) ReceiveString(this IGroupInSocket socket, Encoding encoding,
  359. CancellationToken cancellationToken = default)
  360. {
  361. var msg = new Msg();
  362. msg.InitEmpty();
  363. try
  364. {
  365. socket.Receive(ref msg, cancellationToken);
  366. var group = msg.Group;
  367. var str = msg.Size > 0
  368. ? msg.GetString(encoding)
  369. : string.Empty;
  370. return (group, str);
  371. }
  372. finally
  373. {
  374. msg.Close();
  375. }
  376. }
  377. #endregion
  378. #region Immediate
  379. /// <summary>
  380. /// Attempt to receive a string from <paramref name="socket"/>, and decode using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  381. /// If no message is immediately available, return <c>false</c>.
  382. /// </summary>
  383. /// <param name="socket">The socket to receive from.</param>
  384. /// <param name="group">The message group.</param>
  385. /// <param name="str">The content of the received message as a string, or <c>null</c> if no message was available.</param>
  386. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  387. public static bool TryReceiveString(this IGroupInSocket socket,
  388. [NotNullWhen(returnValue: true)] out string? group,
  389. [NotNullWhen(returnValue: true)] out string? str)
  390. {
  391. return socket.TryReceiveString(TimeSpan.Zero, SendReceiveConstants.DefaultEncoding, out group, out str);
  392. }
  393. /// <summary>
  394. /// Attempt to receive a string from <paramref name="socket"/>, and decode using <paramref name="encoding"/>.
  395. /// If no message is immediately available, return <c>false</c>.
  396. /// </summary>
  397. /// <param name="socket">The socket to receive from.</param>
  398. /// <param name="encoding">The encoding used to convert the data to a string.</param>
  399. /// <param name="group">The message group.</param>
  400. /// <param name="str">The content of the received message as a string, or <c>null</c> if no message was available.</param>
  401. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  402. public static bool TryReceiveString(this IGroupInSocket socket, Encoding encoding,
  403. [NotNullWhen(returnValue: true)] out string? group,
  404. [NotNullWhen(returnValue: true)] out string? str)
  405. {
  406. return socket.TryReceiveString(TimeSpan.Zero, encoding, out group, out str);
  407. }
  408. #endregion
  409. #region Timeout
  410. /// <summary>
  411. /// Attempt to receive a string from <paramref name="socket"/>, and decode using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  412. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  413. /// </summary>
  414. /// <param name="socket">The socket to receive from.</param>
  415. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  416. /// <param name="group">The message group</param>
  417. /// <param name="str">The content of the received message as a string, or <c>null</c> if no message was available.</param>
  418. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  419. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  420. /// <remarks>The method would return false if cancellation has had requested.</remarks>
  421. public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout,
  422. [NotNullWhen(returnValue: true)] out string? group,
  423. [NotNullWhen(returnValue: true)] out string? str,
  424. CancellationToken cancellationToken = default)
  425. {
  426. return socket.TryReceiveString(timeout, SendReceiveConstants.DefaultEncoding, out group, out str,
  427. cancellationToken);
  428. }
  429. /// <summary>
  430. /// Attempt to receive a string from <paramref name="socket"/>, and decode using <paramref name="encoding"/>.
  431. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  432. /// </summary>
  433. /// <param name="socket">The socket to receive from.</param>
  434. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  435. /// <param name="encoding">The encoding used to convert the data to a string.</param>
  436. /// <param name="group">The message group</param>
  437. /// <param name="str">The content of the received message as a string, or <c>null</c> if no message was available.</param>
  438. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  439. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  440. /// <remarks>The method would return false if cancellation has had requested.</remarks>
  441. public static bool TryReceiveString(this IGroupInSocket socket, TimeSpan timeout,
  442. Encoding encoding,
  443. [NotNullWhen(returnValue: true)] out string? group,
  444. [NotNullWhen(returnValue: true)] out string? str,
  445. CancellationToken cancellationToken = default)
  446. {
  447. var msg = new Msg();
  448. msg.InitEmpty();
  449. if (socket.TryReceive(ref msg, timeout, cancellationToken))
  450. {
  451. group = msg.Group;
  452. try
  453. {
  454. str = msg.Size > 0
  455. ? msg.GetString(encoding)
  456. : string.Empty;
  457. return true;
  458. }
  459. finally
  460. {
  461. msg.Close();
  462. }
  463. }
  464. str = null;
  465. group = null;
  466. msg.Close();
  467. return false;
  468. }
  469. #endregion
  470. #region Async
  471. /// <summary>
  472. /// Receive a string from <paramref name="socket"/> asynchronously.
  473. /// </summary>
  474. /// <param name="socket">The socket to receive from.</param>
  475. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  476. /// <returns>Tuple of group and a string</returns>
  477. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  478. public static ValueTask<(string, string)> ReceiveStringAsync(this IGroupInSocket socket,
  479. CancellationToken cancellationToken = default)
  480. {
  481. if (TryReceiveString(socket, out var group, out var msg))
  482. return new ValueTask<(string, string)>((group, msg));
  483. // TODO: this is a hack, eventually we need kind of IO ThreadPool for thread-safe socket to wait on asynchronously
  484. return new ValueTask<(string, string)>(Task.Factory.StartNew(() => socket.ReceiveString(cancellationToken),
  485. cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
  486. }
  487. #endregion
  488. #region AsyncEnumerable
  489. #if NETSTANDARD2_1
  490. /// <summary>
  491. /// Provides a consuming IAsyncEnumerable for receiving messages from the socket.
  492. /// </summary>
  493. /// <param name="socket">The socket to receive from.</param>
  494. /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
  495. /// <returns>An IAsyncEnumerable that receive and returns messages from the socket.</returns>
  496. /// <exception cref="System.OperationCanceledException">The token has had cancellation requested.</exception>
  497. public static async IAsyncEnumerable<(string, string)> ReceiveStringAsyncEnumerable(
  498. this IGroupInSocket socket,
  499. [EnumeratorCancellation] CancellationToken cancellationToken = default)
  500. {
  501. while (true)
  502. {
  503. yield return await socket.ReceiveStringAsync(cancellationToken);
  504. }
  505. }
  506. #endif
  507. #endregion
  508. #endregion
  509. }
  510. }