using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Text; using System.Threading.Tasks; using NetMQ; namespace TcpEventBus { public class Server : ISocket { NetMQPoller poller = new NetMQPoller(); public string IP { get; } public int Port { get; } public int Timeout { get; } public Server(string ip, int port,int timeout = 5000) { IP = ip; Port = port; IsConnect = false; Timeout = timeout; _server = new NetMQ.Sockets.SubscriberSocket($"@tcp://{IP}:{Port}"); _server.ReceiveReady += _server_ReceiveReady; _server.Subscribe(""); _client = new NetMQ.Sockets.PublisherSocket($"@tcp://{IP}:{Port - 1}"); _client.Options.SendBuffer = 500 * 1024; _client.Options.SendHighWatermark = 10; poller.Add(_client); poller.Add(_server); } private void _server_ReceiveReady(object? sender, NetMQSocketEventArgs e) { OnData?.Invoke(e.Socket.ReceiveFrameBytes()); } [AllowNull] private NetMQ.Sockets.SubscriberSocket _server; [AllowNull] private NetMQ.Sockets.PublisherSocket _client; public int ReceiveTimeout => 3000; [AllowNull] public Action OnConnected { get; set; } [AllowNull] public Action> OnData { get; set; } [AllowNull] public Action OnDisconnected { get; set; } public bool IsConnect { get; private set; } public bool IsService => true; private object locker = new object(); public void Disconnect() { if (System.Console.IsOutputRedirected) { System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}"); } else { Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}"); } poller.Stop(); IsConnect = false; } public void Send(byte[] data) { lock (locker) { if (!IsConnect) return; try { _client.SendFrame(data); } catch { } } } public void Start() { if (System.Console.IsOutputRedirected) { System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Start)}"); } else { Console.WriteLine($"{DateTime.Now}:{nameof(Start)}"); } Task.Run(() => poller.Run()); IsConnect = true; } } public class Client : ISocket { NetMQ.NetMQPoller _poller = new NetMQPoller(); public string IP { get; } public int Port { get; } private string ServerAddress => $"tcp://{IP}:{Port - 1}"; private string ClientAddress => $"tcp://{IP}:{Port}"; public int Timeout { get; } public Client(string ip, int port,int timeout = 5000) { IP = ip; Port = port; IsConnect = false; Timeout = timeout; _server = new NetMQ.Sockets.SubscriberSocket(); _server.ReceiveReady += _server_ReceiveReady; _server.Subscribe(""); _client = new NetMQ.Sockets.PublisherSocket(); _client.Options.SendBuffer = 500 * 1024; _client.Options.SendHighWatermark = 10; _poller = new NetMQPoller(); _poller.Add(_server); _poller.Add(_client); } private void _server_ReceiveReady(object? sender, NetMQSocketEventArgs e) { OnData?.Invoke(e.Socket.ReceiveFrameBytes()); } [AllowNull] private NetMQ.Sockets.SubscriberSocket _server; [AllowNull] private NetMQ.Sockets.PublisherSocket _client; public int ReceiveTimeout => 3000; [AllowNull] public Action OnConnected { get; set; } [AllowNull] public Action> OnData { get; set; } [AllowNull] public Action OnDisconnected { get; set; } public bool IsConnect { get; private set; } public bool IsService => false; private object locker = new object(); public void Disconnect() { if (System.Console.IsOutputRedirected) { System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}"); } else { Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}"); } _server.Disconnect(ServerAddress); _client.Disconnect(ClientAddress); _poller.Stop(); IsConnect = false; } public void Send(byte[] data) { lock (locker) { if (!IsConnect) return; try { _client.SendFrame(data); } catch { } } } public void Start() { _client.Connect(ClientAddress); _server.Connect(ServerAddress); Thread.Sleep(100); if (System.Console.IsOutputRedirected) { System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Start)}"); } else { Console.WriteLine($"{DateTime.Now}:{nameof(Start)}"); } Task.Run(() => _poller.Run()); IsConnect = true; } } }