WatsonCommon.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. namespace WatsonTcp
  2. {
  3. using System;
  4. using System.IO;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. internal static class WatsonCommon
  9. {
  10. internal static async Task<MemoryStream> DataStreamToMemoryStream(long contentLength, Stream stream, int bufferLen, CancellationToken token)
  11. {
  12. if (contentLength <= 0) return new MemoryStream(Array.Empty<byte>());
  13. if (bufferLen <= 0) throw new ArgumentException("Buffer must be greater than zero bytes.");
  14. byte[] buffer = new byte[bufferLen];
  15. int read = 0;
  16. long bytesRemaining = contentLength;
  17. MemoryStream ms = new MemoryStream();
  18. while (bytesRemaining > 0)
  19. {
  20. if (bufferLen > bytesRemaining) buffer = new byte[bytesRemaining];
  21. read = await stream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
  22. if (read > 0)
  23. {
  24. await ms.WriteAsync(buffer, 0, read, token).ConfigureAwait(false);
  25. bytesRemaining -= read;
  26. }
  27. else
  28. {
  29. throw new IOException("Could not read from supplied stream.");
  30. }
  31. }
  32. ms.Seek(0, SeekOrigin.Begin);
  33. return ms;
  34. }
  35. internal static async Task<byte[]> ReadFromStreamAsync(Stream stream, long count, int bufferLen, CancellationToken token)
  36. {
  37. if (count <= 0) return null;
  38. if (bufferLen <= 0) throw new ArgumentException("Buffer must be greater than zero bytes.");
  39. byte[] buffer = new byte[bufferLen];
  40. int read = 0;
  41. long bytesRemaining = count;
  42. using (MemoryStream ms = new MemoryStream())
  43. {
  44. try
  45. {
  46. while (bytesRemaining > 0)
  47. {
  48. if (bufferLen > bytesRemaining) buffer = new byte[bytesRemaining];
  49. read = await stream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
  50. if (read > 0)
  51. {
  52. await ms.WriteAsync(buffer, 0, read, token).ConfigureAwait(false);
  53. bytesRemaining -= read;
  54. }
  55. else
  56. {
  57. throw new IOException("Could not read from supplied stream.");
  58. }
  59. }
  60. }
  61. catch (Exception)
  62. {
  63. // exception may be thrown if a client disconnects
  64. // immediately after sending a message
  65. }
  66. ms.Seek(0, SeekOrigin.Begin);
  67. return ms.ToArray();
  68. }
  69. }
  70. internal static async Task<byte[]> ReadMessageDataAsync(WatsonMessage msg, int bufferLen, CancellationToken token)
  71. {
  72. if (msg == null) throw new ArgumentNullException(nameof(msg));
  73. if (msg.ContentLength == 0) return Array.Empty<byte>();
  74. byte[] msgData = null;
  75. try
  76. {
  77. msgData = await WatsonCommon.ReadFromStreamAsync(msg.DataStream, msg.ContentLength, bufferLen, token).ConfigureAwait(false);
  78. }
  79. catch (Exception)
  80. {
  81. // exception may be thrown if a client disconnects
  82. // immediately after sending a message
  83. }
  84. return msgData;
  85. }
  86. internal static byte[] AppendBytes(byte[] head, byte[] tail)
  87. {
  88. byte[] arrayCombined = new byte[head.Length + tail.Length];
  89. Array.Copy(head, 0, arrayCombined, 0, head.Length);
  90. Array.Copy(tail, 0, arrayCombined, head.Length, tail.Length);
  91. return arrayCombined;
  92. }
  93. internal static string ByteArrayToHex(byte[] data)
  94. {
  95. StringBuilder hex = new StringBuilder(data.Length * 2);
  96. foreach (byte b in data) hex.AppendFormat("{0:x2}", b);
  97. return hex.ToString();
  98. }
  99. internal static void BytesToStream(byte[] data, int start, out int contentLength, out Stream stream)
  100. {
  101. contentLength = 0;
  102. stream = new MemoryStream(Array.Empty<byte>());
  103. if (data != null && data.Length > 0)
  104. {
  105. contentLength = (data.Length - start);
  106. stream = new MemoryStream();
  107. stream.Write(data, start, contentLength);
  108. stream.Seek(0, SeekOrigin.Begin);
  109. }
  110. }
  111. internal static DateTime GetExpirationTimestamp(WatsonMessage msg)
  112. {
  113. //
  114. // TimeSpan will be negative if sender timestamp is earlier than now or positive if sender timestamp is later than now
  115. // Goal #1: if sender has a later timestamp, decrease expiration by the difference between sender time and our time
  116. // Goal #2: if sender has an earlier timestamp, increase expiration by the difference between sender time and our time
  117. //
  118. // E.g. If sender time is 10:40 and receiver time is 10:45 and expiration is 1 minute, so 10:41.
  119. // ts = 10:45 - 10:40 = 5 minutes
  120. // expiration = 10:41 + 5 = 10:46 which is 1 minute later than when receiver received the message
  121. //
  122. TimeSpan ts = DateTime.UtcNow - msg.TimestampUtc;
  123. return msg.ExpirationUtc.Value.AddMilliseconds(ts.TotalMilliseconds);
  124. }
  125. }
  126. }