NettyServer.cs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. using DotNetty.Buffers;
  2. using DotNetty.Codecs;
  3. using DotNetty.Common.Internal.Logging;
  4. using DotNetty.Handlers.Logging;
  5. using DotNetty.Handlers.Tls;
  6. using DotNetty.Transport.Bootstrapping;
  7. using DotNetty.Transport.Channels;
  8. using DotNetty.Transport.Channels.Sockets;
  9. using DotNetty.Transport.Libuv;
  10. using Microsoft.Extensions.Logging;
  11. using Microsoft.Extensions.Logging.Console;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Diagnostics.CodeAnalysis;
  15. using System.Linq;
  16. using System.Net;
  17. using System.Net.Http.Headers;
  18. using System.Security.Cryptography.X509Certificates;
  19. using System.Text;
  20. using System.Threading.Tasks;
  21. namespace TcpEventBus
  22. {
  23. public class NettyServerManger: ISocket
  24. {
  25. IEventLoopGroup bossGroup;
  26. IEventLoopGroup workerGroup;
  27. IChannel boundChannel;
  28. public string IP { get; }
  29. public int Port { get; }
  30. public bool IsService => true;
  31. public bool IsConnect => handler == null ? false:handler.IsConnect;
  32. public int ReceiveTimeout { get; }
  33. public Action OnConnected { get; set; }
  34. public Action<ArraySegment<byte>> OnData { get; set; }
  35. public Action OnDisconnected { get; set; }
  36. private NettyHandler handler = new NettyHandler();
  37. public NettyServerManger(string ip,int port,int timeout=1000)
  38. {
  39. this.IP = ip;
  40. this.Port = port;
  41. ReceiveTimeout = timeout;
  42. RunServerAsync().Wait();
  43. }
  44. ServerBootstrap bootstrap;
  45. private async Task RunServerAsync()
  46. {
  47. #if DEBUG
  48. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  49. #endif
  50. if (true)
  51. {
  52. var dispatcher = new DispatcherEventLoopGroup();
  53. bossGroup = dispatcher;
  54. workerGroup = new WorkerEventLoopGroup(dispatcher);
  55. }
  56. else
  57. {
  58. bossGroup = new MultithreadEventLoopGroup(1);
  59. workerGroup = new MultithreadEventLoopGroup();
  60. }
  61. try
  62. {
  63. bootstrap = new ServerBootstrap();
  64. bootstrap.Group(bossGroup, workerGroup);
  65. if (true)
  66. {
  67. bootstrap.Channel<TcpServerChannel>();
  68. }
  69. else
  70. {
  71. bootstrap.Channel<TcpServerSocketChannel>();
  72. }
  73. handler = new NettyHandler();
  74. handler.OnData = (data)=>OnData?.Invoke(data);
  75. handler.OnConnected = ()=>OnConnected?.Invoke();
  76. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  77. bootstrap
  78. .Option(ChannelOption.SoBacklog, 10)
  79. .ChildOption(ChannelOption.SoLinger, 0)
  80. .Handler(new LoggingHandler("SRV-LSTN"))
  81. .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
  82. {
  83. IChannelPipeline pipeline = channel.Pipeline;
  84. pipeline.AddLast(new LoggingHandler("SRV-CONN"));
  85. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  86. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(512000 + 3, 0, 3, 0, 3));
  87. pipeline.AddLast("echo", handler);
  88. }));
  89. }
  90. finally
  91. {
  92. }
  93. }
  94. public async void Disconnect()
  95. {
  96. Console.WriteLine(nameof(Disconnect));
  97. await boundChannel.CloseAsync();
  98. await Task.WhenAll(
  99. bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
  100. workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
  101. }
  102. public async void Start()
  103. {
  104. boundChannel = await bootstrap.BindAsync(Port);
  105. }
  106. public void Send(byte[] data)
  107. {
  108. if (!IsConnect || data ==null|| data.Length ==0 || handler ==null) return;
  109. handler.SendData(data);
  110. }
  111. }
  112. public class NettyHandler: ChannelHandlerAdapter
  113. {
  114. [AllowNull]
  115. public Action OnConnected { get; set; }
  116. [AllowNull]
  117. public Action<ArraySegment<byte>> OnData { get; set; }
  118. [AllowNull]
  119. public Action OnDisconnected { get; set; }
  120. public bool IsConnect { get; private set; }
  121. IChannelHandlerContext _Context;
  122. public NettyHandler()
  123. {
  124. IsConnect = false;
  125. }
  126. public override bool IsSharable => true;
  127. public override void ChannelRead(IChannelHandlerContext context, object message)
  128. {
  129. if(message is IByteBuffer buffer)
  130. {
  131. OnData?.Invoke(buffer.GetIoBuffer());
  132. buffer.Release();
  133. }
  134. }
  135. public override void ChannelReadComplete(IChannelHandlerContext context)
  136. {
  137. context.Flush();
  138. }
  139. public override void ChannelRegistered(IChannelHandlerContext context)
  140. {
  141. base.ChannelRegistered(context);
  142. Console.WriteLine(nameof(ChannelRegistered));
  143. OnConnected?.Invoke();
  144. IsConnect = true;
  145. _Context = context;
  146. }
  147. public override void ChannelUnregistered(IChannelHandlerContext context)
  148. {
  149. base.ChannelUnregistered(context);
  150. Console.WriteLine(nameof(ChannelUnregistered));
  151. OnDisconnected?.Invoke();
  152. IsConnect = false;
  153. _Context = null;
  154. }
  155. public void SendData(byte[] data)
  156. {
  157. if (_Context == null || data == null || data.Length == 0 || !IsConnect) return;
  158. var buffer = _Context.Allocator.Buffer(data.Length);
  159. buffer.WriteBytes(data);
  160. _Context?.WriteAndFlushAsync(buffer);
  161. }
  162. }
  163. public class NettyClientManger : ISocket
  164. {
  165. IChannel clientChannel;
  166. public string IP { get; }
  167. public int Port { get; }
  168. public bool IsService => true;
  169. public bool IsConnect => handler == null ? false : handler.IsConnect;
  170. public int ReceiveTimeout { get; }
  171. public Action OnConnected { get; set; }
  172. public Action<ArraySegment<byte>> OnData { get; set; }
  173. public Action OnDisconnected { get; set; }
  174. private NettyHandler handler = new NettyHandler();
  175. public NettyClientManger(string ip, int port,int timeout = 1000)
  176. {
  177. this.IP = ip;
  178. this.Port = port;
  179. ReceiveTimeout = timeout;
  180. RunClientAsync();
  181. }
  182. Bootstrap bootstrap;
  183. MultithreadEventLoopGroup group;
  184. private void RunClientAsync()
  185. {
  186. #if DEBUG
  187. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  188. #endif
  189. group = new MultithreadEventLoopGroup();
  190. try
  191. {
  192. handler = new NettyHandler();
  193. handler.OnConnected =()=>OnConnected?.Invoke();
  194. handler.OnData = (data) => OnData?.Invoke(data);
  195. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  196. bootstrap = new Bootstrap();
  197. bootstrap
  198. .Group(group)
  199. .Channel<TcpSocketChannel>()
  200. .Option(ChannelOption.TcpNodelay, true)
  201. .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
  202. {
  203. IChannelPipeline pipeline = channel.Pipeline;
  204. pipeline.AddLast(new LoggingHandler());
  205. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  206. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(512000 + 3, 0, 3, 0, 3));
  207. pipeline.AddLast("echo", handler);
  208. }));
  209. }
  210. finally
  211. {
  212. }
  213. }
  214. public async void Disconnect()
  215. {
  216. if (clientChannel == null) return;
  217. await clientChannel.CloseAsync();
  218. Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
  219. }
  220. public void Start()
  221. {
  222. try
  223. {
  224. Task.Run(()=> clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(IP), Port)).Result);
  225. Thread.Sleep(100);
  226. }
  227. catch
  228. {
  229. }
  230. }
  231. public void Send(byte[] data)
  232. {
  233. if (!IsConnect || data == null || data.Length == 0 || clientChannel ==null) return;
  234. var buffer = clientChannel.Allocator.Buffer(data.Length);
  235. buffer.WriteBytes(data);
  236. clientChannel.WriteAndFlushAsync(buffer);
  237. }
  238. }
  239. }