这篇文章内里引见的要领是一种完成,然则存在一个瑕玷就是一个对象序列化后会增大许多,不利于在收集中的传输。
我们在收集中的传输是须要尽量的减小传送的数据包的大小,因而我参考了网上一些材料和一些开源的项目(http://www.ki4.cn/)这个上面的谁人开源的飞鸽传输的框架,
实在也就是把要传送的数据根据某种划定放在一个byte数组中,然后吸收到后根据响应的花样把数据剖析出来,为了减小数据还运用了GZipStream的紧缩,之前出的题目就是在解紧缩时,不过如今已处置惩罚了。
起首我们要定义一种能示意我们传送数据的包的花样
public class PacketNetWorkMsg : IComparable<PacketNetWorkMsg> { /// <summary> /// 封包版本 /// </summary> public int Version { get; set; } /// <summary> /// 要发送的数据包 /// </summary> public byte[] Data { get; set; } /// <summary> /// 数据包所含数据长度 /// </summary> public int DataLength { get; set; } /// <summary> /// 分包后末了一个盈余长度 /// </summary> public int Remainder { get; set; } /// <summary> /// 长途地点 /// </summary> public IPEndPoint RemoteIP { get; set; } /// <summary> /// 发送次数 /// </summary> public int SendTimes { get; set; } /// <summary> /// 包编号 /// </summary> public long PackageNo { get; set; } /// <summary> /// 分包索引 /// </summary> public int PackageIndex { get; set; } /// <summary> /// 分包总数 /// </summary> public int PackageCount { get; set; } /// <summary> /// 取得或设置是不是须要返回已收到标志 /// </summary> public bool IsRequireReceiveCheck { get; set; } public PacketNetWorkMsg() { Version = 1; CreationTime = DateTime.Now; } public PacketNetWorkMsg(long packageNo, int Count, int index, byte[] data, int dataLength, int remainder, IPEndPoint desip, bool IsRequireReceive) { this.PackageNo = packageNo; this.PackageCount = Count; this.PackageIndex = index; this.Data = data; this.DataLength = dataLength; this.Remainder = remainder; this.IsRequireReceiveCheck = IsRequireReceive;//默许都须要确认包 this.RemoteIP = desip; } #region IComparable<PackedNetworkMessage> 成员 public int CompareTo(PacketNetWorkMsg other) { return PackageIndex < other.PackageIndex ? -1 : 1; } #endregion /// <summary> /// 取得生成数据包的时刻 /// </summary> public DateTime CreationTime { get; private set; } }
这个类是我们在收集中须要传输的详细最小的包
然后另有一个就是我们用来示意我们传输的数据的花样类Msg类,一个Msg类能够被分为多个PacketNetWorkMsg 来传送,分包的要领是为了打破udp体式格局传输数据的限定(64k),,能够运用udp传输大数据,许多人就在问,为何不必TCP,虽然TCP是非常好的体式格局,然则我也不想说出个所以然来,我就喜好这么干。
public class Msg { /// <summary> /// 是不是已被处置惩罚.在挂钩过程当中,假如为true,则底层代码不会再对信息举行处置惩罚 /// </summary> public bool Handled { get; set; } /// <summary> /// 取得或设置当前的音讯编号 /// </summary> /// <value></value> /// <remarks></remarks> public long PackageNo { get; set; } /// <summary> /// 取得或设置当前的音讯所属的主机名 /// </summary> public string HostName { get; set; } /// <summary> /// 取得或设置当前的音讯所属的用户名 /// </summary> public string UserName { get; set; } /// <summary> /// 取得或设置当前的敕令代码 /// </summary> //敕令的称号 public Commands Command { get; set; } /// <summary> /// 取得或设置当前的音讯的范例 文本音讯,或许二进制音讯 /// </summary> public Consts Type { get; set; } /// <summary> /// 取得或设置当前的敕令音讯文本 /// </summary> public string NormalMsg { get; set; } /// <summary> /// 音讯文本字节 /// </summary> public byte[] NormalMsgBytes { get; set; } /// <summary> /// 扩大音讯文本字节 /// </summary> public byte[] ExtendMessageBytes { get; set; } /// <summary> /// 取得或设置当前敕令的扩大文本 /// </summary> public string ExtendMessage { get; set; } /// <summary> /// 长途地点 /// </summary> public IPEndPoint RemoteAddr { get; set; } /// <summary> /// 主机地点 /// </summary> public IPEndPoint HostAddr { get; set; } /// <summary> /// 取得或设置是不是须要返回已收到标志 /// </summary> public bool IsRequireReceive { get; set; } public Msg(IPEndPoint Addr) { RemoteAddr = Addr; Handled = false; Type = Consts.MESSAGE_TEXT; } public Msg(IPEndPoint hostIP,IPEndPoint remoteIP,Commands cmd) { HostAddr = hostIP; RemoteAddr = remoteIP; Command = cmd; Handled = false; Type = Consts.MESSAGE_TEXT; } public Msg(IPEndPoint addr, string hostName, string userName,Commands command, string message, string extendMessage) { RemoteAddr = addr; Handled = false; HostName = hostName; UserName = userName; Command = command; NormalMsg = message; ExtendMessage = extendMessage; Type = Consts.MESSAGE_TEXT; } /// <summary> /// 直接竖立一个新的Message对象 /// </summary> /// <param name="host">主机对象</param> /// <param name="addr">长途地点</param> /// <param name="hostName">主机名</param> /// <param name="userName">用户名</param> /// <param name="command">敕令</param> /// <param name="options">选项</param> /// <param name="message">信息</param> /// <param name="extendMessage">扩大信息</param> /// <returns></returns> public static Msg Create(Host host, IPEndPoint addr, string hostName, string userName, Commands command, string message, string extendMessage) { return new Msg(addr,hostName, userName, command,message, extendMessage); } }
现在这两个类就是我们重要的数据结构了。
然后接下来是我们重要的类,分包和组包的一个类了
/// <summary> /// 音讯封包类 /// </summary> public class MessagePacker { Timer _timer; public MessagePacker() { _timer = new Timer(_ => CheckForOutdateMessage(), null, new TimeSpan(0, 5, 0), new TimeSpan(0, 0, 5, 0)); } /* * 音讯包注重: * 1.第一名始终是2(ASCII码50) * 2.第二位到第九位是一个long范例的整数,代表音讯编号 * 3.第十位到第十三位是一个int范例的整数,代表音讯内容总长度 * 4.第十四位到第十七位是一个int范例的整数,代表分包的总数 * 5.第十八位到第二十一名是一个int范例的整数,代表当前的分包编号 * 6.第二十二位示意是不是须要返回一个确认标识(1/0) * 7.第二十三到第三十一名是保存的(Reserved) * 8.第三十二字节今后是数据包 * */ /// <summary> /// 音讯版本号 /// </summary> public static byte VersionHeader { get { return 50; } } /// <summary> /// 返回当前音讯封包的头字节数 /// </summary> public static int PackageHeaderLength { get { return 32; } } /// <summary> /// 取得音讯包的字撙节 /// </summary> /// <param name="message">要打包的音讯对象</param> /// <returns></returns> public static PacketNetWorkMsg[] BuildNetworkMessage(Msg message) { if (message.ExtendMessageBytes != null) { return BuildNetworkMessage( message.RemoteAddr, message.PackageNo, message.Command, message.UserName, message.HostName, message.Type, message.NormalMsgBytes, message.ExtendMessageBytes, message.IsRequireReceive ); } else { return BuildNetworkMessage( message.RemoteAddr, message.PackageNo, message.Command, message.UserName, message.HostName, message.Type, System.Text.Encoding.Unicode.GetBytes(message.NormalMsg), System.Text.Encoding.Unicode.GetBytes(message.ExtendMessage), message.IsRequireReceive ); } } /// <summary> /// 取得音讯包的字撙节 /// </summary> /// <param name="remoteIp">长途主机地点</param> /// <param name="packageNo">包编号</param> /// <param name="command">敕令</param> /// <param name="options">参数</param> /// <param name="userName">用户名</param> /// <param name="hostName">主机名</param> /// <param name="content">正文音讯</param> /// <param name="extendContents">扩大音讯</param> /// <returns></returns> public static PacketNetWorkMsg[] BuildNetworkMessage(IPEndPoint remoteIp, long packageNo, Commands command, string userName, string hostName,Consts type ,byte[] content, byte[] extendContents, bool RequireReceiveCheck) { //每次发送所能容下的数据量 int maxBytesPerPackage = (int)Consts.MAX_UDP_PACKAGE_LENGTH - PackageHeaderLength; //紧缩数据流 var ms = new MemoryStream(); //var dest = new MemoryStream(); //var zip = new GZipStream(dest, CompressionMode.Compress); var bw = new BinaryWriter(ms, System.Text.Encoding.Unicode); //写入头部数据 bw.Write(packageNo); //包编号 bw.Write(userName); //用户名 bw.Write(hostName); //主机名 bw.Write((long)command); //敕令 bw.Write((long)type); //数据范例 bw.Write(content == null ? 0 : content.Length);//数据长度 //写入音讯数据 if (content != null) bw.Write(content); bw.Write(extendContents == null ? 0 : extendContents.Length);//补充数据长度 if (extendContents != null) bw.Write(extendContents); ms.Flush(); ms.Seek(0, System.IO.SeekOrigin.Begin); byte[] ibuf = ms.ToArray(); var dest = new System.IO.MemoryStream(); GZipStream zipStream = new GZipStream(dest, CompressionMode.Compress, true); byte[] buff = new byte[1024]; int offset; ms.Seek(0, SeekOrigin.Begin); while ((offset = ms.Read(buff, 0, buff.Length)) > 0) { zipStream.Write(buff, 0, offset);//先把数据用二进制写入内存,然后在把它用zip紧缩,猎取紧缩事后的二进制流dest } zipStream.Close(); bw.Close(); ms.Close(); dest.Seek(0, SeekOrigin.Begin); //打包数据总量 int dataLength = (int)dest.Length; int packageCount = (int)Math.Ceiling(dataLength * 1.0 / maxBytesPerPackage); PacketNetWorkMsg[] pnma = new PacketNetWorkMsg[packageCount]; for (int i = 0; i < packageCount; i++) { int count = i == packageCount - 1 ? dataLength - maxBytesPerPackage * (packageCount - 1) : maxBytesPerPackage; byte[] buf = new byte[count + PackageHeaderLength]; buf[0] = VersionHeader;//版本号 第1位 BitConverter.GetBytes(packageNo).CopyTo(buf, 1);//音讯编号 第2到9位 long范例的整数 BitConverter.GetBytes(dataLength).CopyTo(buf, 9);//音讯内容长度 第10到13位 int范例的整数 BitConverter.GetBytes(packageCount).CopyTo(buf, 13);//分包总数 第14位到第17位 int范例的整数 BitConverter.GetBytes(i).CopyTo(buf, 17);//分包编号 第18位到第21位 int范例的整数 buf[21] = RequireReceiveCheck ? (byte)1 : (byte)0;//是不是回确认包 第22位 //第23到第31位是保存的(Reserved) dest.Read(buf, 32, buf.Length - 32);//第32字节今后是,详细的数据包 pnma[i] = new PacketNetWorkMsg() { Data = buf, PackageCount = packageCount, PackageIndex = i, PackageNo = packageNo, RemoteIP = remoteIp, SendTimes = 0, Version = 2, IsRequireReceiveCheck = buf[21] == 1 }; } return pnma; } /// <summary> /// 检测确认是不是是这个范例的音讯包 /// </summary> /// <param name="buffer"></param> /// <returns></returns> public static bool Test(byte[] buffer) { return buffer != null && buffer.Length > PackageHeaderLength && buffer[0] == VersionHeader; } /// <summary> /// 缓存吸收到的片断 /// </summary> static Dictionary<long, PacketNetWorkMsg[]> packageCache = new Dictionary<long, PacketNetWorkMsg[]>(); /// <summary> /// 剖析收集数据包并举行转换为信息对象 /// </summary> /// <param name="packs">吸收到的封包对象</param> /// <returns></returns> /// <remarks> /// 关于分包音讯,假如收到的只是片断而且还没有吸收完全,则不会举行剖析 /// </remarks> public static Msg ParseToMessage(params PacketNetWorkMsg[] packs) { if (packs.Length == 0 || (packs[0].PackageCount > 1 && packs.Length != packs[0].PackageCount)) return null; var ms = DecompressMessagePacks(packs); if (ms == null) { //事宜 return null; } //组织读取流 System.IO.BinaryReader br = new System.IO.BinaryReader(ms, System.Text.Encoding.Unicode); //最先读出数据 Msg m = new Msg(packs[0].RemoteIP); m.PackageNo = br.ReadInt64();//包编号 m.UserName = br.ReadString();//用户名 m.HostName = br.ReadString();//主机名 m.Command = (Commands)br.ReadInt64(); //敕令 m.Type = (Consts)br.ReadInt64();//数据范例 int length = br.ReadInt32(); //数据长度 m.NormalMsgBytes = new byte[length]; br.Read(m.NormalMsgBytes, 0, length);//读取内容 length = br.ReadInt32(); //附加数据长度 m.ExtendMessageBytes = new byte[length]; br.Read(m.ExtendMessageBytes, 0, length);//读取附加数据 if (m.Type == Consts.MESSAGE_TEXT) { m.NormalMsg = System.Text.Encoding.Unicode.GetString(m.NormalMsgBytes, 0, length); //正文 m.ExtendMessage = System.Text.Encoding.Unicode.GetString(m.ExtendMessageBytes, 0, length); //扩大音讯 m.ExtendMessageBytes = null; m.NormalMsgBytes = null; } return m; } /// <summary> /// 组合一切的收集数据包并实行解紧缩 /// </summary> /// <param name="packs"></param> /// <returns></returns> static MemoryStream DecompressMessagePacks(params PacketNetWorkMsg[] packs) { try { //尝试解紧缩,先排序 Array.Sort(packs); var msout = new MemoryStream(); using (var ms = new System.IO.MemoryStream()) { //兼并写入 //Array.ForEach(packs, s => ms.Write(s.Data, 32, s.Data.Length-32)); Array.ForEach(packs, s => ms.Write(s.Data, 0, s.Data.Length)); ms.Seek(0, SeekOrigin.Begin); //解紧缩 using (var gz = new GZipStream(ms, CompressionMode.Decompress)) { var buffer = new byte[0x400]; var count = 0; while ((count = gz.Read(buffer, 0, buffer.Length)) > 0) { msout.Write(buffer, 0, count); } } } msout.Seek(0, SeekOrigin.Begin); return msout; } catch (Exception) { return null; } } /// <summary> /// 尝试将收到的收集包剖析为实体 /// </summary> /// <param name="pack">收到的收集包</param> /// <returns></returns> /// <remarks>假如收到的包是分片包,且其一切子包还没有接收完全,则会返回空值</remarks> public static Msg TryToTranslateMessage(PacketNetWorkMsg pack) { if (pack == null || pack.PackageIndex > pack.PackageCount - 1) return null; else if (pack.PackageCount == 1) return ParseToMessage(pack); else { lock (packageCache) { if (packageCache.ContainsKey(pack.PackageNo)) { PacketNetWorkMsg[] array = packageCache[pack.PackageNo]; array[pack.PackageIndex] = pack; //检测是不是完全 if (Array.FindIndex(array, s => s == null) == -1) { packageCache.Remove(pack.PackageNo); return ParseToMessage(array); } else { return null; } } else { PacketNetWorkMsg[] array = new PacketNetWorkMsg[pack.PackageCount]; array[pack.PackageIndex] = pack; packageCache.Add(pack.PackageNo, array); return null; } } } } /// <summary> /// 将收集信息剖析为封包 /// </summary> /// <param name="buffer"></param> /// <returns></returns> public static PacketNetWorkMsg Parse(byte[] buffer, IPEndPoint clientAddress) { if (!Test(buffer)) return null; PacketNetWorkMsg p = new PacketNetWorkMsg() { RemoteIP = clientAddress, SendTimes = 0 }; p.PackageNo = BitConverter.ToInt64(buffer, 1);//包编号 p.DataLength = (int)BitConverter.ToInt64(buffer, 9); //内容长度 p.PackageCount = BitConverter.ToInt32(buffer, 13);//分包总数 p.PackageIndex = BitConverter.ToInt32(buffer, 17);//索引 p.IsRequireReceiveCheck = buffer[21] == 1;//是不是须要回包 p.Data = new byte[buffer.Length - PackageHeaderLength]; Array.Copy(buffer, PackageHeaderLength, p.Data, 0, p.Data.Length); return p; } void CheckForOutdateMessage() { lock (packageCache) { //TODO 这里设置最短的逾期时刻为5分钟,也就是说五分钟之前的音讯会被干掉 var minTime = DateTime.Now.AddMinutes(5.0); var targetList = new List<long>(); foreach (var pkgid in packageCache.Keys) { if (Array.TrueForAll(packageCache[pkgid], s => s == null || s.CreationTime < minTime)) { targetList.Add(pkgid); } } foreach (var pkgid in targetList) { packageCache.Remove(pkgid); } } } #region 事宜 /// <summary> /// 收集层数据包解紧缩失利 /// </summary> public static event EventHandler<PackageEventArgs> DecompressFailed; /// <summary> /// 触发解紧缩失利事宜 /// </summary> /// <param name="e">事宜包括的参数</param> protected static void OnDecompressFailed(PackageEventArgs e) { if (DecompressFailed != null) DecompressFailed(typeof(MessagePacker),e); } #endregion }
经由过程BuildNetworkMessage要领能够把一个Msg对象分为1个或多个packet,然后的话就能够经由过程之前所用的要领把分好的包挨个发送了。收到数据包后用TryToTranslateMessage要领把数据包组装成为一个Msg
接下来是一个 UDP底层通信的类
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using Netframe.Model; using System.Net; using System.Threading; using Netframe.Tool; using Netframe.Event; namespace Netframe.Core { /// <summary> /// 基础通信类 UDP,能够举行基础数据发送,UdpPacketMsg的发送,数据收到时触发事宜 /// </summary> public class UDPThread { #region 私有变量 /// <summary> /// 设置信息 /// </summary> Config _config; /// <summary> /// UDP客户端 /// </summary> UdpClient client; /// <summary> /// 用于轮询是不是发送胜利的纪录 /// </summary> List<PacketNetWorkMsg> SendList; #endregion #region 属性 /// <summary> /// 是不是已初始化了 /// </summary> public bool IsInitialized { get; private set; } /// <summary> /// 是不是竖立衔接 /// </summary> public bool IsConnect { get; private set; } /// <summary> /// 搜检发送行列距离 /// </summary> public int CheckQueueTimeInterval { get; set; } /// <summary> /// 没有收到确认包时,最大从新发送的数量,凌驾此数量会抛弃并触发PackageSendFailture 事宜。 /// </summary> public int MaxResendTimes { get; set; } #endregion #region 组织函数 /// <summary> /// 组织一个新的音讯对象,并绑定到指定的端口和IP上。 /// </summary> /// <param name="ip">绑定的IP</param> /// <param name="port">绑定的端口</param> public UDPThread(int port) { IsInitialized = false; IPAddress LocalIPAddress = null; //取得本机当前的ip try { IPAddress[] address = Dns.GetHostAddresses(Dns.GetHostName()); foreach (IPAddress addr in address) { if (addr.AddressFamily.ToString().Equals("InterNetwork")) { LocalIPAddress = addr; break; } } } catch (Exception) { OnLocalIpError(new EventArgs()); //猎取本机ip非常 return; } try { client = new UdpClient(new IPEndPoint(LocalIPAddress, port)); IsConnect = false; } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //最先监听 client.BeginReceive(ReceiveDataAsync, null); //ReceiveData(); } public UDPThread(Config config) { IsInitialized = false; try { client = new UdpClient(new IPEndPoint(config.BindedIP, config.Port)); } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; this._config = config; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //最先监听 client.BeginReceive(ReceiveDataAsync, null); } /// <summary> /// 组织函数与长途主机衔接 /// </summary> /// <param name="ipaddress">绑定ip</param> /// <param name="port">端口</param> public UDPThread(string ip, int port) { IsInitialized = false; IPAddress ipaddress = IPAddress.Parse(ip);//组织长途衔接的参数 try { client = new UdpClient(); client.Connect(new IPEndPoint(ipaddress, port));//与长途服务器竖立衔接ps:只是形式上,udp自身无衔接的 IsConnect = true; } catch (Exception) { OnNetworkError(new EventArgs()); return; } SendList = new List<PacketNetWorkMsg>(); client.EnableBroadcast = true; CheckQueueTimeInterval = 2000; MaxResendTimes = 5; new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start(); IsInitialized = true; //最先监听 client.BeginReceive(ReceiveDataAsync, null); //ReceiveData(); } #endregion #region 私有要领 /// <summary> /// 吸收数据的要领 /// </summary> /// <param name="ar"></param> void ReceiveDataAsync(IAsyncResult ar) { IPEndPoint ipend = null; byte[] buffer = null; try { buffer = client.EndReceive(ar, ref ipend); } catch (Exception) { return; } finally { if (IsInitialized && client != null) client.BeginReceive(ReceiveDataAsync, null); } if (buffer == null || buffer.Length == 0) return; //触发已收到事宜 OnPackageReceived(new PackageEventArgs() { RemoteIP = ipend, Data = buffer }); } /// <summary> /// 同步数据吸收要领 /// </summary> private void ReceiveData() { while (true) { IPEndPoint retip = null; byte[] buffer = null; try { buffer = client.Receive(ref retip);//吸收数据,当Client端衔接主机的时刻,retip就变成Cilent端的IP了 } catch (Exception) { //非常处置惩罚操纵 return; } if (buffer == null || buffer.Length == 0) return; PackageEventArgs arg = new PackageEventArgs(buffer, retip); OnPackageReceived(arg);//数据包收到触发事宜 } } /// <summary> /// 异步接收数据 /// </summary> private void AsyncReceiveData() { try { client.BeginReceive(new AsyncCallback(ReceiveCallback), null); } catch (SocketException ex) { throw ex; } } /// <summary> /// 吸收数据的回调函数 /// </summary> /// <param name="param"></param> private void ReceiveCallback(IAsyncResult param) { if (param.IsCompleted) { IPEndPoint retip = null; byte[] buffer = null; try { buffer = client.EndReceive(param, ref retip);//吸收数据,当Client端衔接主机的时刻,test就变成Cilent端的IP了 } catch (Exception ex) { //非常处置惩罚操纵 } finally { AsyncReceiveData(); } if (buffer == null || buffer.Length == 0) return; OnPackageReceived(new PackageEventArgs() { RemoteIP = retip, Data = buffer }); } } #endregion #region 大众函数 /// <summary> /// 封闭客户端 /// </summary> public void Close() { if (IsInitialized) { IsInitialized = false; if (IsInitialized) client.Close(); IsConnect = false; client = null; } } /// <summary> /// 发送数据,不举行搜检 /// </summary> /// <param name="address">长途主机地点</param> /// <param name="port">长途主机端口</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(IPAddress address, int port, byte[] data, long packageNo, int packageIndex) { Send(false, new IPEndPoint(address, port), data, packageNo, packageIndex); } /// <summary> /// 发送数据,并推断是不是对数据作回应搜检。将会在每隔 <see cref="CheckQueueTimeInterval"/> 的距离后从新发送,直到收到对方的回应。 /// 注重:收集层不会剖析回应,请挪用 <see cref="PopSendItemFromList"/> 要领来示知已收到数据包。 /// </summary> /// <param name="receiveConfirm">音讯是不是会回发确认包</param> /// <param name="address">长途主机地点</param> /// <param name="port">长途主机端口</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(bool receiveConfirm, IPAddress address, int port, byte[] data, long packageNo, int packageIndex) { Send(receiveConfirm, new IPEndPoint(address, port), data, packageNo, packageIndex); } /// <summary> /// 发送数据,并对数据作回应搜检。当 <see cref="receiveConfirm"/> 为 true 时,将会在每隔 <see cref="CheckQueueTimeInterval"></see> 的距离后从新发送,直到收到对方的回应。 /// 注重:收集层不会剖析回应,请挪用 <see cref="PopSendItemFromList"></see> 要领来示知已收到数据包。 /// </summary> /// <param name="receiveConfirm">音讯是不是会回发确认包</param> /// <param name="address">长途主机地点</param> /// <param name="data">数据流</param> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">分包索引</param> private void Send(bool receiveConfirm, IPEndPoint address, byte[] data, long packageNo, int packageIndex) { if (IsInitialized) { client.Send(data, data.Length, address); if (receiveConfirm) PushSendItemToList(new PacketNetWorkMsg() { Data = data, RemoteIP = address, SendTimes = 0, PackageIndex = packageIndex, PackageNo = packageNo }); } } /// <summary> /// 同步发送分包数据 /// </summary> /// <param name="message"></param> public void SendMsg(Msg message) { if (IsInitialized) { ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message); foreach (PacketNetWorkMsg packedMessage in udpPackets) { //运用同步发送 SendPacket(packedMessage); } } } /// <summary> /// 将已打包的音讯发送出去 /// </summary> /// <param name="packet"></param> public void SendPacket(PacketNetWorkMsg packet) { if (IsInitialized) { //运用同步的要领发送数据 if (!IsConnect) client.Send(packet.Data, packet.Data.Length, packet.RemoteIP); else client.Send(packet.Data, packet.Data.Length); if (packet.IsRequireReceiveCheck) PushSendItemToList(packet); } } /// <summary> /// 异步分包发送数组的要领 /// </summary> /// <param name="message"></param> public void AsyncSendMsg(Msg message) { if (IsInitialized) { ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message); foreach (PacketNetWorkMsg packedMessage in udpPackets) { //运用异步的要领发送数据 AsyncSendPacket(packedMessage); } } } /// <summary> /// 发送完成后的回调要领 /// </summary> /// <param name="param"></param> private void SendCallback(IAsyncResult param) { if (param.IsCompleted) { try { client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对涌现的 } catch (Exception) { PackageEventArgs e = new PackageEventArgs(); OnPackageSendFailure(e);//触发发送失利事宜 } } } /// <summary> /// 异步将将已打包的音讯发送出去,不举行发送搜检 /// </summary> /// <param name="packet"></param> public void AsyncSendPacket(PacketNetWorkMsg packet) { //运用异步的要领发送数据 if (IsInitialized) { if (!IsConnect) this.client.BeginSend(packet.Data, packet.Data.Length, packet.RemoteIP, new AsyncCallback(SendCallback), null); else this.client.BeginSend(packet.Data, packet.Data.Length, new AsyncCallback(SendCallback), null); if (packet.IsRequireReceiveCheck) PushSendItemToList(packet);//将该音讯压入列表 } } #endregion System.Threading.SendOrPostCallback cucqCallpack; System.Threading.SendOrPostCallback resendCallback; /// <summary> /// 自在线程,检测未发送的数据并发出 /// </summary> void CheckUnConfirmedQueue() { //异步挪用托付 if (cucqCallpack == null) cucqCallpack = (s) => OnPackageSendFailure(s as PackageEventArgs); if (resendCallback == null) resendCallback = (s) => OnPackageResend(s as PackageEventArgs); do { if (SendList.Count > 0) { PacketNetWorkMsg[] array = null; lock (SendList) { array = SendList.ToArray(); } //挨个从新发送并计数 Array.ForEach(array, s => { s.SendTimes++; if (s.SendTimes >= MaxResendTimes) { //发送失利啊 PackageEventArgs e = new PackageEventArgs(); if (SeiClient.NeedPostMessage) { SeiClient.SendSynchronizeMessage(cucqCallpack, e); } else { OnPackageSendFailure(e);//触发发送失利事宜 } SendList.Remove(s); } else { //从新发送 AsyncSendPacket(s); PackageEventArgs e = new PackageEventArgs() { PacketMsg = s }; if (SeiClient.NeedPostMessage) { SeiClient.SendASynchronizeMessage(resendCallback, e); } else { OnPackageResend(e);//触发从新发送事宜 } } }); } Thread.Sleep(CheckQueueTimeInterval); } while (IsInitialized); } static object lockObj = new object(); /// <summary> /// 将数据信息压入列表 /// </summary> /// <param name="item"></param> public void PushSendItemToList(PacketNetWorkMsg item) { SendList.Add(item); } /// <summary> /// 将数据包从列表中移除 /// 收集层不会剖析 /// </summary> /// <param name="packageNo">数据包编号</param> /// <param name="packageIndex">数据包分包索引</param> public void PopSendItemFromList(long packageNo, int packageIndex) { lock (lockObj) { Array.ForEach(SendList.Where(s => s.PackageNo == packageNo && s.PackageIndex == packageIndex).ToArray(), s => SendList.Remove(s)); } } #region 事宜 /// <summary> /// 收集涌现非常,没法猎取当地ip地点 /// </summary> public event EventHandler IPError; protected void OnLocalIpError(EventArgs e) { if (IPError != null) IPError(this, e); } /// <summary> /// 收集涌现非常(如端口没法绑定等,此时没法继承事情) /// </summary> public event EventHandler NetworkError; protected void OnNetworkError(EventArgs e) { if (NetworkError != null) NetworkError(this, e); } /// <summary> /// 当数据包收到时触发 /// </summary> public event EventHandler<PackageEventArgs> PackageReceived; /// <summary> /// 当数据包收到事宜触发时,被挪用 /// </summary> /// <param name="e">包括事宜的参数</param> protected virtual void OnPackageReceived(PackageEventArgs e) { if (PackageReceived != null) PackageReceived(this, e); } /// <summary> /// 数据包发送失利 /// </summary> public event EventHandler<PackageEventArgs> PackageSendFailure; /// <summary> /// 当数据发送失利时挪用 /// </summary> /// <param name="e">包括事宜的参数</param> protected virtual void OnPackageSendFailure(PackageEventArgs e) { if (PackageSendFailure != null) PackageSendFailure(this, e); } /// <summary> /// 数据包未吸收到确认,从新发送 /// </summary> public event EventHandler<PackageEventArgs> PackageResend; /// <summary> /// 触发从新发送事宜 /// </summary> /// <param name="e">包括事宜的参数</param> protected virtual void OnPackageResend(PackageEventArgs e) { if (PackageResend != null) PackageResend(this, e); } #endregion #region IDisposable 成员 /// <summary> /// 封闭客户端并开释资本 /// </summary> public void Dispose() { Close(); } #endregion } }
再然厥后一个 UDP上层的类,用来把收到的包组装兼并成为一个msg
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; using Netframe.Event; using Netframe.Model; using Netframe.Tool; using System.Threading; namespace Netframe.Core { /// <summary> /// 对底层收到数据的剖析 /// </summary> public class MsgTranslator { #region 属性 /// <summary> /// 用来发送和吸收音讯的对象 /// </summary> public UDPThread Client { get; set; } Config _config; //用来检测反复收到的音讯包 Queue<long> ReceivedQueue; #endregion public MsgTranslator(UDPThread udpClient,Config config) { this.Client = udpClient; this._config = config; ReceivedQueue = new Queue<long>(); Client.PackageReceived += PackageReceived; } /// <summary> /// 发送信息实体 /// </summary> /// <param name="msg"></param> public void Send(Msg msg) { //音讯正在发送事宜 OnMessageSending(new MessageEventArgs(msg)); Client.AsyncSendMsg(msg); //音讯已发送事宜 OnMessageSended(new MessageEventArgs(msg)); } static object lockObj = new object(); /// <summary> /// 音讯包吸收到时的事宜 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void PackageReceived(object sender, PackageEventArgs e) { if (!e.IsHandled) { e.IsHandled = true; Msg m = ResolveToMessage(e.Data, e.RemoteIP); if (m == null) return; if (m.Command == Commands.RecvConfirm) { long pno = m.NormalMsg.TryParseToInt(0); int pindex = m.ExtendMessage.TryParseToInt(0); if (pno != 0) this.Client.PopSendItemFromList(pno, pindex); return; } //搜检近来收到的音讯行列内里是不是已包括了这个音讯包,假如是,则抛弃 if (!ReceivedQueue.Contains(m.PackageNo)) { ReceivedQueue.Enqueue(m.PackageNo); if (ReceivedQueue.Count > 100) ReceivedQueue.Dequeue(); OnMessageReceived(new MessageEventArgs(m)); } else OnMessageDroped(new MessageEventArgs(m)); } } public Msg ResolveToMessage(byte[] buffer, IPEndPoint remoteEndPoint) { if (buffer == null || buffer.Length < 0) return null; Msg m = null; if (MessagePacker.Test(buffer)) { PacketNetWorkMsg pack = MessagePacker.Parse(buffer, remoteEndPoint); if (pack == null) return null; if (DetermineConfirm(pack)) { //发送确认标志 Msg cm = Helper.CreateRecivedCheck(remoteEndPoint, pack.PackageNo, pack.PackageIndex, _config); Client.SendMsg(cm); } m = MessagePacker.TryToTranslateMessage(pack); } return m; } /// <summary> /// 检测是不是须要发送复兴包来确认收到 /// </summary> /// <param name="message"></param> /// <returns></returns> static bool DetermineConfirm(PacketNetWorkMsg packet) { return packet.IsRequireReceiveCheck; } static bool DetermineConfirm(Msg message) { return message.IsRequireReceive; } #region 事宜 /// <summary> /// 吸收到音讯包(UDP) /// </summary> public event EventHandler<MessageEventArgs> MessageReceived; SendOrPostCallback messageReceivedCallBack; /// <summary> /// 激发吸收到音讯包事宜 /// </summary> /// <param name="e"></param> protected virtual void OnMessageReceived(MessageEventArgs e) { if (MessageReceived == null) return; if (!SeiClient.NeedPostMessage) { MessageReceived(this, e); } else { if (messageReceivedCallBack == null) messageReceivedCallBack = s => MessageReceived(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageReceivedCallBack, e); } } /// <summary> /// 音讯将要发送事宜 /// </summary> public event EventHandler<MessageEventArgs> MessageSending; SendOrPostCallback messageSendingCallBack; /// <summary> /// 激发音讯将要发送事宜 /// </summary> /// <param name="e"></param> protected virtual void OnMessageSending(MessageEventArgs e) { if (MessageSending == null) return; if (!SeiClient.NeedPostMessage) { MessageSending(this, e); } else { if (messageSendingCallBack == null) messageSendingCallBack = s => MessageSending(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageSendingCallBack, e); } } /// <summary> /// 音讯已发送事宜 /// </summary> public event EventHandler<MessageEventArgs> MessageSended; SendOrPostCallback messageSendedCall; /// <summary> /// 激发音讯已发送事宜 /// </summary> /// <param name="e"></param> protected virtual void OnMessageSended(MessageEventArgs e) { if (MessageSended == null) return; if (!SeiClient.NeedPostMessage) { MessageSended(this, e); } else { if (messageSendedCall == null) messageSendedCall = s => MessageSended(this, s as MessageEventArgs); SeiClient.SendSynchronizeMessage(messageSendedCall, e); } } /// <summary> /// 反复收包然后丢包事宜 /// </summary> public event EventHandler<MessageEventArgs> MessageDroped; /// <summary> /// 激发抛弃Msg事宜 /// </summary> /// <param name="e"></param> protected virtual void OnMessageDroped(MessageEventArgs e) { if (MessageDroped == null) return; MessageDroped(this, e); } #endregion } }
以上就是c#基于事宜模子的UDP通信框架(适用于收集包编解码)的内容,更多相关内容请关注ki4网(www.ki4.cn)!