namespace WatsonTcp
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Security;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Security.AccessControl;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
///
/// Watson TCP server, with or without SSL.
///
public class WatsonTcpServer : IDisposable
{
#region Public-Members
///
/// Watson TCP server settings.
///
public WatsonTcpServerSettings Settings
{
get
{
return _Settings;
}
set
{
if (value == null) _Settings = new WatsonTcpServerSettings();
else _Settings = value;
}
}
///
/// Watson TCP server events.
///
public WatsonTcpServerEvents Events
{
get
{
return _Events;
}
set
{
if (value == null) _Events = new WatsonTcpServerEvents();
else _Events = value;
}
}
///
/// Watson TCP server callbacks.
///
public WatsonTcpServerCallbacks Callbacks
{
get
{
return _Callbacks;
}
set
{
if (value == null) _Callbacks = new WatsonTcpServerCallbacks();
else _Callbacks = value;
}
}
///
/// Watson TCP statistics.
///
public WatsonTcpStatistics Statistics
{
get
{
return _Statistics;
}
}
///
/// Watson TCP keepalive settings.
///
public WatsonTcpKeepaliveSettings Keepalive
{
get
{
return _Keepalive;
}
set
{
if (value == null) _Keepalive = new WatsonTcpKeepaliveSettings();
else _Keepalive = value;
}
}
///
/// Watson TCP server SSL configuration.
///
public WatsonTcpServerSslConfiguration SslConfiguration
{
get
{
return _SslConfiguration;
}
set
{
if (value == null) _SslConfiguration = new WatsonTcpServerSslConfiguration();
else _SslConfiguration = value;
}
}
///
/// JSON serialization helper.
///
public ISerializationHelper SerializationHelper
{
get
{
return _SerializationHelper;
}
set
{
if (value == null) throw new ArgumentNullException(nameof(SerializationHelper));
_SerializationHelper = value;
_MessageBuilder.SerializationHelper = value;
}
}
///
/// Retrieve the number of current connected clients.
///
public int Connections
{
get
{
return _Connections;
}
}
///
/// Flag to indicate if Watson TCP is listening for incoming TCP connections.
///
public bool IsListening
{
get
{
return _IsListening;
}
}
#endregion
#region Private-Members
private string _Header = "[WatsonTcpServer] ";
private WatsonMessageBuilder _MessageBuilder = new WatsonMessageBuilder();
private WatsonTcpServerSettings _Settings = new WatsonTcpServerSettings();
private WatsonTcpServerEvents _Events = new WatsonTcpServerEvents();
private WatsonTcpServerCallbacks _Callbacks = new WatsonTcpServerCallbacks();
private WatsonTcpStatistics _Statistics = new WatsonTcpStatistics();
private WatsonTcpKeepaliveSettings _Keepalive = new WatsonTcpKeepaliveSettings();
private WatsonTcpServerSslConfiguration _SslConfiguration = new WatsonTcpServerSslConfiguration();
private ClientMetadataManager _ClientManager = new ClientMetadataManager();
private ISerializationHelper _SerializationHelper = new DefaultSerializationHelper();
private int _Connections = 0;
private bool _IsListening = false;
private Mode _Mode;
private TlsVersion _TlsVersion = TlsVersion.Tls12;
private string _ListenerIp;
private int _ListenerPort;
private IPAddress _ListenerIpAddress;
private TcpListener _Listener;
private X509Certificate2 _SslCertificate;
private CancellationTokenSource _TokenSource = new CancellationTokenSource();
private CancellationToken _Token;
private Task _AcceptConnections = null;
private Task _MonitorClients = null;
private readonly object _SyncResponseLock = new object();
private event EventHandler _SyncResponseReceived;
#endregion
#region Constructors-and-Factories
///
/// Initialize the Watson TCP server without SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
///
/// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).
/// The TCP port on which the server should listen.
public WatsonTcpServer(
string listenerIp,
int listenerPort)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
_Mode = Mode.Tcp;
// According to the https://github.com/dotnet/WatsonTcp?tab=readme-ov-file#local-vs-external-connections
if (string.IsNullOrEmpty(listenerIp) || listenerIp.Equals("*") || listenerIp.Equals("+") || listenerIp.Equals("0.0.0.0"))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
{
_ListenerIpAddress = IPAddress.Loopback;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_ListenerPort = listenerPort;
SerializationHelper.InstantiateConverter(); // Unity fix
}
///
/// Initialize the Watson TCP server with SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
///
/// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).
/// The TCP port on which the server should listen.
/// The file containing the SSL certificate.
/// The password for the SSL certificate.
/// The TLS version required for client connections.
public WatsonTcpServer(
string listenerIp,
int listenerPort,
string pfxCertFile,
string pfxCertPass,
TlsVersion tlsVersion = TlsVersion.Tls12)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
if (String.IsNullOrEmpty(pfxCertFile)) throw new ArgumentNullException(nameof(pfxCertFile));
_Mode = Mode.Ssl;
_TlsVersion = tlsVersion;
if (String.IsNullOrEmpty(listenerIp))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
{
_ListenerIpAddress = IPAddress.Loopback;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_SslCertificate = null;
if (String.IsNullOrEmpty(pfxCertPass))
{
_SslCertificate = new X509Certificate2(pfxCertFile);
}
else
{
_SslCertificate = new X509Certificate2(pfxCertFile, pfxCertPass);
}
_ListenerPort = listenerPort;
SerializationHelper.InstantiateConverter(); // Unity fix
}
///
/// Initialize the Watson TCP server with SSL.
/// Supply a specific IP address on which to listen. Otherwise, use 'null' for the IP address to listen on any IP address.
/// If you do not specify an IP address, you may have to run WatsonTcp with administrative privileges.
/// Call Start() afterward to start the server.
///
/// The IP address on which the server should listen. If null, listen on any IP address (may require administrative privileges).
/// The TCP port on which the server should listen.
/// The SSL certificate.
/// The TLS version required for client connections.
///
public WatsonTcpServer(
string listenerIp,
int listenerPort,
X509Certificate2 cert,
TlsVersion tlsVersion = TlsVersion.Tls12)
{
if (listenerPort < 1) throw new ArgumentOutOfRangeException(nameof(listenerPort));
if (cert == null) throw new ArgumentNullException(nameof(cert));
_Mode = Mode.Ssl;
_TlsVersion = tlsVersion;
_SslCertificate = cert;
if (String.IsNullOrEmpty(listenerIp))
{
_ListenerIpAddress = IPAddress.Any;
_ListenerIp = _ListenerIpAddress.ToString();
}
else if (listenerIp.Equals("localhost") || listenerIp.Equals("127.0.0.1") || listenerIp.Equals("::1"))
{
_ListenerIpAddress = IPAddress.Loopback;
_ListenerIp = _ListenerIpAddress.ToString();
}
else
{
_ListenerIpAddress = IPAddress.Parse(listenerIp);
_ListenerIp = listenerIp;
}
_ListenerPort = listenerPort;
SerializationHelper.InstantiateConverter(); // Unity fix
}
#endregion
#region Public-Methods
///
/// Tear down the server and dispose of background workers.
/// Do not reuse the object after disposal.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Start accepting connections.
///
public void Start()
{
if (_IsListening) throw new InvalidOperationException("WatsonTcpServer is already running.");
_ClientManager = new ClientMetadataManager();
_TokenSource = new CancellationTokenSource();
_Token = _TokenSource.Token;
_Statistics = new WatsonTcpStatistics();
_Listener = new TcpListener(_ListenerIpAddress, _ListenerPort);
if (!_Events.IsUsingMessages && !_Events.IsUsingStreams)
throw new InvalidOperationException("One of either 'MessageReceived' or 'StreamReceived' events must first be set.");
if (_Mode == Mode.Tcp)
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "starting on " + _ListenerIp + ":" + _ListenerPort);
}
else if (_Mode == Mode.Ssl)
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "starting with SSL on " + _ListenerIp + ":" + _ListenerPort);
}
else
{
throw new ArgumentException("Unknown mode: " + _Mode.ToString());
}
_Listener.Start();
_AcceptConnections = Task.Run(() => AcceptConnections(_Token), _Token); // sets _IsListening
_MonitorClients = Task.Run(() => MonitorForIdleClients(_Token), _Token);
_Events.HandleServerStarted(this, EventArgs.Empty);
}
///
/// Stop accepting connections.
///
public void Stop()
{
_IsListening = false;
_Listener.Stop();
_TokenSource.Cancel();
_Settings.Logger?.Invoke(Severity.Info, _Header + "stopped");
_Events.HandleServerStopped(this, EventArgs.Empty);
}
#region SendAsync
///
/// Send data and metadata to the specified client, asynchronously.
///
/// Globally-unique identifier of the client.
/// String containing data.
/// Dictionary containing metadata.
/// Start position within the supplied array.
/// Cancellation token to cancel the request.
/// Task with Boolean indicating if the message was sent successfully.
public async Task SendAsync(Guid guid, string data, Dictionary metadata = null, int start = 0, CancellationToken token = default)
{
byte[] bytes = Array.Empty();
if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
return await SendAsync(guid, bytes, metadata, start, token).ConfigureAwait(false);
}
///
/// Send data and metadata to the specified client, asynchronously.
///
/// Globally-unique identifier of the client.
/// Byte array containing data.
/// Dictionary containing metadata.
/// Start position within the supplied array.
/// Cancellation token to cancel the request.
/// Task with Boolean indicating if the message was sent successfully.
public async Task SendAsync(Guid guid, byte[] data, Dictionary metadata = null, int start = 0, CancellationToken token = default)
{
if (data == null) data = Array.Empty();
WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
return await SendAsync(guid, contentLength, stream, metadata, token).ConfigureAwait(false);
}
///
/// Send data and metadata to the specified client using a stream, asynchronously.
///
/// Globally-unique identifier of the client.
/// The number of bytes in the stream.
/// The stream containing the data.
/// Dictionary containing metadata.
/// Cancellation token to cancel the request.
/// Task with Boolean indicating if the message was sent successfully.
public async Task SendAsync(Guid guid, long contentLength, Stream stream, Dictionary metadata = null, CancellationToken token = default)
{
if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
if (token == default(CancellationToken)) token = _Token;
ClientMetadata client = _ClientManager.GetClient(guid);
if (client == null)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
throw new KeyNotFoundException("Unable to find client " + guid.ToString() + ".");
}
if (stream == null) stream = new MemoryStream(Array.Empty());
WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, false, false, null, metadata);
return await SendInternalAsync(client, msg, contentLength, stream, token).ConfigureAwait(false);
}
#endregion
#region SendAndWaitAsync
///
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
///
/// Number of milliseconds to wait before considering a request to be expired.
/// Globally-unique identifier of the client.
/// Data to send.
/// Metadata dictionary to attach to the message.
/// Start position within the supplied array.
/// Cancellation token to cancel the request.
/// SyncResponse.
public async Task SendAndWaitAsync(int timeoutMs, Guid guid, string data, Dictionary metadata = null, int start = 0, CancellationToken token = default)
{
byte[] bytes = Array.Empty();
if (!String.IsNullOrEmpty(data)) bytes = Encoding.UTF8.GetBytes(data);
return await SendAndWaitAsync(timeoutMs, guid, bytes, metadata, start, token);
// SendAndWaitAsync(timeoutMs, guid, bytes, metadata, token).ConfigureAwait(false);
}
///
/// Send data and wait for a response for the specified number of milliseconds.
///
/// Number of milliseconds to wait before considering a request to be expired.
/// Globally-unique identifier of the client.
/// Data to send.
/// Metadata dictionary to attach to the message.
/// Start position within the supplied array.
/// Cancellation token to cancel the request.
/// SyncResponse.
public async Task SendAndWaitAsync(int timeoutMs, Guid guid, byte[] data, Dictionary metadata = null, int start = 0, CancellationToken token = default)
{
if (data == null) data = Array.Empty();
WatsonCommon.BytesToStream(data, start, out int contentLength, out Stream stream);
return await SendAndWaitAsync(timeoutMs, guid, contentLength, stream, metadata, token);
}
///
/// Send data and wait for a response for the specified number of milliseconds. A TimeoutException will be thrown if a response is not received.
///
/// Number of milliseconds to wait before considering a request to be expired.
/// Globally-unique identifier of the client.
/// The number of bytes to send from the supplied stream.
/// Stream containing data.
/// Metadata dictionary to attach to the message.
/// Cancellation token to cancel the request.
/// SyncResponse.
public async Task SendAndWaitAsync(int timeoutMs, Guid guid, long contentLength, Stream stream, Dictionary metadata = null, CancellationToken token = default)
{
if (contentLength < 0) throw new ArgumentException("Content length must be zero or greater.");
if (timeoutMs < 1000) throw new ArgumentException("Timeout milliseconds must be 1000 or greater.");
ClientMetadata client = _ClientManager.GetClient(guid);
if (client == null)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
throw new KeyNotFoundException("Unable to find client " + guid.ToString() + ".");
}
if (stream == null) stream = new MemoryStream(Array.Empty());
DateTime expiration = DateTime.UtcNow.AddMilliseconds(timeoutMs);
WatsonMessage msg = _MessageBuilder.ConstructNew(contentLength, stream, true, false, expiration, metadata);
return await SendAndWaitInternalAsync(client, msg, timeoutMs, contentLength, stream, token);
}
#endregion
///
/// Determine whether or not the specified client is connected to the server.
///
/// Globally-unique identifier of the client.
/// Boolean indicating if the client is connected to the server.
public bool IsClientConnected(Guid guid)
{
return _ClientManager.ExistsClient(guid);
}
///
/// Retrieve the client metadata associated with each connected client.
///
/// An enumerable collection of client metadata.
public IEnumerable ListClients()
{
Dictionary clients = _ClientManager.AllClients();
if (clients != null && clients.Count > 0)
{
foreach (KeyValuePair client in clients)
{
yield return client.Value;
}
}
}
///
/// Disconnects the specified client.
///
/// Globally-unique identifier of the client.
/// Reason for the disconnect. This is conveyed to the client.
/// Flag to indicate whether the client should be notified of the disconnect. This message will not be sent until other send requests have been handled.
/// Cancellation token to cancel the request.
public async Task DisconnectClientAsync(Guid guid, MessageStatus status = MessageStatus.Removed, bool sendNotice = true, CancellationToken token = default)
{
ClientMetadata client = _ClientManager.GetClient(guid);
if (client == null)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "unable to find client " + guid.ToString());
}
else
{
if (!_ClientManager.ExistsClientTimedout(guid)) _ClientManager.AddClientKicked(guid);
if (sendNotice)
{
WatsonMessage removeMsg = new WatsonMessage();
removeMsg.Status = status;
await SendInternalAsync(client, removeMsg, 0, null, token).ConfigureAwait(false);
}
client.Dispose();
_ClientManager.RemoveClient(guid);
}
}
///
/// Disconnects all connected clients.
///
/// Reason for the disconnect. This is conveyed to each client.
/// Flag to indicate whether the client should be notified of the disconnect. This message will not be sent until other send requests have been handled.
/// Cancellation token to cancel the request.
public async Task DisconnectClientsAsync(MessageStatus status = MessageStatus.Removed, bool sendNotice = true, CancellationToken token = default)
{
Dictionary clients = _ClientManager.AllClients();
if (clients != null && clients.Count > 0)
{
foreach (KeyValuePair client in clients)
{
await DisconnectClientAsync(client.Key, status, sendNotice, token).ConfigureAwait(false);
}
}
}
#endregion
#region Private-Methods
///
/// Tear down the server and dispose of background workers.
/// Do not reuse the object after disposal.
///
/// Indicate if resources should be disposed.
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "disposing");
if (_IsListening) Stop();
DisconnectClientsAsync(MessageStatus.Shutdown).Wait();
if (_Listener != null)
{
if (_Listener.Server != null)
{
_Listener.Server.Close();
_Listener.Server.Dispose();
}
}
if (_SslCertificate != null)
{
_SslCertificate.Dispose();
}
if (_ClientManager != null)
{
_ClientManager.Dispose();
}
_Settings = null;
_Events = null;
_Callbacks = null;
_Statistics = null;
_Keepalive = null;
_SslConfiguration = null;
_ListenerIp = null;
_ListenerIpAddress = null;
_Listener = null;
_SslCertificate = null;
_TokenSource = null;
_AcceptConnections = null;
_MonitorClients = null;
_IsListening = false;
}
}
#region Connection
private void EnableKeepalives(TcpClient client)
{
// issues with definitions: https://github.com/dotnet/sdk/issues/14540
try
{
#if NET6_0_OR_GREATER
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, _Keepalive.TcpKeepAliveTime);
client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, _Keepalive.TcpKeepAliveInterval);
// Windows 10 version 1703 or later
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)
&& Environment.OSVersion.Version >= new Version(10, 0, 15063))
{
client.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, _Keepalive.TcpKeepAliveRetryCount);
}
#elif NETFRAMEWORK
// .NET Framework expects values in milliseconds
byte[] keepAlive = new byte[12];
Buffer.BlockCopy(BitConverter.GetBytes((uint)1), 0, keepAlive, 0, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveTime * 1000)), 0, keepAlive, 4, 4);
Buffer.BlockCopy(BitConverter.GetBytes((uint)(_Keepalive.TcpKeepAliveInterval * 1000)), 0, keepAlive, 8, 4);
client.Client.IOControl(IOControlCode.KeepAliveValues, keepAlive, null);
#elif NETSTANDARD
#endif
}
catch (Exception)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "keepalives not supported on this platform, disabled");
_Keepalive.EnableTcpKeepAlives = false;
}
}
private async Task AcceptConnections(CancellationToken token)
{
_IsListening = true;
while (true)
{
try
{
token.ThrowIfCancellationRequested();
#region Check-for-Maximum-Connections
if (!_IsListening && (_Connections >= _Settings.MaxConnections))
{
await Task.Delay(100);
continue;
}
else if (!_IsListening)
{
_Listener.Start();
_IsListening = true;
}
#endregion
#region Accept-and-Validate
TcpClient tcpClient = await _Listener.AcceptTcpClientAsync().ConfigureAwait(false);
tcpClient.LingerState.Enabled = false;
tcpClient.NoDelay = _Settings.NoDelay;
if (_Keepalive.EnableTcpKeepAlives) EnableKeepalives(tcpClient);
string clientIp = ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString();
if (_Settings.PermittedIPs.Count > 0 && !_Settings.PermittedIPs.Contains(clientIp))
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "rejecting connection from " + clientIp + " (not permitted)");
tcpClient.Close();
continue;
}
if (_Settings.BlockedIPs.Count > 0 && _Settings.BlockedIPs.Contains(clientIp))
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "rejecting connection from " + clientIp + " (blocked)");
tcpClient.Close();
continue;
}
ClientMetadata client = new ClientMetadata(tcpClient);
client.SendBuffer = new byte[_Settings.StreamBufferSize];
_ClientManager.AddClient(client.Guid, client);
_ClientManager.AddClientLastSeen(client.Guid);
CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_Token, client.Token);
#endregion
#region Check-for-Maximum-Connections
Interlocked.Increment(ref _Connections);
if (_Connections >= _Settings.MaxConnections)
{
_Settings.Logger?.Invoke(Severity.Info, _Header + "maximum connections " + _Settings.MaxConnections + " met (currently " + _Connections + " connections), pausing");
_IsListening = false;
_Listener.Stop();
}
#endregion
#region Initialize-Client
Task unawaited = null;
if (_Mode == Mode.Tcp)
{
unawaited = Task.Run(() => FinalizeConnection(client, linkedCts.Token), linkedCts.Token);
}
else if (_Mode == Mode.Ssl)
{
if (_Settings.AcceptInvalidCertificates)
{
client.SslStream = new SslStream(client.NetworkStream, false, _SslConfiguration.ClientCertificateValidationCallback);
}
else
{
client.SslStream = new SslStream(client.NetworkStream, false);
}
unawaited = Task.Run(async () =>
{
bool success = await StartTls(client, linkedCts.Token).ConfigureAwait(false);
if (success)
{
await FinalizeConnection(client, linkedCts.Token).ConfigureAwait(false);
}
else
{
_ClientManager.RemoveClient(client.Guid);
_ClientManager.RemoveClientLastSeen(client.Guid);
client.Dispose();
}
}, linkedCts.Token);
}
else
{
throw new ArgumentException("Unknown mode: " + _Mode.ToString());
}
_Settings.Logger?.Invoke(Severity.Debug, _Header + "accepted connection from " + client.ToString());
#endregion
}
catch (TaskCanceledException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception e)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "listener exception: " + e.Message);
_Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
break;
}
}
}
private async Task StartTls(ClientMetadata client, CancellationToken token)
{
try
{
token.ThrowIfCancellationRequested();
await client.SslStream.AuthenticateAsServerAsync(_SslCertificate, _SslConfiguration.ClientCertificateRequired, _TlsVersion.ToSslProtocols(), !_Settings.AcceptInvalidCertificates).ConfigureAwait(false);
if (!client.SslStream.IsEncrypted)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "stream from " + client.ToString() + " not encrypted");
client.Dispose();
Interlocked.Decrement(ref _Connections);
return false;
}
if (!client.SslStream.IsAuthenticated)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "stream from " + client.ToString() + " not authenticated");
client.Dispose();
Interlocked.Decrement(ref _Connections);
return false;
}
if (_Settings.MutuallyAuthenticate && !client.SslStream.IsMutuallyAuthenticated)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + $"mutual authentication with {client.ToString()} ({_TlsVersion}) failed");
client.Dispose();
Interlocked.Decrement(ref _Connections);
return false;
}
}
catch (Exception e)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + $"disconnected during SSL/TLS establishment with {client.ToString()} ({_TlsVersion}): " + e.Message);
_Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
client.Dispose();
Interlocked.Decrement(ref _Connections);
return false;
}
return true;
}
private async Task FinalizeConnection(ClientMetadata client, CancellationToken token)
{
#region Request-Authentication
if (!String.IsNullOrEmpty(_Settings.PresharedKey))
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "requesting authentication material from " + client.ToString());
_ClientManager.AddUnauthenticatedClient(client.Guid);
byte[] data = Encoding.UTF8.GetBytes("Authentication required");
WatsonMessage authMsg = new WatsonMessage();
authMsg.Status = MessageStatus.AuthRequired;
await SendInternalAsync(client, authMsg, 0, null, token).ConfigureAwait(false);
}
#endregion
#region Start-Data-Receiver
_Settings.Logger?.Invoke(Severity.Debug, _Header + "starting data receiver for " + client.ToString());
client.DataReceiver = Task.Run(() => DataReceiver(client, token), token);
#endregion
}
private bool IsClientConnected(ClientMetadata client)
{
if (client != null && client.TcpClient != null)
{
var state = IPGlobalProperties.GetIPGlobalProperties()
.GetActiveTcpConnections()
.FirstOrDefault(x =>
x.LocalEndPoint.Equals(client.TcpClient.Client.LocalEndPoint)
&& x.RemoteEndPoint.Equals(client.TcpClient.Client.RemoteEndPoint));
if (state == default(TcpConnectionInformation)
|| state.State == TcpState.Unknown
|| state.State == TcpState.FinWait1
|| state.State == TcpState.FinWait2
|| state.State == TcpState.Closed
|| state.State == TcpState.Closing
|| state.State == TcpState.CloseWait)
{
return false;
}
byte[] tmp = new byte[1];
bool success = false;
try
{
client.WriteLock.Wait();
client.TcpClient.Client.Send(tmp, 0, 0);
success = true;
}
catch (SocketException se)
{
if (se.NativeErrorCode.Equals(10035)) success = true;
}
catch (Exception)
{
}
finally
{
if (client != null)
{
client.WriteLock.Release();
}
}
if (success) return true;
try
{
client.WriteLock.Wait();
if ((client.TcpClient.Client.Poll(0, SelectMode.SelectWrite))
&& (!client.TcpClient.Client.Poll(0, SelectMode.SelectError)))
{
byte[] buffer = new byte[1];
if (client.TcpClient.Client.Receive(buffer, SocketFlags.Peek) == 0)
{
return false;
}
else
{
return true;
}
}
else
{
return false;
}
}
catch (Exception)
{
return false;
}
finally
{
if (client != null) client.WriteLock.Release();
}
}
else
{
return false;
}
}
#endregion
#region Read
private async Task DataReceiver(ClientMetadata client, CancellationToken token)
{
while (true)
{
try
{
token.ThrowIfCancellationRequested();
if (!IsClientConnected(client)) break;
WatsonMessage msg = await _MessageBuilder.BuildFromStream(client.DataStream);
if (msg == null)
{
await Task.Delay(30, token).ConfigureAwait(false);
continue;
}
if (!String.IsNullOrEmpty(_Settings.PresharedKey))
{
if (_ClientManager.ExistsUnauthenticatedClient(client.Guid))
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "message received from unauthenticated endpoint " + client.ToString());
byte[] data = null;
WatsonMessage authMsg = null;
int contentLength = 0;
Stream authStream = null;
if (msg.Status == MessageStatus.AuthRequested)
{
// check preshared key
if (msg.PresharedKey != null && msg.PresharedKey.Length > 0)
{
string clientPsk = Encoding.UTF8.GetString(msg.PresharedKey).Trim();
if (_Settings.PresharedKey.Trim().Equals(clientPsk))
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "accepted authentication for " + client.ToString());
_ClientManager.RemoveUnauthenticatedClient(client.Guid);
_Events.HandleAuthenticationSucceeded(this, new AuthenticationSucceededEventArgs(client));
data = Encoding.UTF8.GetBytes("Authentication successful");
WatsonCommon.BytesToStream(data, 0, out contentLength, out authStream);
authMsg = _MessageBuilder.ConstructNew(contentLength, authStream, false, false, null, null);
authMsg.Status = MessageStatus.AuthSuccess;
await SendInternalAsync(client, authMsg, 0, null, token).ConfigureAwait(false);
continue;
}
else
{
_Settings.Logger?.Invoke(Severity.Warn, _Header + "declined authentication for " + client.ToString());
await DisconnectClientAsync(client.Guid, MessageStatus.AuthFailure, false, token).ConfigureAwait(false);
break;
}
}
}
// decline and terminate
_Settings.Logger?.Invoke(Severity.Warn, _Header + "no authentication material for " + client.ToString());
await DisconnectClientAsync(client.Guid, MessageStatus.AuthFailure, false, token).ConfigureAwait(false);
break;
}
}
if (msg.Status == MessageStatus.Shutdown)
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " is disconnecting");
break;
}
else if (msg.Status == MessageStatus.Removed)
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "sent disconnect notice to " + client.ToString());
break;
}
else if (msg.Status == MessageStatus.RegisterClient)
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " attempting to register GUID " + msg.SenderGuid.ToString());
_ClientManager.ReplaceGuid(client.Guid, msg.SenderGuid);
_Settings.Logger?.Invoke(Severity.Debug, _Header + "updated client GUID from " + client.Guid + " to " + msg.SenderGuid);
client.Guid = msg.SenderGuid;
_Events.HandleClientConnected(this, new ConnectionEventArgs(client));
continue;
}
if (msg.SyncRequest)
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous request received: " + msg.ConversationGuid.ToString());
DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
if (DateTime.UtcNow < expiration)
{
Task unawaited = Task.Run(async () =>
{
SyncRequest syncReq = new SyncRequest(
client,
msg.ConversationGuid,
msg.ExpirationUtc.Value,
msg.Metadata,
msgData);
SyncResponse syncResp = null;
#pragma warning disable CS0618 // Type or member is obsolete
if (_Callbacks.SyncRequestReceivedAsync != null)
{
syncResp = await _Callbacks.HandleSyncRequestReceivedAsync(syncReq);
}
else if (_Callbacks.SyncRequestReceived != null)
{
syncResp = _Callbacks.HandleSyncRequestReceived(syncReq);
}
#pragma warning restore CS0618 // Type or member is obsolete
if (syncResp != null)
{
WatsonCommon.BytesToStream(syncResp.Data, 0, out int contentLength, out Stream stream);
WatsonMessage respMsg = _MessageBuilder.ConstructNew(
contentLength,
stream,
false,
true,
msg.ExpirationUtc.Value,
syncResp.Metadata);
respMsg.ConversationGuid = msg.ConversationGuid;
await SendInternalAsync(client, respMsg, contentLength, stream, token).ConfigureAwait(false);
}
}, token);
}
else
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous request received and discarded from " + client.ToString());
}
}
else if (msg.SyncResponse)
{
// No need to amend message expiration; it is copied from the request, which was set by this node
// DateTime expiration = WatsonCommon.GetExpirationTimestamp(msg);
_Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous response received: " + msg.ConversationGuid.ToString());
byte[] msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
if (DateTime.UtcNow < msg.ExpirationUtc.Value)
{
lock (_SyncResponseLock)
{
_SyncResponseReceived?.Invoke(this, new SyncResponseReceivedEventArgs(msg, msgData));
}
}
else
{
_Settings.Logger?.Invoke(Severity.Debug, _Header + "expired synchronous response received and discarded from " + client.ToString());
}
}
else
{
byte[] msgData = null;
if (_Events.IsUsingMessages)
{
msgData = await WatsonCommon.ReadMessageDataAsync(msg, _Settings.StreamBufferSize, token).ConfigureAwait(false);
MessageReceivedEventArgs mr = new MessageReceivedEventArgs(client, msg.Metadata, msgData);
await Task.Run(() => _Events.HandleMessageReceived(this, mr), token);
}
else if (_Events.IsUsingStreams)
{
StreamReceivedEventArgs sr = null;
WatsonStream ws = null;
if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
{
ws = new WatsonStream(msg.ContentLength, msg.DataStream);
sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws);
_Events.HandleStreamReceived(this, sr);
}
else
{
MemoryStream ms = await WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize, token).ConfigureAwait(false);
ws = new WatsonStream(msg.ContentLength, ms);
sr = new StreamReceivedEventArgs(client, msg.Metadata, msg.ContentLength, ws);
await Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
}
}
else
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "event handler not set for either MessageReceived or StreamReceived");
break;
}
}
_Statistics.IncrementReceivedMessages();
_Statistics.AddReceivedBytes(msg.ContentLength);
_ClientManager.UpdateClientLastSeen(client.Guid, DateTime.UtcNow);
}
catch (ObjectDisposedException ode)
{
_Settings?.Logger?.Invoke(Severity.Debug, _Header + "object disposed exception encountered");
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ode));
break;
}
catch (TaskCanceledException tce)
{
_Settings?.Logger?.Invoke(Severity.Debug, _Header + "task canceled exception encountered");
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(tce));
break;
}
catch (OperationCanceledException oce)
{
_Settings?.Logger?.Invoke(Severity.Debug, _Header + "operation canceled exception encountered");
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(oce));
break;
}
catch (IOException ioe)
{
_Settings?.Logger?.Invoke(Severity.Debug, _Header + "IO exception encountered");
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(ioe));
break;
}
catch (Exception e)
{
_Settings?.Logger?.Invoke(Severity.Error, _Header + "data receiver exception for " + client.ToString() + ": " + e.Message);
_Events?.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
break;
}
}
if (_Settings != null && _Events != null)
{
DisconnectionEventArgs cd = null;
if (_ClientManager.ExistsClientKicked(client.Guid)) cd = new DisconnectionEventArgs(client, DisconnectReason.Removed);
else if (_ClientManager.ExistsClientTimedout(client.Guid)) cd = new DisconnectionEventArgs(client, DisconnectReason.Timeout);
else cd = new DisconnectionEventArgs(client, DisconnectReason.Normal);
_Events.HandleClientDisconnected(this, cd);
_ClientManager.Remove(client.Guid);
Interlocked.Decrement(ref _Connections);
_Settings?.Logger?.Invoke(Severity.Debug, _Header + "client " + client.ToString() + " disconnected");
client.Dispose();
}
}
#endregion
#region Send
private async Task SendInternalAsync(ClientMetadata client, WatsonMessage msg, long contentLength, Stream stream, CancellationToken token)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (contentLength > 0)
{
if (stream == null || !stream.CanRead)
{
throw new ArgumentException("Cannot read from supplied stream.");
}
}
if (token == default(CancellationToken))
{
CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token, _Token);
token = linkedCts.Token;
}
await client.WriteLock.WaitAsync(token).ConfigureAwait(false);
try
{
await SendHeadersAsync(client, msg, token).ConfigureAwait(false);
await SendDataStreamAsync(client, contentLength, stream, token).ConfigureAwait(false);
_Statistics.IncrementSentMessages();
_Statistics.AddSentBytes(contentLength);
return true;
}
catch (TaskCanceledException)
{
return false;
}
catch (OperationCanceledException)
{
return false;
}
catch (Exception e)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "failed to write message to " + client.ToString() + ": " + e.Message);
_Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
return false;
}
finally
{
if (client != null) client.WriteLock.Release();
}
}
private async Task SendAndWaitInternalAsync(ClientMetadata client, WatsonMessage msg, int timeoutMs, long contentLength, Stream stream, CancellationToken token)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (msg == null) throw new ArgumentNullException(nameof(msg));
if (contentLength > 0)
{
if (stream == null || !stream.CanRead)
{
throw new ArgumentException("Cannot read from supplied stream.");
}
}
await client.WriteLock.WaitAsync();
SyncResponse ret = null;
AutoResetEvent responded = new AutoResetEvent(false);
// Create a new handler specially for this Conversation.
EventHandler handler = (sender, e) =>
{
if (e.Message.ConversationGuid == msg.ConversationGuid)
{
ret = new SyncResponse(e.Message.ConversationGuid, e.Message.ExpirationUtc.Value, e.Message.Metadata, e.Data);
responded.Set();
}
};
// Subscribe
_SyncResponseReceived += handler;
try
{
await SendHeadersAsync(client, msg, token);
await SendDataStreamAsync(client, contentLength, stream, token);
_Settings.Logger?.Invoke(Severity.Debug, _Header + client.ToString() + " synchronous request sent: " + msg.ConversationGuid);
_Statistics.IncrementSentMessages();
_Statistics.AddSentBytes(contentLength);
}
catch (Exception e)
{
_Settings.Logger?.Invoke(Severity.Error, _Header + client.ToString() + " failed to write message: " + e.Message);
_Events.HandleExceptionEncountered(this, new ExceptionEventArgs(e));
_SyncResponseReceived -= handler;
throw;
}
finally
{
if (client != null) client.WriteLock.Release();
}
// Wait for responded.Set() to be called
responded.WaitOne(new TimeSpan(0, 0, 0, 0, timeoutMs));
// Unsubscribe
_SyncResponseReceived -= handler;
if (ret != null)
{
return ret;
}
else
{
_Settings.Logger?.Invoke(Severity.Error, _Header + "synchronous response not received within the timeout window");
throw new TimeoutException("A response to a synchronous request was not received within the timeout window.");
}
}
private async Task SendHeadersAsync(ClientMetadata client, WatsonMessage msg, CancellationToken token)
{
byte[] headerBytes = _MessageBuilder.GetHeaderBytes(msg);
await client.DataStream.WriteAsync(headerBytes, 0, headerBytes.Length, token).ConfigureAwait(false);
await client.DataStream.FlushAsync(token).ConfigureAwait(false);
}
private async Task SendDataStreamAsync(ClientMetadata client, long contentLength, Stream stream, CancellationToken token)
{
if (contentLength <= 0) return;
long bytesRemaining = contentLength;
int bytesRead = 0;
while (bytesRemaining > 0)
{
if (bytesRemaining >= _Settings.StreamBufferSize)
{
client.SendBuffer = new byte[_Settings.StreamBufferSize];
}
else
{
client.SendBuffer = new byte[bytesRemaining];
}
bytesRead = await stream.ReadAsync(client.SendBuffer, 0, client.SendBuffer.Length, token).ConfigureAwait(false);
if (bytesRead > 0)
{
await client.DataStream.WriteAsync(client.SendBuffer, 0, bytesRead, token).ConfigureAwait(false);
bytesRemaining -= bytesRead;
}
}
await client.DataStream.FlushAsync(token).ConfigureAwait(false);
}
#endregion
#region Tasks
private async Task MonitorForIdleClients(CancellationToken token)
{
try
{
Dictionary lastSeen = null;
while (true)
{
token.ThrowIfCancellationRequested();
await Task.Delay(5000, _Token).ConfigureAwait(false);
if (_Settings.IdleClientTimeoutSeconds > 0)
{
lastSeen = _ClientManager.AllClientsLastSeen();
if (lastSeen != null && lastSeen.Count > 0)
{
DateTime idleTimestamp = DateTime.UtcNow.AddSeconds(-1 * _Settings.IdleClientTimeoutSeconds);
foreach (KeyValuePair curr in lastSeen)
{
if (curr.Value < idleTimestamp)
{
_ClientManager.AddClientTimedout(curr.Key);
_Settings.Logger?.Invoke(Severity.Debug, _Header + "disconnecting client " + curr.Key + " due to idle timeout");
await DisconnectClientAsync(curr.Key, MessageStatus.Timeout, true);
}
}
}
}
}
}
catch (TaskCanceledException)
{
}
catch (OperationCanceledException)
{
}
}
#endregion
#endregion
}
}