123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using NetMQ;
- namespace TcpEventBus
- {
- public class Server : ISocket
- {
- NetMQPoller poller = new NetMQPoller();
- public string IP { get; }
- public int Port { get; }
- public int Timeout { get; }
- public Server(string ip, int port,int timeout = 5000)
- {
- IP = ip;
- Port = port;
- IsConnect = false;
- Timeout = timeout;
- _server = new NetMQ.Sockets.SubscriberSocket($"@tcp://{IP}:{Port}");
- _server.ReceiveReady += _server_ReceiveReady;
- _server.Subscribe("");
- _client = new NetMQ.Sockets.PublisherSocket($"@tcp://{IP}:{Port - 1}");
- _client.Options.SendBuffer = 500 * 1024;
- _client.Options.SendHighWatermark = 10;
- poller.Add(_client);
- poller.Add(_server);
- }
- private void _server_ReceiveReady(object? sender, NetMQSocketEventArgs e)
- {
- OnData?.Invoke(e.Socket.ReceiveFrameBytes());
- }
- [AllowNull]
- private NetMQ.Sockets.SubscriberSocket _server;
- [AllowNull]
- private NetMQ.Sockets.PublisherSocket _client;
- public int ReceiveTimeout => 3000;
- [AllowNull]
- public Action OnConnected { get; set; }
- [AllowNull]
- public Action<ArraySegment<byte>> OnData { get; set; }
- [AllowNull]
- public Action OnDisconnected { get; set; }
- public bool IsConnect { get; private set; }
- public bool IsService => true;
- private object locker = new object();
- public void Disconnect()
- {
- if (System.Console.IsOutputRedirected)
- {
- System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
- }
- else
- {
- Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
- }
- poller.Stop();
- IsConnect = false;
- }
-
- public void Send(byte[] data)
- {
- lock (locker)
- {
- if (!IsConnect) return;
- try
- {
- _client.SendFrame(data);
- }
- catch
- {
- }
- }
- }
- public void Start()
- {
- if (System.Console.IsOutputRedirected)
- {
- System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Start)}");
- }
- else
- {
- Console.WriteLine($"{DateTime.Now}:{nameof(Start)}");
- }
- Task.Run(() => poller.Run());
- IsConnect = true;
- }
- }
- public class Client : ISocket
- {
- NetMQ.NetMQPoller _poller = new NetMQPoller();
- public string IP { get; }
- public int Port { get; }
- private string ServerAddress => $"tcp://{IP}:{Port - 1}";
- private string ClientAddress => $"tcp://{IP}:{Port}";
- public int Timeout { get; }
- public Client(string ip, int port,int timeout = 5000)
- {
- IP = ip;
- Port = port;
- IsConnect = false;
- Timeout = timeout;
- _server = new NetMQ.Sockets.SubscriberSocket();
- _server.ReceiveReady += _server_ReceiveReady;
- _server.Subscribe("");
- _client = new NetMQ.Sockets.PublisherSocket();
- _client.Options.SendBuffer = 500 * 1024;
- _client.Options.SendHighWatermark = 10;
- _poller = new NetMQPoller();
- _poller.Add(_server);
- _poller.Add(_client);
- }
- private void _server_ReceiveReady(object? sender, NetMQSocketEventArgs e)
- {
- OnData?.Invoke(e.Socket.ReceiveFrameBytes());
- }
- [AllowNull]
- private NetMQ.Sockets.SubscriberSocket _server;
- [AllowNull]
- private NetMQ.Sockets.PublisherSocket _client;
- public int ReceiveTimeout => 3000;
- [AllowNull]
- public Action OnConnected { get; set; }
- [AllowNull]
- public Action<ArraySegment<byte>> OnData { get; set; }
- [AllowNull]
- public Action OnDisconnected { get; set; }
- public bool IsConnect { get; private set; }
- public bool IsService => false;
- private object locker = new object();
- public void Disconnect()
- {
- if (System.Console.IsOutputRedirected)
- {
- System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
- }
- else
- {
- Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
- }
- _server.Disconnect(ServerAddress);
- _client.Disconnect(ClientAddress);
- _poller.Stop();
- IsConnect = false;
- }
- public void Send(byte[] data)
- {
- lock (locker)
- {
- if (!IsConnect) return;
- try
- {
- _client.SendFrame(data);
- }
- catch
- {
- }
- }
- }
- public void Start()
- {
- _client.Connect(ClientAddress);
- _server.Connect(ServerAddress);
- Thread.Sleep(100);
- if (System.Console.IsOutputRedirected)
- {
- System.Diagnostics.Debug.WriteLine($"{DateTime.Now}:{nameof(Start)}");
- }
- else
- {
- Console.WriteLine($"{DateTime.Now}:{nameof(Start)}");
- }
- Task.Run(() => _poller.Run());
- IsConnect = true;
- }
- }
- }
|