ReceivingSocketExtensions.cs 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Diagnostics.CodeAnalysis;
  5. using System.Text;
  6. namespace NetMQ
  7. {
  8. /// <summary>
  9. /// Provides extension methods for the <see cref="IReceivingSocket"/> interface,
  10. /// via which messages may be received in several ways.
  11. /// </summary>
  12. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "MemberCanBePrivate.Global")]
  13. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "UnusedMember.Global")]
  14. [System.Diagnostics.CodeAnalysis.SuppressMessage("ReSharper", "UnusedMethodReturnValue.Global")]
  15. public static class ReceivingSocketExtensions
  16. {
  17. /// <summary>
  18. /// Block until the next message arrives, then make the message's data available via <paramref name="msg"/>.
  19. /// </summary>
  20. /// <remarks>
  21. /// The call blocks until the next message arrives, and cannot be interrupted. This a convenient and safe when
  22. /// you know a message is available, such as for code within a <see cref="NetMQSocket.ReceiveReady"/> callback.
  23. /// </remarks>
  24. /// <param name="socket">The socket to receive from.</param>
  25. /// <param name="msg">An object to receive the message's data into.</param>
  26. public static void Receive(this IReceivingSocket socket, ref Msg msg)
  27. {
  28. var result = socket.TryReceive(ref msg, SendReceiveConstants.InfiniteTimeout);
  29. Debug.Assert(result);
  30. }
  31. #region Receiving a frame as a byte array
  32. #region Blocking
  33. /// <summary>
  34. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives.
  35. /// </summary>
  36. /// <param name="socket">The socket to receive from.</param>
  37. /// <returns>The content of the received message frame.</returns>
  38. public static byte[] ReceiveFrameBytes(this IReceivingSocket socket)
  39. {
  40. return socket.ReceiveFrameBytes(out bool more);
  41. }
  42. /// <summary>
  43. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives.
  44. /// Indicate whether further frames exist via <paramref name="more"/>.
  45. /// </summary>
  46. /// <param name="socket">The socket to receive from.</param>
  47. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  48. /// <returns>The content of the received message frame.</returns>
  49. public static byte[] ReceiveFrameBytes(this IReceivingSocket socket, out bool more)
  50. {
  51. var msg = new Msg();
  52. msg.InitEmpty();
  53. socket.Receive(ref msg);
  54. var data = msg.CloneData();
  55. more = msg.HasMore;
  56. msg.Close();
  57. return data;
  58. }
  59. #endregion
  60. #region Immediate
  61. /// <summary>
  62. /// Attempt to receive a single frame from <paramref name="socket"/>.
  63. /// If no message is immediately available, return <c>false</c>.
  64. /// </summary>
  65. /// <param name="socket">The socket to receive from.</param>
  66. /// <param name="bytes">The content of the received message frame, or <c>null</c> if no message was available.</param>
  67. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  68. public static bool TryReceiveFrameBytes(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out byte[]? bytes)
  69. {
  70. return socket.TryReceiveFrameBytes(out bytes, out bool more);
  71. }
  72. /// <summary>
  73. /// Attempt to receive a single frame from <paramref name="socket"/>.
  74. /// If no message is immediately available, return <c>false</c>.
  75. /// Indicate whether further frames exist via <paramref name="more"/>.
  76. /// </summary>
  77. /// <param name="socket">The socket to receive from.</param>
  78. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  79. /// <param name="bytes">The content of the received message frame, or <c>null</c> if no message was available.</param>
  80. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  81. public static bool TryReceiveFrameBytes(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out byte[]? bytes, out bool more)
  82. {
  83. return socket.TryReceiveFrameBytes(TimeSpan.Zero, out bytes, out more);
  84. }
  85. #endregion
  86. #region Timeout
  87. /// <summary>
  88. /// Attempt to receive a single frame from <paramref name="socket"/>.
  89. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  90. /// </summary>
  91. /// <param name="socket">The socket to receive from.</param>
  92. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  93. /// <param name="bytes">The content of the received message frame, or <c>null</c> if no message was available.</param>
  94. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  95. public static bool TryReceiveFrameBytes(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out byte[]? bytes)
  96. {
  97. return socket.TryReceiveFrameBytes(timeout, out bytes, out bool more);
  98. }
  99. /// <summary>
  100. /// Attempt to receive a single frame from <paramref name="socket"/>.
  101. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  102. /// Indicate whether further frames exist via <paramref name="more"/>.
  103. /// </summary>
  104. /// <param name="socket">The socket to receive from.</param>
  105. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  106. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  107. /// <param name="bytes">The content of the received message frame, or <c>null</c> if no message was available.</param>
  108. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  109. public static bool TryReceiveFrameBytes(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out byte[]? bytes, out bool more)
  110. {
  111. var msg = new Msg();
  112. msg.InitEmpty();
  113. if (!socket.TryReceive(ref msg, timeout))
  114. {
  115. msg.Close();
  116. bytes = null;
  117. more = false;
  118. return false;
  119. }
  120. bytes = msg.CloneData();
  121. more = msg.HasMore;
  122. msg.Close();
  123. return true;
  124. }
  125. #endregion
  126. #endregion
  127. #region Receiving a multipart message as byte arrays
  128. #region Blocking
  129. /// <summary>
  130. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until a message arrives.
  131. /// </summary>
  132. /// <param name="socket">The socket to receive from.</param>
  133. /// <param name="expectedFrameCount">Optional initial <see cref="List{T}.Capacity"/> for the returned <see cref="List{T}"/>.</param>
  134. /// <returns>All frames of a multipart message as a list having one or more items.</returns>
  135. public static List<byte[]> ReceiveMultipartBytes(this IReceivingSocket socket, int expectedFrameCount = 4)
  136. {
  137. var frames = new List<byte[]>(expectedFrameCount);
  138. socket.ReceiveMultipartBytes(ref frames);
  139. return frames;
  140. }
  141. /// <summary>
  142. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until a message arrives.
  143. /// </summary>
  144. /// <param name="socket">The socket to receive from.</param>
  145. /// <param name="frames">Reference to a list for return values. If <c>null</c> a new instance will be assigned, otherwise the provided list will be cleared and populated.</param>
  146. /// <param name="expectedFrameCount">Optional initial <see cref="List{T}.Capacity"/> for the returned <see cref="List{T}"/>.</param>
  147. public static void ReceiveMultipartBytes(this IReceivingSocket socket, [AllowNull] ref List<byte[]> frames, int expectedFrameCount = 4)
  148. {
  149. if (frames == null)
  150. frames = new List<byte[]>(expectedFrameCount);
  151. else
  152. frames.Clear();
  153. var msg = new Msg();
  154. msg.InitEmpty();
  155. do
  156. {
  157. socket.Receive(ref msg);
  158. frames.Add(msg.CloneData());
  159. }
  160. while (msg.HasMore);
  161. msg.Close();
  162. }
  163. #endregion
  164. #region Immediate
  165. /// <summary>
  166. /// Attempt to receive all frames of the next message from <paramref name="socket"/>.
  167. /// If no message is immediately available, return <c>false</c>.
  168. /// </summary>
  169. /// <param name="socket">The socket to receive from.</param>
  170. /// <param name="frames">Reference to a list for return values. If <c>null</c> a new instance will be assigned, otherwise the provided list will be cleared and populated.</param>
  171. /// <param name="expectedFrameCount">Optional initial <see cref="List{T}.Capacity"/> for the returned <see cref="List{T}"/>.</param>
  172. public static bool TryReceiveMultipartBytes(this IReceivingSocket socket, ref List<byte[]> frames, int expectedFrameCount = 4)
  173. {
  174. return socket.TryReceiveMultipartBytes(TimeSpan.Zero, ref frames, expectedFrameCount);
  175. }
  176. #endregion
  177. #region Timeout
  178. /// <summary>
  179. /// Attempt to receive all frames of the next message from <paramref name="socket"/>.
  180. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  181. /// </summary>
  182. /// <param name="socket">The socket to receive from.</param>
  183. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  184. /// <param name="frames">Reference to a list for return values. If <c>null</c> a new instance will be assigned, otherwise the provided list will be cleared and populated.</param>
  185. /// <param name="expectedFrameCount">Optional initial <see cref="List{T}.Capacity"/> for the returned <see cref="List{T}"/>.</param>
  186. public static bool TryReceiveMultipartBytes(this IReceivingSocket socket, TimeSpan timeout, ref List<byte[]> frames, int expectedFrameCount = 4)
  187. {
  188. var msg = new Msg();
  189. msg.InitEmpty();
  190. // Try to read the first frame
  191. if (!socket.TryReceive(ref msg, timeout))
  192. {
  193. msg.Close();
  194. return false;
  195. }
  196. // We have one, so prepare the container
  197. if (frames == null)
  198. frames = new List<byte[]>(expectedFrameCount);
  199. else
  200. frames.Clear();
  201. // Add the frame
  202. frames.Add(msg.CloneData());
  203. // Rinse and repeat...
  204. while (msg.HasMore)
  205. {
  206. socket.Receive(ref msg);
  207. frames.Add(msg.CloneData());
  208. }
  209. msg.Close();
  210. return true;
  211. }
  212. #endregion
  213. #endregion
  214. #region Receiving a frame as a string
  215. #region Blocking
  216. /// <summary>
  217. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  218. /// </summary>
  219. /// <param name="socket">The socket to receive from.</param>
  220. /// <returns>The content of the received message frame as a string.</returns>
  221. public static string ReceiveFrameString(this IReceivingSocket socket)
  222. {
  223. return socket.ReceiveFrameString(SendReceiveConstants.DefaultEncoding, out bool more);
  224. }
  225. /// <summary>
  226. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  227. /// Indicate whether further frames exist via <paramref name="more"/>.
  228. /// </summary>
  229. /// <param name="socket">The socket to receive from.</param>
  230. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  231. /// <returns>The content of the received message frame.</returns>
  232. public static string ReceiveFrameString(this IReceivingSocket socket, out bool more)
  233. {
  234. return socket.ReceiveFrameString(SendReceiveConstants.DefaultEncoding, out more);
  235. }
  236. /// <summary>
  237. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, and decode as a string using <paramref name="encoding"/>.
  238. /// </summary>
  239. /// <param name="socket">The socket to receive from.</param>
  240. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  241. /// <returns>The content of the received message frame as a string.</returns>
  242. public static string ReceiveFrameString(this IReceivingSocket socket, Encoding encoding)
  243. {
  244. return socket.ReceiveFrameString(encoding, out bool more);
  245. }
  246. /// <summary>
  247. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, and decode as a string using <paramref name="encoding"/>.
  248. /// Indicate whether further frames exist via <paramref name="more"/>.
  249. /// </summary>
  250. /// <param name="socket">The socket to receive from.</param>
  251. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  252. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  253. /// <returns>The content of the received message frame as a string.</returns>
  254. public static string ReceiveFrameString(this IReceivingSocket socket, Encoding encoding, out bool more)
  255. {
  256. var msg = new Msg();
  257. msg.InitEmpty();
  258. socket.Receive(ref msg);
  259. more = msg.HasMore;
  260. try
  261. {
  262. return msg.Size > 0
  263. ? msg.GetString(encoding)
  264. : string.Empty;
  265. }
  266. finally
  267. {
  268. msg.Close();
  269. }
  270. }
  271. #endregion
  272. #region Immediate
  273. /// <summary>
  274. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  275. /// If no message is immediately available, return <c>false</c>.
  276. /// </summary>
  277. /// <param name="socket">The socket to receive from.</param>
  278. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  279. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  280. public static bool TryReceiveFrameString(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out string? frameString)
  281. {
  282. return socket.TryReceiveFrameString(TimeSpan.Zero, SendReceiveConstants.DefaultEncoding, out frameString, out bool more);
  283. }
  284. /// <summary>
  285. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  286. /// If no message is immediately available, return <c>false</c>.
  287. /// </summary>
  288. /// <param name="socket">The socket to receive from.</param>
  289. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  290. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  291. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  292. public static bool TryReceiveFrameString(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out string? frameString, out bool more)
  293. {
  294. return socket.TryReceiveFrameString(TimeSpan.Zero, SendReceiveConstants.DefaultEncoding, out frameString, out more);
  295. }
  296. /// <summary>
  297. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <paramref name="encoding"/>.
  298. /// If no message is immediately available, return <c>false</c>.
  299. /// </summary>
  300. /// <param name="socket">The socket to receive from.</param>
  301. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  302. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  303. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  304. public static bool TryReceiveFrameString(this IReceivingSocket socket, Encoding encoding, [NotNullWhen(returnValue: true)] out string? frameString)
  305. {
  306. return socket.TryReceiveFrameString(TimeSpan.Zero, encoding, out frameString, out bool more);
  307. }
  308. /// <summary>
  309. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <paramref name="encoding"/>.
  310. /// If no message is immediately available, return <c>false</c>.
  311. /// </summary>
  312. /// <param name="socket">The socket to receive from.</param>
  313. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  314. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  315. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  316. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  317. public static bool TryReceiveFrameString(this IReceivingSocket socket, Encoding encoding, [NotNullWhen(returnValue: true)] out string? frameString, out bool more)
  318. {
  319. return socket.TryReceiveFrameString(TimeSpan.Zero, encoding, out frameString, out more);
  320. }
  321. #endregion
  322. #region Timeout
  323. /// <summary>
  324. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  325. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  326. /// </summary>
  327. /// <param name="socket">The socket to receive from.</param>
  328. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  329. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  330. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  331. public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out string? frameString)
  332. {
  333. return socket.TryReceiveFrameString(timeout, SendReceiveConstants.DefaultEncoding, out frameString, out bool more);
  334. }
  335. /// <summary>
  336. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  337. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  338. /// </summary>
  339. /// <param name="socket">The socket to receive from.</param>
  340. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  341. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  342. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  343. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  344. public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out string? frameString, out bool more)
  345. {
  346. return socket.TryReceiveFrameString(timeout, SendReceiveConstants.DefaultEncoding, out frameString, out more);
  347. }
  348. /// <summary>
  349. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <paramref name="encoding"/>.
  350. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  351. /// </summary>
  352. /// <param name="socket">The socket to receive from.</param>
  353. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  354. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  355. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  356. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  357. public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan timeout, Encoding encoding, [NotNullWhen(returnValue: true)] out string? frameString)
  358. {
  359. return socket.TryReceiveFrameString(timeout, encoding, out frameString, out bool more);
  360. }
  361. /// <summary>
  362. /// Attempt to receive a single frame from <paramref name="socket"/>, and decode as a string using <paramref name="encoding"/>.
  363. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  364. /// </summary>
  365. /// <param name="socket">The socket to receive from.</param>
  366. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  367. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  368. /// <param name="frameString">The content of the received message frame as a string, or <c>null</c> if no message was available.</param>
  369. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  370. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  371. public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan timeout, Encoding encoding, [NotNullWhen(returnValue: true)] out string? frameString, out bool more)
  372. {
  373. var msg = new Msg();
  374. msg.InitEmpty();
  375. if (socket.TryReceive(ref msg, timeout))
  376. {
  377. more = msg.HasMore;
  378. try
  379. {
  380. frameString = msg.Size > 0
  381. ? msg.GetString(encoding)
  382. : string.Empty;
  383. return true;
  384. }
  385. finally
  386. {
  387. msg.Close();
  388. }
  389. }
  390. frameString = null;
  391. more = false;
  392. msg.Close();
  393. return false;
  394. }
  395. #endregion
  396. #endregion
  397. #region Receiving a multipart message as strings
  398. #region Blocking
  399. /// <summary>
  400. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until they arrive, and decode as strings using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  401. /// </summary>
  402. /// <param name="socket">The socket to receive from.</param>
  403. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  404. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  405. /// an extra allocation will occur, but the result will still be correct.</param>
  406. /// <returns>The content of the received message frame as a string.</returns>
  407. public static List<string> ReceiveMultipartStrings(this IReceivingSocket socket, int expectedFrameCount = 4)
  408. {
  409. return ReceiveMultipartStrings(socket, SendReceiveConstants.DefaultEncoding, expectedFrameCount);
  410. }
  411. /// <summary>
  412. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until they arrive, and decode as strings using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  413. /// </summary>
  414. /// <param name="socket">The socket to receive from.</param>
  415. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  416. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  417. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  418. /// an extra allocation will occur, but the result will still be correct.</param>
  419. public static List<string> ReceiveMultipartStrings(this IReceivingSocket socket, Encoding encoding, int expectedFrameCount = 4)
  420. {
  421. var frames = new List<string>(expectedFrameCount);
  422. var msg = new Msg();
  423. msg.InitEmpty();
  424. do
  425. {
  426. socket.Receive(ref msg);
  427. frames.Add(msg.GetString(encoding));
  428. }
  429. while (msg.HasMore);
  430. msg.Close();
  431. return frames;
  432. }
  433. #endregion
  434. #region Immediate
  435. /// <summary>
  436. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, and decode them as strings using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  437. /// If no message is immediately available, return <c>false</c>.
  438. /// </summary>
  439. /// <param name="socket">The socket to receive from.</param>
  440. /// <param name="frames">The frames of the received message as strings. Untouched if no message was available.</param>
  441. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  442. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  443. /// an extra allocation will occur, but the result will still be correct.</param>
  444. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  445. public static bool TryReceiveMultipartStrings(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] ref List<string>? frames, int expectedFrameCount = 4)
  446. {
  447. return TryReceiveMultipartStrings(socket, SendReceiveConstants.DefaultEncoding, ref frames, expectedFrameCount);
  448. }
  449. /// <summary>
  450. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, and decode them as strings using <paramref name="encoding"/>.
  451. /// If no message is immediately available, return <c>false</c>.
  452. /// </summary>
  453. /// <param name="socket">The socket to receive from.</param>
  454. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  455. /// <param name="frames">The frames of the received message as strings. Untouched if no message was available.</param>
  456. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  457. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  458. /// an extra allocation will occur, but the result will still be correct.</param>
  459. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  460. public static bool TryReceiveMultipartStrings(this IReceivingSocket socket, Encoding encoding, [NotNullWhen(returnValue: true)] ref List<string>? frames, int expectedFrameCount = 4)
  461. {
  462. return socket.TryReceiveMultipartStrings(TimeSpan.Zero, encoding, ref frames, expectedFrameCount);
  463. }
  464. #endregion
  465. #region Timeout
  466. /// <summary>
  467. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, and decode them as strings using <see cref="SendReceiveConstants.DefaultEncoding"/>.
  468. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  469. /// </summary>
  470. /// <param name="socket">The socket to receive from.</param>
  471. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  472. /// <param name="frames">The frames of the received message as strings. Untouched if no message was available.</param>
  473. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  474. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  475. /// an extra allocation will occur, but the result will still be correct.</param>
  476. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  477. public static bool TryReceiveMultipartStrings(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] ref List<string>? frames, int expectedFrameCount = 4)
  478. {
  479. return TryReceiveMultipartStrings(socket, timeout, SendReceiveConstants.DefaultEncoding, ref frames, expectedFrameCount);
  480. }
  481. /// <summary>
  482. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, and decode them as strings using <paramref name="encoding"/>.
  483. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  484. /// </summary>
  485. /// <param name="socket">The socket to receive from.</param>
  486. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  487. /// <param name="encoding">The encoding used to convert the frame's data to a string.</param>
  488. /// <param name="frames">The frames of the received message as strings. Untouched if no message was available.</param>
  489. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  490. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  491. /// an extra allocation will occur, but the result will still be correct.</param>
  492. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  493. public static bool TryReceiveMultipartStrings(this IReceivingSocket socket, TimeSpan timeout, Encoding encoding, [NotNullWhen(returnValue: true)] ref List<string>? frames, int expectedFrameCount = 4)
  494. {
  495. var msg = new Msg();
  496. msg.InitEmpty();
  497. // Try to read the first frame
  498. if (!socket.TryReceive(ref msg, timeout))
  499. {
  500. msg.Close();
  501. return false;
  502. }
  503. // We have one, so prepare the container
  504. if (frames == null)
  505. frames = new List<string>(expectedFrameCount);
  506. else
  507. frames.Clear();
  508. // Add the frame
  509. frames.Add(msg.GetString(encoding));
  510. // Rinse and repeat...
  511. while (msg.HasMore)
  512. {
  513. socket.Receive(ref msg);
  514. frames.Add(msg.GetString(encoding));
  515. }
  516. msg.Close();
  517. return true;
  518. }
  519. #endregion
  520. #endregion
  521. #region Receiving a multipart message as NetMQMessage
  522. #region Blocking
  523. /// <summary>
  524. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until they arrive.
  525. /// </summary>
  526. /// <param name="socket">The socket to receive from.</param>
  527. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  528. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  529. /// an extra allocation will occur, but the result will still be correct.</param>
  530. /// <returns>The content of the received message frame as a string.</returns>
  531. public static NetMQMessage ReceiveMultipartMessage(this IReceivingSocket socket, int expectedFrameCount = 4)
  532. {
  533. var msg = new Msg();
  534. msg.InitEmpty();
  535. var message = new NetMQMessage(expectedFrameCount);
  536. do
  537. {
  538. socket.Receive(ref msg);
  539. message.Append(msg.CloneData());
  540. }
  541. while (msg.HasMore);
  542. msg.Close();
  543. return message;
  544. }
  545. #endregion
  546. #region Immediate
  547. /// <summary>
  548. /// Attempt to receive all frames of the next message from <paramref name="socket"/>.
  549. /// If no message is immediately available, return <c>false</c>.
  550. /// </summary>
  551. /// <param name="socket">The socket to receive from.</param>
  552. /// <param name="message">The received message. Untouched if no message was available.</param>
  553. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  554. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  555. /// an extra allocation will occur, but the result will still be correct.</param>
  556. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  557. public static bool TryReceiveMultipartMessage(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] ref NetMQMessage? message, int expectedFrameCount = 4)
  558. {
  559. return socket.TryReceiveMultipartMessage(TimeSpan.Zero, ref message, expectedFrameCount);
  560. }
  561. #endregion
  562. #region Timeout
  563. /// <summary>
  564. /// Attempt to receive all frames of the next message from <paramref name="socket"/>.
  565. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  566. /// </summary>
  567. /// <param name="socket">The socket to receive from.</param>
  568. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  569. /// <param name="message">The received message. Untouched if no message was available.</param>
  570. /// <param name="expectedFrameCount">Specifies the initial capacity of the <see cref="List{T}"/> used
  571. /// to buffer results. If the number of frames is known, set it here. If more frames arrive than expected,
  572. /// an extra allocation will occur, but the result will still be correct.</param>
  573. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  574. public static bool TryReceiveMultipartMessage(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] ref NetMQMessage? message, int expectedFrameCount = 4)
  575. {
  576. var msg = new Msg();
  577. msg.InitEmpty();
  578. // Try to read the first frame
  579. if (!socket.TryReceive(ref msg, timeout))
  580. {
  581. msg.Close();
  582. return false;
  583. }
  584. // We have one, so prepare the container
  585. if (message == null)
  586. message = new NetMQMessage(expectedFrameCount);
  587. else
  588. message.Clear();
  589. // Add the frame
  590. message.Append(new NetMQFrame(msg.CloneData()));
  591. // Rinse and repeat...
  592. while (msg.HasMore)
  593. {
  594. socket.Receive(ref msg);
  595. message.Append(new NetMQFrame(msg.CloneData()));
  596. }
  597. msg.Close();
  598. return true;
  599. }
  600. #endregion
  601. #endregion
  602. #region Receiving a signal
  603. #region Blocking
  604. /// <summary>
  605. /// Receive frames from <paramref name="socket"/>, blocking until a valid signal arrives.
  606. /// </summary>
  607. /// <param name="socket">The socket to receive from.</param>
  608. /// <returns><c>true</c> if the received signal was zero, otherwise <c>false</c>.</returns>
  609. public static bool ReceiveSignal(this IReceivingSocket socket)
  610. {
  611. var msg = new Msg();
  612. msg.InitEmpty();
  613. while (true)
  614. {
  615. socket.Receive(ref msg);
  616. var isMultiFrame = msg.HasMore;
  617. while (msg.HasMore)
  618. {
  619. socket.Receive(ref msg);
  620. }
  621. if (isMultiFrame || msg.Size != 8)
  622. continue;
  623. var signalValue = NetworkOrderBitsConverter.ToInt64(msg);
  624. if ((signalValue & 0x7FFFFFFFFFFFFF00L) == 0x7766554433221100L)
  625. {
  626. msg.Close();
  627. return (signalValue & 255) == 0;
  628. }
  629. }
  630. }
  631. #endregion
  632. #region Immediate
  633. /// <summary>
  634. /// Attempt to receive a valid signal from <paramref name="socket"/>.
  635. /// If no message is immediately available, return <c>false</c>.
  636. /// </summary>
  637. /// <param name="socket">The socket to receive from.</param>
  638. /// <param name="signal"><c>true</c> if the received signal was zero, otherwise <c>false</c>. If no signal received, <c>false</c>.</param>
  639. /// <returns><c>true</c> if a valid signal was observed, otherwise <c>false</c>.</returns>
  640. public static bool TryReceiveSignal(this IReceivingSocket socket, out bool signal)
  641. {
  642. return socket.TryReceiveSignal(TimeSpan.Zero, out signal);
  643. }
  644. #endregion
  645. #region Timeout
  646. /// <summary>
  647. /// Attempt to receive a valid signal from <paramref name="socket"/>.
  648. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  649. /// </summary>
  650. /// <param name="socket">The socket to receive from.</param>
  651. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  652. /// <param name="signal"><c>true</c> if the received signal was zero, otherwise <c>false</c>. If no signal received, <c>false</c>.</param>
  653. /// <returns><c>true</c> if a valid signal was observed, otherwise <c>false</c>.</returns>
  654. public static bool TryReceiveSignal(this IReceivingSocket socket, TimeSpan timeout, out bool signal)
  655. {
  656. var msg = new Msg();
  657. msg.InitEmpty();
  658. // TODO use clock to enforce timeout across multiple loop iterations — if invalid messages are received regularly, the method may not return once the timeout elapses
  659. while (true)
  660. {
  661. if (!socket.TryReceive(ref msg, timeout))
  662. {
  663. signal = false;
  664. msg.Close();
  665. return false;
  666. }
  667. var isMultiFrame = msg.HasMore;
  668. while (msg.HasMore)
  669. {
  670. socket.Receive(ref msg);
  671. }
  672. if (isMultiFrame || msg.Size != 8)
  673. continue;
  674. var signalValue = NetworkOrderBitsConverter.ToInt64(msg);
  675. if ((signalValue & 0x7FFFFFFFFFFFFF00L) == 0x7766554433221100L)
  676. {
  677. signal = (signalValue & 255) == 0;
  678. msg.Close();
  679. return true;
  680. }
  681. }
  682. }
  683. #endregion
  684. #endregion
  685. #region Skipping a message
  686. #region Blocking
  687. /// <summary>
  688. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, then ignore its content.
  689. /// </summary>
  690. /// <param name="socket">The socket to receive from.</param>
  691. public static void SkipFrame(this IReceivingSocket socket)
  692. {
  693. var msg = new Msg();
  694. msg.InitEmpty();
  695. socket.Receive(ref msg);
  696. msg.Close();
  697. }
  698. /// <summary>
  699. /// Receive a single frame from <paramref name="socket"/>, blocking until one arrives, then ignore its content.
  700. /// Indicate whether further frames exist via <paramref name="more"/>.
  701. /// </summary>
  702. /// <param name="socket">The socket to receive from.</param>
  703. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  704. public static void SkipFrame(this IReceivingSocket socket, out bool more)
  705. {
  706. var msg = new Msg();
  707. msg.InitEmpty();
  708. socket.Receive(ref msg);
  709. more = msg.HasMore;
  710. msg.Close();
  711. }
  712. #endregion
  713. #region Immediate
  714. /// <summary>
  715. /// Attempt to receive a single frame from <paramref name="socket"/>, then ignore its content.
  716. /// If no message is immediately available, return <c>false</c>.
  717. /// </summary>
  718. /// <param name="socket">The socket to receive from.</param>
  719. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  720. public static bool TrySkipFrame(this IReceivingSocket socket)
  721. {
  722. var msg = new Msg();
  723. msg.InitEmpty();
  724. var received = socket.TryReceive(ref msg, TimeSpan.Zero);
  725. msg.Close();
  726. return received;
  727. }
  728. /// <summary>
  729. /// Attempt to receive a single frame from <paramref name="socket"/>, then ignore its content.
  730. /// If no message is immediately available, return <c>false</c>.
  731. /// Indicate whether further frames exist via <paramref name="more"/>.
  732. /// </summary>
  733. /// <param name="socket">The socket to receive from.</param>
  734. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  735. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  736. public static bool TrySkipFrame(this IReceivingSocket socket, out bool more)
  737. {
  738. var msg = new Msg();
  739. msg.InitEmpty();
  740. var result = socket.TryReceive(ref msg, TimeSpan.Zero);
  741. more = msg.HasMore;
  742. msg.Close();
  743. return result;
  744. }
  745. #endregion
  746. #region Timeout
  747. /// <summary>
  748. /// Attempt to receive a single frame from <paramref name="socket"/>, then ignore its content.
  749. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  750. /// </summary>
  751. /// <param name="socket">The socket to receive from.</param>
  752. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  753. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  754. public static bool TrySkipFrame(this IReceivingSocket socket, TimeSpan timeout)
  755. {
  756. var msg = new Msg();
  757. msg.InitEmpty();
  758. var received = socket.TryReceive(ref msg, timeout);
  759. msg.Close();
  760. return received;
  761. }
  762. /// <summary>
  763. /// Attempt to receive a single frame from <paramref name="socket"/>, then ignore its content.
  764. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  765. /// Indicate whether further frames exist via <paramref name="more"/>.
  766. /// </summary>
  767. /// <param name="socket">The socket to receive from.</param>
  768. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  769. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  770. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  771. public static bool TrySkipFrame(this IReceivingSocket socket, TimeSpan timeout, out bool more)
  772. {
  773. var msg = new Msg();
  774. msg.InitEmpty();
  775. if (!socket.TryReceive(ref msg, timeout))
  776. {
  777. more = false;
  778. msg.Close();
  779. return false;
  780. }
  781. more = msg.HasMore;
  782. msg.Close();
  783. return true;
  784. }
  785. #endregion
  786. #endregion
  787. #region Skipping all frames of a multipart message
  788. #region Blocking
  789. /// <summary>
  790. /// Receive all frames of the next message from <paramref name="socket"/>, blocking until a message arrives, then ignore their contents.
  791. /// </summary>
  792. /// <param name="socket">The socket to receive from.</param>
  793. public static void SkipMultipartMessage(this IReceivingSocket socket)
  794. {
  795. var msg = new Msg();
  796. msg.InitEmpty();
  797. do
  798. {
  799. socket.Receive(ref msg);
  800. }
  801. while (msg.HasMore);
  802. msg.Close();
  803. }
  804. #endregion
  805. #region Immediate
  806. /// <summary>
  807. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, then ignore their contents.
  808. /// If no message is immediately available, return <c>false</c>.
  809. /// </summary>
  810. /// <param name="socket">The socket to receive from.</param>
  811. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  812. public static bool TrySkipMultipartMessage(this IReceivingSocket socket)
  813. {
  814. var msg = new Msg();
  815. msg.InitEmpty();
  816. var received = socket.TryReceive(ref msg, TimeSpan.Zero);
  817. msg.Close();
  818. return received;
  819. }
  820. #endregion
  821. #region Timeout
  822. /// <summary>
  823. /// Attempt to receive all frames of the next message from <paramref name="socket"/>, then ignore their contents.
  824. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  825. /// </summary>
  826. /// <param name="socket">The socket to receive from.</param>
  827. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  828. /// <returns><c>true</c> if a frame was received and ignored, otherwise <c>false</c>.</returns>
  829. public static bool TrySkipMultipartMessage(this IReceivingSocket socket, TimeSpan timeout)
  830. {
  831. var msg = new Msg();
  832. msg.InitEmpty();
  833. // Try to read the first frame
  834. if (!socket.TryReceive(ref msg, timeout))
  835. {
  836. msg.Close();
  837. return false;
  838. }
  839. // Rinse and repeat...
  840. while (msg.HasMore)
  841. {
  842. socket.Receive(ref msg);
  843. }
  844. msg.Close();
  845. return true;
  846. }
  847. #endregion
  848. #endregion
  849. #region Receiving a routing key
  850. /// <summary>
  851. /// Receive a routing-key from <paramref name="socket"/>, blocking until one arrives.
  852. /// </summary>
  853. /// <param name="socket">The socket to receive from.</param>
  854. /// <returns>The routing key.</returns>
  855. public static RoutingKey ReceiveRoutingKey(this IReceivingSocket socket)
  856. {
  857. return new RoutingKey(socket.ReceiveFrameBytes(out bool more));
  858. }
  859. /// <summary>
  860. /// Receive a routing-key from <paramref name="socket"/>, blocking until one arrives.
  861. /// Indicate whether further frames exist via <paramref name="more"/>.
  862. /// </summary>
  863. /// <param name="socket">The socket to receive from.</param>
  864. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  865. /// <returns>The routing key.</returns>
  866. public static RoutingKey ReceiveRoutingKey(this IReceivingSocket socket, out bool more)
  867. {
  868. return new RoutingKey(socket.ReceiveFrameBytes(out more));
  869. }
  870. /// <summary>
  871. /// Attempt to receive a routing-key from <paramref name="socket"/>.
  872. /// If no message is immediately available, return <c>false</c>.
  873. /// </summary>
  874. /// <param name="socket">The socket to receive from.</param>
  875. /// <param name="routingKey">The routing-key of the received message frame.</param>
  876. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  877. public static bool TryReceiveRoutingKey(this IReceivingSocket socket, ref RoutingKey routingKey)
  878. {
  879. if (socket.TryReceiveFrameBytes(out byte[]? bytes))
  880. {
  881. routingKey = new RoutingKey(bytes);
  882. return true;
  883. }
  884. return false;
  885. }
  886. /// <summary>
  887. /// Attempt to receive a routing-key from <paramref name="socket"/>.
  888. /// If no message is immediately available, return <c>false</c>.
  889. /// Indicate whether further frames exist via <paramref name="more"/>.
  890. /// </summary>
  891. /// <param name="socket">The socket to receive from.</param>
  892. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  893. /// <param name="routingKey">The routing-key of the received message frame.</param>
  894. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  895. public static bool TryReceiveRoutingKey(this IReceivingSocket socket, ref RoutingKey routingKey, out bool more)
  896. {
  897. if (socket.TryReceiveFrameBytes(out byte[]? bytes, out more))
  898. {
  899. routingKey = new RoutingKey(bytes);
  900. return true;
  901. }
  902. return false;
  903. }
  904. /// <summary>
  905. /// Attempt to receive a routing-key from <paramref name="socket"/>.
  906. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  907. /// </summary>
  908. /// <param name="socket">The socket to receive from.</param>
  909. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  910. /// <param name="routingKey">The routing-key of the received message frame.</param>
  911. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  912. public static bool TryReceiveRoutingKey(this IReceivingSocket socket, TimeSpan timeout, ref RoutingKey routingKey)
  913. {
  914. if (socket.TryReceiveFrameBytes(timeout, out byte[]? bytes, out bool more))
  915. {
  916. routingKey = new RoutingKey(bytes);
  917. return true;
  918. }
  919. return false;
  920. }
  921. /// <summary>
  922. /// Attempt to receive a routing-key from <paramref name="socket"/>.
  923. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  924. /// Indicate whether further frames exist via <paramref name="more"/>.
  925. /// </summary>
  926. /// <param name="socket">The socket to receive from.</param>
  927. /// <param name="more"><c>true</c> if another frame of the same message follows, otherwise <c>false</c>.</param>
  928. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  929. /// <param name="routingKey">The routing-key of the received message frame.</param>
  930. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  931. public static bool TryReceiveRoutingKey(this IReceivingSocket socket, TimeSpan timeout, ref RoutingKey routingKey, out bool more)
  932. {
  933. if (socket.TryReceiveFrameBytes(timeout, out byte[]? bytes, out more))
  934. {
  935. routingKey = new RoutingKey(bytes);
  936. return true;
  937. }
  938. return false;
  939. }
  940. #endregion
  941. #region Receiving a routing keys
  942. /// <summary>
  943. /// Receive routing keys from <paramref name="socket"/> until a bottom message arrives (empty message), blocking until one arrives.
  944. /// </summary>
  945. /// <param name="socket">The socket to receive from.</param>
  946. /// <returns>The routing keys.</returns>
  947. public static IEnumerable<RoutingKey> ReceiveRoutingKeys(this IReceivingSocket socket)
  948. {
  949. List<RoutingKey> keys = new List<RoutingKey>();
  950. while (true)
  951. {
  952. var routingKey = socket.ReceiveRoutingKey(out bool more);
  953. if (!more)
  954. throw new InvalidException("Malformed multipart message, empty message expected");
  955. if (routingKey.Bytes.Length == 0)
  956. break;
  957. keys.Add(routingKey);
  958. }
  959. return keys;
  960. }
  961. /// <summary>
  962. /// Attempt to receive routing-keys from <paramref name="socket"/>, an empty message expected at the end of routing keys.
  963. /// If no message is immediately available, return <c>false</c>.
  964. /// </summary>
  965. /// <param name="socket">The socket to receive from.</param>
  966. /// <param name="routingKeys">The routing-keys of the received message.</param>
  967. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  968. public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
  969. {
  970. return TryReceiveRoutingKeys(socket, TimeSpan.Zero, out routingKeys);
  971. }
  972. /// <summary>
  973. /// Attempt to receive a routing-keys from <paramref name="socket"/>.
  974. /// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
  975. /// </summary>
  976. /// <param name="socket">The socket to receive from.</param>
  977. /// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
  978. /// <param name="routingKeys">The routing-keys of the received message.</param>
  979. /// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
  980. public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
  981. {
  982. RoutingKey first = new RoutingKey();
  983. if (socket.TryReceiveRoutingKey(timeout, ref first, out bool more))
  984. {
  985. if (!more)
  986. throw new InvalidException("Malformed multipart message, empty message expected");
  987. List<RoutingKey> keys = new List<RoutingKey>();
  988. routingKeys = keys;
  989. if (first.Bytes.Length == 0)
  990. return true;
  991. keys.Add(first);
  992. while (true)
  993. {
  994. var routingKey = socket.ReceiveRoutingKey(out more);
  995. if (!more)
  996. throw new InvalidException("Malformed multipart message, empty message expected");
  997. if (routingKey.Bytes.Length == 0)
  998. break;
  999. keys.Add(routingKey);
  1000. }
  1001. return true;
  1002. }
  1003. routingKeys = null;
  1004. return false;
  1005. }
  1006. #endregion
  1007. }
  1008. }