namespace WatsonTcp { using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; internal class WatsonMessageBuilder { #region Internal-Members internal ISerializationHelper SerializationHelper { get => _SerializationHelper; set { if (value == null) throw new ArgumentNullException(nameof(SerializationHelper)); _SerializationHelper = value; } } internal int ReadStreamBuffer { get => _ReadStreamBuffer; set { if (value < 1) throw new ArgumentOutOfRangeException(nameof(ReadStreamBuffer)); _ReadStreamBuffer = value; } } #endregion #region Private-Members private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper(); private int _ReadStreamBuffer = 65536; #endregion #region Constructors-and-Factories internal WatsonMessageBuilder() { } #endregion #region Internal-Methods /// /// Construct a new message to send. /// /// The number of bytes included in the stream. /// The stream containing the data. /// Indicate if the message is a synchronous message request. /// Indicate if the message is a synchronous message response. /// The UTC time at which the message should expire (only valid for synchronous message requests). /// Metadata to attach to the message. internal WatsonMessage ConstructNew( long contentLength, Stream stream, bool syncRequest = false, bool syncResponse = false, DateTime? expirationUtc = null, Dictionary metadata = null) { if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater."); if (contentLength > 0) { if (stream == null || !stream.CanRead) { throw new ArgumentException("Cannot read from supplied stream."); } } WatsonMessage msg = new WatsonMessage(); msg.ContentLength = contentLength; msg.DataStream = stream; msg.SyncRequest = syncRequest; msg.SyncResponse = syncResponse; msg.ExpirationUtc = expirationUtc; msg.Metadata = metadata; return msg; } /// /// Read from a stream and construct a message. /// /// Stream. /// Cancellation token. internal async Task BuildFromStream(Stream stream, CancellationToken token = default) { if (stream == null) throw new ArgumentNullException(nameof(stream)); if (!stream.CanRead) throw new ArgumentException("Cannot read from stream."); WatsonMessage msg = new WatsonMessage(); // {"len":0,"s":"Normal"}\r\n\r\n byte[] headerBytes = new byte[24]; byte[] headerBuffer = new byte[1]; int read = 0; int readTotal = 0; while (true) { #region Retrieve-First-24-Bytes read = await stream.ReadAsync(headerBytes, readTotal, (24 - readTotal), token).ConfigureAwait(false); if (read > 0) { readTotal += read; if (readTotal >= 24) break; } #endregion } while (true) { #region Read-Byte-by-Byte byte[] endCheck = headerBytes.Skip(headerBytes.Length - 4).Take(4).ToArray(); if ((int)endCheck[3] == 0 && (int)endCheck[2] == 0 && (int)endCheck[1] == 0 && (int)endCheck[0] == 0) { throw new IOException("Null header data indicates peer disconnected."); } if ((int)endCheck[3] == 10 && (int)endCheck[2] == 13 && (int)endCheck[1] == 10 && (int)endCheck[0] == 13) { // delimiter reached break; } read = await stream.ReadAsync(headerBuffer, 0, 1, token).ConfigureAwait(false); if (read > 0) headerBytes = WatsonCommon.AppendBytes(headerBytes, headerBuffer); #endregion } msg = _SerializationHelper.DeserializeJson(headerBytes); msg.DataStream = stream; return msg; } /// /// Retrieve header bytes for a message. /// /// Watson message. /// Header bytes. internal byte[] GetHeaderBytes(WatsonMessage msg) { byte[] jsonBytes = _SerializationHelper.SerializeJson(msg, false); byte[] end = Encoding.UTF8.GetBytes("\r\n\r\n"); byte[] final = WatsonCommon.AppendBytes(jsonBytes, end); return final; } #endregion #region Private-Methods #endregion } }