123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- using DotNetty.Buffers;
- using DotNetty.Codecs;
- using DotNetty.Common.Internal.Logging;
- using DotNetty.Handlers.Logging;
- using DotNetty.Handlers.Tls;
- using DotNetty.Transport.Bootstrapping;
- using DotNetty.Transport.Channels;
- using DotNetty.Transport.Channels.Sockets;
- using DotNetty.Transport.Libuv;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Console;
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.Linq;
- using System.Net;
- using System.Net.Http.Headers;
- using System.Security.Cryptography.X509Certificates;
- using System.Text;
- using System.Threading.Tasks;
- namespace TcpEventBus
- {
- public class NettyServerManger: ISocket
- {
- [AllowNull]
- private IEventLoopGroup bossGroup;
- [AllowNull]
- private IEventLoopGroup workerGroup;
- [AllowNull]
- private IChannel boundChannel;
- [AllowNull]
- private ServerBootstrap bootstrap;
- public string IP { get; }
- public int MaxFrameLength { get; }
- public int Port { get; }
- public bool IsService => true;
- public bool IsConnect => handler == null ? false:handler.IsConnect;
- public int ReceiveTimeout { get; }
- [AllowNull]
- public Action OnConnected { get; set; }
- [AllowNull]
- public Action<ArraySegment<byte>> OnData { get; set; }
- [AllowNull]
- public Action OnDisconnected { get; set; }
- private NettyHandler handler = new NettyHandler();
- public NettyServerManger(string ip,int port,int timeout=1000,int maxFrameLength=204800)
- {
- if (maxFrameLength <= 0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
- MaxFrameLength = maxFrameLength;
- this.IP = ip;
- this.Port = port;
- ReceiveTimeout = timeout;
- RunServer();
- }
- private void RunServer()
- {
- #if DEBUG
- InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
- #endif
- if (true)
- {
- var dispatcher = new DispatcherEventLoopGroup();
- bossGroup = dispatcher;
- workerGroup = new WorkerEventLoopGroup(dispatcher);
- }
- else
- {
- bossGroup = new MultithreadEventLoopGroup(1);
- workerGroup = new MultithreadEventLoopGroup();
- }
- try
- {
- bootstrap = new ServerBootstrap();
- bootstrap.Group(bossGroup, workerGroup);
- if (true)
- {
- bootstrap.Channel<TcpServerChannel>();
- }
- else
- {
- bootstrap.Channel<TcpServerSocketChannel>();
- }
- handler = new NettyHandler();
- handler.OnData = (data)=>OnData?.Invoke(data);
- handler.OnConnected = ()=>OnConnected?.Invoke();
- handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
- bootstrap
- .Option(ChannelOption.SoBacklog, 10)
- .ChildOption(ChannelOption.SoLinger, 0)
- .Handler(new LoggingHandler("SRV-LSTN"))
- .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
- {
- IChannelPipeline pipeline = channel.Pipeline;
- pipeline.AddLast(new LoggingHandler("SRV-CONN"));
- pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
- pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
- pipeline.AddLast("echo", handler);
- }));
- }
- finally
- {
- }
- }
- public async void Disconnect()
- {
- Console.WriteLine(nameof(Disconnect));
- await boundChannel.CloseAsync();
- await Task.WhenAll(
- bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
- workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
- }
- public async void Start()
- {
- boundChannel = await bootstrap.BindAsync(Port);
- }
- public void Send(byte[] data)
- {
- if (!IsConnect || data ==null|| data.Length ==0 || handler ==null) return;
- handler.SendData(data);
- }
- }
- public class NettyHandler: ChannelHandlerAdapter
- {
- [AllowNull]
- public Action OnConnected { get; set; }
- [AllowNull]
- public Action<ArraySegment<byte>> OnData { get; set; }
- [AllowNull]
- public Action OnDisconnected { get; set; }
- public bool IsConnect { get; private set; }
- [AllowNull]
- private IChannelHandlerContext _Context;
- public NettyHandler()
- {
- IsConnect = false;
- }
- public override bool IsSharable => true;
- public override void ChannelRead(IChannelHandlerContext context, object message)
- {
- if(message is IByteBuffer buffer)
- {
- OnData?.Invoke(buffer.GetIoBuffer());
- buffer.Release();
- }
- }
- public override void ChannelReadComplete(IChannelHandlerContext context)
- {
- context.Flush();
- }
- public override void ChannelRegistered(IChannelHandlerContext context)
- {
- base.ChannelRegistered(context);
- Console.WriteLine(nameof(ChannelRegistered));
- OnConnected?.Invoke();
- IsConnect = true;
-
- _Context = context;
- }
- public override async void ChannelUnregistered(IChannelHandlerContext context)
- {
- base.ChannelUnregistered(context);
- Console.WriteLine(nameof(ChannelUnregistered));
- OnDisconnected?.Invoke();
- IsConnect = false;
- if (_Context != null)
- {
- _Context.Allocator.HeapBuffer().Clear();
- _Context.Allocator.DirectBuffer().Clear();
- _Context.Allocator.Buffer().Clear();
- _Context.Allocator.CompositeBuffer().Clear();
- _Context.Allocator.CompositeDirectBuffer().Clear();
- _Context.Allocator.CompositeHeapBuffer().Clear();
- _Context.Channel.Allocator.HeapBuffer().Clear();
- await _Context.CloseAsync();
- _Context.Channel.Pipeline.Remove(_Context.Handler);
- }
- _Context = null;
- }
- public void SendData(byte[] data)
- {
- if (_Context == null || data == null || data.Length == 0 || !IsConnect) return;
- var buffer = _Context.Allocator.Buffer(data.Length);
- buffer.WriteBytes(data);
- _Context?.WriteAndFlushAsync(buffer);
- }
- }
- public class NettyClientManger : ISocket
- {
- [AllowNull]
- private Bootstrap bootstrap;
- [AllowNull]
- private MultithreadEventLoopGroup group;
- [AllowNull]
- private IChannel clientChannel;
- public string IP { get; }
- public int Port { get; }
- public bool IsService => true;
- public bool IsConnect => handler == null ? false : handler.IsConnect;
- public int ReceiveTimeout { get; }
- [AllowNull]
- public Action OnConnected { get; set; }
- public int MaxFrameLength { get; }
- [AllowNull]
- public Action<ArraySegment<byte>> OnData { get; set; }
- [AllowNull]
- public Action OnDisconnected { get; set; }
- private NettyHandler handler = new NettyHandler();
- public NettyClientManger(string ip, int port,int timeout = 1000,int maxFrameLength=204800)
- {
- if(maxFrameLength<=0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
- this.IP = ip;
- this.Port = port;
- ReceiveTimeout = timeout;
- MaxFrameLength = maxFrameLength;
- RunClientAsync();
- }
- private void RunClientAsync()
- {
- #if DEBUG
- InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
- #endif
- group = new MultithreadEventLoopGroup();
- try
- {
- handler = new NettyHandler();
- handler.OnConnected =()=>OnConnected?.Invoke();
- handler.OnData = (data) => OnData?.Invoke(data);
- handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
- bootstrap = new Bootstrap();
- bootstrap
- .Group(group)
- .Channel<TcpSocketChannel>()
- .Option(ChannelOption.TcpNodelay, true)
- .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
- {
- IChannelPipeline pipeline = channel.Pipeline;
- pipeline.AddLast(new LoggingHandler());
- pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
- pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
- pipeline.AddLast("echo", handler);
- }));
- }
- finally
- {
- }
- }
- public async void Disconnect()
- {
- if (clientChannel == null) return;
- await clientChannel.CloseAsync();
- Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
- }
- public void Start()
- {
- try
- {
- Task.Run(()=> clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(IP), Port)).Result);
- Thread.Sleep(100);
- }
- catch
- {
- }
- }
- public void Send(byte[] data)
- {
- if (!IsConnect || data == null || data.Length == 0 || clientChannel ==null) return;
- var buffer = clientChannel.Allocator.Buffer(data.Length);
- buffer.WriteBytes(data);
- clientChannel.WriteAndFlushAsync(buffer);
- }
- }
- }
|