WatsonTcpClient.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221
  1. namespace WatsonTcp
  2. {
  3. using System;
  4. using System.Collections.Generic;
  5. using System.IO;
  6. using System.Net;
  7. using System.Net.Security;
  8. using System.Net.Sockets;
  9. using System.Runtime.InteropServices;
  10. using System.Security.Authentication;
  11. using System.Security.Cryptography.X509Certificates;
  12. using System.Text;
  13. using System.Threading;
  14. using System.Threading.Tasks;
  15. /// <summary>
  16. /// Watson TCP client, with or without SSL.
  17. /// </summary>
  18. public class WatsonTcpClient : IDisposable
  19. {
  20. #region Public-Members
  21. /// <summary>
  22. /// Watson TCP client settings.
  23. /// </summary>
  24. public WatsonTcpClientSettings Settings
  25. {
  26. get
  27. {
  28. return _Settings;
  29. }
  30. set
  31. {
  32. if (value == null) _Settings = new WatsonTcpClientSettings();
  33. else _Settings = value;
  34. }
  35. }
  36. /// <summary>
  37. /// Watson TCP client events.
  38. /// </summary>
  39. public WatsonTcpClientEvents Events
  40. {
  41. get
  42. {
  43. return _Events;
  44. }
  45. set
  46. {
  47. if (value == null) _Events = new WatsonTcpClientEvents();
  48. else _Events = value;
  49. }
  50. }
  51. /// <summary>
  52. /// Watson TCP client callbacks.
  53. /// </summary>
  54. public WatsonTcpClientCallbacks Callbacks
  55. {
  56. get
  57. {
  58. return _Callbacks;
  59. }
  60. set
  61. {
  62. if (value == null) _Callbacks = new WatsonTcpClientCallbacks();
  63. else _Callbacks = value;
  64. }
  65. }
  66. /// <summary>
  67. /// Watson TCP statistics.
  68. /// </summary>
  69. public WatsonTcpStatistics Statistics
  70. {
  71. get
  72. {
  73. return _Statistics;
  74. }
  75. }
  76. /// <summary>
  77. /// Watson TCP keepalive settings.
  78. /// </summary>
  79. public WatsonTcpKeepaliveSettings Keepalive
  80. {
  81. get
  82. {
  83. return _Keepalive;
  84. }
  85. set
  86. {
  87. if (value == null) _Keepalive = new WatsonTcpKeepaliveSettings();
  88. else _Keepalive = value;
  89. }
  90. }
  91. /// <summary>
  92. /// Watson TCP client SSL configuration.
  93. /// </summary>
  94. public WatsonTcpClientSslConfiguration SslConfiguration
  95. {
  96. get
  97. {
  98. return _SslConfiguration;
  99. }
  100. set
  101. {
  102. if (value == null) _SslConfiguration = new WatsonTcpClientSslConfiguration();
  103. else _SslConfiguration = value;
  104. }
  105. }
  106. /// <summary>
  107. /// JSON serialization helper.
  108. /// </summary>
  109. public ISerializationHelper SerializationHelper
  110. {
  111. get
  112. {
  113. return _SerializationHelper;
  114. }
  115. set
  116. {
  117. if (value == null) throw new ArgumentNullException(nameof(SerializationHelper));
  118. _SerializationHelper = value;
  119. _MessageBuilder.SerializationHelper = value;
  120. }
  121. }
  122. /// <summary>
  123. /// Indicates whether or not the client is connected to the server.
  124. /// </summary>
  125. public bool Connected { get; private set; }
  126. #endregion
  127. #region Private-Members
  128. private string _Header = "[WatsonTcpClient] ";
  129. private WatsonMessageBuilder _MessageBuilder = new WatsonMessageBuilder();
  130. private WatsonTcpClientSettings _Settings = new WatsonTcpClientSettings();
  131. private WatsonTcpClientEvents _Events = new WatsonTcpClientEvents();
  132. private WatsonTcpClientCallbacks _Callbacks = new WatsonTcpClientCallbacks();
  133. private WatsonTcpStatistics _Statistics = new WatsonTcpStatistics();
  134. private WatsonTcpKeepaliveSettings _Keepalive = new WatsonTcpKeepaliveSettings();
  135. private WatsonTcpClientSslConfiguration _SslConfiguration = new WatsonTcpClientSslConfiguration();
  136. private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper();
  137. private Mode _Mode = Mode.Tcp;
  138. private TlsVersion _TlsVersion = TlsVersion.Tls12;
  139. private string _SourceIp = null;
  140. private int _SourcePort = 0;
  141. private string _ServerIp = null;
  142. private int _ServerPort = 0;
  143. private TcpClient _Client = null;
  144. private Stream _DataStream = null;
  145. private NetworkStream _TcpStream = null;
  146. private SslStream _SslStream = null;
  147. private X509Certificate2 _SslCertificate = null;
  148. private X509Certificate2Collection _SslCertificateCollection = null;
  149. private SemaphoreSlim _WriteLock = new SemaphoreSlim(1, 1);
  150. private SemaphoreSlim _ReadLock = new SemaphoreSlim(1, 1);
  151. private CancellationTokenSource _TokenSource = new CancellationTokenSource();
  152. private CancellationToken _Token;
  153. private Task _DataReceiver = null;
  154. private Task _IdleServerMonitor = null;
  155. private DateTime _LastActivity = DateTime.UtcNow;
  156. private bool _IsTimeout = false;
  157. private byte[] _SendBuffer = new byte[65536];
  158. private readonly object _SyncResponseLock = new object();
  159. private event EventHandler<SyncResponseReceivedEventArgs> _SyncResponseReceived;
  160. #endregion
  161. #region Constructors-and-Factories
  162. /// <summary>
  163. /// Initialize the Watson TCP client without SSL. Call Start() afterward to connect to the server.
  164. /// </summary>
  165. /// <param name="serverIp">The IP address or hostname of the server.</param>
  166. /// <param name="serverPort">The TCP port on which the server is listening.</param>
  167. public WatsonTcpClient(
  168. string serverIp,
  169. int serverPort)
  170. {
  171. if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
  172. if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
  173. _Mode = Mode.Tcp;
  174. _ServerIp = serverIp;
  175. _ServerPort = serverPort;
  176. _SendBuffer = new byte[_Settings.StreamBufferSize];
  177. SerializationHelper.InstantiateConverter(); // Unity fix
  178. }
  179. /// <summary>
  180. /// Initialize the Watson TCP client with SSL. Call Start() afterward to connect to the server.
  181. /// </summary>
  182. /// <param name="serverIp">The IP address or hostname of the server.</param>
  183. /// <param name="serverPort">The TCP port on which the server is listening.</param>
  184. /// <param name="pfxCertFile">The file containing the SSL certificate.</param>
  185. /// <param name="pfxCertPass">The password for the SSL certificate.</param>
  186. /// <param name="tlsVersion">The TLS version used for this connection.</param>
  187. public WatsonTcpClient(
  188. string serverIp,
  189. int serverPort,
  190. string pfxCertFile,
  191. string pfxCertPass,
  192. TlsVersion tlsVersion = TlsVersion.Tls12)
  193. {
  194. if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
  195. if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
  196. _Mode = Mode.Ssl;
  197. _TlsVersion = tlsVersion;
  198. _ServerIp = serverIp;
  199. _ServerPort = serverPort;
  200. _SendBuffer = new byte[_Settings.StreamBufferSize];
  201. if (!String.IsNullOrEmpty(pfxCertFile))
  202. {
  203. if (String.IsNullOrEmpty(pfxCertPass))
  204. {
  205. _SslCertificate = new X509Certificate2(pfxCertFile);
  206. }
  207. else
  208. {
  209. _SslCertificate = new X509Certificate2(pfxCertFile, pfxCertPass);
  210. }
  211. _SslCertificateCollection = new X509Certificate2Collection
  212. {
  213. _SslCertificate
  214. };
  215. }
  216. else
  217. {
  218. _SslCertificateCollection = new X509Certificate2Collection();
  219. }
  220. SerializationHelper.InstantiateConverter(); // Unity fix
  221. }
  222. /// <summary>
  223. /// Initialize the Watson TCP client with SSL. Call Start() afterward to connect to the server.
  224. /// </summary>
  225. /// <param name="serverIp">The IP address or hostname of the server.</param>
  226. /// <param name="serverPort">The TCP port on which the server is listening.</param>
  227. /// <param name="cert">The SSL certificate</param>
  228. /// <param name="tlsVersion">The TLS version used for this conenction.</param>
  229. public WatsonTcpClient(
  230. string serverIp,
  231. int serverPort,
  232. X509Certificate2 cert,
  233. TlsVersion tlsVersion = TlsVersion.Tls12)
  234. {
  235. if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
  236. if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
  237. if (cert == null) throw new ArgumentNullException(nameof(cert));
  238. _Mode = Mode.Ssl;
  239. _TlsVersion = tlsVersion;
  240. _SslCertificate = cert;
  241. _ServerIp = serverIp;
  242. _ServerPort = serverPort;
  243. _SendBuffer = new byte[_Settings.StreamBufferSize];
  244. _SslCertificateCollection = new X509Certificate2Collection
  245. {
  246. _SslCertificate
  247. };
  248. SerializationHelper.InstantiateConverter(); // Unity fix
  249. }
  250. #endregion
  251. #region Public-Methods
  252. /// <summary>
  253. /// Disconnect the client and dispose of background workers.
  254. /// Do not reuse the object after disposal.
  255. /// </summary>
  256. public void Dispose()
  257. {
  258. Dispose(true);
  259. GC.SuppressFinalize(this);
  260. }
  261. /// <summary>
  262. /// Connect to the server.
  263. /// </summary>
  264. public void Connect()
  265. {
  266. if (Connected) throw new InvalidOperationException("Already connected to the server.");
  267. if (_Settings.LocalPort == 0)
  268. {
  269. _Client = new TcpClient();
  270. }
  271. else
  272. {
  273. IPEndPoint ipe = new IPEndPoint(IPAddress.Any, _Settings.LocalPort);
  274. _Client = new TcpClient(ipe);
  275. }
  276. _Client.NoDelay = _Settings.NoDelay;
  277. _Statistics = new WatsonTcpStatistics();
  278. IAsyncResult asyncResult = null;
  279. WaitHandle waitHandle = null;
  280. bool connectSuccess = false;
  281. if (!_Events.IsUsingMessages && !_Events.IsUsingStreams)
  282. throw new InvalidOperationException("One of either 'MessageReceived' or 'StreamReceived' events must first be set.");
  283. if (_Mode == Mode.Tcp)
  284. {
  285. #region TCP
  286. _Settings.Logger?.Invoke(Severity.Info, _Header + "connecting to " + _ServerIp + ":" + _ServerPort);
  287. _Client.LingerState = new LingerOption(true, 0);
  288. asyncResult = _Client.BeginConnect(_ServerIp, _ServerPort, null, null);
  289. waitHandle = asyncResult.AsyncWaitHandle;
  290. try
  291. {
  292. connectSuccess = waitHandle.WaitOne(TimeSpan.FromSeconds(_Settings.ConnectTimeoutSeconds), false);
  293. if (!connectSuccess)
  294. {
  295. _Client.Close();
  296. _Settings.Logger?.Invoke(Severity.Error, _Header + "timeout connecting to " + _ServerIp + ":" + _ServerPort);
  297. throw new TimeoutException("Timeout connecting to " + _ServerIp + ":" + _ServerPort);
  298. }
  299. _Client.EndConnect(asyncResult);
  300. _SourceIp = ((IPEndPoint)_Client.Client.LocalEndPoint).Address.ToString();
  301. _SourcePort = ((IPEndPoint)_Client.Client.LocalEndPoint).Port;
  302. _TcpStream = _Client.GetStream();
  303. _DataStream = _TcpStream;
  304. _SslStream = null;
  305. if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives();
  306. Connected = true;
  307. }
  308. catch (Exception e)
  309. {
  310. _Settings.Logger?.Invoke(Severity.Error, _Header + "exception encountered: " + e.Message);
  311. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  312. throw;
  313. }
  314. finally
  315. {
  316. // https://social.msdn.microsoft.com/Forums/en-US/313cf28c-2a6d-498e-8188-7a0639dbd552/tcpclientbeginconnect-issue?forum=netfxnetcom
  317. // waitHandle.Close();
  318. }
  319. #endregion TCP
  320. }
  321. else if (_Mode == Mode.Ssl)
  322. {
  323. #region SSL
  324. _Settings.Logger?.Invoke(Severity.Info, _Header + "connecting with SSL to " + _ServerIp + ":" + _ServerPort);
  325. _Client.LingerState = new LingerOption(true, 0);
  326. asyncResult = _Client.BeginConnect(_ServerIp, _ServerPort, null, null);
  327. waitHandle = asyncResult.AsyncWaitHandle;
  328. try
  329. {
  330. connectSuccess = waitHandle.WaitOne(TimeSpan.FromSeconds(_Settings.ConnectTimeoutSeconds), false);
  331. if (!connectSuccess)
  332. {
  333. _Client.Close();
  334. _Settings.Logger?.Invoke(Severity.Error, _Header + "timeout connecting to " + _ServerIp + ":" + _ServerPort);
  335. throw new TimeoutException("Timeout connecting to " + _ServerIp + ":" + _ServerPort);
  336. }
  337. _Client.EndConnect(asyncResult);
  338. _SourceIp = ((IPEndPoint)_Client.Client.LocalEndPoint).Address.ToString();
  339. _SourcePort = ((IPEndPoint)_Client.Client.LocalEndPoint).Port;
  340. if (_Settings.AcceptInvalidCertificates)
  341. _SslStream = new SslStream(_Client.GetStream(), false, _SslConfiguration.ServerCertificateValidationCallback, _SslConfiguration.ClientCertificateSelectionCallback);
  342. else
  343. _SslStream = new SslStream(_Client.GetStream(), false);
  344. _SslStream.AuthenticateAsClient(_ServerIp, _SslCertificateCollection, _TlsVersion.ToSslProtocols(), !_Settings.AcceptInvalidCertificates);
  345. if (!_SslStream.IsEncrypted)
  346. {
  347. _Settings.Logger?.Invoke(Severity.Error, _Header + "stream to " + _ServerIp + ":" + _ServerPort + " is not encrypted");
  348. throw new AuthenticationException("Stream is not encrypted");
  349. }
  350. if (!_SslStream.IsAuthenticated)
  351. {
  352. _Settings.Logger?.Invoke(Severity.Error, _Header + "stream to " + _ServerIp + ":" + _ServerPort + " is not authenticated");
  353. throw new AuthenticationException("Stream is not authenticated");
  354. }
  355. if (_Settings.MutuallyAuthenticate && !_SslStream.IsMutuallyAuthenticated)
  356. {
  357. _Settings.Logger?.Invoke(Severity.Error, _Header + "mutual authentication with " + _ServerIp + ":" + _ServerPort + " failed");
  358. throw new AuthenticationException("Mutual authentication failed");
  359. }
  360. _DataStream = _SslStream;
  361. if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives();
  362. Connected = true;
  363. }
  364. catch (Exception e)
  365. {
  366. _Settings.Logger?.Invoke(Severity.Error, _Header + "exception encountered: " + e.Message);
  367. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  368. throw;
  369. }
  370. finally
  371. {
  372. // https://social.msdn.microsoft.com/Forums/en-US/313cf28c-2a6d-498e-8188-7a0639dbd552/tcpclientbeginconnect-issue?forum=netfxnetcom
  373. // waitHandle.Close();
  374. }
  375. #endregion SSL
  376. }
  377. else
  378. {
  379. throw new ArgumentException("Unknown mode: " + _Mode.ToString());
  380. }
  381. if (Connected)
  382. {
  383. WatsonMessage msg = new WatsonMessage();
  384. msg.Status = MessageStatus.RegisterClient;
  385. if (!SendInternalAsync(msg, 0, null, default(CancellationToken)).Result)
  386. {
  387. Connected = false;
  388. _Settings?.Logger?.Invoke(Severity.Alert, _Header + "unable to register GUID " + _Settings.Guid + " with the server");
  389. throw new ArgumentException("Server rejected GUID " + _Settings.Guid);
  390. }
  391. }
  392. _TokenSource = new CancellationTokenSource();
  393. _Token = _TokenSource.Token;
  394. _LastActivity = DateTime.UtcNow;
  395. _IsTimeout = false;
  396. _DataReceiver = Task.Run(() => DataReceiver(_Token), _Token);
  397. _IdleServerMonitor = Task.Run(() => IdleServerMonitor(_Token), _Token);
  398. _Events.HandleServerConnected(this, new ConnectionEventArgs());
  399. _Settings?.Logger?.Invoke(Severity.Info, _Header + "connected to " + _ServerIp + ":" + _ServerPort);
  400. }
  401. /// <summary>
  402. /// Disconnect from the server.
  403. /// </summary>
  404. /// <param name="sendNotice">Flag to indicate whether the server should be notified of the disconnect. This message will not be sent until other send requests have been handled.</param>
  405. public void Disconnect(bool sendNotice = true)
  406. {
  407. if (!Connected) throw new InvalidOperationException("Not connected to the server.");
  408. _Settings.Logger?.Invoke(Severity.Info, _Header + "disconnecting from " + _ServerIp + ":" + _ServerPort);
  409. if (Connected && sendNotice)
  410. {
  411. WatsonMessage msg = new WatsonMessage();
  412. msg.Status = MessageStatus.Shutdown;
  413. SendInternalAsync(msg, 0, null, default(CancellationToken)).Wait();
  414. }
  415. if (_TokenSource != null)
  416. {
  417. // stop background tasks
  418. if (!_TokenSource.IsCancellationRequested)
  419. {
  420. _TokenSource.Cancel();
  421. _TokenSource.Dispose();
  422. _Token = default(CancellationToken);
  423. }
  424. }
  425. if (_SslStream != null)
  426. {
  427. _SslStream.Close();
  428. }
  429. if (_TcpStream != null)
  430. {
  431. _TcpStream.Close();
  432. }
  433. if (_Client != null)
  434. {
  435. _Client.Close();
  436. }
  437. while (_DataReceiver?.IsCompleted == false)
  438. {
  439. Task.Delay(10).Wait();
  440. }
  441. while (_IdleServerMonitor?.IsCompleted == false)
  442. {
  443. Task.Delay(10).Wait();
  444. }
  445. Connected = false;
  446. _Settings.Logger?.Invoke(Severity.Info, _Header + "disconnected from " + _ServerIp + ":" + _ServerPort);
  447. }
  448. /// <summary>
  449. /// Send a pre-shared key to the server to authenticate.
  450. /// </summary>
  451. /// <param name="presharedKey">Up to 16-character string.</param>
  452. /// <param name="token">Cancellation token to cancel the request.</param>
  453. public async Task AuthenticateAsync(string presharedKey, CancellationToken token = default)
  454. {
  455. if (String.IsNullOrEmpty(presharedKey)) throw new ArgumentNullException(nameof(presharedKey));
  456. if (presharedKey.Length != 16) throw new ArgumentException("Preshared key length must be 16 bytes.");
  457. WatsonMessage msg = new WatsonMessage();
  458. msg.Status = MessageStatus.AuthRequested;
  459. msg.PresharedKey = Encoding.UTF8.GetBytes(presharedKey);
  460. await SendInternalAsync(msg, 0, null, token);
  461. }
  462. #region SendAsync
  463. /// <summary>
  464. /// Send data and metadata to the server asynchronously.
  465. /// </summary>
  466. /// <param name="data">String containing data.</param>
  467. /// <param name="metadata">Dictionary containing metadata.</param>
  468. /// <param name="token">Cancellation token to cancel the request.</param>
  469. /// <returns>Boolean indicating if the message was sent successfully.</returns>
  470. public async Task<bool> SendAsync(string data, Dictionary<string, object> metadata = null, CancellationToken token = default)
  471. {
  472. if (String.IsNullOrEmpty(data)) return await SendAsync(Array.Empty<byte>(), metadata);
  473. if (token == default(CancellationToken)) token = _Token;
  474. return await SendAsync(Encoding.UTF8.GetBytes(data), metadata, 0, token).ConfigureAwait(false);
  475. }
  476. /// <summary>
  477. /// Send data and metadata to the server asynchronously.
  478. /// </summary>
  479. /// <param name="data">Byte array containing data.</param>
  480. /// <param name="metadata">Dictionary containing metadata.</param>
  481. /// <param name="start">Start position within the supplied array.</param>
  482. /// <param name="token">Cancellation token to cancel the request.</param>
  483. /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
  484. public async Task<bool> SendAsync(byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  485. {
  486. if (token == default(CancellationToken)) token = _Token;
  487. if (data == null) data = Array.Empty<byte>();
  488. WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
  489. return await SendAsync(contentLength, stream, metadata, token).ConfigureAwait(false);
  490. }
  491. /// <summary>
  492. /// Send data and metadata to the server from a stream asynchronously.
  493. /// </summary>
  494. /// <param name="contentLength">The number of bytes to send.</param>
  495. /// <param name="stream">The stream containing the data.</param>
  496. /// <param name="metadata">Dictionary containing metadata.</param>
  497. /// <param name="token">Cancellation token to cancel the request.</param>
  498. /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
  499. public async Task<bool> SendAsync(long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
  500. {
  501. if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
  502. if (token == default(CancellationToken)) token = _Token;
  503. if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
  504. WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, false, false, null, metadata);
  505. return await SendInternalAsync(msg, contentLength, stream, token).ConfigureAwait(false);
  506. }
  507. #endregion
  508. #region SendAndWaitAsync
  509. /// <summary>
  510. /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
  511. /// </summary>
  512. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  513. /// <param name="data">Data to send.</param>
  514. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  515. /// <param name="start">Start position within the supplied array.</param>
  516. /// <param name="token">Cancellation token to cancel the request.</param>
  517. /// <returns>SyncResponse.</returns>
  518. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, string data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  519. {
  520. if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
  521. if (String.IsNullOrEmpty(data)) return await SendAndWaitAsync(timeoutMs, Array.Empty<byte>(), metadata, start, token);
  522. return await SendAndWaitAsync(timeoutMs, Encoding.UTF8.GetBytes(data), metadata, start, token).ConfigureAwait(false);
  523. }
  524. /// <summary>
  525. /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
  526. /// </summary>
  527. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  528. /// <param name="data">Data to send.</param>
  529. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  530. /// <param name="start">Start position within the supplied array.</param>
  531. /// <param name="token">Cancellation token to cancel the request.</param>
  532. /// <returns>SyncResponse.</returns>
  533. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  534. {
  535. if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
  536. if (data == null) data = Array.Empty<byte>();
  537. WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
  538. return await SendAndWaitAsync(timeoutMs, contentLength, stream, metadata, token).ConfigureAwait(false);
  539. }
  540. /// <summary>
  541. /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
  542. /// </summary>
  543. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  544. /// <param name="contentLength">The number of bytes to send from the supplied stream.</param>
  545. /// <param name="stream">Stream containing data.</param>
  546. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  547. /// <param name="token">Cancellation token to cancel the request.</param>
  548. /// <returns>SyncResponse.</returns>
  549. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
  550. {
  551. if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
  552. if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
  553. if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
  554. DateTime expiration = DateTime.UtcNow.AddMilliseconds(timeoutMs);
  555. WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, true, false, expiration, metadata);
  556. return await SendAndWaitInternalAsync(msg, timeoutMs, contentLength, stream, token).ConfigureAwait(false);
  557. }
  558. #endregion
  559. #endregion
  560. #region Private-Methods
  561. /// <summary>
  562. /// Disconnect the client and dispose of background workers.
  563. /// Do not reuse the object after disposal.
  564. /// </summary>
  565. /// <param name="disposing">Indicate if resources should be disposed.</param>
  566. protected virtual void Dispose(bool disposing)
  567. {
  568. if (disposing)
  569. {
  570. _Settings.Logger?.Invoke(Severity.Info, _Header + "disposing");
  571. if (Connected) Disconnect();
  572. if (_SslCertificate != null)
  573. _SslCertificate.Dispose();
  574. if (_WriteLock != null)
  575. _WriteLock.Dispose();
  576. if (_ReadLock != null)
  577. _ReadLock.Dispose();
  578. _Settings = null;
  579. _Events = null;
  580. _Callbacks = null;
  581. _Statistics = null;
  582. _Keepalive = null;
  583. _SslConfiguration = null;
  584. _SourceIp = null;
  585. _ServerIp = null;
  586. _Client = null;
  587. _DataStream = null;
  588. _TcpStream = null;
  589. _SslStream = null;
  590. _SslCertificate = null;
  591. _SslCertificateCollection = null;
  592. _WriteLock = null;
  593. _ReadLock = null;
  594. _DataReceiver = null;
  595. }
  596. }
  597. #region Connection
  598. private void EnableKeepalives()
  599. {
  600. // issues with definitions: https://github.com/dotnet/sdk/issues/14540
  601. try
  602. {
  603. #if NET6_0_OR_GREATER
  604. _Client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  605. _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, _Keepalive.TcpKeepAliveTime);
  606. _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, _Keepalive.TcpKeepAliveInterval);
  607. // Windows 10 version 1703 or later
  608. if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
  609. && Environment.OSVersion.Version >= new Version(10, 0, 15063))
  610. {
  611. _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, _Keepalive.TcpKeepAliveRetryCount);
  612. }
  613. #elif NETFRAMEWORK
  614. // .NET Framework expects values in milliseconds
  615. byte[] keepAlive = new byte[12];
  616. Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
  617. Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
  618. Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
  619. _Client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null);
  620. #elif NETSTANDARD
  621. #endif
  622. }
  623. catch (Exception)
  624. {
  625. _Settings.Logger?.Invoke(Severity.Error, _Header + "keepalives not supported on this platform, disabled");
  626. _Keepalive.EnableTcpKeepAlives = false;
  627. }
  628. }
  629. #endregion
  630. #region Read
  631. private async Task DataReceiver(CancellationToken token)
  632. {
  633. DisconnectReason reason = DisconnectReason.Normal;
  634. while (true)
  635. {
  636. try
  637. {
  638. token.ThrowIfCancellationRequested();
  639. #region Check-for-Connection
  640. if (_Client == null || !_Client.Connected)
  641. {
  642. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "disconnect detected");
  643. break;
  644. }
  645. #endregion
  646. #region Read-Message
  647. await _ReadLock.WaitAsync(token);
  648. WatsonMessage msg = await _MessageBuilder.BuildFromStream(_DataStream, token);
  649. if (msg == null)
  650. {
  651. await Task.Delay(30, token).ConfigureAwait(false);
  652. continue;
  653. }
  654. _LastActivity = DateTime.UtcNow;
  655. #endregion
  656. #region Process-by-Status
  657. if (msg.Status == MessageStatus.Removed)
  658. {
  659. _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to server-side removal");
  660. reason = DisconnectReason.Removed;
  661. break;
  662. }
  663. else if (msg.Status == MessageStatus.Shutdown)
  664. {
  665. _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to server shutdown");
  666. reason = DisconnectReason.Shutdown;
  667. break;
  668. }
  669. else if (msg.Status == MessageStatus.Timeout)
  670. {
  671. _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to timeout");
  672. reason = DisconnectReason.Timeout;
  673. break;
  674. }
  675. else if (msg.Status == MessageStatus.AuthSuccess)
  676. {
  677. _Settings.Logger?.Invoke(Severity.Debug, _Header + "authentication successful");
  678. Task unawaited = Task.Run(() => _Events.HandleAuthenticationSucceeded(this, EventArgs.Empty), token);
  679. continue;
  680. }
  681. else if (msg.Status == MessageStatus.AuthFailure)
  682. {
  683. _Settings.Logger?.Invoke(Severity.Error, _Header + "authentication failed");
  684. reason = DisconnectReason.AuthFailure;
  685. Task unawaited = Task.Run(() => _Events.HandleAuthenticationFailure(this, EventArgs.Empty), token);
  686. break;
  687. }
  688. else if (msg.Status == MessageStatus.AuthRequired)
  689. {
  690. _Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key");
  691. string psk = _Callbacks.HandleAuthenticationRequested();
  692. if (!String.IsNullOrEmpty(psk)) await AuthenticateAsync(psk, token);
  693. continue;
  694. }
  695. #endregion
  696. #region Process-Message
  697. if (msg.SyncRequest)
  698. {
  699. _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request received: " + msg.ConversationGuid.ToString());
  700. DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
  701. byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  702. if (DateTime.UtcNow < expiration)
  703. {
  704. Task unawaited = Task.Run(async () =>
  705. {
  706. SyncRequest syncReq = new SyncRequest(
  707. null,
  708. msg.ConversationGuid,
  709. msg.ExpirationUtc.Value,
  710. msg.Metadata,
  711. msgData);
  712. SyncResponse syncResp = null;
  713. #pragma warning disable CS0618 // Type or member is obsolete
  714. if (_Callbacks.SyncRequestReceivedAsync != null)
  715. {
  716. syncResp = await _Callbacks.HandleSyncRequestReceivedAsync(syncReq);
  717. }
  718. else if (_Callbacks.SyncRequestReceived != null)
  719. {
  720. syncResp = _Callbacks.HandleSyncRequestReceived(syncReq);
  721. }
  722. #pragma warning restore CS0618 // Type or member is obsolete
  723. if (syncResp != null)
  724. {
  725. WatsonCommon.BytesToStream(syncResp.Data, 0, out int contentLength, out Stream stream);
  726. WatsonMessage respMsg = _MessageBuilder.ConstructNew(
  727. contentLength,
  728. stream,
  729. false,
  730. true,
  731. msg.ExpirationUtc.Value,
  732. syncResp.Metadata);
  733. respMsg.ConversationGuid = msg.ConversationGuid;
  734. await SendInternalAsync(respMsg, contentLength, stream, token).ConfigureAwait(false);
  735. }
  736. }, _Token);
  737. }
  738. else
  739. {
  740. _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded");
  741. }
  742. }
  743. else if (msg.SyncResponse)
  744. {
  745. // No need to amend message expiration; it is copied from the request, which was set by this node
  746. // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
  747. _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous response received: " + msg.ConversationGuid.ToString());
  748. byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  749. if (DateTime.UtcNow < msg.ExpirationUtc.Value)
  750. {
  751. lock (_SyncResponseLock)
  752. {
  753. _SyncResponseReceived?.Invoke(this,new SyncResponseReceivedEventArgs(msg,msgData));
  754. }
  755. }
  756. else
  757. {
  758. _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous response received and discarded");
  759. }
  760. }
  761. else
  762. {
  763. byte[] msgData = null;
  764. if (_Events.IsUsingMessages)
  765. {
  766. msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  767. MessageReceivedEventArgs args = new MessageReceivedEventArgs(null, msg.Metadata, msgData);
  768. await Task.Run(() => _Events.HandleMessageReceived(this, args));
  769. }
  770. else if (_Events.IsUsingStreams)
  771. {
  772. StreamReceivedEventArgs sr = null;
  773. WatsonStream ws = null;
  774. if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
  775. {
  776. ws = new WatsonStream(msg.ContentLength, msg.DataStream);
  777. sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
  778. _Events.HandleStreamReceived(this, sr);
  779. }
  780. else
  781. {
  782. MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  783. ws = new WatsonStream(msg.ContentLength, ms);
  784. sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
  785. Task unawaited = Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
  786. }
  787. }
  788. else
  789. {
  790. _Settings.Logger?.Invoke(Severity.Error, _Header + "event handler not set for either MessageReceived or StreamReceived");
  791. break;
  792. }
  793. }
  794. #endregion
  795. _Statistics.IncrementReceivedMessages();
  796. _Statistics.AddReceivedBytes(msg.ContentLength);
  797. }
  798. catch (ObjectDisposedException ode)
  799. {
  800. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "object disposed exception encountered");
  801. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ode));
  802. break;
  803. }
  804. catch (TaskCanceledException tce)
  805. {
  806. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "task canceled exception encountered");
  807. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(tce));
  808. break;
  809. }
  810. catch (OperationCanceledException oce)
  811. {
  812. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "operation canceled exception encountered");
  813. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(oce));
  814. break;
  815. }
  816. catch (IOException ioe)
  817. {
  818. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "IO exception encountered");
  819. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ioe));
  820. break;
  821. }
  822. catch (Exception e)
  823. {
  824. _Settings?.Logger?.Invoke(Severity.Error,
  825. _Header + "data receiver exception for " + _ServerIp + ":" + _ServerPort + ": " + e.Message + Environment.NewLine);
  826. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  827. break;
  828. }
  829. finally
  830. {
  831. if (_ReadLock != null) _ReadLock.Release();
  832. }
  833. }
  834. Connected = false;
  835. if (_IsTimeout) reason = DisconnectReason.Timeout;
  836. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "data receiver terminated for " + _ServerIp + ":" + _ServerPort);
  837. _Events?.HandleServerDisconnected(this, new DisconnectionEventArgs(null, reason));
  838. }
  839. #endregion
  840. #region Send
  841. private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength, Stream stream, CancellationToken token)
  842. {
  843. if (msg == null) throw new ArgumentNullException(nameof(msg));
  844. if (!Connected) return false;
  845. if (contentLength > 0 && (stream == null || !stream.CanRead))
  846. {
  847. throw new ArgumentException("Cannot read from supplied stream.");
  848. }
  849. if (token == default(CancellationToken))
  850. {
  851. CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token, _Token);
  852. token = linkedCts.Token;
  853. }
  854. bool disconnectDetected = false;
  855. if (_Client == null || !_Client.Connected)
  856. {
  857. return false;
  858. }
  859. await _WriteLock.WaitAsync(token).ConfigureAwait(false);
  860. try
  861. {
  862. await SendHeadersAsync(msg, token).ConfigureAwait(false);
  863. await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false);
  864. _Statistics.IncrementSentMessages();
  865. _Statistics.AddSentBytes(contentLength);
  866. return true;
  867. }
  868. catch (TaskCanceledException)
  869. {
  870. return false;
  871. }
  872. catch (OperationCanceledException)
  873. {
  874. return false;
  875. }
  876. catch (Exception e)
  877. {
  878. _Settings.Logger?.Invoke(Severity.Error,
  879. _Header + "failed to write message to " + _ServerIp + ":" + _ServerPort + ":" +
  880. Environment.NewLine +
  881. e.ToString() +
  882. Environment.NewLine);
  883. disconnectDetected = true;
  884. return false;
  885. }
  886. finally
  887. {
  888. _WriteLock.Release();
  889. if (disconnectDetected)
  890. {
  891. Connected = false;
  892. Dispose();
  893. }
  894. }
  895. }
  896. private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token)
  897. {
  898. if (msg == null) throw new ArgumentNullException(nameof(msg));
  899. if (!Connected) throw new InvalidOperationException("Client is not connected to the server.");
  900. if (contentLength > 0 && (stream == null || !stream.CanRead))
  901. throw new ArgumentException("Cannot read from supplied stream.");
  902. bool disconnectDetected = false;
  903. if (_Client == null || !_Client.Connected)
  904. {
  905. disconnectDetected = true;
  906. throw new InvalidOperationException("Client is not connected to the server.");
  907. }
  908. await _WriteLock.WaitAsync(token).ConfigureAwait(false);
  909. SyncResponse ret = null;
  910. AutoResetEvent responded = new AutoResetEvent(false);
  911. // Create a new handler specially for this conversation
  912. EventHandler<SyncResponseReceivedEventArgs> handler = (sender, e) =>
  913. {
  914. if (e.Message.ConversationGuid == msg.ConversationGuid)
  915. {
  916. ret = new SyncResponse(msg.ConversationGuid, e.Message.ExpirationUtc.Value, e.Message.Metadata, e.Data);
  917. responded.Set();
  918. }
  919. };
  920. // Subscribe
  921. _SyncResponseReceived += handler;
  922. try
  923. {
  924. await SendHeadersAsync(msg, token).ConfigureAwait(false);
  925. await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false);
  926. _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request sent: " + msg.ConversationGuid);
  927. _Statistics.IncrementSentMessages();
  928. _Statistics.AddSentBytes(contentLength);
  929. }
  930. catch (TaskCanceledException)
  931. {
  932. return null;
  933. }
  934. catch (OperationCanceledException)
  935. {
  936. return null;
  937. }
  938. catch (Exception e)
  939. {
  940. _Settings.Logger?.Invoke(Severity.Error, _Header + "failed to write message to " + _ServerIp + ":" + _ServerPort + ": " + e.Message);
  941. _SyncResponseReceived -= handler;
  942. disconnectDetected = true;
  943. throw;
  944. }
  945. finally
  946. {
  947. _WriteLock.Release();
  948. if (disconnectDetected)
  949. {
  950. Connected = false;
  951. Dispose();
  952. }
  953. }
  954. // Wait for responded.Set() to be called
  955. responded.WaitOne(new TimeSpan(0,0,0,0, timeoutMs));
  956. // Unsubscribe
  957. _SyncResponseReceived -= handler;
  958. if (ret != null)
  959. {
  960. return ret;
  961. }
  962. else
  963. {
  964. _Settings.Logger?.Invoke(Severity.Error, _Header + "synchronous response not received within the timeout window");
  965. throw new TimeoutException("A response to a synchronous request was not received within the timeout window.");
  966. }
  967. }
  968. private async Task SendHeadersAsync(WatsonMessage msg, CancellationToken token)
  969. {
  970. msg.SenderGuid = _Settings.Guid;
  971. byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
  972. await _DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false);
  973. await _DataStream.FlushAsync(token).ConfigureAwait(false);
  974. }
  975. private async Task SendDataStreamAsync(long contentLength, Stream stream, CancellationToken token)
  976. {
  977. if (contentLength <= 0) return;
  978. long bytesRemaining = contentLength;
  979. int bytesRead = 0;
  980. while (bytesRemaining > 0)
  981. {
  982. if (bytesRemaining >= _Settings.StreamBufferSize)
  983. {
  984. _SendBuffer = new byte[_Settings.StreamBufferSize];
  985. }
  986. else
  987. {
  988. _SendBuffer = new byte[bytesRemaining];
  989. }
  990. bytesRead = await stream.ReadAsync(_SendBuffer, 0, _SendBuffer.Length, token).ConfigureAwait(false);
  991. if (bytesRead > 0)
  992. {
  993. await _DataStream.WriteAsync(_SendBuffer, 0, bytesRead, token).ConfigureAwait(false);
  994. bytesRemaining -= bytesRead;
  995. }
  996. }
  997. await _DataStream.FlushAsync(token).ConfigureAwait(false);
  998. }
  999. #endregion
  1000. #region Tasks
  1001. private async Task IdleServerMonitor(CancellationToken token)
  1002. {
  1003. while (!token.IsCancellationRequested)
  1004. {
  1005. try
  1006. {
  1007. await Task.Delay(_Settings.IdleServerEvaluationIntervalMs, token).ConfigureAwait(false);
  1008. if (_Settings.IdleServerTimeoutMs == 0) continue;
  1009. DateTime timeoutTime = _LastActivity.AddMilliseconds(_Settings.IdleServerTimeoutMs);
  1010. if (DateTime.UtcNow > timeoutTime)
  1011. {
  1012. _Settings.Logger?.Invoke(Severity.Warn, _Header + "disconnecting from " + _ServerIp + ":" + _ServerPort + " due to timeout");
  1013. _IsTimeout = true;
  1014. Disconnect();
  1015. }
  1016. }
  1017. catch (TaskCanceledException)
  1018. {
  1019. }
  1020. catch (OperationCanceledException)
  1021. {
  1022. }
  1023. catch (Exception e)
  1024. {
  1025. _Settings.Logger?.Invoke(Severity.Warn, _Header + "exception encountered while monitoring for idle server connection: " + e.Message);
  1026. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  1027. }
  1028. }
  1029. }
  1030. #endregion
  1031. #endregion
  1032. }
  1033. }