WatsonTcpServer.cs 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441
  1. namespace WatsonTcp
  2. {
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.IO;
  7. using System.Linq;
  8. using System.Net;
  9. using System.Net.NetworkInformation;
  10. using System.Net.Security;
  11. using System.Net.Sockets;
  12. using System.Runtime.InteropServices;
  13. using System.Security.AccessControl;
  14. using System.Security.Cryptography.X509Certificates;
  15. using System.Text;
  16. using System.Threading;
  17. using System.Threading.Tasks;
  18. /// <summary>
  19. /// Watson TCP server, with or without SSL.
  20. /// </summary>
  21. public class WatsonTcpServer : IDisposable
  22. {
  23. #region Public-Members
  24. /// <summary>
  25. /// Watson TCP server settings.
  26. /// </summary>
  27. public WatsonTcpServerSettings Settings
  28. {
  29. get
  30. {
  31. return _Settings;
  32. }
  33. set
  34. {
  35. if (value == null) _Settings = new WatsonTcpServerSettings();
  36. else _Settings = value;
  37. }
  38. }
  39. /// <summary>
  40. /// Watson TCP server events.
  41. /// </summary>
  42. public WatsonTcpServerEvents Events
  43. {
  44. get
  45. {
  46. return _Events;
  47. }
  48. set
  49. {
  50. if (value == null) _Events = new WatsonTcpServerEvents();
  51. else _Events = value;
  52. }
  53. }
  54. /// <summary>
  55. /// Watson TCP server callbacks.
  56. /// </summary>
  57. public WatsonTcpServerCallbacks Callbacks
  58. {
  59. get
  60. {
  61. return _Callbacks;
  62. }
  63. set
  64. {
  65. if (value == null) _Callbacks = new WatsonTcpServerCallbacks();
  66. else _Callbacks = value;
  67. }
  68. }
  69. /// <summary>
  70. /// Watson TCP statistics.
  71. /// </summary>
  72. public WatsonTcpStatistics Statistics
  73. {
  74. get
  75. {
  76. return _Statistics;
  77. }
  78. }
  79. /// <summary>
  80. /// Watson TCP keepalive settings.
  81. /// </summary>
  82. public WatsonTcpKeepaliveSettings Keepalive
  83. {
  84. get
  85. {
  86. return _Keepalive;
  87. }
  88. set
  89. {
  90. if (value == null) _Keepalive = new WatsonTcpKeepaliveSettings();
  91. else _Keepalive = value;
  92. }
  93. }
  94. /// <summary>
  95. /// Watson TCP server SSL configuration.
  96. /// </summary>
  97. public WatsonTcpServerSslConfiguration SslConfiguration
  98. {
  99. get
  100. {
  101. return _SslConfiguration;
  102. }
  103. set
  104. {
  105. if (value == null) _SslConfiguration = new WatsonTcpServerSslConfiguration();
  106. else _SslConfiguration = value;
  107. }
  108. }
  109. /// <summary>
  110. /// JSON serialization helper.
  111. /// </summary>
  112. public ISerializationHelper SerializationHelper
  113. {
  114. get
  115. {
  116. return _SerializationHelper;
  117. }
  118. set
  119. {
  120. if (value == null) throw new ArgumentNullException(nameof(SerializationHelper));
  121. _SerializationHelper = value;
  122. _MessageBuilder.SerializationHelper = value;
  123. }
  124. }
  125. /// <summary>
  126. /// Retrieve the number of current connected clients.
  127. /// </summary>
  128. public int Connections
  129. {
  130. get
  131. {
  132. return _Connections;
  133. }
  134. }
  135. /// <summary>
  136. /// Flag to indicate if Watson TCP is listening for incoming TCP connections.
  137. /// </summary>
  138. public bool IsListening
  139. {
  140. get
  141. {
  142. return _IsListening;
  143. }
  144. }
  145. #endregion
  146. #region Private-Members
  147. private string _Header = "[WatsonTcpServer] ";
  148. private WatsonMessageBuilder _MessageBuilder = new WatsonMessageBuilder();
  149. private WatsonTcpServerSettings _Settings = new WatsonTcpServerSettings();
  150. private WatsonTcpServerEvents _Events = new WatsonTcpServerEvents();
  151. private WatsonTcpServerCallbacks _Callbacks = new WatsonTcpServerCallbacks();
  152. private WatsonTcpStatistics _Statistics = new WatsonTcpStatistics();
  153. private WatsonTcpKeepaliveSettings _Keepalive = new WatsonTcpKeepaliveSettings();
  154. private WatsonTcpServerSslConfiguration _SslConfiguration = new WatsonTcpServerSslConfiguration();
  155. private ClientMetadataManager _ClientManager = new ClientMetadataManager();
  156. private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper();
  157. private int _Connections = 0;
  158. private bool _IsListening = false;
  159. private Mode _Mode;
  160. private TlsVersion _TlsVersion = TlsVersion.Tls12;
  161. private string _ListenerIp;
  162. private int _ListenerPort;
  163. private IPAddress _ListenerIpAddress;
  164. private TcpListener _Listener;
  165. private X509Certificate2 _SslCertificate;
  166. private CancellationTokenSource _TokenSource = new CancellationTokenSource();
  167. private CancellationToken _Token;
  168. private Task _AcceptConnections = null;
  169. private Task _MonitorClients = null;
  170. private readonly object _SyncResponseLock = new object();
  171. private event EventHandler<SyncResponseReceivedEventArgs> _SyncResponseReceived;
  172. #endregion
  173. #region Constructors-and-Factories
  174. /// <summary>
  175. /// Initialize the Watson TCP server without SSL.
  176. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
  177. /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
  178. /// Call Start() afterward to start the server.
  179. /// </summary>
  180. /// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
  181. /// <param name="listenerPort">The TCP port on which the server should listen.</param>
  182. public WatsonTcpServer(
  183. string listenerIp,
  184. int listenerPort)
  185. {
  186. if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
  187. _Mode = Mode.Tcp;
  188. // According to the https://github.com/dotnet/WatsonTcp?tab=readme-ov-file#local-vs-external-connections
  189. if (string.IsNullOrEmpty(listenerIp) || listenerIp.Equals("*") || listenerIp.Equals("+") || listenerIp.Equals("0.0.0.0"))
  190. {
  191. _ListenerIpAddress = IPAddress.Any;
  192. _ListenerIp = _ListenerIpAddress.ToString();
  193. }
  194. else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
  195. {
  196. _ListenerIpAddress = IPAddress.Loopback;
  197. _ListenerIp = _ListenerIpAddress.ToString();
  198. }
  199. else
  200. {
  201. _ListenerIpAddress = IPAddress.Parse(listenerIp);
  202. _ListenerIp = listenerIp;
  203. }
  204. _ListenerPort = listenerPort;
  205. SerializationHelper.InstantiateConverter(); // Unity fix
  206. }
  207. /// <summary>
  208. /// Initialize the Watson TCP server with SSL.
  209. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
  210. /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
  211. /// Call Start() afterward to start the server.
  212. /// </summary>
  213. /// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
  214. /// <param name="listenerPort">The TCP port on which the server should listen.</param>
  215. /// <param name="pfxCertFile">The file containing the SSL certificate.</param>
  216. /// <param name="pfxCertPass">The password for the SSL certificate.</param>
  217. /// <param name="tlsVersion">The TLS version required for client connections.</param>
  218. public WatsonTcpServer(
  219. string listenerIp,
  220. int listenerPort,
  221. string pfxCertFile,
  222. string pfxCertPass,
  223. TlsVersion tlsVersion = TlsVersion.Tls12)
  224. {
  225. if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
  226. if (String.IsNullOrEmpty(pfxCertFile)) throw new ArgumentNullException(nameof(pfxCertFile));
  227. _Mode = Mode.Ssl;
  228. _TlsVersion = tlsVersion;
  229. if (String.IsNullOrEmpty(listenerIp))
  230. {
  231. _ListenerIpAddress = IPAddress.Any;
  232. _ListenerIp = _ListenerIpAddress.ToString();
  233. }
  234. else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
  235. {
  236. _ListenerIpAddress = IPAddress.Loopback;
  237. _ListenerIp = _ListenerIpAddress.ToString();
  238. }
  239. else
  240. {
  241. _ListenerIpAddress = IPAddress.Parse(listenerIp);
  242. _ListenerIp = listenerIp;
  243. }
  244. _SslCertificate = null;
  245. if (String.IsNullOrEmpty(pfxCertPass))
  246. {
  247. _SslCertificate = new X509Certificate2(pfxCertFile);
  248. }
  249. else
  250. {
  251. _SslCertificate = new X509Certificate2(pfxCertFile, pfxCertPass);
  252. }
  253. _ListenerPort = listenerPort;
  254. SerializationHelper.InstantiateConverter(); // Unity fix
  255. }
  256. /// <summary>
  257. /// Initialize the Watson TCP server with SSL.
  258. /// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
  259. /// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
  260. /// Call Start() afterward to start the server.
  261. /// </summary>
  262. /// <param name="listenerIp">The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).</param>
  263. /// <param name="listenerPort">The TCP port on which the server should listen.</param>
  264. /// <param name="cert">The SSL certificate.</param>
  265. /// <param name="tlsVersion">The TLS version required for client connections.</param>
  266. /// <exception cref="ArgumentOutOfRangeException"></exception>
  267. public WatsonTcpServer(
  268. string listenerIp,
  269. int listenerPort,
  270. X509Certificate2 cert,
  271. TlsVersion tlsVersion = TlsVersion.Tls12)
  272. {
  273. if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
  274. if (cert == null) throw new ArgumentNullException(nameof(cert));
  275. _Mode = Mode.Ssl;
  276. _TlsVersion = tlsVersion;
  277. _SslCertificate = cert;
  278. if (String.IsNullOrEmpty(listenerIp))
  279. {
  280. _ListenerIpAddress = IPAddress.Any;
  281. _ListenerIp = _ListenerIpAddress.ToString();
  282. }
  283. else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
  284. {
  285. _ListenerIpAddress = IPAddress.Loopback;
  286. _ListenerIp = _ListenerIpAddress.ToString();
  287. }
  288. else
  289. {
  290. _ListenerIpAddress = IPAddress.Parse(listenerIp);
  291. _ListenerIp = listenerIp;
  292. }
  293. _ListenerPort = listenerPort;
  294. SerializationHelper.InstantiateConverter(); // Unity fix
  295. }
  296. #endregion
  297. #region Public-Methods
  298. /// <summary>
  299. /// Tear down the server and dispose of background workers.
  300. /// Do not reuse the object after disposal.
  301. /// </summary>
  302. public void Dispose()
  303. {
  304. Dispose(true);
  305. GC.SuppressFinalize(this);
  306. }
  307. /// <summary>
  308. /// Start accepting connections.
  309. /// </summary>
  310. public void Start()
  311. {
  312. if (_IsListening) throw new InvalidOperationException("WatsonTcpServer is already running.");
  313. _ClientManager = new ClientMetadataManager();
  314. _TokenSource = new CancellationTokenSource();
  315. _Token = _TokenSource.Token;
  316. _Statistics = new WatsonTcpStatistics();
  317. _Listener = new TcpListener(_ListenerIpAddress, _ListenerPort);
  318. if (!_Events.IsUsingMessages && !_Events.IsUsingStreams)
  319. throw new InvalidOperationException("One of either 'MessageReceived' or 'StreamReceived' events must first be set.");
  320. if (_Mode == Mode.Tcp)
  321. {
  322. _Settings.Logger?.Invoke(Severity.Info, _Header + "starting on " + _ListenerIp + ":" + _ListenerPort);
  323. }
  324. else if (_Mode == Mode.Ssl)
  325. {
  326. _Settings.Logger?.Invoke(Severity.Info, _Header + "starting with SSL on " + _ListenerIp + ":" + _ListenerPort);
  327. }
  328. else
  329. {
  330. throw new ArgumentException("Unknown mode: " + _Mode.ToString());
  331. }
  332. _Listener.Start();
  333. _AcceptConnections = Task.Run(() => AcceptConnections(_Token), _Token); // sets _IsListening
  334. _MonitorClients = Task.Run(() => MonitorForIdleClients(_Token), _Token);
  335. _Events.HandleServerStarted(this, EventArgs.Empty);
  336. }
  337. /// <summary>
  338. /// Stop accepting connections.
  339. /// </summary>
  340. public void Stop()
  341. {
  342. _IsListening = false;
  343. _Listener.Stop();
  344. _TokenSource.Cancel();
  345. _Settings.Logger?.Invoke(Severity.Info, _Header + "stopped");
  346. _Events.HandleServerStopped(this, EventArgs.Empty);
  347. }
  348. #region SendAsync
  349. /// <summary>
  350. /// Send data and metadata to the specified client, asynchronously.
  351. /// </summary>
  352. /// <param name="guid">Globally-unique identifier of the client.</param>
  353. /// <param name="data">String containing data.</param>
  354. /// <param name="metadata">Dictionary containing metadata.</param>
  355. /// <param name="start">Start position within the supplied array.</param>
  356. /// <param name="token">Cancellation token to cancel the request.</param>
  357. /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
  358. public async Task<bool> SendAsync(Guid guid, string data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  359. {
  360. byte[] bytes = Array.Empty<byte>();
  361. if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
  362. return await SendAsync(guid, bytes, metadata, start, token).ConfigureAwait(false);
  363. }
  364. /// <summary>
  365. /// Send data and metadata to the specified client, asynchronously.
  366. /// </summary>
  367. /// <param name="guid">Globally-unique identifier of the client.</param>
  368. /// <param name="data">Byte array containing data.</param>
  369. /// <param name="metadata">Dictionary containing metadata.</param>
  370. /// <param name="start">Start position within the supplied array.</param>
  371. /// <param name="token">Cancellation token to cancel the request.</param>
  372. /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
  373. public async Task<bool> SendAsync(Guid guid, byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  374. {
  375. if (data == null) data = Array.Empty<byte>();
  376. WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
  377. return await SendAsync(guid, contentLength, stream, metadata, token).ConfigureAwait(false);
  378. }
  379. /// <summary>
  380. /// Send data and metadata to the specified client using a stream, asynchronously.
  381. /// </summary>
  382. /// <param name="guid">Globally-unique identifier of the client.</param>
  383. /// <param name="contentLength">The number of bytes in the stream.</param>
  384. /// <param name="stream">The stream containing the data.</param>
  385. /// <param name="metadata">Dictionary containing metadata.</param>
  386. /// <param name="token">Cancellation token to cancel the request.</param>
  387. /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
  388. public async Task<bool> SendAsync(Guid guid, long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
  389. {
  390. if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
  391. if (token == default(CancellationToken)) token = _Token;
  392. ClientMetadata client = _ClientManager.GetClient(guid);
  393. if (client == null)
  394. {
  395. _Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
  396. throw new KeyNotFoundException("Unable to find client " + guid.ToString() + ".");
  397. }
  398. if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
  399. WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, false, false, null, metadata);
  400. return await SendInternalAsync(client, msg, contentLength, stream, token).ConfigureAwait(false);
  401. }
  402. #endregion
  403. #region SendAndWaitAsync
  404. /// <summary>
  405. /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
  406. /// </summary>
  407. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  408. /// <param name="guid">Globally-unique identifier of the client.</param>
  409. /// <param name="data">Data to send.</param>
  410. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  411. /// <param name="start">Start position within the supplied array.</param>
  412. /// <param name="token">Cancellation token to cancel the request.</param>
  413. /// <returns>SyncResponse.</returns>
  414. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, Guid guid, string data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  415. {
  416. byte[] bytes = Array.Empty<byte>();
  417. if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
  418. return await SendAndWaitAsync(timeoutMs, guid, bytes, metadata, start, token);
  419. // SendAndWaitAsync(timeoutMs, guid, bytes, metadata, token).ConfigureAwait(false);
  420. }
  421. /// <summary>
  422. /// Send data and wait for a response for the specified number of milliseconds.
  423. /// </summary>
  424. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  425. /// <param name="guid">Globally-unique identifier of the client.</param>
  426. /// <param name="data">Data to send.</param>
  427. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  428. /// <param name="start">Start position within the supplied array.</param>
  429. /// <param name="token">Cancellation token to cancel the request.</param>
  430. /// <returns>SyncResponse.</returns>
  431. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, Guid guid, byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
  432. {
  433. if (data == null) data = Array.Empty<byte>();
  434. WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
  435. return await SendAndWaitAsync(timeoutMs, guid, contentLength, stream, metadata, token);
  436. }
  437. /// <summary>
  438. /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
  439. /// </summary>
  440. /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
  441. /// <param name="guid">Globally-unique identifier of the client.</param>
  442. /// <param name="contentLength">The number of bytes to send from the supplied stream.</param>
  443. /// <param name="stream">Stream containing data.</param>
  444. /// <param name="metadata">Metadata dictionary to attach to the message.</param>
  445. /// <param name="token">Cancellation token to cancel the request.</param>
  446. /// <returns>SyncResponse.</returns>
  447. public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, Guid guid, long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
  448. {
  449. if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
  450. if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
  451. ClientMetadata client = _ClientManager.GetClient(guid);
  452. if (client == null)
  453. {
  454. _Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
  455. throw new KeyNotFoundException("Unable to find client " + guid.ToString() + ".");
  456. }
  457. if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
  458. DateTime expiration = DateTime.UtcNow.AddMilliseconds(timeoutMs);
  459. WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, true, false, expiration, metadata);
  460. return await SendAndWaitInternalAsync(client, msg, timeoutMs, contentLength, stream, token);
  461. }
  462. #endregion
  463. /// <summary>
  464. /// Determine whether or not the specified client is connected to the server.
  465. /// </summary>
  466. /// <param name="guid">Globally-unique identifier of the client.</param>
  467. /// <returns>Boolean indicating if the client is connected to the server.</returns>
  468. public bool IsClientConnected(Guid guid)
  469. {
  470. return _ClientManager.ExistsClient(guid);
  471. }
  472. /// <summary>
  473. /// Retrieve the client metadata associated with each connected client.
  474. /// </summary>
  475. /// <returns>An enumerable collection of client metadata.</returns>
  476. public IEnumerable<ClientMetadata> ListClients()
  477. {
  478. Dictionary<Guid, ClientMetadata> clients = _ClientManager.AllClients();
  479. if (clients != null && clients.Count > 0)
  480. {
  481. foreach (KeyValuePair<Guid, ClientMetadata> client in clients)
  482. {
  483. yield return client.Value;
  484. }
  485. }
  486. }
  487. /// <summary>
  488. /// Disconnects the specified client.
  489. /// </summary>
  490. /// <param name="guid">Globally-unique identifier of the client.</param>
  491. /// <param name="status">Reason for the disconnect. This is conveyed to the client.</param>
  492. /// <param name="sendNotice">Flag to indicate whether the client should be notified of the disconnect. This message will not be sent until other send requests have been handled.</param>
  493. /// <param name="token">Cancellation token to cancel the request.</param>
  494. public async Task DisconnectClientAsync(Guid guid, MessageStatus status = MessageStatus.Removed, bool sendNotice = true, CancellationToken token = default)
  495. {
  496. ClientMetadata client = _ClientManager.GetClient(guid);
  497. if (client == null)
  498. {
  499. _Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
  500. }
  501. else
  502. {
  503. if (!_ClientManager.ExistsClientTimedout(guid)) _ClientManager.AddClientKicked(guid);
  504. if (sendNotice)
  505. {
  506. WatsonMessage removeMsg = new WatsonMessage();
  507. removeMsg.Status = status;
  508. await SendInternalAsync(client, removeMsg, 0, null, token).ConfigureAwait(false);
  509. }
  510. client.Dispose();
  511. _ClientManager.RemoveClient(guid);
  512. }
  513. }
  514. /// <summary>
  515. /// Disconnects all connected clients.
  516. /// </summary>
  517. /// <param name="status">Reason for the disconnect. This is conveyed to each client.</param>
  518. /// <param name="sendNotice">Flag to indicate whether the client should be notified of the disconnect. This message will not be sent until other send requests have been handled.</param>
  519. /// <param name="token">Cancellation token to cancel the request.</param>
  520. public async Task DisconnectClientsAsync(MessageStatus status = MessageStatus.Removed, bool sendNotice = true, CancellationToken token = default)
  521. {
  522. Dictionary<Guid, ClientMetadata> clients = _ClientManager.AllClients();
  523. if (clients != null && clients.Count > 0)
  524. {
  525. foreach (KeyValuePair<Guid, ClientMetadata> client in clients)
  526. {
  527. await DisconnectClientAsync(client.Key, status, sendNotice, token).ConfigureAwait(false);
  528. }
  529. }
  530. }
  531. #endregion
  532. #region Private-Methods
  533. /// <summary>
  534. /// Tear down the server and dispose of background workers.
  535. /// Do not reuse the object after disposal.
  536. /// </summary>
  537. /// <param name="disposing">Indicate if resources should be disposed.</param>
  538. protected virtual void Dispose(bool disposing)
  539. {
  540. if (disposing)
  541. {
  542. _Settings.Logger?.Invoke(Severity.Info, _Header + "disposing");
  543. if (_IsListening) Stop();
  544. DisconnectClientsAsync(MessageStatus.Shutdown).Wait();
  545. if (_Listener != null)
  546. {
  547. if (_Listener.Server != null)
  548. {
  549. _Listener.Server.Close();
  550. _Listener.Server.Dispose();
  551. }
  552. }
  553. if (_SslCertificate != null)
  554. {
  555. _SslCertificate.Dispose();
  556. }
  557. if (_ClientManager != null)
  558. {
  559. _ClientManager.Dispose();
  560. }
  561. _Settings = null;
  562. _Events = null;
  563. _Callbacks = null;
  564. _Statistics = null;
  565. _Keepalive = null;
  566. _SslConfiguration = null;
  567. _ListenerIp = null;
  568. _ListenerIpAddress = null;
  569. _Listener = null;
  570. _SslCertificate = null;
  571. _TokenSource = null;
  572. _AcceptConnections = null;
  573. _MonitorClients = null;
  574. _IsListening = false;
  575. }
  576. }
  577. #region Connection
  578. private void EnableKeepalives(TcpClient client)
  579. {
  580. // issues with definitions: https://github.com/dotnet/sdk/issues/14540
  581. try
  582. {
  583. #if NET6_0_OR_GREATER
  584. client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  585. client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, _Keepalive.TcpKeepAliveTime);
  586. client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, _Keepalive.TcpKeepAliveInterval);
  587. // Windows 10 version 1703 or later
  588. if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
  589. && Environment.OSVersion.Version >= new Version(10, 0, 15063))
  590. {
  591. client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, _Keepalive.TcpKeepAliveRetryCount);
  592. }
  593. #elif NETFRAMEWORK
  594. // .NET Framework expects values in milliseconds
  595. byte[] keepAlive = new byte[12];
  596. Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
  597. Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
  598. Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
  599. client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null);
  600. #elif NETSTANDARD
  601. #endif
  602. }
  603. catch (Exception)
  604. {
  605. _Settings.Logger?.Invoke(Severity.Error, _Header + "keepalives not supported on this platform, disabled");
  606. _Keepalive.EnableTcpKeepAlives = false;
  607. }
  608. }
  609. private async Task AcceptConnections(CancellationToken token)
  610. {
  611. _IsListening = true;
  612. while (true)
  613. {
  614. try
  615. {
  616. token.ThrowIfCancellationRequested();
  617. #region Check-for-Maximum-Connections
  618. if (!_IsListening && (_Connections >= _Settings.MaxConnections))
  619. {
  620. await Task.Delay(100);
  621. continue;
  622. }
  623. else if (!_IsListening)
  624. {
  625. _Listener.Start();
  626. _IsListening = true;
  627. }
  628. #endregion
  629. #region Accept-and-Validate
  630. TcpClient tcpClient = await _Listener.AcceptTcpClientAsync().ConfigureAwait(false);
  631. tcpClient.LingerState.Enabled = false;
  632. tcpClient.NoDelay = _Settings.NoDelay;
  633. if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives(tcpClient);
  634. string clientIp = ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString();
  635. if (_Settings.PermittedIPs.Count > 0 && !_Settings.PermittedIPs.Contains(clientIp))
  636. {
  637. _Settings.Logger?.Invoke(Severity.Info, _Header + "rejecting connection from " + clientIp + " (not permitted)");
  638. tcpClient.Close();
  639. continue;
  640. }
  641. if (_Settings.BlockedIPs.Count > 0 && _Settings.BlockedIPs.Contains(clientIp))
  642. {
  643. _Settings.Logger?.Invoke(Severity.Info, _Header + "rejecting connection from " + clientIp + " (blocked)");
  644. tcpClient.Close();
  645. continue;
  646. }
  647. ClientMetadata client = new ClientMetadata(tcpClient);
  648. client.SendBuffer = new byte[_Settings.StreamBufferSize];
  649. _ClientManager.AddClient(client.Guid, client);
  650. _ClientManager.AddClientLastSeen(client.Guid);
  651. CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_Token, client.Token);
  652. #endregion
  653. #region Check-for-Maximum-Connections
  654. Interlocked.Increment(ref _Connections);
  655. if (_Connections >= _Settings.MaxConnections)
  656. {
  657. _Settings.Logger?.Invoke(Severity.Info, _Header + "maximum connections " + _Settings.MaxConnections + " met (currently " + _Connections + " connections), pausing");
  658. _IsListening = false;
  659. _Listener.Stop();
  660. }
  661. #endregion
  662. #region Initialize-Client
  663. Task unawaited = null;
  664. if (_Mode == Mode.Tcp)
  665. {
  666. unawaited = Task.Run(() => FinalizeConnection(client, linkedCts.Token), linkedCts.Token);
  667. }
  668. else if (_Mode == Mode.Ssl)
  669. {
  670. if (_Settings.AcceptInvalidCertificates)
  671. {
  672. client.SslStream = new SslStream(client.NetworkStream, false, _SslConfiguration.ClientCertificateValidationCallback);
  673. }
  674. else
  675. {
  676. client.SslStream = new SslStream(client.NetworkStream, false);
  677. }
  678. unawaited = Task.Run(async () =>
  679. {
  680. bool success = await StartTls(client, linkedCts.Token).ConfigureAwait(false);
  681. if (success)
  682. {
  683. await FinalizeConnection(client, linkedCts.Token).ConfigureAwait(false);
  684. }
  685. else
  686. {
  687. _ClientManager.RemoveClient(client.Guid);
  688. _ClientManager.RemoveClientLastSeen(client.Guid);
  689. client.Dispose();
  690. }
  691. }, linkedCts.Token);
  692. }
  693. else
  694. {
  695. throw new ArgumentException("Unknown mode: " + _Mode.ToString());
  696. }
  697. _Settings.Logger?.Invoke(Severity.Debug, _Header + "accepted connection from " + client.ToString());
  698. #endregion
  699. }
  700. catch (TaskCanceledException)
  701. {
  702. break;
  703. }
  704. catch (ObjectDisposedException)
  705. {
  706. break;
  707. }
  708. catch (Exception e)
  709. {
  710. _Settings.Logger?.Invoke(Severity.Error, _Header + "listener exception: " + e.Message);
  711. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  712. break;
  713. }
  714. }
  715. }
  716. private async Task<bool> StartTls(ClientMetadata client, CancellationToken token)
  717. {
  718. try
  719. {
  720. token.ThrowIfCancellationRequested();
  721. await client.SslStream.AuthenticateAsServerAsync(_SslCertificate, _SslConfiguration.ClientCertificateRequired, _TlsVersion.ToSslProtocols(), !_Settings.AcceptInvalidCertificates).ConfigureAwait(false);
  722. if (!client.SslStream.IsEncrypted)
  723. {
  724. _Settings.Logger?.Invoke(Severity.Error, _Header + "stream from " + client.ToString() + " not encrypted");
  725. client.Dispose();
  726. Interlocked.Decrement(ref _Connections);
  727. return false;
  728. }
  729. if (!client.SslStream.IsAuthenticated)
  730. {
  731. _Settings.Logger?.Invoke(Severity.Error, _Header + "stream from " + client.ToString() + " not authenticated");
  732. client.Dispose();
  733. Interlocked.Decrement(ref _Connections);
  734. return false;
  735. }
  736. if (_Settings.MutuallyAuthenticate && !client.SslStream.IsMutuallyAuthenticated)
  737. {
  738. _Settings.Logger?.Invoke(Severity.Error, _Header + $"mutual authentication with {client.ToString()} ({_TlsVersion}) failed");
  739. client.Dispose();
  740. Interlocked.Decrement(ref _Connections);
  741. return false;
  742. }
  743. }
  744. catch (Exception e)
  745. {
  746. _Settings.Logger?.Invoke(Severity.Error, _Header + $"disconnected during SSL/TLS establishment with {client.ToString()} ({_TlsVersion}): " + e.Message);
  747. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  748. client.Dispose();
  749. Interlocked.Decrement(ref _Connections);
  750. return false;
  751. }
  752. return true;
  753. }
  754. private async Task FinalizeConnection(ClientMetadata client, CancellationToken token)
  755. {
  756. #region Request-Authentication
  757. if (!String.IsNullOrEmpty(_Settings.PresharedKey))
  758. {
  759. _Settings.Logger?.Invoke(Severity.Debug, _Header + "requesting authentication material from " + client.ToString());
  760. _ClientManager.AddUnauthenticatedClient(client.Guid);
  761. byte[] data = Encoding.UTF8.GetBytes("Authentication required");
  762. WatsonMessage authMsg = new WatsonMessage();
  763. authMsg.Status = MessageStatus.AuthRequired;
  764. await SendInternalAsync(client, authMsg, 0, null, token).ConfigureAwait(false);
  765. }
  766. #endregion
  767. #region Start-Data-Receiver
  768. _Settings.Logger?.Invoke(Severity.Debug, _Header + "starting data receiver for " + client.ToString());
  769. client.DataReceiver = Task.Run(() => DataReceiver(client, token), token);
  770. #endregion
  771. }
  772. private bool IsClientConnected(ClientMetadata client)
  773. {
  774. if (client != null && client.TcpClient != null)
  775. {
  776. var state = IPGlobalProperties.GetIPGlobalProperties()
  777. .GetActiveTcpConnections()
  778. .FirstOrDefault(x =>
  779. x.LocalEndPoint.Equals(client.TcpClient.Client.LocalEndPoint)
  780. && x.RemoteEndPoint.Equals(client.TcpClient.Client.RemoteEndPoint));
  781. if (state == default(TcpConnectionInformation)
  782. || state.State == TcpState.Unknown
  783. || state.State == TcpState.FinWait1
  784. || state.State == TcpState.FinWait2
  785. || state.State == TcpState.Closed
  786. || state.State == TcpState.Closing
  787. || state.State == TcpState.CloseWait)
  788. {
  789. return false;
  790. }
  791. byte[] tmp = new byte[1];
  792. bool success = false;
  793. try
  794. {
  795. client.WriteLock.Wait();
  796. client.TcpClient.Client.Send(tmp, 0, 0);
  797. success = true;
  798. }
  799. catch (SocketException se)
  800. {
  801. if (se.NativeErrorCode.Equals(10035)) success = true;
  802. }
  803. catch (Exception)
  804. {
  805. }
  806. finally
  807. {
  808. if (client != null)
  809. {
  810. client.WriteLock.Release();
  811. }
  812. }
  813. if (success) return true;
  814. try
  815. {
  816. client.WriteLock.Wait();
  817. if ((client.TcpClient.Client.Poll(0, SelectMode.SelectWrite))
  818. && (!client.TcpClient.Client.Poll(0, SelectMode.SelectError)))
  819. {
  820. byte[] buffer = new byte[1];
  821. if (client.TcpClient.Client.Receive(buffer, SocketFlags.Peek) == 0)
  822. {
  823. return false;
  824. }
  825. else
  826. {
  827. return true;
  828. }
  829. }
  830. else
  831. {
  832. return false;
  833. }
  834. }
  835. catch (Exception)
  836. {
  837. return false;
  838. }
  839. finally
  840. {
  841. if (client != null) client.WriteLock.Release();
  842. }
  843. }
  844. else
  845. {
  846. return false;
  847. }
  848. }
  849. #endregion
  850. #region Read
  851. private async Task DataReceiver(ClientMetadata client, CancellationToken token)
  852. {
  853. while (true)
  854. {
  855. try
  856. {
  857. token.ThrowIfCancellationRequested();
  858. if (!IsClientConnected(client)) break;
  859. WatsonMessage msg = await _MessageBuilder.BuildFromStream(client.DataStream);
  860. if (msg == null)
  861. {
  862. await Task.Delay(30, token).ConfigureAwait(false);
  863. continue;
  864. }
  865. if (!String.IsNullOrEmpty(_Settings.PresharedKey))
  866. {
  867. if (_ClientManager.ExistsUnauthenticatedClient(client.Guid))
  868. {
  869. _Settings.Logger?.Invoke(Severity.Debug, _Header + "message received from unauthenticated endpoint " + client.ToString());
  870. byte[] data = null;
  871. WatsonMessage authMsg = null;
  872. int contentLength = 0;
  873. Stream authStream = null;
  874. if (msg.Status == MessageStatus.AuthRequested)
  875. {
  876. // check preshared key
  877. if (msg.PresharedKey != null && msg.PresharedKey.Length > 0)
  878. {
  879. string clientPsk = Encoding.UTF8.GetString(msg.PresharedKey).Trim();
  880. if (_Settings.PresharedKey.Trim().Equals(clientPsk))
  881. {
  882. _Settings.Logger?.Invoke(Severity.Debug, _Header + "accepted authentication for " + client.ToString());
  883. _ClientManager.RemoveUnauthenticatedClient(client.Guid);
  884. _Events.HandleAuthenticationSucceeded(this, new AuthenticationSucceededEventArgs(client));
  885. data = Encoding.UTF8.GetBytes("Authentication successful");
  886. WatsonCommon.BytesToStream(data, 0, out contentLength, out authStream);
  887. authMsg = _MessageBuilder.ConstructNew(contentLength, authStream, false, false, null, null);
  888. authMsg.Status = MessageStatus.AuthSuccess;
  889. await SendInternalAsync(client, authMsg, 0, null, token).ConfigureAwait(false);
  890. continue;
  891. }
  892. else
  893. {
  894. _Settings.Logger?.Invoke(Severity.Warn, _Header + "declined authentication for " + client.ToString());
  895. await DisconnectClientAsync(client.Guid, MessageStatus.AuthFailure, false, token).ConfigureAwait(false);
  896. break;
  897. }
  898. }
  899. }
  900. // decline and terminate
  901. _Settings.Logger?.Invoke(Severity.Warn, _Header + "no authentication material for " + client.ToString());
  902. await DisconnectClientAsync(client.Guid, MessageStatus.AuthFailure, false, token).ConfigureAwait(false);
  903. break;
  904. }
  905. }
  906. if (msg.Status == MessageStatus.Shutdown)
  907. {
  908. _Settings.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " is disconnecting");
  909. break;
  910. }
  911. else if (msg.Status == MessageStatus.Removed)
  912. {
  913. _Settings.Logger?.Invoke(Severity.Debug, _Header + "sent disconnect notice to " + client.ToString());
  914. break;
  915. }
  916. else if (msg.Status == MessageStatus.RegisterClient)
  917. {
  918. _Settings.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " attempting to register GUID " + msg.SenderGuid.ToString());
  919. _ClientManager.ReplaceGuid(client.Guid, msg.SenderGuid);
  920. _Settings.Logger?.Invoke(Severity.Debug, _Header + "updated client GUID from " + client.Guid + " to " + msg.SenderGuid);
  921. client.Guid = msg.SenderGuid;
  922. _Events.HandleClientConnected(this, new ConnectionEventArgs(client));
  923. continue;
  924. }
  925. if (msg.SyncRequest)
  926. {
  927. _Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous request received: " + msg.ConversationGuid.ToString());
  928. DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
  929. byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  930. if (DateTime.UtcNow < expiration)
  931. {
  932. Task unawaited = Task.Run(async () =>
  933. {
  934. SyncRequest syncReq = new SyncRequest(
  935. client,
  936. msg.ConversationGuid,
  937. msg.ExpirationUtc.Value,
  938. msg.Metadata,
  939. msgData);
  940. SyncResponse syncResp = null;
  941. #pragma warning disable CS0618 // Type or member is obsolete
  942. if (_Callbacks.SyncRequestReceivedAsync != null)
  943. {
  944. syncResp = await _Callbacks.HandleSyncRequestReceivedAsync(syncReq);
  945. }
  946. else if (_Callbacks.SyncRequestReceived != null)
  947. {
  948. syncResp = _Callbacks.HandleSyncRequestReceived(syncReq);
  949. }
  950. #pragma warning restore CS0618 // Type or member is obsolete
  951. if (syncResp != null)
  952. {
  953. WatsonCommon.BytesToStream(syncResp.Data, 0, out int contentLength, out Stream stream);
  954. WatsonMessage respMsg = _MessageBuilder.ConstructNew(
  955. contentLength,
  956. stream,
  957. false,
  958. true,
  959. msg.ExpirationUtc.Value,
  960. syncResp.Metadata);
  961. respMsg.ConversationGuid = msg.ConversationGuid;
  962. await SendInternalAsync(client, respMsg, contentLength, stream, token).ConfigureAwait(false);
  963. }
  964. }, token);
  965. }
  966. else
  967. {
  968. _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded from " + client.ToString());
  969. }
  970. }
  971. else if (msg.SyncResponse)
  972. {
  973. // No need to amend message expiration; it is copied from the request, which was set by this node
  974. // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
  975. _Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous response received: " + msg.ConversationGuid.ToString());
  976. byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  977. if (DateTime.UtcNow < msg.ExpirationUtc.Value)
  978. {
  979. lock (_SyncResponseLock)
  980. {
  981. _SyncResponseReceived?.Invoke(this, new SyncResponseReceivedEventArgs(msg, msgData));
  982. }
  983. }
  984. else
  985. {
  986. _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous response received and discarded from " + client.ToString());
  987. }
  988. }
  989. else
  990. {
  991. byte[] msgData = null;
  992. if (_Events.IsUsingMessages)
  993. {
  994. msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  995. MessageReceivedEventArgs mr = new MessageReceivedEventArgs(client, msg.Metadata, msgData);
  996. await Task.Run(() => _Events.HandleMessageReceived(this, mr), token);
  997. }
  998. else if (_Events.IsUsingStreams)
  999. {
  1000. StreamReceivedEventArgs sr = null;
  1001. WatsonStream ws = null;
  1002. if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
  1003. {
  1004. ws = new WatsonStream(msg.ContentLength, msg.DataStream);
  1005. sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws);
  1006. _Events.HandleStreamReceived(this, sr);
  1007. }
  1008. else
  1009. {
  1010. MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false);
  1011. ws = new WatsonStream(msg.ContentLength, ms);
  1012. sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws);
  1013. await Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
  1014. }
  1015. }
  1016. else
  1017. {
  1018. _Settings.Logger?.Invoke(Severity.Error, _Header + "event handler not set for either MessageReceived or StreamReceived");
  1019. break;
  1020. }
  1021. }
  1022. _Statistics.IncrementReceivedMessages();
  1023. _Statistics.AddReceivedBytes(msg.ContentLength);
  1024. _ClientManager.UpdateClientLastSeen(client.Guid, DateTime.UtcNow);
  1025. }
  1026. catch (ObjectDisposedException ode)
  1027. {
  1028. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "object disposed exception encountered");
  1029. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ode));
  1030. break;
  1031. }
  1032. catch (TaskCanceledException tce)
  1033. {
  1034. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "task canceled exception encountered");
  1035. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(tce));
  1036. break;
  1037. }
  1038. catch (OperationCanceledException oce)
  1039. {
  1040. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "operation canceled exception encountered");
  1041. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(oce));
  1042. break;
  1043. }
  1044. catch (IOException ioe)
  1045. {
  1046. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "IO exception encountered");
  1047. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ioe));
  1048. break;
  1049. }
  1050. catch (Exception e)
  1051. {
  1052. _Settings?.Logger?.Invoke(Severity.Error, _Header + "data receiver exception for " + client.ToString() + ": " + e.Message);
  1053. _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  1054. break;
  1055. }
  1056. }
  1057. if (_Settings != null && _Events != null)
  1058. {
  1059. DisconnectionEventArgs cd = null;
  1060. if (_ClientManager.ExistsClientKicked(client.Guid)) cd = new DisconnectionEventArgs(client, DisconnectReason.Removed);
  1061. else if (_ClientManager.ExistsClientTimedout(client.Guid)) cd = new DisconnectionEventArgs(client, DisconnectReason.Timeout);
  1062. else cd = new DisconnectionEventArgs(client, DisconnectReason.Normal);
  1063. _Events.HandleClientDisconnected(this, cd);
  1064. _ClientManager.Remove(client.Guid);
  1065. Interlocked.Decrement(ref _Connections);
  1066. _Settings?.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " disconnected");
  1067. client.Dispose();
  1068. }
  1069. }
  1070. #endregion
  1071. #region Send
  1072. private async Task<bool> SendInternalAsync(ClientMetadata client, WatsonMessage msg, long contentLength, Stream stream, CancellationToken token)
  1073. {
  1074. if (client == null) throw new ArgumentNullException(nameof(client));
  1075. if (msg == null) throw new ArgumentNullException(nameof(msg));
  1076. if (contentLength > 0)
  1077. {
  1078. if (stream == null || !stream.CanRead)
  1079. {
  1080. throw new ArgumentException("Cannot read from supplied stream.");
  1081. }
  1082. }
  1083. if (token == default(CancellationToken))
  1084. {
  1085. CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token, _Token);
  1086. token = linkedCts.Token;
  1087. }
  1088. await client.WriteLock.WaitAsync(token).ConfigureAwait(false);
  1089. try
  1090. {
  1091. await SendHeadersAsync(client, msg, token).ConfigureAwait(false);
  1092. await SendDataStreamAsync(client, contentLength, stream, token).ConfigureAwait(false);
  1093. _Statistics.IncrementSentMessages();
  1094. _Statistics.AddSentBytes(contentLength);
  1095. return true;
  1096. }
  1097. catch (TaskCanceledException)
  1098. {
  1099. return false;
  1100. }
  1101. catch (OperationCanceledException)
  1102. {
  1103. return false;
  1104. }
  1105. catch (Exception e)
  1106. {
  1107. _Settings.Logger?.Invoke(Severity.Error, _Header + "failed to write message to " + client.ToString() + ": " + e.Message);
  1108. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  1109. return false;
  1110. }
  1111. finally
  1112. {
  1113. if (client != null) client.WriteLock.Release();
  1114. }
  1115. }
  1116. private async Task<SyncResponse> SendAndWaitInternalAsync(ClientMetadata client, WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token)
  1117. {
  1118. if (client == null) throw new ArgumentNullException(nameof(client));
  1119. if (msg == null) throw new ArgumentNullException(nameof(msg));
  1120. if (contentLength > 0)
  1121. {
  1122. if (stream == null || !stream.CanRead)
  1123. {
  1124. throw new ArgumentException("Cannot read from supplied stream.");
  1125. }
  1126. }
  1127. await client.WriteLock.WaitAsync();
  1128. SyncResponse ret = null;
  1129. AutoResetEvent responded = new AutoResetEvent(false);
  1130. // Create a new handler specially for this Conversation.
  1131. EventHandler<SyncResponseReceivedEventArgs> handler = (sender, e) =>
  1132. {
  1133. if (e.Message.ConversationGuid == msg.ConversationGuid)
  1134. {
  1135. ret = new SyncResponse(e.Message.ConversationGuid, e.Message.ExpirationUtc.Value, e.Message.Metadata, e.Data);
  1136. responded.Set();
  1137. }
  1138. };
  1139. // Subscribe
  1140. _SyncResponseReceived += handler;
  1141. try
  1142. {
  1143. await SendHeadersAsync(client, msg, token);
  1144. await SendDataStreamAsync(client, contentLength, stream, token);
  1145. _Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous request sent: " + msg.ConversationGuid);
  1146. _Statistics.IncrementSentMessages();
  1147. _Statistics.AddSentBytes(contentLength);
  1148. }
  1149. catch (Exception e)
  1150. {
  1151. _Settings.Logger?.Invoke(Severity.Error, _Header + client.ToString() + " failed to write message: " + e.Message);
  1152. _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
  1153. _SyncResponseReceived -= handler;
  1154. throw;
  1155. }
  1156. finally
  1157. {
  1158. if (client != null) client.WriteLock.Release();
  1159. }
  1160. // Wait for responded.Set() to be called
  1161. responded.WaitOne(new TimeSpan(0, 0, 0, 0, timeoutMs));
  1162. // Unsubscribe
  1163. _SyncResponseReceived -= handler;
  1164. if (ret != null)
  1165. {
  1166. return ret;
  1167. }
  1168. else
  1169. {
  1170. _Settings.Logger?.Invoke(Severity.Error, _Header + "synchronous response not received within the timeout window");
  1171. throw new TimeoutException("A response to a synchronous request was not received within the timeout window.");
  1172. }
  1173. }
  1174. private async Task SendHeadersAsync(ClientMetadata client, WatsonMessage msg, CancellationToken token)
  1175. {
  1176. byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
  1177. await client.DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false);
  1178. await client.DataStream.FlushAsync(token).ConfigureAwait(false);
  1179. }
  1180. private async Task SendDataStreamAsync(ClientMetadata client, long contentLength, Stream stream, CancellationToken token)
  1181. {
  1182. if (contentLength <= 0) return;
  1183. long bytesRemaining = contentLength;
  1184. int bytesRead = 0;
  1185. while (bytesRemaining > 0)
  1186. {
  1187. if (bytesRemaining >= _Settings.StreamBufferSize)
  1188. {
  1189. client.SendBuffer = new byte[_Settings.StreamBufferSize];
  1190. }
  1191. else
  1192. {
  1193. client.SendBuffer = new byte[bytesRemaining];
  1194. }
  1195. bytesRead = await stream.ReadAsync(client.SendBuffer, 0, client.SendBuffer.Length, token).ConfigureAwait(false);
  1196. if (bytesRead > 0)
  1197. {
  1198. await client.DataStream.WriteAsync(client.SendBuffer, 0, bytesRead, token).ConfigureAwait(false);
  1199. bytesRemaining -= bytesRead;
  1200. }
  1201. }
  1202. await client.DataStream.FlushAsync(token).ConfigureAwait(false);
  1203. }
  1204. #endregion
  1205. #region Tasks
  1206. private async Task MonitorForIdleClients(CancellationToken token)
  1207. {
  1208. try
  1209. {
  1210. Dictionary<Guid, DateTime> lastSeen = null;
  1211. while (true)
  1212. {
  1213. token.ThrowIfCancellationRequested();
  1214. await Task.Delay(5000, _Token).ConfigureAwait(false);
  1215. if (_Settings.IdleClientTimeoutSeconds > 0)
  1216. {
  1217. lastSeen = _ClientManager.AllClientsLastSeen();
  1218. if (lastSeen != null && lastSeen.Count > 0)
  1219. {
  1220. DateTime idleTimestamp = DateTime.UtcNow.AddSeconds(-1 * _Settings.IdleClientTimeoutSeconds);
  1221. foreach (KeyValuePair<Guid, DateTime> curr in lastSeen)
  1222. {
  1223. if (curr.Value < idleTimestamp)
  1224. {
  1225. _ClientManager.AddClientTimedout(curr.Key);
  1226. _Settings.Logger?.Invoke(Severity.Debug, _Header + "disconnecting client " + curr.Key + " due to idle timeout");
  1227. await DisconnectClientAsync(curr.Key, MessageStatus.Timeout, true);
  1228. }
  1229. }
  1230. }
  1231. }
  1232. }
  1233. }
  1234. catch (TaskCanceledException)
  1235. {
  1236. }
  1237. catch (OperationCanceledException)
  1238. {
  1239. }
  1240. }
  1241. #endregion
  1242. #endregion
  1243. }
  1244. }