DIOCP开源项目-详解编码器和和如安在传输中参加紧缩和解压功能

    添加时间:2013-5-30 点击量:

    >>>>>>DIOCP评论辩论群:320641073


    >>>>>>SVN源码和DEMO:https://code.google.com/p/diocp/


    收集带宽有限,对数据进行紧缩再进行传送可以有效的哄骗带宽和进步传输的效力。


    DIOCP中对数据进行紧缩传送,须要批改编码器和,先说说这两个器材的的用法和功能。


          


              举个例子:我们要把一台电脑快递回老家给正在上学的小弟应用,那么老家就是办事端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。


    在这个过程中,发送一个对象(电脑)用到客户端的编码器,接管对象(电脑)用到办事端的。


             


              在之前编写的DIOCP例子中都应用了JSonStream对象进行传输,这个TJSonStream类,首要有两个项目组的数据,第一项目组包含JSon字符串数据,第二项目组包含Stream流数据。在三层数据保存例子中我们把客户端的恳求放在JSon中。办事端接管数据后经由过程办事端的还原成JSonStream对象。然掉队行逻辑的处理惩罚,办事端回写对象时,同过办事端的编码器对回写的JSonStream进行编码发送,客户端经由过程客户端的接管并还原成JSonStream对象。在办事端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,若是对Stream对象进行紧缩,在做紧缩中调试法度时发明一个70K的数据包进行一下紧缩,数据包可以变成7K了,对文本紧缩结果还是很不错的。


            当然我们容许本身定义和谈和编写编码和,我们可以定义本身的TStrStream类或者TXMLStream等等,然后编写响应的编码器和就行了。


            下面解析下代码


    客户端代码:



    var
    
    lvJSonStream, lvRecvObject:TJsonStream;
    lvStream:TStream;
    lvData:AnsiString;
    l, j, x:Integer;
    begin
    lvJSonStream :
    = TJsonStream.Create;
    try
    lvJSonStream.JSon :
    = SO();
    lvJSonStream.JSon.I[
    cmdIndex] := 1001; //打开一个SQL脚本,获取数据
    lvJSonStream.Json.S[
    sql] := mmoSQL.Lines.Text;

    FClientSocket.sendObject(lvJSonStream);
    finally
    lvJSonStream.Free;
    end;


    起首是建树一个TJSonStream对象,然后设定信息,因为是产生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,



    procedure TD10ClientSocket.sendObject(pvObject:TObject);
    
    begin
    if FCoder = nil then raise Exception.Create(没有注册对象编码和(registerCoder)!);

    if not Active then Exit;

    FCoder.Encode(Self, pvObject);
    end;


    可以看出这里调用注册的编码器,调用Encode函数



    客户端编码器的Encode函数如下



    procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
    
    TObject);
    var
    lvJSonStream:TJsonStream;
    lvJSonLength:Integer;
    lvStreamLength:Integer;
    sData, lvTemp:String;
    lvStream:TStream;
    lvTempBuf:PAnsiChar;

    lvBytes, lvTempBytes:TBytes;

    l:Integer;
    lvBufBytes:
    array[0..1023] of byte;
    begin
    if pvObject = nil then exit;
    lvJSonStream :
    = TJsonStream(pvObject);

    //是否紧缩流
    if (lvJSonStream.Stream <> nilthen
    begin
    if lvJSonStream.Json.O[config.stream.zip] <> nil then
    begin
    if lvJSonStream.Json.B[config.stream.zip] then
    begin
    //紧缩流
    TZipTools.compressStreamEx(lvJSonStream.Stream);
    end;
    end else if lvJSonStream.Stream.Size > 0 then
    begin
    //紧缩流
    TZipTools.compressStreamEx(lvJSonStream.Stream);
    lvJSonStream.Json.B[
    config.stream.zip] := true;
    end;
    end;

    sData :
    = lvJSonStream.JSon.AsJSon(True);

    lvBytes :
    = TNetworkTools.ansiString2Utf8Bytes(sData);

    lvJSonLength :
    = Length(lvBytes);
    lvStream :
    = lvJSonStream.Stream;

    lvJSonLength :
    = TNetworkTools.htonl(lvJSonLength);

    if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;


    if lvStream <> nil then
    begin
    lvStreamLength :
    = lvStream.Size;
    end else
    begin
    lvStreamLength :
    = 0;
    end;

    lvStreamLength :
    = TNetworkTools.htonl(lvStreamLength);
    if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;

    //json bytes
    if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;

    if lvStream.Size > 0 then
    begin
    lvStream.Position :
    = 0;
    repeat
    l :
    = lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
    if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
    until (l = 0);
    end;
    end;


    该项目组完成的功能有


    1.断定Stream数据是否须要紧缩。


    2.发送Json数据长度和Stream数据长度


    3.发送Json数据


    4.发送Stream数据



    申明:


    lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
    lvStreamLength := TNetworkTools.ntohl(lvStreamLength);


    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);


    这三行代码须要申明下,是为了兼容java,netty做办事端便利解码,当然我们也可以不进行转换。直接发送也是可以的。只要共同办事端就行了。和谈是本身定义的。



    接下来是办事端IOCP队列中会收到接管数据的旌旗灯号。



    function TIOCPObject.processIOQueued: Integer;
    
    var
    BytesTransferred:Cardinal;
    lvResultStatus:BOOL;
    lvRet:Integer;
    lvIOData:POVERLAPPEDEx;

    lvDataObject:TObject;

    lvClientContext:TIOCPClientContext;
    begin
    Result :
    = IOCP_RESULT_OK;

    //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接管到数据为止
    lvResultStatus :
    = GetQueuedCompletionStatus(FIOCoreHandle,

    .......
    if lvIOData.IO_TYPE = IO_TYPE_Accept then //连接恳求
    begin
    TIODataMemPool.instance.giveBackIOData(lvIOData);
    PostWSARecv(lvClientContext);
    end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
    begin
    //参加到套接字对应的缓存中,处理惩罚逻辑
    lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
    lvIOData.Overlapped.InternalHigh);

    TIODataMemPool.instance.giveBackIOData(lvIOData);

    //持续送达接管恳求
    PostWSARecv(lvClientContext);
    end;
    .........
    end;



    //参加到套接字对应的缓存中,处理惩罚逻辑
    lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
      lvIOData.Overlapped.InternalHigh);


    //这里会调用测验测验进行解码



    procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
    
    var
    lvObject:TObject;
    begin
    FCS.Enter;
    try
    //参加到套接字对应的缓存
    FBuffers.AddBuffer(buf, len);

    //调用注册的<进行解码>
    lvObject :
    = TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
    if lvObject <> nil then
    try
    try
    //解码成功,调用营业逻辑的处理惩罚办法
    dataReceived(lvObject);
    except
    on E:Exception
    do
    begin
    TIOCPFileLogger.logErrMessage(
    截获处理惩罚逻辑异常! + e.Message);
    end;
    end;
    //清理掉这一次分派的内存<若是没有可用的内存块>清理
    if FBuffers.validCount = 0 then
    begin
    FBuffers.clearBuffer;
    end;
    finally
    lvObject.Free;
    end;
    finally
    FCS.Leave;
    end;
    end;



    我们在之前的Demo中应用的是TIOCPJSonStreamDecoder




    function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
    
    var
    lvJSonLength, lvStreamLength:Integer;
    lvData:String;
    lvBuffer:
    array of Char;
    lvBufData:PAnsiChar;
    lvStream:TMemoryStream;
    lvJsonStream:TJsonStream;
    lvBytes:TBytes;
    lvValidCount:Integer;
    begin
    Result :
    = nil;

    //若是缓存中的数据长度不敷包头长度,解码失败<json字符串长度,流长度>
    lvValidCount :
    = inBuf.validCount;
    if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
    begin
    Exit;
    end;

    //记录读取地位
    inBuf.markReaderIndex;
    inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
    inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));

    lvJSonLength :
    = TNetworkTools.ntohl(lvJSonLength);
    lvStreamLength :
    = TNetworkTools.ntohl(lvStreamLength);

    //若是缓存中的数据不敷json的长度和流长度<申明数据还没有收取完毕>解码失败
    lvValidCount :
    = inBuf.validCount;
    if lvValidCount < (lvJSonLength + lvStreamLength) then
    begin
    //返回buf的读取地位
    inBuf.restoreReaderIndex;
    exit;
    end else if (lvJSonLength + lvStreamLength) = 0 then
    begin
    //两个都为0<两个0>客户端可以用来作为主动重连应用
    TIOCPFileLogger.logDebugMessage(
    接管到一次[00]数据!);
    Exit;
    end;



    //解码成功
    lvJsonStream :
    = TJsonStream.Create;
    Result :
    = lvJsonStream;

    //读取json字符串
    if lvJSonLength > 0 then
    begin
    SetLength(lvBytes, lvJSonLength);
    ZeroMemory(@lvBytes[
    0], lvJSonLength);
    inBuf.readBuffer(@lvBytes[
    0], lvJSonLength);

    lvData :
    = TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

    lvJsonStream.Json :
    = SO(lvData);
    end else
    begin
    TFileLogger.instance.logMessage(
    接管到一次JSon为空的一次数据恳求!IOCP_ALERT_);
    end;


    //读取流数据
    if lvStreamLength > 0 then
    begin
    GetMem(lvBufData, lvStreamLength);
    try
    inBuf.readBuffer(lvBufData, lvStreamLength);
    lvJsonStream.Stream.Size :
    = 0;
    lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);

    //解压流
    if lvJsonStream.Json.B[config.stream.zip] then
    begin
    //解压
    TZipTools.unCompressStreamEX(lvJsonStream.Stream);
    end;
    finally
    FreeMem(lvBufData, lvStreamLength);
    end;
    end;
    end;


    //办事端中有三行代码来共同客户端的编码流



    lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
    lvStreamLength := TNetworkTools.ntohl(lvStreamLength);   
    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);


    /////


    办事端首要完成的功能有


    0.断定接管到的数据是否可以进行解码,若是不成以退出,解码不成功。


    1.接管json长度,流数据长度


    2.接管json数据,接管流数据存入JsonStream.json中,


    3.按照json中config.stream.zip进行断定流数据是否须要解压.放入JsonStream.stream中
    4.解码成功返回JsonStream对象。



    解码完成后可以看到


    lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);


    if lvObject <> nil then


    try //解码成功,调用营业逻辑的处理惩罚办法


        dataReceived(lvObject);


    ………


    解码成功调用dataReceived,进行逻辑的处理惩罚。



    总结:


       办事端的配套客户端的编码器,办事端的编码器配套客户端的。

    无论对感情还是对生活,“只要甜不要苦”都是任性而孩子气的,因为我们也不完美,我们也会伤害人。正因为我们都不完美,也因为生活从不是事事如意,所以对这些“瑕疵”的收纳才让我们对生活、对他人的爱变得日益真实而具体。—— 汪冰《世界再亏欠你,也要敢于拥抱幸福》
    分享到: