-
DIOCP开源项目-详解编码器和和如安在传输中参加紧缩和解压功能
添加时间:2013-5-30 点击量:>>>>>>DIOCP评论辩论群:320641073
>>>>>>SVN源码和DEMO:https://code.google.com/p/diocp/
收集带宽有限,对数据进行紧缩再进行传送可以有效的哄骗带宽和进步传输的效力。
DIOCP中对数据进行紧缩传送,须要批改编码器和,先说说这两个器材的的用法和功能。
举个例子:我们要把一台电脑快递回老家给正在上学的小弟应用,那么老家就是办事端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。
在这个过程中,发送一个对象(电脑)用到客户端的编码器,接管对象(电脑)用到办事端的。
在之前编写的DIOCP例子中都应用了JSonStream对象进行传输,这个TJSonStream类,首要有两个项目组的数据,第一项目组包含JSon字符串数据,第二项目组包含Stream流数据。在三层数据保存例子中我们把客户端的恳求放在JSon中。办事端接管数据后经由过程办事端的还原成JSonStream对象。然掉队行逻辑的处理惩罚,办事端回写对象时,同过办事端的编码器对回写的JSonStream进行编码发送,客户端经由过程客户端的接管并还原成JSonStream对象。在办事端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,若是对Stream对象进行紧缩,在做紧缩中调试法度时发明一个70K的数据包进行一下紧缩,数据包可以变成7K了,对文本紧缩结果还是很不错的。
当然我们容许本身定义和谈和编写编码和,我们可以定义本身的TStrStream类或者TXMLStream等等,然后编写响应的编码器和就行了。
下面解析下代码
客户端代码:
var
lvJSonStream, lvRecvObject:TJsonStream;
lvStream:TStream;
lvData:AnsiString;
l, j, x:Integer;
begin
lvJSonStream := TJsonStream.Create;
try
lvJSonStream.JSon := SO();
lvJSonStream.JSon.I[cmdIndex] := 1001; //打开一个SQL脚本,获取数据
lvJSonStream.Json.S[sql] := mmoSQL.Lines.Text;
FClientSocket.sendObject(lvJSonStream);
finally
lvJSonStream.Free;
end;起首是建树一个TJSonStream对象,然后设定信息,因为是产生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,
procedure TD10ClientSocket.sendObject(pvObject:TObject);
begin
if FCoder = nil then raise Exception.Create(没有注册对象编码和(registerCoder)!);
if not Active then Exit;
FCoder.Encode(Self, pvObject);
end;可以看出这里调用注册的编码器,调用Encode函数
客户端编码器的Encode函数如下
procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
TObject);
var
lvJSonStream:TJsonStream;
lvJSonLength:Integer;
lvStreamLength:Integer;
sData, lvTemp:String;
lvStream:TStream;
lvTempBuf:PAnsiChar;
lvBytes, lvTempBytes:TBytes;
l:Integer;
lvBufBytes:array[0..1023] of byte;
begin
if pvObject = nil then exit;
lvJSonStream := TJsonStream(pvObject);
//是否紧缩流
if (lvJSonStream.Stream <> nil) then
begin
if lvJSonStream.Json.O[config.stream.zip] <> nil then
begin
if lvJSonStream.Json.B[config.stream.zip] then
begin
//紧缩流
TZipTools.compressStreamEx(lvJSonStream.Stream);
end;
end else if lvJSonStream.Stream.Size > 0 then
begin
//紧缩流
TZipTools.compressStreamEx(lvJSonStream.Stream);
lvJSonStream.Json.B[config.stream.zip] := true;
end;
end;
sData := lvJSonStream.JSon.AsJSon(True);
lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);
lvJSonLength := Length(lvBytes);
lvStream := lvJSonStream.Stream;
lvJSonLength := TNetworkTools.htonl(lvJSonLength);
if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;
if lvStream <> nil then
begin
lvStreamLength := lvStream.Size;
end else
begin
lvStreamLength := 0;
end;
lvStreamLength := TNetworkTools.htonl(lvStreamLength);
if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;
//json bytes
if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;
if lvStream.Size > 0 then
begin
lvStream.Position := 0;
repeat
l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
until (l = 0);
end;
end;该项目组完成的功能有
1.断定Stream数据是否须要紧缩。
2.发送Json数据长度和Stream数据长度
3.发送Json数据
4.发送Stream数据
申明:
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
这三行代码须要申明下,是为了兼容java,netty做办事端便利解码,当然我们也可以不进行转换。直接发送也是可以的。只要共同办事端就行了。和谈是本身定义的。
接下来是办事端IOCP队列中会收到接管数据的旌旗灯号。
function TIOCPObject.processIOQueued: Integer;
var
BytesTransferred:Cardinal;
lvResultStatus:BOOL;
lvRet:Integer;
lvIOData:POVERLAPPEDEx;
lvDataObject:TObject;
lvClientContext:TIOCPClientContext;
begin
Result := IOCP_RESULT_OK;
//工作者线程会停止到GetQueuedCompletionStatus函数处,直到接管到数据为止
lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
.......
if lvIOData.IO_TYPE = IO_TYPE_Accept then //连接恳求
begin
TIODataMemPool.instance.giveBackIOData(lvIOData);
PostWSARecv(lvClientContext);
end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
begin
//参加到套接字对应的缓存中,处理惩罚逻辑
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
lvIOData.Overlapped.InternalHigh);
TIODataMemPool.instance.giveBackIOData(lvIOData);
//持续送达接管恳求
PostWSARecv(lvClientContext);
end;
.........
end;
//参加到套接字对应的缓存中,处理惩罚逻辑
lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
lvIOData.Overlapped.InternalHigh);//这里会调用测验测验进行解码
procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
var
lvObject:TObject;
begin
FCS.Enter;
try
//参加到套接字对应的缓存
FBuffers.AddBuffer(buf, len);
//调用注册的<进行解码>
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
if lvObject <> nil then
try
try
//解码成功,调用营业逻辑的处理惩罚办法
dataReceived(lvObject);
except
on E:Exception do
begin
TIOCPFileLogger.logErrMessage(截获处理惩罚逻辑异常! + e.Message);
end;
end;
//清理掉这一次分派的内存<若是没有可用的内存块>清理
if FBuffers.validCount = 0 then
begin
FBuffers.clearBuffer;
end;
finally
lvObject.Free;
end;
finally
FCS.Leave;
end;
end;
我们在之前的Demo中应用的是TIOCPJSonStreamDecoder
function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
var
lvJSonLength, lvStreamLength:Integer;
lvData:String;
lvBuffer:array of Char;
lvBufData:PAnsiChar;
lvStream:TMemoryStream;
lvJsonStream:TJsonStream;
lvBytes:TBytes;
lvValidCount:Integer;
begin
Result := nil;
//若是缓存中的数据长度不敷包头长度,解码失败<json字符串长度,流长度>
lvValidCount := inBuf.validCount;
if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
begin
Exit;
end;
//记录读取地位
inBuf.markReaderIndex;
inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
//若是缓存中的数据不敷json的长度和流长度<申明数据还没有收取完毕>解码失败
lvValidCount := inBuf.validCount;
if lvValidCount < (lvJSonLength + lvStreamLength) then
begin
//返回buf的读取地位
inBuf.restoreReaderIndex;
exit;
end else if (lvJSonLength + lvStreamLength) = 0 then
begin
//两个都为0<两个0>客户端可以用来作为主动重连应用
TIOCPFileLogger.logDebugMessage(接管到一次[00]数据!);
Exit;
end;
//解码成功
lvJsonStream := TJsonStream.Create;
Result := lvJsonStream;
//读取json字符串
if lvJSonLength > 0 then
begin
SetLength(lvBytes, lvJSonLength);
ZeroMemory(@lvBytes[0], lvJSonLength);
inBuf.readBuffer(@lvBytes[0], lvJSonLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
lvJsonStream.Json := SO(lvData);
end else
begin
TFileLogger.instance.logMessage(接管到一次JSon为空的一次数据恳求!, IOCP_ALERT_);
end;
//读取流数据
if lvStreamLength > 0 then
begin
GetMem(lvBufData, lvStreamLength);
try
inBuf.readBuffer(lvBufData, lvStreamLength);
lvJsonStream.Stream.Size := 0;
lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);
//解压流
if lvJsonStream.Json.B[config.stream.zip] then
begin
//解压
TZipTools.unCompressStreamEX(lvJsonStream.Stream);
end;
finally
FreeMem(lvBufData, lvStreamLength);
end;
end;
end;//办事端中有三行代码来共同客户端的编码流
lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);/////
办事端首要完成的功能有
0.断定接管到的数据是否可以进行解码,若是不成以退出,解码不成功。
1.接管json长度,流数据长度
2.接管json数据,接管流数据存入JsonStream.json中,
3.按照json中config.stream.zip进行断定流数据是否须要解压.放入JsonStream.stream中
4.解码成功返回JsonStream对象。
解码完成后可以看到
lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
if lvObject <> nil then
try //解码成功,调用营业逻辑的处理惩罚办法
dataReceived(lvObject);
………
解码成功调用dataReceived,进行逻辑的处理惩罚。
总结:
办事端的配套客户端的编码器,办事端的编码器配套客户端的。
无论对感情还是对生活,“只要甜不要苦”都是任性而孩子气的,因为我们也不完美,我们也会伤害人。正因为我们都不完美,也因为生活从不是事事如意,所以对这些“瑕疵”的收纳才让我们对生活、对他人的爱变得日益真实而具体。—— 汪冰《世界再亏欠你,也要敢于拥抱幸福》