using DotNetty.Buffers; using DotNetty.Codecs; using DotNetty.Common.Internal.Logging; using DotNetty.Handlers.Logging; using DotNetty.Handlers.Tls; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using DotNetty.Transport.Libuv; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Console; using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Net; using System.Net.Http.Headers; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Tasks; namespace TcpEventBus { public class NettyServerManger: ISocket { [AllowNull] private IEventLoopGroup bossGroup; [AllowNull] private IEventLoopGroup workerGroup; [AllowNull] private IChannel boundChannel; [AllowNull] private ServerBootstrap bootstrap; public string IP { get; } public int MaxFrameLength { get; } public int Port { get; } public bool IsService => true; public bool IsConnect => handler == null ? false:handler.IsConnect; public int ReceiveTimeout { get; } [AllowNull] public Action OnConnected { get; set; } [AllowNull] public Action> OnData { get; set; } [AllowNull] public Action OnDisconnected { get; set; } private NettyHandler handler = new NettyHandler(); public NettyServerManger(string ip,int port,int timeout=1000,int maxFrameLength=204800) { if (maxFrameLength <= 0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0"); MaxFrameLength = maxFrameLength; this.IP = ip; this.Port = port; ReceiveTimeout = timeout; RunServer(); } private void RunServer() { #if DEBUG InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole()); #endif if (true) { var dispatcher = new DispatcherEventLoopGroup(); bossGroup = dispatcher; workerGroup = new WorkerEventLoopGroup(dispatcher); } else { bossGroup = new MultithreadEventLoopGroup(1); workerGroup = new MultithreadEventLoopGroup(); } try { bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); if (true) { bootstrap.Channel(); } else { bootstrap.Channel(); } handler = new NettyHandler(); handler.OnData = (data)=>OnData?.Invoke(data); handler.OnConnected = ()=>OnConnected?.Invoke(); handler.OnDisconnected = ()=>OnDisconnected?.Invoke(); bootstrap .Option(ChannelOption.SoBacklog, 10) .ChildOption(ChannelOption.SoLinger, 0) .Handler(new LoggingHandler("SRV-LSTN")) .ChildHandler(new ActionChannelInitializer(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new LoggingHandler("SRV-CONN")); pipeline.AddLast("framing-enc", new LengthFieldPrepender(3)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3)); pipeline.AddLast("echo", handler); })); } finally { } } public async void Disconnect() { Console.WriteLine(nameof(Disconnect)); await boundChannel.CloseAsync(); await Task.WhenAll( bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); } public async void Start() { boundChannel = await bootstrap.BindAsync(Port); } public void Send(byte[] data) { if (!IsConnect || data ==null|| data.Length ==0 || handler ==null) return; handler.SendData(data); } } public class NettyHandler: ChannelHandlerAdapter { [AllowNull] public Action OnConnected { get; set; } [AllowNull] public Action> OnData { get; set; } [AllowNull] public Action OnDisconnected { get; set; } public bool IsConnect { get; private set; } [AllowNull] private IChannelHandlerContext _Context; public NettyHandler() { IsConnect = false; } public override bool IsSharable => true; public override void ChannelRead(IChannelHandlerContext context, object message) { if(message is IByteBuffer buffer) { OnData?.Invoke(buffer.GetIoBuffer()); buffer.Release(); } } public override void ChannelReadComplete(IChannelHandlerContext context) { context.Flush(); } public override void ChannelRegistered(IChannelHandlerContext context) { base.ChannelRegistered(context); Console.WriteLine(nameof(ChannelRegistered)); OnConnected?.Invoke(); IsConnect = true; _Context = context; } public override async void ChannelUnregistered(IChannelHandlerContext context) { base.ChannelUnregistered(context); Console.WriteLine(nameof(ChannelUnregistered)); OnDisconnected?.Invoke(); IsConnect = false; if (_Context != null) { _Context.Allocator.HeapBuffer().Clear(); _Context.Allocator.DirectBuffer().Clear(); _Context.Allocator.Buffer().Clear(); _Context.Allocator.CompositeBuffer().Clear(); _Context.Allocator.CompositeDirectBuffer().Clear(); _Context.Allocator.CompositeHeapBuffer().Clear(); _Context.Channel.Allocator.HeapBuffer().Clear(); await _Context.CloseAsync(); _Context.Channel.Pipeline.Remove(_Context.Handler); } _Context = null; } public void SendData(byte[] data) { if (_Context == null || data == null || data.Length == 0 || !IsConnect) return; var buffer = _Context.Allocator.Buffer(data.Length); buffer.WriteBytes(data); _Context?.WriteAndFlushAsync(buffer); } } public class NettyClientManger : ISocket { [AllowNull] private Bootstrap bootstrap; [AllowNull] private MultithreadEventLoopGroup group; [AllowNull] private IChannel clientChannel; public string IP { get; } public int Port { get; } public bool IsService => true; public bool IsConnect => handler == null ? false : handler.IsConnect; public int ReceiveTimeout { get; } [AllowNull] public Action OnConnected { get; set; } public int MaxFrameLength { get; } [AllowNull] public Action> OnData { get; set; } [AllowNull] public Action OnDisconnected { get; set; } private NettyHandler handler = new NettyHandler(); public NettyClientManger(string ip, int port,int timeout = 1000,int maxFrameLength=204800) { if(maxFrameLength<=0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0"); this.IP = ip; this.Port = port; ReceiveTimeout = timeout; MaxFrameLength = maxFrameLength; RunClientAsync(); } private void RunClientAsync() { #if DEBUG InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole()); #endif group = new MultithreadEventLoopGroup(); try { handler = new NettyHandler(); handler.OnConnected =()=>OnConnected?.Invoke(); handler.OnData = (data) => OnData?.Invoke(data); handler.OnDisconnected = ()=>OnDisconnected?.Invoke(); bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(3)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3)); pipeline.AddLast("echo", handler); })); } finally { } } public async void Disconnect() { if (clientChannel == null) return; await clientChannel.CloseAsync(); Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}"); } public void Start() { try { Task.Run(()=> clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(IP), Port)).Result); Thread.Sleep(100); } catch { } } public void Send(byte[] data) { if (!IsConnect || data == null || data.Length == 0 || clientChannel ==null) return; var buffer = clientChannel.Allocator.Buffer(data.Length); buffer.WriteBytes(data); clientChannel.WriteAndFlushAsync(buffer); } } }