123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221 |
- namespace WatsonTcp
- {
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Net;
- using System.Net.Security;
- using System.Net.Sockets;
- using System.Runtime.InteropServices;
- using System.Security.Authentication;
- using System.Security.Cryptography.X509Certificates;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- /// <summary>
- /// Watson TCP client, with or without SSL.
- /// </summary>
- public class WatsonTcpClient : IDisposable
- {
- #region Public-Members
- /// <summary>
- /// Watson TCP client settings.
- /// </summary>
- public WatsonTcpClientSettings Settings
- {
- get
- {
- return _Settings;
- }
- set
- {
- if (value == null) _Settings = new WatsonTcpClientSettings();
- else _Settings = value;
- }
- }
- /// <summary>
- /// Watson TCP client events.
- /// </summary>
- public WatsonTcpClientEvents Events
- {
- get
- {
- return _Events;
- }
- set
- {
- if (value == null) _Events = new WatsonTcpClientEvents();
- else _Events = value;
- }
- }
- /// <summary>
- /// Watson TCP client callbacks.
- /// </summary>
- public WatsonTcpClientCallbacks Callbacks
- {
- get
- {
- return _Callbacks;
- }
- set
- {
- if (value == null) _Callbacks = new WatsonTcpClientCallbacks();
- else _Callbacks = value;
- }
- }
- /// <summary>
- /// Watson TCP statistics.
- /// </summary>
- public WatsonTcpStatistics Statistics
- {
- get
- {
- return _Statistics;
- }
- }
- /// <summary>
- /// Watson TCP keepalive settings.
- /// </summary>
- public WatsonTcpKeepaliveSettings Keepalive
- {
- get
- {
- return _Keepalive;
- }
- set
- {
- if (value == null) _Keepalive = new WatsonTcpKeepaliveSettings();
- else _Keepalive = value;
- }
- }
- /// <summary>
- /// Watson TCP client SSL configuration.
- /// </summary>
- public WatsonTcpClientSslConfiguration SslConfiguration
- {
- get
- {
- return _SslConfiguration;
- }
- set
- {
- if (value == null) _SslConfiguration = new WatsonTcpClientSslConfiguration();
- else _SslConfiguration = value;
- }
- }
- /// <summary>
- /// JSON serialization helper.
- /// </summary>
- public ISerializationHelper SerializationHelper
- {
- get
- {
- return _SerializationHelper;
- }
- set
- {
- if (value == null) throw new ArgumentNullException(nameof(SerializationHelper));
- _SerializationHelper = value;
- _MessageBuilder.SerializationHelper = value;
- }
- }
- /// <summary>
- /// Indicates whether or not the client is connected to the server.
- /// </summary>
- public bool Connected { get; private set; }
- #endregion
- #region Private-Members
- private string _Header = "[WatsonTcpClient] ";
- private WatsonMessageBuilder _MessageBuilder = new WatsonMessageBuilder();
- private WatsonTcpClientSettings _Settings = new WatsonTcpClientSettings();
- private WatsonTcpClientEvents _Events = new WatsonTcpClientEvents();
- private WatsonTcpClientCallbacks _Callbacks = new WatsonTcpClientCallbacks();
- private WatsonTcpStatistics _Statistics = new WatsonTcpStatistics();
- private WatsonTcpKeepaliveSettings _Keepalive = new WatsonTcpKeepaliveSettings();
- private WatsonTcpClientSslConfiguration _SslConfiguration = new WatsonTcpClientSslConfiguration();
- private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper();
- private Mode _Mode = Mode.Tcp;
- private TlsVersion _TlsVersion = TlsVersion.Tls12;
- private string _SourceIp = null;
- private int _SourcePort = 0;
- private string _ServerIp = null;
- private int _ServerPort = 0;
- private TcpClient _Client = null;
- private Stream _DataStream = null;
- private NetworkStream _TcpStream = null;
- private SslStream _SslStream = null;
- private X509Certificate2 _SslCertificate = null;
- private X509Certificate2Collection _SslCertificateCollection = null;
- private SemaphoreSlim _WriteLock = new SemaphoreSlim(1, 1);
- private SemaphoreSlim _ReadLock = new SemaphoreSlim(1, 1);
- private CancellationTokenSource _TokenSource = new CancellationTokenSource();
- private CancellationToken _Token;
- private Task _DataReceiver = null;
- private Task _IdleServerMonitor = null;
- private DateTime _LastActivity = DateTime.UtcNow;
- private bool _IsTimeout = false;
- private byte[] _SendBuffer = new byte[65536];
- private readonly object _SyncResponseLock = new object();
- private event EventHandler<SyncResponseReceivedEventArgs> _SyncResponseReceived;
- #endregion
- #region Constructors-and-Factories
- /// <summary>
- /// Initialize the Watson TCP client without SSL. Call Start() afterward to connect to the server.
- /// </summary>
- /// <param name="serverIp">The IP address or hostname of the server.</param>
- /// <param name="serverPort">The TCP port on which the server is listening.</param>
- public WatsonTcpClient(
- string serverIp,
- int serverPort)
- {
- if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
- if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
-
- _Mode = Mode.Tcp;
- _ServerIp = serverIp;
- _ServerPort = serverPort;
- _SendBuffer = new byte[_Settings.StreamBufferSize];
- SerializationHelper.InstantiateConverter(); // Unity fix
- }
- /// <summary>
- /// Initialize the Watson TCP client with SSL. Call Start() afterward to connect to the server.
- /// </summary>
- /// <param name="serverIp">The IP address or hostname of the server.</param>
- /// <param name="serverPort">The TCP port on which the server is listening.</param>
- /// <param name="pfxCertFile">The file containing the SSL certificate.</param>
- /// <param name="pfxCertPass">The password for the SSL certificate.</param>
- /// <param name="tlsVersion">The TLS version used for this connection.</param>
- public WatsonTcpClient(
- string serverIp,
- int serverPort,
- string pfxCertFile,
- string pfxCertPass,
- TlsVersion tlsVersion = TlsVersion.Tls12)
- {
- if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
- if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
-
- _Mode = Mode.Ssl;
- _TlsVersion = tlsVersion;
- _ServerIp = serverIp;
- _ServerPort = serverPort;
- _SendBuffer = new byte[_Settings.StreamBufferSize];
- if (!String.IsNullOrEmpty(pfxCertFile))
- {
- if (String.IsNullOrEmpty(pfxCertPass))
- {
- _SslCertificate = new X509Certificate2(pfxCertFile);
- }
- else
- {
- _SslCertificate = new X509Certificate2(pfxCertFile, pfxCertPass);
- }
- _SslCertificateCollection = new X509Certificate2Collection
- {
- _SslCertificate
- };
- }
- else
- {
- _SslCertificateCollection = new X509Certificate2Collection();
- }
- SerializationHelper.InstantiateConverter(); // Unity fix
- }
- /// <summary>
- /// Initialize the Watson TCP client with SSL. Call Start() afterward to connect to the server.
- /// </summary>
- /// <param name="serverIp">The IP address or hostname of the server.</param>
- /// <param name="serverPort">The TCP port on which the server is listening.</param>
- /// <param name="cert">The SSL certificate</param>
- /// <param name="tlsVersion">The TLS version used for this conenction.</param>
- public WatsonTcpClient(
- string serverIp,
- int serverPort,
- X509Certificate2 cert,
- TlsVersion tlsVersion = TlsVersion.Tls12)
- {
- if (String.IsNullOrEmpty(serverIp)) throw new ArgumentNullException(nameof(serverIp));
- if (serverPort < 0) throw new ArgumentOutOfRangeException(nameof(serverPort));
- if (cert == null) throw new ArgumentNullException(nameof(cert));
-
- _Mode = Mode.Ssl;
- _TlsVersion = tlsVersion;
- _SslCertificate = cert;
- _ServerIp = serverIp;
- _ServerPort = serverPort;
- _SendBuffer = new byte[_Settings.StreamBufferSize];
- _SslCertificateCollection = new X509Certificate2Collection
- {
- _SslCertificate
- };
- SerializationHelper.InstantiateConverter(); // Unity fix
- }
- #endregion
- #region Public-Methods
- /// <summary>
- /// Disconnect the client and dispose of background workers.
- /// Do not reuse the object after disposal.
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- /// <summary>
- /// Connect to the server.
- /// </summary>
- public void Connect()
- {
- if (Connected) throw new InvalidOperationException("Already connected to the server.");
- if (_Settings.LocalPort == 0)
- {
- _Client = new TcpClient();
- }
- else
- {
- IPEndPoint ipe = new IPEndPoint(IPAddress.Any, _Settings.LocalPort);
- _Client = new TcpClient(ipe);
- }
- _Client.NoDelay = _Settings.NoDelay;
- _Statistics = new WatsonTcpStatistics();
- IAsyncResult asyncResult = null;
- WaitHandle waitHandle = null;
- bool connectSuccess = false;
- if (!_Events.IsUsingMessages && !_Events.IsUsingStreams)
- throw new InvalidOperationException("One of either 'MessageReceived' or 'StreamReceived' events must first be set.");
- if (_Mode == Mode.Tcp)
- {
- #region TCP
- _Settings.Logger?.Invoke(Severity.Info, _Header + "connecting to " + _ServerIp + ":" + _ServerPort);
- _Client.LingerState = new LingerOption(true, 0);
- asyncResult = _Client.BeginConnect(_ServerIp, _ServerPort, null, null);
- waitHandle = asyncResult.AsyncWaitHandle;
- try
- {
- connectSuccess = waitHandle.WaitOne(TimeSpan.FromSeconds(_Settings.ConnectTimeoutSeconds), false);
- if (!connectSuccess)
- {
- _Client.Close();
- _Settings.Logger?.Invoke(Severity.Error, _Header + "timeout connecting to " + _ServerIp + ":" + _ServerPort);
- throw new TimeoutException("Timeout connecting to " + _ServerIp + ":" + _ServerPort);
- }
- _Client.EndConnect(asyncResult);
- _SourceIp = ((IPEndPoint)_Client.Client.LocalEndPoint).Address.ToString();
- _SourcePort = ((IPEndPoint)_Client.Client.LocalEndPoint).Port;
- _TcpStream = _Client.GetStream();
- _DataStream = _TcpStream;
- _SslStream = null;
- if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives();
- Connected = true;
- }
- catch (Exception e)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "exception encountered: " + e.Message);
- _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
- throw;
- }
- finally
- {
- // https://social.msdn.microsoft.com/Forums/en-US/313cf28c-2a6d-498e-8188-7a0639dbd552/tcpclientbeginconnect-issue?forum=netfxnetcom
- // waitHandle.Close();
- }
- #endregion TCP
- }
- else if (_Mode == Mode.Ssl)
- {
- #region SSL
- _Settings.Logger?.Invoke(Severity.Info, _Header + "connecting with SSL to " + _ServerIp + ":" + _ServerPort);
- _Client.LingerState = new LingerOption(true, 0);
- asyncResult = _Client.BeginConnect(_ServerIp, _ServerPort, null, null);
- waitHandle = asyncResult.AsyncWaitHandle;
- try
- {
- connectSuccess = waitHandle.WaitOne(TimeSpan.FromSeconds(_Settings.ConnectTimeoutSeconds), false);
- if (!connectSuccess)
- {
- _Client.Close();
- _Settings.Logger?.Invoke(Severity.Error, _Header + "timeout connecting to " + _ServerIp + ":" + _ServerPort);
- throw new TimeoutException("Timeout connecting to " + _ServerIp + ":" + _ServerPort);
- }
- _Client.EndConnect(asyncResult);
- _SourceIp = ((IPEndPoint)_Client.Client.LocalEndPoint).Address.ToString();
- _SourcePort = ((IPEndPoint)_Client.Client.LocalEndPoint).Port;
- if (_Settings.AcceptInvalidCertificates)
- _SslStream = new SslStream(_Client.GetStream(), false, _SslConfiguration.ServerCertificateValidationCallback, _SslConfiguration.ClientCertificateSelectionCallback);
- else
- _SslStream = new SslStream(_Client.GetStream(), false);
- _SslStream.AuthenticateAsClient(_ServerIp, _SslCertificateCollection, _TlsVersion.ToSslProtocols(), !_Settings.AcceptInvalidCertificates);
- if (!_SslStream.IsEncrypted)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "stream to " + _ServerIp + ":" + _ServerPort + " is not encrypted");
- throw new AuthenticationException("Stream is not encrypted");
- }
- if (!_SslStream.IsAuthenticated)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "stream to " + _ServerIp + ":" + _ServerPort + " is not authenticated");
- throw new AuthenticationException("Stream is not authenticated");
- }
- if (_Settings.MutuallyAuthenticate && !_SslStream.IsMutuallyAuthenticated)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "mutual authentication with " + _ServerIp + ":" + _ServerPort + " failed");
- throw new AuthenticationException("Mutual authentication failed");
- }
- _DataStream = _SslStream;
- if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives();
- Connected = true;
- }
- catch (Exception e)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "exception encountered: " + e.Message);
- _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
- throw;
- }
- finally
- {
- // https://social.msdn.microsoft.com/Forums/en-US/313cf28c-2a6d-498e-8188-7a0639dbd552/tcpclientbeginconnect-issue?forum=netfxnetcom
- // waitHandle.Close();
- }
- #endregion SSL
- }
- else
- {
- throw new ArgumentException("Unknown mode: " + _Mode.ToString());
- }
- if (Connected)
- {
- WatsonMessage msg = new WatsonMessage();
- msg.Status = MessageStatus.RegisterClient;
- if (!SendInternalAsync(msg, 0, null, default(CancellationToken)).Result)
- {
- Connected = false;
- _Settings?.Logger?.Invoke(Severity.Alert, _Header + "unable to register GUID " + _Settings.Guid + " with the server");
- throw new ArgumentException("Server rejected GUID " + _Settings.Guid);
- }
- }
- _TokenSource = new CancellationTokenSource();
- _Token = _TokenSource.Token;
- _LastActivity = DateTime.UtcNow;
- _IsTimeout = false;
- _DataReceiver = Task.Run(() => DataReceiver(_Token), _Token);
- _IdleServerMonitor = Task.Run(() => IdleServerMonitor(_Token), _Token);
- _Events.HandleServerConnected(this, new ConnectionEventArgs());
- _Settings?.Logger?.Invoke(Severity.Info, _Header + "connected to " + _ServerIp + ":" + _ServerPort);
- }
- /// <summary>
- /// Disconnect from the server.
- /// </summary>
- /// <param name="sendNotice">Flag to indicate whether the server should be notified of the disconnect. This message will not be sent until other send requests have been handled.</param>
- public void Disconnect(bool sendNotice = true)
- {
- if (!Connected) throw new InvalidOperationException("Not connected to the server.");
- _Settings.Logger?.Invoke(Severity.Info, _Header + "disconnecting from " + _ServerIp + ":" + _ServerPort);
- if (Connected && sendNotice)
- {
- WatsonMessage msg = new WatsonMessage();
- msg.Status = MessageStatus.Shutdown;
- SendInternalAsync(msg, 0, null, default(CancellationToken)).Wait();
- }
- if (_TokenSource != null)
- {
- // stop background tasks
- if (!_TokenSource.IsCancellationRequested)
- {
- _TokenSource.Cancel();
- _TokenSource.Dispose();
- _Token = default(CancellationToken);
- }
- }
-
- if (_SslStream != null)
- {
- _SslStream.Close();
- }
- if (_TcpStream != null)
- {
- _TcpStream.Close();
- }
- if (_Client != null)
- {
- _Client.Close();
- }
- while (_DataReceiver?.IsCompleted == false)
- {
- Task.Delay(10).Wait();
- }
- while (_IdleServerMonitor?.IsCompleted == false)
- {
- Task.Delay(10).Wait();
- }
- Connected = false;
- _Settings.Logger?.Invoke(Severity.Info, _Header + "disconnected from " + _ServerIp + ":" + _ServerPort);
- }
- /// <summary>
- /// Send a pre-shared key to the server to authenticate.
- /// </summary>
- /// <param name="presharedKey">Up to 16-character string.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- public async Task AuthenticateAsync(string presharedKey, CancellationToken token = default)
- {
- if (String.IsNullOrEmpty(presharedKey)) throw new ArgumentNullException(nameof(presharedKey));
- if (presharedKey.Length != 16) throw new ArgumentException("Preshared key length must be 16 bytes.");
- WatsonMessage msg = new WatsonMessage();
- msg.Status = MessageStatus.AuthRequested;
- msg.PresharedKey = Encoding.UTF8.GetBytes(presharedKey);
- await SendInternalAsync(msg, 0, null, token);
- }
- #region SendAsync
- /// <summary>
- /// Send data and metadata to the server asynchronously.
- /// </summary>
- /// <param name="data">String containing data.</param>
- /// <param name="metadata">Dictionary containing metadata.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>Boolean indicating if the message was sent successfully.</returns>
- public async Task<bool> SendAsync(string data, Dictionary<string, object> metadata = null, CancellationToken token = default)
- {
- if (String.IsNullOrEmpty(data)) return await SendAsync(Array.Empty<byte>(), metadata);
- if (token == default(CancellationToken)) token = _Token;
- return await SendAsync(Encoding.UTF8.GetBytes(data), metadata, 0, token).ConfigureAwait(false);
- }
- /// <summary>
- /// Send data and metadata to the server asynchronously.
- /// </summary>
- /// <param name="data">Byte array containing data.</param>
- /// <param name="metadata">Dictionary containing metadata.</param>
- /// <param name="start">Start position within the supplied array.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
- public async Task<bool> SendAsync(byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
- {
- if (token == default(CancellationToken)) token = _Token;
- if (data == null) data = Array.Empty<byte>();
- WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
- return await SendAsync(contentLength, stream, metadata, token).ConfigureAwait(false);
- }
-
- /// <summary>
- /// Send data and metadata to the server from a stream asynchronously.
- /// </summary>
- /// <param name="contentLength">The number of bytes to send.</param>
- /// <param name="stream">The stream containing the data.</param>
- /// <param name="metadata">Dictionary containing metadata.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>Task with Boolean indicating if the message was sent successfully.</returns>
- public async Task<bool> SendAsync(long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
- {
- if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
- if (token == default(CancellationToken)) token = _Token;
- if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
- WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, false, false, null, metadata);
- return await SendInternalAsync(msg, contentLength, stream, token).ConfigureAwait(false);
- }
- #endregion
- #region SendAndWaitAsync
- /// <summary>
- /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
- /// </summary>
- /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
- /// <param name="data">Data to send.</param>
- /// <param name="metadata">Metadata dictionary to attach to the message.</param>
- /// <param name="start">Start position within the supplied array.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>SyncResponse.</returns>
- public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, string data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
- {
- if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
- if (String.IsNullOrEmpty(data)) return await SendAndWaitAsync(timeoutMs, Array.Empty<byte>(), metadata, start, token);
- return await SendAndWaitAsync(timeoutMs, Encoding.UTF8.GetBytes(data), metadata, start, token).ConfigureAwait(false);
- }
- /// <summary>
- /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
- /// </summary>
- /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
- /// <param name="data">Data to send.</param>
- /// <param name="metadata">Metadata dictionary to attach to the message.</param>
- /// <param name="start">Start position within the supplied array.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>SyncResponse.</returns>
- public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, byte[] data, Dictionary<string, object> metadata = null, int start = 0, CancellationToken token = default)
- {
- if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
- if (data == null) data = Array.Empty<byte>();
- WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
- return await SendAndWaitAsync(timeoutMs, contentLength, stream, metadata, token).ConfigureAwait(false);
- }
- /// <summary>
- /// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
- /// </summary>
- /// <param name="timeoutMs">Number of milliseconds to wait before considering a request to be expired.</param>
- /// <param name="contentLength">The number of bytes to send from the supplied stream.</param>
- /// <param name="stream">Stream containing data.</param>
- /// <param name="metadata">Metadata dictionary to attach to the message.</param>
- /// <param name="token">Cancellation token to cancel the request.</param>
- /// <returns>SyncResponse.</returns>
- public async Task<SyncResponse> SendAndWaitAsync(int timeoutMs, long contentLength, Stream stream, Dictionary<string, object> metadata = null, CancellationToken token = default)
- {
- if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
- if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
- if (stream == null) stream = new MemoryStream(Array.Empty<byte>());
- DateTime expiration = DateTime.UtcNow.AddMilliseconds(timeoutMs);
- WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, true, false, expiration, metadata);
- return await SendAndWaitInternalAsync(msg, timeoutMs, contentLength, stream, token).ConfigureAwait(false);
- }
- #endregion
- #endregion
- #region Private-Methods
- /// <summary>
- /// Disconnect the client and dispose of background workers.
- /// Do not reuse the object after disposal.
- /// </summary>
- /// <param name="disposing">Indicate if resources should be disposed.</param>
- protected virtual void Dispose(bool disposing)
- {
- if (disposing)
- {
- _Settings.Logger?.Invoke(Severity.Info, _Header + "disposing");
- if (Connected) Disconnect();
- if (_SslCertificate != null)
- _SslCertificate.Dispose();
- if (_WriteLock != null)
- _WriteLock.Dispose();
- if (_ReadLock != null)
- _ReadLock.Dispose();
- _Settings = null;
- _Events = null;
- _Callbacks = null;
- _Statistics = null;
- _Keepalive = null;
- _SslConfiguration = null;
- _SourceIp = null;
- _ServerIp = null;
- _Client = null;
- _DataStream = null;
- _TcpStream = null;
- _SslStream = null;
- _SslCertificate = null;
- _SslCertificateCollection = null;
- _WriteLock = null;
- _ReadLock = null;
- _DataReceiver = null;
- }
- }
- #region Connection
- private void EnableKeepalives()
- {
- // issues with definitions: https://github.com/dotnet/sdk/issues/14540
- try
- {
- #if NET6_0_OR_GREATER
- _Client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
- _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, _Keepalive.TcpKeepAliveTime);
- _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, _Keepalive.TcpKeepAliveInterval);
- // Windows 10 version 1703 or later
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
- && Environment.OSVersion.Version >= new Version(10, 0, 15063))
- {
- _Client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, _Keepalive.TcpKeepAliveRetryCount);
- }
- #elif NETFRAMEWORK
- // .NET Framework expects values in milliseconds
- byte[] keepAlive = new byte[12];
- Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
- Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
- Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
- _Client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null);
- #elif NETSTANDARD
- #endif
- }
- catch (Exception)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "keepalives not supported on this platform, disabled");
- _Keepalive.EnableTcpKeepAlives = false;
- }
- }
- #endregion
- #region Read
- private async Task DataReceiver(CancellationToken token)
- {
- DisconnectReason reason = DisconnectReason.Normal;
- while (true)
- {
- try
- {
- token.ThrowIfCancellationRequested();
-
- #region Check-for-Connection
- if (_Client == null || !_Client.Connected)
- {
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "disconnect detected");
- break;
- }
- #endregion
- #region Read-Message
- await _ReadLock.WaitAsync(token);
- WatsonMessage msg = await _MessageBuilder.BuildFromStream(_DataStream, token);
- if (msg == null)
- {
- await Task.Delay(30, token).ConfigureAwait(false);
- continue;
- }
- _LastActivity = DateTime.UtcNow;
- #endregion
- #region Process-by-Status
- if (msg.Status == MessageStatus.Removed)
- {
- _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to server-side removal");
- reason = DisconnectReason.Removed;
- break;
- }
- else if (msg.Status == MessageStatus.Shutdown)
- {
- _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to server shutdown");
- reason = DisconnectReason.Shutdown;
- break;
- }
- else if (msg.Status == MessageStatus.Timeout)
- {
- _Settings?.Logger?.Invoke(Severity.Info, _Header + "disconnect due to timeout");
- reason = DisconnectReason.Timeout;
- break;
- }
- else if (msg.Status == MessageStatus.AuthSuccess)
- {
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "authentication successful");
- Task unawaited = Task.Run(() => _Events.HandleAuthenticationSucceeded(this, EventArgs.Empty), token);
- continue;
- }
- else if (msg.Status == MessageStatus.AuthFailure)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "authentication failed");
- reason = DisconnectReason.AuthFailure;
- Task unawaited = Task.Run(() => _Events.HandleAuthenticationFailure(this, EventArgs.Empty), token);
- break;
- }
- else if (msg.Status == MessageStatus.AuthRequired)
- {
- _Settings.Logger?.Invoke(Severity.Info, _Header + "authentication required by server; please authenticate using pre-shared key");
- string psk = _Callbacks.HandleAuthenticationRequested();
- if (!String.IsNullOrEmpty(psk)) await AuthenticateAsync(psk, token);
- continue;
- }
- #endregion
- #region Process-Message
- if (msg.SyncRequest)
- {
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request received: " + msg.ConversationGuid.ToString());
-
- DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
- byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
- if (DateTime.UtcNow < expiration)
- {
- Task unawaited = Task.Run(async () =>
- {
- SyncRequest syncReq = new SyncRequest(
- null,
- msg.ConversationGuid,
- msg.ExpirationUtc.Value,
- msg.Metadata,
- msgData);
- SyncResponse syncResp = null;
- #pragma warning disable CS0618 // Type or member is obsolete
- if (_Callbacks.SyncRequestReceivedAsync != null)
- {
- syncResp = await _Callbacks.HandleSyncRequestReceivedAsync(syncReq);
- }
- else if (_Callbacks.SyncRequestReceived != null)
- {
- syncResp = _Callbacks.HandleSyncRequestReceived(syncReq);
- }
- #pragma warning restore CS0618 // Type or member is obsolete
- if (syncResp != null)
- {
- WatsonCommon.BytesToStream(syncResp.Data, 0, out int contentLength, out Stream stream);
- WatsonMessage respMsg = _MessageBuilder.ConstructNew(
- contentLength,
- stream,
- false,
- true,
- msg.ExpirationUtc.Value,
- syncResp.Metadata);
- respMsg.ConversationGuid = msg.ConversationGuid;
- await SendInternalAsync(respMsg, contentLength, stream, token).ConfigureAwait(false);
- }
- }, _Token);
- }
- else
- {
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded");
- }
- }
- else if (msg.SyncResponse)
- {
- // No need to amend message expiration; it is copied from the request, which was set by this node
- // DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous response received: " + msg.ConversationGuid.ToString());
- byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
- if (DateTime.UtcNow < msg.ExpirationUtc.Value)
- {
- lock (_SyncResponseLock)
- {
- _SyncResponseReceived?.Invoke(this,new SyncResponseReceivedEventArgs(msg,msgData));
- }
- }
- else
- {
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous response received and discarded");
- }
- }
- else
- {
- byte[] msgData = null;
- if (_Events.IsUsingMessages)
- {
- msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
- MessageReceivedEventArgs args = new MessageReceivedEventArgs(null, msg.Metadata, msgData);
- await Task.Run(() => _Events.HandleMessageReceived(this, args));
- }
- else if (_Events.IsUsingStreams)
- {
- StreamReceivedEventArgs sr = null;
- WatsonStream ws = null;
- if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
- {
- ws = new WatsonStream(msg.ContentLength, msg.DataStream);
- sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
- _Events.HandleStreamReceived(this, sr);
- }
- else
- {
- MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false);
- ws = new WatsonStream(msg.ContentLength, ms);
- sr = new StreamReceivedEventArgs(null, msg.Metadata, msg.ContentLength, ws);
- Task unawaited = Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
- }
- }
- else
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "event handler not set for either MessageReceived or StreamReceived");
- break;
- }
- }
- #endregion
- _Statistics.IncrementReceivedMessages();
- _Statistics.AddReceivedBytes(msg.ContentLength);
- }
- catch (ObjectDisposedException ode)
- {
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "object disposed exception encountered");
- _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ode));
- break;
- }
- catch (TaskCanceledException tce)
- {
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "task canceled exception encountered");
- _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(tce));
- break;
- }
- catch (OperationCanceledException oce)
- {
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "operation canceled exception encountered");
- _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(oce));
- break;
- }
- catch (IOException ioe)
- {
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "IO exception encountered");
- _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ioe));
- break;
- }
- catch (Exception e)
- {
- _Settings?.Logger?.Invoke(Severity.Error,
- _Header + "data receiver exception for " + _ServerIp + ":" + _ServerPort + ": " + e.Message + Environment.NewLine);
- _Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
- break;
- }
- finally
- {
- if (_ReadLock != null) _ReadLock.Release();
- }
- }
- Connected = false;
- if (_IsTimeout) reason = DisconnectReason.Timeout;
- _Settings?.Logger?.Invoke(Severity.Debug, _Header + "data receiver terminated for " + _ServerIp + ":" + _ServerPort);
- _Events?.HandleServerDisconnected(this, new DisconnectionEventArgs(null, reason));
- }
- #endregion
- #region Send
- private async Task<bool> SendInternalAsync(WatsonMessage msg, long contentLength, Stream stream, CancellationToken token)
- {
- if (msg == null) throw new ArgumentNullException(nameof(msg));
- if (!Connected) return false;
-
- if (contentLength > 0 && (stream == null || !stream.CanRead))
- {
- throw new ArgumentException("Cannot read from supplied stream.");
- }
- if (token == default(CancellationToken))
- {
- CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token, _Token);
- token = linkedCts.Token;
- }
- bool disconnectDetected = false;
-
- if (_Client == null || !_Client.Connected)
- {
- return false;
- }
-
- await _WriteLock.WaitAsync(token).ConfigureAwait(false);
- try
- {
- await SendHeadersAsync(msg, token).ConfigureAwait(false);
- await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false);
- _Statistics.IncrementSentMessages();
- _Statistics.AddSentBytes(contentLength);
- return true;
- }
- catch (TaskCanceledException)
- {
- return false;
- }
- catch (OperationCanceledException)
- {
- return false;
- }
- catch (Exception e)
- {
- _Settings.Logger?.Invoke(Severity.Error,
- _Header + "failed to write message to " + _ServerIp + ":" + _ServerPort + ":" +
- Environment.NewLine +
- e.ToString() +
- Environment.NewLine);
- disconnectDetected = true;
- return false;
- }
- finally
- {
- _WriteLock.Release();
- if (disconnectDetected)
- {
- Connected = false;
- Dispose();
- }
- }
- }
-
- private async Task<SyncResponse> SendAndWaitInternalAsync(WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token)
- {
- if (msg == null) throw new ArgumentNullException(nameof(msg));
- if (!Connected) throw new InvalidOperationException("Client is not connected to the server.");
- if (contentLength > 0 && (stream == null || !stream.CanRead))
- throw new ArgumentException("Cannot read from supplied stream.");
- bool disconnectDetected = false;
-
- if (_Client == null || !_Client.Connected)
- {
- disconnectDetected = true;
- throw new InvalidOperationException("Client is not connected to the server.");
- }
-
- await _WriteLock.WaitAsync(token).ConfigureAwait(false);
- SyncResponse ret = null;
- AutoResetEvent responded = new AutoResetEvent(false);
- // Create a new handler specially for this conversation
- EventHandler<SyncResponseReceivedEventArgs> handler = (sender, e) =>
- {
- if (e.Message.ConversationGuid == msg.ConversationGuid)
- {
- ret = new SyncResponse(msg.ConversationGuid, e.Message.ExpirationUtc.Value, e.Message.Metadata, e.Data);
- responded.Set();
- }
- };
- // Subscribe
- _SyncResponseReceived += handler;
- try
- {
- await SendHeadersAsync(msg, token).ConfigureAwait(false);
- await SendDataStreamAsync(contentLength, stream, token).ConfigureAwait(false);
- _Settings.Logger?.Invoke(Severity.Debug, _Header + "synchronous request sent: " + msg.ConversationGuid);
- _Statistics.IncrementSentMessages();
- _Statistics.AddSentBytes(contentLength);
- }
- catch (TaskCanceledException)
- {
- return null;
- }
- catch (OperationCanceledException)
- {
- return null;
- }
- catch (Exception e)
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "failed to write message to " + _ServerIp + ":" + _ServerPort + ": " + e.Message);
- _SyncResponseReceived -= handler;
- disconnectDetected = true;
- throw;
- }
- finally
- {
- _WriteLock.Release();
- if (disconnectDetected)
- {
- Connected = false;
- Dispose();
- }
- }
- // Wait for responded.Set() to be called
- responded.WaitOne(new TimeSpan(0,0,0,0, timeoutMs));
- // Unsubscribe
- _SyncResponseReceived -= handler;
- if (ret != null)
- {
- return ret;
- }
- else
- {
- _Settings.Logger?.Invoke(Severity.Error, _Header + "synchronous response not received within the timeout window");
- throw new TimeoutException("A response to a synchronous request was not received within the timeout window.");
- }
- }
- private async Task SendHeadersAsync(WatsonMessage msg, CancellationToken token)
- {
- msg.SenderGuid = _Settings.Guid;
- byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
- await _DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false);
- await _DataStream.FlushAsync(token).ConfigureAwait(false);
- }
-
- private async Task SendDataStreamAsync(long contentLength, Stream stream, CancellationToken token)
- {
- if (contentLength <= 0) return;
- long bytesRemaining = contentLength;
- int bytesRead = 0;
- while (bytesRemaining > 0)
- {
- if (bytesRemaining >= _Settings.StreamBufferSize)
- {
- _SendBuffer = new byte[_Settings.StreamBufferSize];
- }
- else
- {
- _SendBuffer = new byte[bytesRemaining];
- }
- bytesRead = await stream.ReadAsync(_SendBuffer, 0, _SendBuffer.Length, token).ConfigureAwait(false);
- if (bytesRead > 0)
- {
- await _DataStream.WriteAsync(_SendBuffer, 0, bytesRead, token).ConfigureAwait(false);
- bytesRemaining -= bytesRead;
- }
- }
- await _DataStream.FlushAsync(token).ConfigureAwait(false);
- }
- #endregion
- #region Tasks
- private async Task IdleServerMonitor(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- try
- {
- await Task.Delay(_Settings.IdleServerEvaluationIntervalMs, token).ConfigureAwait(false);
- if (_Settings.IdleServerTimeoutMs == 0) continue;
- DateTime timeoutTime = _LastActivity.AddMilliseconds(_Settings.IdleServerTimeoutMs);
- if (DateTime.UtcNow > timeoutTime)
- {
- _Settings.Logger?.Invoke(Severity.Warn, _Header + "disconnecting from " + _ServerIp + ":" + _ServerPort + " due to timeout");
- _IsTimeout = true;
- Disconnect();
- }
- }
- catch (TaskCanceledException)
- {
- }
- catch (OperationCanceledException)
- {
- }
- catch (Exception e)
- {
- _Settings.Logger?.Invoke(Severity.Warn, _Header + "exception encountered while monitoring for idle server connection: " + e.Message);
- _Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
- }
- }
- }
- #endregion
- #endregion
- }
- }
|