NettyServer.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. using DotNetty.Buffers;
  2. using DotNetty.Codecs;
  3. using DotNetty.Common.Internal.Logging;
  4. using DotNetty.Handlers.Logging;
  5. using DotNetty.Handlers.Tls;
  6. using DotNetty.Transport.Bootstrapping;
  7. using DotNetty.Transport.Channels;
  8. using DotNetty.Transport.Channels.Sockets;
  9. using DotNetty.Transport.Libuv;
  10. using Microsoft.Extensions.Logging;
  11. using Microsoft.Extensions.Logging.Console;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Diagnostics.CodeAnalysis;
  15. using System.Linq;
  16. using System.Net;
  17. using System.Net.Http.Headers;
  18. using System.Security.Cryptography.X509Certificates;
  19. using System.Text;
  20. using System.Threading.Tasks;
  21. namespace TcpEventBus
  22. {
  23. public class NettyServerManger: ISocket
  24. {
  25. [AllowNull]
  26. private IEventLoopGroup bossGroup;
  27. [AllowNull]
  28. private IEventLoopGroup workerGroup;
  29. [AllowNull]
  30. private IChannel boundChannel;
  31. [AllowNull]
  32. private ServerBootstrap bootstrap;
  33. public string IP { get; }
  34. public int MaxFrameLength { get; }
  35. public int Port { get; }
  36. public bool IsService => true;
  37. public bool IsConnect => handler == null ? false:handler.IsConnect;
  38. public int ReceiveTimeout { get; }
  39. [AllowNull]
  40. public Action OnConnected { get; set; }
  41. [AllowNull]
  42. public Action<ArraySegment<byte>> OnData { get; set; }
  43. [AllowNull]
  44. public Action OnDisconnected { get; set; }
  45. private NettyHandler handler = new NettyHandler();
  46. public NettyServerManger(string ip,int port,int timeout=1000,int maxFrameLength=204800)
  47. {
  48. if (maxFrameLength <= 0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
  49. MaxFrameLength = maxFrameLength;
  50. this.IP = ip;
  51. this.Port = port;
  52. ReceiveTimeout = timeout;
  53. RunServer();
  54. }
  55. private void RunServer()
  56. {
  57. #if DEBUG
  58. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  59. #endif
  60. if (true)
  61. {
  62. var dispatcher = new DispatcherEventLoopGroup();
  63. bossGroup = dispatcher;
  64. workerGroup = new WorkerEventLoopGroup(dispatcher);
  65. }
  66. else
  67. {
  68. bossGroup = new MultithreadEventLoopGroup(1);
  69. workerGroup = new MultithreadEventLoopGroup();
  70. }
  71. try
  72. {
  73. bootstrap = new ServerBootstrap();
  74. bootstrap.Group(bossGroup, workerGroup);
  75. if (true)
  76. {
  77. bootstrap.Channel<TcpServerChannel>();
  78. }
  79. else
  80. {
  81. bootstrap.Channel<TcpServerSocketChannel>();
  82. }
  83. handler = new NettyHandler();
  84. handler.OnData = (data)=>OnData?.Invoke(data);
  85. handler.OnConnected = ()=>OnConnected?.Invoke();
  86. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  87. bootstrap
  88. .Option(ChannelOption.SoBacklog, 10)
  89. .ChildOption(ChannelOption.SoLinger, 0)
  90. .Handler(new LoggingHandler("SRV-LSTN"))
  91. .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
  92. {
  93. IChannelPipeline pipeline = channel.Pipeline;
  94. pipeline.AddLast(new LoggingHandler("SRV-CONN"));
  95. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  96. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
  97. pipeline.AddLast("echo", handler);
  98. }));
  99. }
  100. finally
  101. {
  102. }
  103. }
  104. public async void Disconnect()
  105. {
  106. Console.WriteLine(nameof(Disconnect));
  107. await boundChannel.CloseAsync();
  108. await Task.WhenAll(
  109. bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
  110. workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
  111. }
  112. public async void Start()
  113. {
  114. boundChannel = await bootstrap.BindAsync(Port);
  115. }
  116. public void Send(byte[] data)
  117. {
  118. if (!IsConnect || data ==null|| data.Length ==0 || handler ==null) return;
  119. handler.SendData(data);
  120. }
  121. }
  122. public class NettyHandler: ChannelHandlerAdapter
  123. {
  124. [AllowNull]
  125. public Action OnConnected { get; set; }
  126. [AllowNull]
  127. public Action<ArraySegment<byte>> OnData { get; set; }
  128. [AllowNull]
  129. public Action OnDisconnected { get; set; }
  130. public bool IsConnect { get; private set; }
  131. [AllowNull]
  132. private IChannelHandlerContext _Context;
  133. public NettyHandler()
  134. {
  135. IsConnect = false;
  136. }
  137. public override bool IsSharable => true;
  138. public override void ChannelRead(IChannelHandlerContext context, object message)
  139. {
  140. if(message is IByteBuffer buffer)
  141. {
  142. OnData?.Invoke(buffer.GetIoBuffer());
  143. buffer.Release();
  144. }
  145. }
  146. public override void ChannelReadComplete(IChannelHandlerContext context)
  147. {
  148. context.Flush();
  149. }
  150. public override void ChannelRegistered(IChannelHandlerContext context)
  151. {
  152. base.ChannelRegistered(context);
  153. Console.WriteLine(nameof(ChannelRegistered));
  154. OnConnected?.Invoke();
  155. IsConnect = true;
  156. _Context = context;
  157. }
  158. public override async void ChannelUnregistered(IChannelHandlerContext context)
  159. {
  160. base.ChannelUnregistered(context);
  161. Console.WriteLine(nameof(ChannelUnregistered));
  162. OnDisconnected?.Invoke();
  163. IsConnect = false;
  164. if (_Context != null)
  165. {
  166. _Context.Allocator.HeapBuffer().Clear();
  167. _Context.Allocator.DirectBuffer().Clear();
  168. _Context.Allocator.Buffer().Clear();
  169. _Context.Allocator.CompositeBuffer().Clear();
  170. _Context.Allocator.CompositeDirectBuffer().Clear();
  171. _Context.Allocator.CompositeHeapBuffer().Clear();
  172. _Context.Channel.Allocator.HeapBuffer().Clear();
  173. await _Context.CloseAsync();
  174. _Context.Channel.Pipeline.Remove(_Context.Handler);
  175. }
  176. _Context = null;
  177. }
  178. public void SendData(byte[] data)
  179. {
  180. if (_Context == null || data == null || data.Length == 0 || !IsConnect) return;
  181. var buffer = _Context.Allocator.Buffer(data.Length);
  182. buffer.WriteBytes(data);
  183. _Context?.WriteAndFlushAsync(buffer);
  184. }
  185. }
  186. public class NettyClientManger : ISocket
  187. {
  188. [AllowNull]
  189. private Bootstrap bootstrap;
  190. [AllowNull]
  191. private MultithreadEventLoopGroup group;
  192. [AllowNull]
  193. private IChannel clientChannel;
  194. public string IP { get; }
  195. public int Port { get; }
  196. public bool IsService => true;
  197. public bool IsConnect => handler == null ? false : handler.IsConnect;
  198. public int ReceiveTimeout { get; }
  199. [AllowNull]
  200. public Action OnConnected { get; set; }
  201. public int MaxFrameLength { get; }
  202. [AllowNull]
  203. public Action<ArraySegment<byte>> OnData { get; set; }
  204. [AllowNull]
  205. public Action OnDisconnected { get; set; }
  206. private NettyHandler handler = new NettyHandler();
  207. public NettyClientManger(string ip, int port,int timeout = 1000,int maxFrameLength=204800)
  208. {
  209. if(maxFrameLength<=0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
  210. this.IP = ip;
  211. this.Port = port;
  212. ReceiveTimeout = timeout;
  213. MaxFrameLength = maxFrameLength;
  214. RunClientAsync();
  215. }
  216. private void RunClientAsync()
  217. {
  218. #if DEBUG
  219. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  220. #endif
  221. group = new MultithreadEventLoopGroup();
  222. try
  223. {
  224. handler = new NettyHandler();
  225. handler.OnConnected =()=>OnConnected?.Invoke();
  226. handler.OnData = (data) => OnData?.Invoke(data);
  227. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  228. bootstrap = new Bootstrap();
  229. bootstrap
  230. .Group(group)
  231. .Channel<TcpSocketChannel>()
  232. .Option(ChannelOption.TcpNodelay, true)
  233. .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
  234. {
  235. IChannelPipeline pipeline = channel.Pipeline;
  236. pipeline.AddLast(new LoggingHandler());
  237. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  238. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
  239. pipeline.AddLast("echo", handler);
  240. }));
  241. }
  242. finally
  243. {
  244. }
  245. }
  246. public async void Disconnect()
  247. {
  248. if (clientChannel == null) return;
  249. await clientChannel.CloseAsync();
  250. Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
  251. }
  252. public void Start()
  253. {
  254. try
  255. {
  256. Task.Run(()=> clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(IP), Port)).Result);
  257. Thread.Sleep(100);
  258. }
  259. catch
  260. {
  261. }
  262. }
  263. public void Send(byte[] data)
  264. {
  265. if (!IsConnect || data == null || data.Length == 0 || clientChannel ==null) return;
  266. var buffer = clientChannel.Allocator.Buffer(data.Length);
  267. buffer.WriteBytes(data);
  268. clientChannel.WriteAndFlushAsync(buffer);
  269. }
  270. }
  271. }