NettyServer.cs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. using DotNetty.Buffers;
  2. using DotNetty.Codecs;
  3. using DotNetty.Common.Internal.Logging;
  4. using DotNetty.Handlers.Logging;
  5. using DotNetty.Handlers.Tls;
  6. using DotNetty.Transport.Bootstrapping;
  7. using DotNetty.Transport.Channels;
  8. using DotNetty.Transport.Channels.Sockets;
  9. using DotNetty.Transport.Libuv;
  10. using Microsoft.Extensions.Logging;
  11. using Microsoft.Extensions.Logging.Console;
  12. using System;
  13. using System.Collections.Generic;
  14. using System.Diagnostics.CodeAnalysis;
  15. using System.Linq;
  16. using System.Net;
  17. using System.Net.Http.Headers;
  18. using System.Security.Cryptography.X509Certificates;
  19. using System.Text;
  20. using System.Threading.Tasks;
  21. namespace TcpEventBus
  22. {
  23. public class NettyServerManger: ISocket
  24. {
  25. [AllowNull]
  26. private IEventLoopGroup bossGroup;
  27. [AllowNull]
  28. private IEventLoopGroup workerGroup;
  29. [AllowNull]
  30. private IChannel boundChannel;
  31. [AllowNull]
  32. private ServerBootstrap bootstrap;
  33. public string IP { get; }
  34. public int MaxFrameLength { get; }
  35. public int Port { get; }
  36. public bool IsService => true;
  37. public bool IsConnect => handler == null ? false:handler.IsConnect;
  38. public int ReceiveTimeout { get; }
  39. [AllowNull]
  40. public Action OnConnected { get; set; }
  41. [AllowNull]
  42. public Action<ArraySegment<byte>> OnData { get; set; }
  43. [AllowNull]
  44. public Action OnDisconnected { get; set; }
  45. private NettyHandler handler = new NettyHandler();
  46. public NettyServerManger(string ip,int port,int timeout=1000,int maxFrameLength=512000)
  47. {
  48. if (maxFrameLength <= 0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
  49. MaxFrameLength = maxFrameLength;
  50. this.IP = ip;
  51. this.Port = port;
  52. ReceiveTimeout = timeout;
  53. RunServer();
  54. }
  55. private void RunServer()
  56. {
  57. #if DEBUG
  58. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  59. #endif
  60. if (true)
  61. {
  62. var dispatcher = new DispatcherEventLoopGroup();
  63. bossGroup = dispatcher;
  64. workerGroup = new WorkerEventLoopGroup(dispatcher);
  65. }
  66. else
  67. {
  68. bossGroup = new MultithreadEventLoopGroup(1);
  69. workerGroup = new MultithreadEventLoopGroup();
  70. }
  71. try
  72. {
  73. bootstrap = new ServerBootstrap();
  74. bootstrap.Group(bossGroup, workerGroup);
  75. if (true)
  76. {
  77. bootstrap.Channel<TcpServerChannel>();
  78. }
  79. else
  80. {
  81. bootstrap.Channel<TcpServerSocketChannel>();
  82. }
  83. handler = new NettyHandler();
  84. handler.OnData = (data)=>OnData?.Invoke(data);
  85. handler.OnConnected = ()=>OnConnected?.Invoke();
  86. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  87. bootstrap
  88. .Option(ChannelOption.SoBacklog, 10)
  89. .ChildOption(ChannelOption.SoLinger, 0)
  90. .Handler(new LoggingHandler("SRV-LSTN"))
  91. .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
  92. {
  93. IChannelPipeline pipeline = channel.Pipeline;
  94. pipeline.AddLast(new LoggingHandler("SRV-CONN"));
  95. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  96. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
  97. pipeline.AddLast("echo", handler);
  98. }));
  99. }
  100. finally
  101. {
  102. }
  103. }
  104. public async void Disconnect()
  105. {
  106. Console.WriteLine(nameof(Disconnect));
  107. await boundChannel.CloseAsync();
  108. await Task.WhenAll(
  109. bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
  110. workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
  111. }
  112. public async void Start()
  113. {
  114. boundChannel = await bootstrap.BindAsync(Port);
  115. }
  116. public void Send(byte[] data)
  117. {
  118. if (!IsConnect || data ==null|| data.Length ==0 || handler ==null) return;
  119. handler.SendData(data);
  120. }
  121. }
  122. public class NettyHandler: ChannelHandlerAdapter
  123. {
  124. [AllowNull]
  125. public Action OnConnected { get; set; }
  126. [AllowNull]
  127. public Action<ArraySegment<byte>> OnData { get; set; }
  128. [AllowNull]
  129. public Action OnDisconnected { get; set; }
  130. public bool IsConnect { get; private set; }
  131. [AllowNull]
  132. private IChannelHandlerContext _Context;
  133. public NettyHandler()
  134. {
  135. IsConnect = false;
  136. }
  137. public override bool IsSharable => true;
  138. public override void ChannelRead(IChannelHandlerContext context, object message)
  139. {
  140. if(message is IByteBuffer buffer)
  141. {
  142. OnData?.Invoke(buffer.GetIoBuffer());
  143. buffer.Release();
  144. }
  145. }
  146. public override void ChannelReadComplete(IChannelHandlerContext context)
  147. {
  148. context.Flush();
  149. }
  150. public override void ChannelRegistered(IChannelHandlerContext context)
  151. {
  152. base.ChannelRegistered(context);
  153. Console.WriteLine(nameof(ChannelRegistered));
  154. OnConnected?.Invoke();
  155. IsConnect = true;
  156. _Context = context;
  157. }
  158. public override void ChannelUnregistered(IChannelHandlerContext context)
  159. {
  160. base.ChannelUnregistered(context);
  161. Console.WriteLine(nameof(ChannelUnregistered));
  162. OnDisconnected?.Invoke();
  163. IsConnect = false;
  164. _Context = null;
  165. }
  166. public void SendData(byte[] data)
  167. {
  168. if (_Context == null || data == null || data.Length == 0 || !IsConnect) return;
  169. var buffer = _Context.Allocator.Buffer(data.Length);
  170. buffer.WriteBytes(data);
  171. _Context?.WriteAndFlushAsync(buffer);
  172. }
  173. }
  174. public class NettyClientManger : ISocket
  175. {
  176. [AllowNull]
  177. private Bootstrap bootstrap;
  178. [AllowNull]
  179. private MultithreadEventLoopGroup group;
  180. [AllowNull]
  181. private IChannel clientChannel;
  182. public string IP { get; }
  183. public int Port { get; }
  184. public bool IsService => true;
  185. public bool IsConnect => handler == null ? false : handler.IsConnect;
  186. public int ReceiveTimeout { get; }
  187. [AllowNull]
  188. public Action OnConnected { get; set; }
  189. public int MaxFrameLength { get; }
  190. [AllowNull]
  191. public Action<ArraySegment<byte>> OnData { get; set; }
  192. [AllowNull]
  193. public Action OnDisconnected { get; set; }
  194. private NettyHandler handler = new NettyHandler();
  195. public NettyClientManger(string ip, int port,int timeout = 1000,int maxFrameLength=512000)
  196. {
  197. if(maxFrameLength<=0) throw new ArgumentOutOfRangeException($"{nameof(maxFrameLength)} must be greater than 0");
  198. this.IP = ip;
  199. this.Port = port;
  200. ReceiveTimeout = timeout;
  201. MaxFrameLength = maxFrameLength;
  202. RunClientAsync();
  203. }
  204. private void RunClientAsync()
  205. {
  206. #if DEBUG
  207. InternalLoggerFactory.DefaultFactory = LoggerFactory.Create(builder => builder.AddConsole());
  208. #endif
  209. group = new MultithreadEventLoopGroup();
  210. try
  211. {
  212. handler = new NettyHandler();
  213. handler.OnConnected =()=>OnConnected?.Invoke();
  214. handler.OnData = (data) => OnData?.Invoke(data);
  215. handler.OnDisconnected = ()=>OnDisconnected?.Invoke();
  216. bootstrap = new Bootstrap();
  217. bootstrap
  218. .Group(group)
  219. .Channel<TcpSocketChannel>()
  220. .Option(ChannelOption.TcpNodelay, true)
  221. .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
  222. {
  223. IChannelPipeline pipeline = channel.Pipeline;
  224. pipeline.AddLast(new LoggingHandler());
  225. pipeline.AddLast("framing-enc", new LengthFieldPrepender(3));
  226. pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(MaxFrameLength + 3, 0, 3, 0, 3));
  227. pipeline.AddLast("echo", handler);
  228. }));
  229. }
  230. finally
  231. {
  232. }
  233. }
  234. public async void Disconnect()
  235. {
  236. if (clientChannel == null) return;
  237. await clientChannel.CloseAsync();
  238. Console.WriteLine($"{DateTime.Now}:{nameof(Disconnect)}");
  239. }
  240. public void Start()
  241. {
  242. try
  243. {
  244. Task.Run(()=> clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(IP), Port)).Result);
  245. Thread.Sleep(100);
  246. }
  247. catch
  248. {
  249. }
  250. }
  251. public void Send(byte[] data)
  252. {
  253. if (!IsConnect || data == null || data.Length == 0 || clientChannel ==null) return;
  254. var buffer = clientChannel.Allocator.Buffer(data.Length);
  255. buffer.WriteBytes(data);
  256. clientChannel.WriteAndFlushAsync(buffer);
  257. }
  258. }
  259. }