APM 体式格局,即 Asynchronous Programming Model
TAP 体式格局,即 Task-based Asynchronous Pattern
SAEA 体式格局,即 SocketAsyncEventArgs
RIO 体式格局,即 Registered I/O
在 .NET/C# 中关于 Socket 的支撑均是基于 Windows I/O Completion Ports 完成端口手艺的封装,经由历程差别的 Non-Blocking 封装组织来满足差别的编程需求。以上体式格局均已在 Cowboy.Sockets 中有完全完成,而且 APM 和 TAP 体式格局已经在现实项目中运用。Cowboy.Sockets 还在不停的进化和完美中,若有任何问题请实时斧正。
虽然有这么多种完成体式格局,但笼统的看,它们是一样一样的,用两个 Loop 即可形貌:Accept Loop 和 Read Loop,以下图所示。(这里说起的 "Loop" 指的是一种轮回体式格局,而非特指 while/for 等关键字。)
在任何 TCP Server 的完成中,肯定存在一个 Accept Socket Loop,用于吸收 Client 端的 Connect 要求以竖立 TCP Connection。
在任何 TCP Server 的完成中,肯定存在一个 Read Socket Loop,用于吸收 Client 端 Write 过来的数据。
假如 Accept 轮回壅塞,则会致使没法疾速的竖立衔接,效劳端 Pending Backlog 满,进而致使 Client 端收到 Connect Timeout 的非常。假如 Read 轮回壅塞,则显然会致使没法实时收到 Client 端发过来的数据,进而致使 Client 端 Send Buffer 满,没法再发送数据。
从完成细节的角度看,能够致使效劳壅塞的位置能够在:
Accept 到新的 Socket,构建新的 Connection 须要分派种种资本,分派资本慢;
Accept 到新的 Socket,没有实时触发下一次 Accept;
Read 到新的 Buffer,剖断 Payload 音讯长度,剖断历程长;
Read 到新的 Buffer,发明 Payload 还没有收全,继承 Read,则 "能够" 会致使一次 Buffer Copy;
Payload 吸收终了,举行 De-Serialization 转成可辨认的 Protocol Message,反序列化慢;
由 Business Module 来处置惩罚响应的 Protocol Message,处置惩罚历程慢;
1-2 涉及到 Accept 历程和 Connection 的竖立历程,3-4 涉及到 ReceiveBuffer 的处置惩罚历程,5-6 涉及到运用逻辑侧的完成。
Java 中有名的 Netty 收集库从 4.0 版本最先关于 Buffer 部份做了全新的尝试,采纳了名叫 ByteBuf 的设想,完成 Buffer Zero Copy 以削减高并发条件下 Buffer 拷贝带来的机能损失和 GC 压力。DotNetty,Orleans ,Helios 等项目正在尝试在 C# 中举行相似的 ByteBuf 的完成。
APM 体式格局:TcpSocketServer
TcpSocketServer 的完成是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采纳基于 APM 的 BeginXXX 和 EndXXX 接口完成。
TcpSocketServer 中的 Accept Loop 指的就是,
BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...
每个竖立胜利的 Connection 由 TcpSocketSession 来处置惩罚,所以 TcpSocketSession 中会包括 Read Loop,
BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...
TcpSocketServer 经由历程暴露 Event 来完成 Connection 的竖立与断开和数据吸收的关照。
event EventHandler<TcpClientConnectedEventArgs> ClientConnected; event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected; event EventHandler<TcpClientDataReceivedEventArgs> ClientDataReceived;
运用也是简朴直接,直接定阅事宜关照。
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }
TAP 体式格局:AsyncTcpSocketServer
AsyncTcpSocketServer 的完成是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采纳基于 TAP 的 async/await 的 XXXAsync 接口完成。
但是,现实上 XXXAsync 并没有建立什么奇异的结果,其内部完成只是将 APM 的要领转换成了 TAP 的挪用体式格局。
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public Task<Socket> AcceptSocketAsync() { return Task<Socket>.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task<TcpClient> AcceptTcpClientAsync() { return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
AsyncTcpSocketServer 中的 Accept Loop 指的就是,
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
每个竖立胜利的 Connection 由 AsyncTcpSocketSession 来处置惩罚,所以 AsyncTcpSocketSession 中会包括 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
为了将 async/await 异步究竟,AsyncTcpSocketServer 所暴露的接口也一样是 Awaitable 的。
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }
运用时仅需将一个完成了该接口的对象注入到 AsyncTcpSocketServer 的组织函数中即可。
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
固然,关于接口的完成也不是强迫了,也能够在组织函数中直接注入要领的完成。
public AsyncTcpSocketServer( IPEndPoint listenedEndPoint, Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null, Func<AsyncTcpSocketSession, Task> onSessionStarted = null, Func<AsyncTcpSocketSession, Task> onSessionClosed = null, AsyncTcpSocketServerConfiguration configuration = null) {}
SAEA 体式格局:TcpSocketSaeaServer
SAEA 是 SocketAsyncEventArgs 的简写。SocketAsyncEventArgs 是 .NET Framework 3.5 最先支撑的一种支撑高机能 Socket 通讯的完成。SocketAsyncEventArgs 比拟于 APM 体式格局的重要长处能够形貌以下:
The main feature of these enhancements is the avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
也就是说,长处就是无需为每次挪用都生成 IAsyncResult 等对象,向原生 Socket 更接近一些。
运用 SocketAsyncEventArgs 的引荐步骤以下:
Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
Reuse the context for another operation, put it back in the pool, or discard it.
重点在于池化(Pooling),池化的目标就是为了重用和削减运行时分派和垃圾接纳的压力。
TcpSocketSaeaServer 等于对 SocketAsyncEventArgs 的运用和封装,并完成了 Pooling 手艺。TcpSocketSaeaServer 中的重点是 SaeaAwaitable 类,SaeaAwaitable 中内置了一个 SocketAsyncEventArgs,并经由历程 GetAwaiter 返回 SaeaAwaiter 来支撑 async/await 操纵。同时,经由历程 SaeaExtensions 扩大要领对来扩大 SocketAsyncEventArgs 的 Awaitable 完成。
public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool 则是一个 QueuedObjectPool<SaeaAwaitable> 的衍生完成,用于池化 SaeaAwaitable 实例。同时,为了削减 TcpSocketSaeaSession 的构建历程,也完成了 SessionPool 即 QueuedObjectPool<TcpSocketSaeaSession>。
TcpSocketSaeaServer 中的 Accept Loop 指的就是,
while (IsListening) { var saea = _acceptSaeaPool.Take(); var socketError = await _listener.AcceptAsync(saea); if (socketError == SocketError.Success) { var acceptedSocket = saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }
每个竖立胜利的 Connection 由 TcpSocketSaeaSession 来处置惩罚,所以 TcpSocketSaeaSession 中会包括 Read Loop,
var saea = _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); while (State == TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(0, _receiveBuffer.Length); var socketError = await _socket.ReceiveAsync(saea); if (socketError != SocketError.Success) break; var receiveCount = saea.Saea.BytesTransferred; if (receiveCount == 0) break; }
一样,TcpSocketSaeaServer 对外所暴露的接口也一样是 Awaitable 的。
public interface ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count); Task OnSessionClosed(TcpSocketSaeaSession session); }
运用起来也是简朴直接:
public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
RIO 体式格局:TcpSocketRioServer
从 Windows 8.1 / Windows Server 2012 R2 最先,微软推出了 Registered I/O Networking Extensions 来支撑高机能 Socket 效劳的完成,简称 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
RIOCloseCompletionQueue
RIOCreateCompletionQueue
RIOCreateRequestQueue
RIODequeueCompletion
RIODeregisterBuffer
RIONotify
RIOReceive
RIOReceiveEx
RIORegisterBuffer
RIOResizeCompletionQueue
RIOResizeRequestQueue
RIOSend
RIOSendEx
到目前为止,.NET Framework 还没有推出对 RIO 的支撑,所以若想在 C# 中完成 RIO 则只能经由历程 P/Invoke 体式格局,RioSharp 是开源项目中的一个比较完全的完成。
Cowboy.Sockets 直接引用了 RioSharp 的源代码,安排在 Cowboy.Sockets.Experimental 名空间下,以供试验和测试运用。
一样,经由历程 TcpSocketRioServer 来完成 Accept Loop,
_listener.OnAccepted = (acceptedSocket) =>{ Task.Run(async () => { await Process(acceptedSocket); }) .Forget(); };
经由历程 TcpSocketRioSession 来处置惩罚 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); if (receiveCount == 0) break; }
测试代码自始自终的相似:
public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketRioSession session) { //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); Console.WriteLine(string.Format("TCP session has connected {0}.", session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.Write(string.Format("Client : --> ")); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
参考资料
Asynchronous Programming Model (APM)
Task-based Asynchronous Pattern (TAP)
Event-based Asynchronous Pattern (EAP)
SocketAsyncEventArgs
Registered I/O
Netty: Reference counted objects
Socket Performance Enhancements in Version 3.5
What's New for Windows Sockets for Windows 8.1 and Windows Server 2012 R2
RIO_EXTENSION_FUNCTION_TABLE structure
Windows 8 Registered I/O Networking Extensions
本篇文章《C#高机能TCP效劳的多种完成体式格局》由 Dennis Gao 宣布自博客园个人博客,未经作者本人赞同制止以任何的情势转载,任何自动的或工资的爬虫转载行动均为耍流氓。
以上就是C# 高机能 TCP 效劳的多种完成的细致内容,更多请关注ki4网别的相干文章!