WatsonMessageBuilder.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. namespace WatsonTcp
  2. {
  3. using System;
  4. using System.Collections.Generic;
  5. using System.IO;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. internal class WatsonMessageBuilder
  11. {
  12. #region Internal-Members
  13. internal ISerializationHelper SerializationHelper
  14. {
  15. get => _SerializationHelper;
  16. set
  17. {
  18. if (value == null) throw new ArgumentNullException(nameof(SerializationHelper));
  19. _SerializationHelper = value;
  20. }
  21. }
  22. internal int ReadStreamBuffer
  23. {
  24. get => _ReadStreamBuffer;
  25. set
  26. {
  27. if (value < 1) throw new ArgumentOutOfRangeException(nameof(ReadStreamBuffer));
  28. _ReadStreamBuffer = value;
  29. }
  30. }
  31. #endregion
  32. #region Private-Members
  33. private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper();
  34. private int _ReadStreamBuffer = 65536;
  35. #endregion
  36. #region Constructors-and-Factories
  37. internal WatsonMessageBuilder()
  38. {
  39. }
  40. #endregion
  41. #region Internal-Methods
  42. /// <summary>
  43. /// Construct a new message to send.
  44. /// </summary>
  45. /// <param name="contentLength">The number of bytes included in the stream.</param>
  46. /// <param name="stream">The stream containing the data.</param>
  47. /// <param name="syncRequest">Indicate if the message is a synchronous message request.</param>
  48. /// <param name="syncResponse">Indicate if the message is a synchronous message response.</param>
  49. /// <param name="expirationUtc">The UTC time at which the message should expire (only valid for synchronous message requests).</param>
  50. /// <param name="metadata">Metadata to attach to the message.</param>
  51. internal WatsonMessage ConstructNew(
  52. long contentLength,
  53. Stream stream,
  54. bool syncRequest = false,
  55. bool syncResponse = false,
  56. DateTime? expirationUtc = null,
  57. Dictionary<string, object> metadata = null)
  58. {
  59. if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
  60. if (contentLength > 0)
  61. {
  62. if (stream == null || !stream.CanRead)
  63. {
  64. throw new ArgumentException("Cannot read from supplied stream.");
  65. }
  66. }
  67. WatsonMessage msg = new WatsonMessage();
  68. msg.ContentLength = contentLength;
  69. msg.DataStream = stream;
  70. msg.SyncRequest = syncRequest;
  71. msg.SyncResponse = syncResponse;
  72. msg.ExpirationUtc = expirationUtc;
  73. msg.Metadata = metadata;
  74. return msg;
  75. }
  76. /// <summary>
  77. /// Read from a stream and construct a message.
  78. /// </summary>
  79. /// <param name="stream">Stream.</param>
  80. /// <param name="token">Cancellation token.</param>
  81. internal async Task<WatsonMessage> BuildFromStream(Stream stream, CancellationToken token = default)
  82. {
  83. if (stream == null) throw new ArgumentNullException(nameof(stream));
  84. if (!stream.CanRead) throw new ArgumentException("Cannot read from stream.");
  85. WatsonMessage msg = new WatsonMessage();
  86. // {"len":0,"s":"Normal"}\r\n\r\n
  87. byte[] headerBytes = new byte[24];
  88. byte[] headerBuffer = new byte[1];
  89. int read = 0;
  90. int readTotal = 0;
  91. while (true)
  92. {
  93. #region Retrieve-First-24-Bytes
  94. read = await stream.ReadAsync(headerBytes, readTotal, (24 - readTotal), token).ConfigureAwait(false);
  95. if (read > 0)
  96. {
  97. readTotal += read;
  98. if (readTotal >= 24) break;
  99. }
  100. #endregion
  101. }
  102. while (true)
  103. {
  104. #region Read-Byte-by-Byte
  105. byte[] endCheck = headerBytes.Skip(headerBytes.Length - 4).Take(4).ToArray();
  106. if ((int)endCheck[3] == 0
  107. && (int)endCheck[2] == 0
  108. && (int)endCheck[1] == 0
  109. && (int)endCheck[0] == 0)
  110. {
  111. throw new IOException("Null header data indicates peer disconnected.");
  112. }
  113. if ((int)endCheck[3] == 10
  114. && (int)endCheck[2] == 13
  115. && (int)endCheck[1] == 10
  116. && (int)endCheck[0] == 13)
  117. {
  118. // delimiter reached
  119. break;
  120. }
  121. read = await stream.ReadAsync(headerBuffer, 0, 1, token).ConfigureAwait(false);
  122. if (read > 0)
  123. headerBytes = WatsonCommon.AppendBytes(headerBytes, headerBuffer);
  124. #endregion
  125. }
  126. msg = _SerializationHelper.DeserializeJson<WatsonMessage>(headerBytes);
  127. msg.DataStream = stream;
  128. return msg;
  129. }
  130. /// <summary>
  131. /// Retrieve header bytes for a message.
  132. /// </summary>
  133. /// <param name="msg">Watson message.</param>
  134. /// <returns>Header bytes.</returns>
  135. internal byte[] GetHeaderBytes(WatsonMessage msg)
  136. {
  137. byte[] jsonBytes = _SerializationHelper.SerializeJson(msg, false);
  138. byte[] end = Encoding.UTF8.GetBytes("\r\n\r\n");
  139. byte[] final = WatsonCommon.AppendBytes(jsonBytes, end);
  140. return final;
  141. }
  142. #endregion
  143. #region Private-Methods
  144. #endregion
  145. }
  146. }