[搜片神器]之DHT办事器的代码实现办法

    添加时间:2013-7-26 点击量:

    持续接着第一篇写:应用C#实现DHT磁力搜刮的BT种子后端经管法度+数据库设计(开源)[搜片神器]



    开源地址:https://github.com/h31h31/H31DHTMgr


    法度:H31DHT


     


    看大师对昨天此类文章的爱好没有第一篇高,今天就简单的对支撑的伴侣进行交换.园子里的伴侣授大师以渔,所以这项目组代码就先不放出来.大师更多的参加进来.


    也谁有才能将C++的代码转换成C#的,添加到我们的搜片神器对象里面.


    昨天经由过程向大师介绍DHT的工作道理,信赖大师可能熟悉打听怎么回事,不熟悉打听的伴侣可以持续分享接下来的文章.


     本人鉴戒的代码是C++版本的:transmission里面的DHT代码,大师可以接见网站:http://www.transmissionbt.com/ 


    不过里面的代码景象是LINUX下的,须要本身转换到响应的WIN平台上来.


    有爱好应用C#来完成DHT功能的伴侣可以鉴戒mono-monotorrent,里面的框架代码斗劲多,不如C++的transmission里面就三个文件来得熟悉打听.



    transmission里面只有三个文件就可以实现dht的功能: dht.c dht.h dht-example.c,并且接口很简单,复用性很好。



    下面介绍进入DHT收集首要功能步调

    dht.c dht.h代码分成三项目组:

    1、路由表的插入操纵。
    1)若是节点已经在路由表中,则更新节点,返回。
    2)若是桶没有满,则插入,返回。
    3)若是发明失效节点,调换,返回。
    4)发明可疑节点,则保存新节点到缓存中并且若是该可疑节点没有ping,发出ping_node操纵,返回。
    5)如今,桶已经充斥了好的节点,若是本身的ID没有落在这个桶中,返回。
    6)将桶空间分成两半。跳到步调1)。

    2、KAD长途处理惩罚调用。
    这项目组又分成3种,
    1)ping/pong操纵。
    所有的包的tid都应用pg\0\0
    2)find_node操纵。
    所有的包的tid都应用fn\0\0
    3)get_peers/annouce_peer操纵。
    对同一个HASH的一次递归查询中,tid对峙不变。
    此中只有3)种实现bittorrent的DHT规范里面提到的递归查询操纵,1)和2)仅仅用来保护路由表,并且不保存状况。

    3、按时器处理惩罚:
    为了检测路由表中节点的有效性(按照规范,路由表中应当只保存有效节点),在代码中,在履行krpc操纵时若是发明时对路由表中的节点操纵,那么则保存操纵的开端时候
    pinged_time,经由过程操纵的开端时候来断定操纵是否超时。

    expire_stuff_time
    超不时,会履行下面的操纵:
    1、搜检路由表中失效的节点(按照pinged_time来剖断),并将该节点删除。
    2、搜检用来保存annoounce_peer的节点是否跨越30分钟(这个不筹算深切评论辩论,故不做解析)。
    3、搜检递归查询操纵超时。

    rotate_secrets_time
    按时器。
    用来每隔大约15分阁下就调换token(见DHT规范).

    confirm_nodes_time
    按时器。
    查找长久没有活动的桶,然后经由过程履行一个find_node的krpc操纵来刷新它。

    search_time按时器。
    有可能呈现发出的所有的get_peers操纵,都没有应答,那么search_time按时器碰到这种景象时负责重发所有恳求。(重视:
    get_peers操纵最大未决的krpc恳求数是3)

    用于保持路由表的ping/pong操纵:
    在试图插入节点时,发明桶已经满,而存在可疑节点时会触发ping_node操纵。未响应的节点会有可疑终极变为失效节点,而被调换。

    下面介绍我们是如何进入DHT收集



    1. DHT必须把本身电脑当办事器,别人才干够知道本身在谁,所以须要经由过程UDP绑定端口,参考代码里面支撑IPV6,小我感觉可以过滤掉.WIN平台代码如下:




    2.  1     //初始化socket
      
      2 m_soListen =(int)socket(PF_INET, SOCK_DGRAM, IPPROTO_IP);
      3 if (m_soListen == INVALID_SOCKET) {
      4 m_iErrorNo=WSAGetLastError();
      5 _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
      6 return -1;
      7 }
      8 //初始化办事器地址
      9 SOCKADDR_IN addr;
      10 memset(&addr, 0sizeof(addr));
      11 addr.sin_family = AF_INET;
      12 addr.sin_port = htons(port);
      13 addr.sin_addr.s_addr = htonl(INADDR_ANY);
      14 //绑定端口
      15 if (bind(m_soListen, (SOCKADDR)&addr, sizeof(addr)) == SOCKET_ERROR) {
      16 m_iErrorNo=WSAGetLastError();
      17 _dout(_T(CH31CarMonitorDlg Start Error(%d).\n),m_iErrorNo);
      18 return -2;
      19 }


      UDP端口绑定


    3. DHT须要生成一个本身的20位ID号,当然可以经由过程随机一个数值,然后经由过程SHA1来生成20位的ID号,WIN平台代码如下:




    4. 1 unsigned char p[20];
      
      2 CSHA1 sha1;
      3 sha1.Reset();
      4 sha1.Update((const unsigned char )m_myID.GetBuffer(), m_myID.GetLength());
      5 sha1.Final();
      6 sha1.GetHash(p);


      SHA1生成ID号


    5. 初始化他人办事器的IP信息,如许我们就可以从他们那边查询我们要的信息,鉴戒代码如下:




    6.  1     rc = getaddrinfo(router.utorrent.com6881, &hints1, &info);
      
      2 //rc = getaddrinfo(router.bittorrent.com,6881, &hints1, &info);
      3 //rc = getaddrinfo(dht.transmissionbt.com,6881, &hints1, &info);
      4 if(rc != 0) {
      5 fprintf(stderr, getaddrinfo: %s\n, gai_strerror(rc));
      6 exit(1);
      7 }
      8 infop = info;
      9 while(infop&&m_bDataThread)
      10 {
      11 memcpy(&bootstrap_nodes[num_bootstrap_nodes],infop->ai_addr, infop->ai_addrlen);
      12 infop = infop->ai_next;
      13 num_bootstrap_nodes++;
      14 }
      15 freeaddrinfo(info);


      办事器信息


    7. 如今就可以初始化我们的DHT类了.因为此类应用C写的,大师可以自行封装成C++类应用.




    8. 1     rc = m_dht.dht_init(s, s6, m_myid,NULL);
      
      2 if(rc < 0) {
      3 perror(dht_init);
      4 exit(1);
      5 }


      初始化DHT类


    9. 对办事器进行PING操纵,办事器就会回应PONG操纵,如许就注解办事器活动正常.




    10. 1     forint i = 0; i < num_bootstrap_nodes&&m_bDataThread; i++
      2 {
      3 m_dht.dht_ping_node((struct sockaddr)&bootstrap_nodes[i],sizeof(bootstrap_nodes[i]));
      4 Sleep(m_dht.random() % 1000);
      5 }


      PING办事器


    11. 下面就可以应用搜刮类进行操纵,查询我们要的HASH值的BT种子文件代码.鉴戒代码如下:


      1  if(searching) {
      
      2 if(s >= 0
      3 dht_search(hash, 0, AF_INET, callback, NULL);
      4 if(s6 >= 0
      5 dht_search(hash, 0, AF_INET6, callback, NULL);
      6 searching = 0;
      7 }


      dht_search


    12. 大师可以鉴戒dht-example.c里面接下来的Search函数的操纵,不过我们不是如许来的,我们须要直接向办事器发送Findnode和Get_Peer操纵.


      1                 unsigned char tid[16];
      
      2 m_dht.make_tid(tid, fn0);
      3 m_dht.send_find_node(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,ipRecvPingList[ipListPOS].ID,00);
      4 Sleep(100);
      5 memset(tid,0sizeof(tid));
      6 m_dht.make_tid(tid, gp0);
      7 m_dht.send_get_peers(&ipRecvPingList[ipListPOS].addr,sizeof(sockaddr),tid,4,hashList[0],00);


      发送FINDNODE和GET_PEER操纵


    13. 接下来的工作就是守候别人返回的信息进行解析就可以了,当然DHT类代码已经全部为我们做好的.


       1         FD_ZERO(&readfds);
      
      2 if(m_soListen >= 0
      3 FD_SET(m_soListen, &readfds);
      4 if(s6 >= 0
      5 FD_SET(s6, &readfds);
      6 rc = (m_soListen > s6 ? m_soListen + 1 : s6 + 1, &readfds, NULL, NULL, &tv);
      7 if(rc <0&&m_bDataThread)
      8 {
      9 if(errno != EINTR) {
      10 perror();
      11 Sleep(1000);
      12 }
      13 }
      14
      15 if(!m_bDataThread)
      16 break;
      17
      18 if(rc > 0&&m_bDataThread)
      19 {
      20 len = sizeof(1);
      21 memset(buf,0sizeof(buf));
      22 if(m_soListen >= 0 && FD_ISSET(m_soListen, &readfds))
      23 rc = recv(m_soListen, buf, sizeof(buf) - 10,&1, &len);
      24 else if(s6 >= 0 && FD_ISSET(s6, &readfds))
      25 rc = recv(s6, buf, sizeof(buf) - 10,&1, &len);
      26 else
      27 abort();
      28 }
      29
      30 if(rc > 0&&m_bDataThread)
      31 {
      32 buf[rc] = \0;
      33 rc = m_dht.dht_periodic(buf, rc, &1, len,&tosleep, DHT_callback, this);
      34
      35 }
      36 else
      37 {
      38 rc = m_dht.dht_periodic(NULL, 0, NULL, 0, &tosleep, DHT_callback, this);
      39 }


      守候返回DHT收集信息


    14. 如何解析信息DHT代码已经有了,如何别人的恳求,代码也已经有了,大师可以解析DHT.c就知道是怎么回事.


        1 int CDHT::dht_periodic(const void buf, size_t buflen,const struct sockaddr Addr, int len,time_t tosleep,dht_callback callback, void closure)
      
      2 {
      3 gettimeofday(&nowTime, NULL);
      4
      5 if(buflen > 0
      6 {
      7 int message;
      8 unsigned char tid[16], id[20], info_hash[20], target[20];
      9 unsigned char nodes[256], nodes6[1024], token[128];
      10 int tid_len = 16, token_len = 128;
      11 int nodes_len = 256, nodes6_len = 1024;
      12 unsigned short port;
      13 unsigned char values[2048], values6[2048];
      14 int values_len = 2048, values6_len = 2048;
      15 int want;
      16 unsigned short ttid;
      17
      18 struct sockaddr_in tempip=(struct sockaddr_in )Addr;
      19
      20 if(is_martian(Addr))
      21 goto dontread;
      22
      23 if(node_blacklisted(Addr, len)) {
      24 _dout(Received packet blacklisted node.\n);
      25 goto dontread;
      26 }
      27
      28 if(((char)buf)[buflen] != \0) {
      29 _dout(Unterminated message.\n);
      30 errno = EINVAL;
      31 return -1;
      32 }
      33
      34 message = parse_message((unsigned char )buf, buflen, tid, &tid_len, id, info_hash,target, &port, token, &token_len,nodes, &nodes_len, nodes6, &nodes6_len,values, &values_len, values6, &values6_len,&want);
      35
      36 if(token_len>0
      37 {
      38 int a=0;
      39 }
      40 if(message < 0 || message == ERROR || id_cmp(id, zeroes) == 0
      41 {
      42 _dout(Unparseable message: );
      43 debug_printable((const unsigned char )buf, buflen);
      44 _dout(\n);
      45 goto dontread;
      46 }
      47
      48 if(id_cmp(id, myid) == 0) {
      49 _dout(Received message self.\n);
      50 goto dontread;
      51 }
      52
      53 if(message > REPLY) {
      54 / Rate limit requests. /
      55 if(!token_bucket()) {
      56 _dout(Dropping request due to rate limiting.\n);
      57 goto dontread;
      58 }
      59 }
      60
      61 switch(message)
      62 {
      63 case REPLY:
      64 if(tid_len != 4
      65 {
      66 _dout(Broken node s transaction ids: );
      67 debug_printable((const unsigned char )buf, buflen);
      68 _dout(\n);
      69 / This is really annoying, as it means that we will
      70 time-out all our searches that go through this node.
      71 Kill it. /
      72 blacklist_node(id, Addr, len);
      73 goto dontread;
      74 }
      75 if(tid_match(tid, pn, NULL))
      76 {
      77 _dout(Pong!From IP:%s:[%d] id:[%s]\n,inet_ntoa(tempip->sin_addr),tempip->sin_port,id);
      78 new_node(id, Addr, len, 2);
      79 (callback)(closure, DHT_EVENT_PONG_VALUES,id,(void)Addr, len);
      80 //send_find_node(,len,tid,4,id,0,0);
      81 }
      82 else if(tid_match(tid, fn, NULL) ||tid_match(tid, gp, NULL))
      83 {
      84 int gp = 0;
      85 struct search sr = NULL;
      86 if(tid_match(tid, gp, &ttid))
      87 {
      88 gp = 1;
      89 sr = find_search(ttid, Addr->sa_family);
      90 }
      91 _dout(Nodes found (%d+%d)%s!From IP:%s:[%d]\n, nodes_len/26, nodes6_len/38,gp ? for get_peers : ,inet_ntoa(tempip->sin_addr),tempip->sin_port);
      92 if(nodes_len % 26 != 0 || nodes6_len % 38 != 0
      93 {
      94 _dout(Unexpected length for node info!\n);
      95 blacklist_node(id, Addr, len);
      96 }
      97 //else if(gp && sr == NULL)
      98 //{
      99 // _dout(Unknown search!\n);
      100 // new_node(id, Addr, len, 1);
      101 // }
      102 else
      103 {
      104 int i;
      105 new_node(id, Addr, len, 2);
      106 for(i = 0; i < nodes_len / 26; i++
      107 {
      108 unsigned char ni = nodes + i 26;
      109 struct sockaddr_in sin;
      110 if(id_cmp(ni, myid) == 0
      111 continue;
      112 memset(&sin, 0sizeof(sin));
      113 sin.sin_family = AF_INET;
      114 memcpy(&sin.sin_addr, ni + 204);
      115 memcpy(&sin.sin_port, ni + 242);
      116 new_node(ni, (struct sockaddr)&sin, sizeof(sin), 0);
      117 (callback)(closure, DHT_EVENT_FINDNODE_VALUES, ni,(void)&sin, sizeof(sin));
      118 if(sr && sr->af == AF_INET)
      119 {
      120 _search_node(ni,(struct sockaddr)&sin,sizeof(sin),sr, 0, NULL, 0);
      121 }
      122 //send_get_peers((struct sockaddr)&sin,sizeof(sockaddr),tid,4,ni,0,0);
      123 }
      124 for(i = 0; i < nodes6_len / 38; i++
      125 {
      126 unsigned char ni = nodes6 + i 38;
      127 struct sockaddr_in6 sinip6;
      128 if(id_cmp(ni, myid) == 0
      129 continue;
      130 memset(&sinip6, 0sizeof(sinip6));
      131 sinip6.sin6_family = AF_INET6;
      132 memcpy(&sinip6.sin6_addr, ni + 2016);
      133 memcpy(&sinip6.sin6_port, ni + 362);
      134 new_node(ni, (struct sockaddr)&sinip6, sizeof(sinip6), 0);
      135 if(sr && sr->af == AF_INET6)
      136 {
      137 _search_node(ni,(struct sockaddr)&sinip6,sizeof(sinip6),sr, 0, NULL, 0);
      138 }
      139 }
      140 if(sr)
      141 / Since we received a reply, the number of requests in flight has decreased. Lets push another request. /
      142 search_send_get_peers(sr, NULL);
      143 }
      144 //if(sr)
      145 {
      146 // _search_node(id, Addr, len, sr,1, token, token_len);
      147 if(values_len > 0 || values6_len > 0
      148 {
      149 _dout(Got values (%d+%d)!\n, values_len / 6, values6_len / 18);
      150 if(callback) {
      151 if(values_len > 0
      152 (callback)(closure, DHT_EVENT_VALUES, sr->id,(void)values, values_len);
      153
      154 if(values6_len > 0
      155 (callback)(closure, DHT_EVENT_VALUES6, sr->id,(void)values6, values6_len);
      156 }
      157 }
      158 }
      159 }
      160 else if(tid_match(tid, ap, &ttid))
      161 {
      162 struct search sr;
      163 _dout(Got reply to announce_peer.\n);
      164 sr = find_search(ttid, Addr->sa_family);
      165 if(!sr) {
      166 _dout(Unknown search!\n);
      167 new_node(id, Addr, len, 1);
      168 }
      169 else
      170 {
      171 int i;
      172 new_node(id, Addr, len, 2);
      173 for(i = 0; i < sr->numnodes; i++
      174 {
      175 if(id_cmp(sr->nodes[i].id, id) == 0
      176 {
      177 sr->nodes[i].request_time = 0;
      178 sr->nodes[i].reply_time = nowTime.tv_sec;
      179 sr->nodes[i].acked = 1;
      180 sr->nodes[i].pinged = 0;
      181 break;
      182 }
      183 }
      184 / See comment for gp above. /
      185 search_send_get_peers(sr, NULL);
      186 }
      187 }
      188 else
      189 {
      190 _dout(Unexpected reply: );
      191 debug_printable((const unsigned char )buf, buflen);
      192 _dout(\n);
      193 }
      194 break;
      195 case PING:
      196 _dout(Ping (%d)!From IP:%s:%d\n, tid_len,inet_ntoa(tempip->sin_addr),tempip->sin_port);
      197 new_node(id, Addr, len, 1);
      198 _dout(Sending pong.\n);
      199 send_pong(Addr, len, tid, tid_len);
      200 break;
      201 case FIND_NODE:
      202 _dout(Find node!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
      203 new_node(id, Addr, len, 1);
      204 _dout(Sending closest nodes (%d).\n, want);
      205 send_closest_nodes(Addr, len,tid, tid_len, target, want,0, NULL, NULL, 0);
      206 break;
      207 case GET_PEERS:
      208 _dout(Get_peers!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
      209 new_node(id, Addr, len, 1);
      210 if(id_cmp(info_hash, zeroes) == 0
      211 {
      212 _dout(Eek! Got get_peers with no info_hash.\n);
      213 send_error(Addr, len, tid, tid_len,203Get_peers with no info_hash);
      214 break;
      215 }
      216 else
      217 {
      218 struct storage st = find_storage(info_hash);
      219 unsigned char token[TOKEN_SIZE];
      220 make_token(Addr, 0, token);
      221 if(st && st->numpeers > 0
      222 {
      223 _dout(Sending found%s peers.\n,Addr->sa_family == AF_INET6 ? IPv6 : );
      224 send_closest_nodes(Addr, len,tid, tid_len,info_hash, want,Addr->sa_family, st,token, TOKEN_SIZE);
      225 }
      226 else
      227 {
      228 _dout(Sending nodes for get_peers.\n);
      229 send_closest_nodes(Addr, len,tid, tid_len, info_hash, want,0, NULL, token, TOKEN_SIZE);
      230 }
      231 if(callback)
      232 {
      233 (callback)(closure, DHT_EVENT_GET_PEER_VALUES, info_hash,(void )Addr, len);
      234 }
      235 }
      236
      237 break;
      238 case ANNOUNCE_PEER:
      239 _dout(Announce peer!From IP:%s:%d\n,inet_ntoa(tempip->sin_addr),tempip->sin_port);
      240 new_node(id, Addr, len, 1);
      241
      242 if(id_cmp(info_hash, zeroes) == 0
      243 {
      244 _dout(Announce_peer with no info_hash.\n);
      245 send_error(Addr, len, tid, tid_len,203Announce_peer with no info_hash);
      246 break;
      247 }
      248 if(!token_match(token, token_len, Addr)) {
      249 _dout(Incorrect token for announce_peer.\n);
      250 send_error(Addr, len, tid, tid_len,203Announce_peer with wrong token);
      251 break;
      252 }
      253 if(port == 0) {
      254 _dout(Announce_peer with forbidden port %d.\n, port);
      255 send_error(Addr, len, tid, tid_len,203Announce_peer with forbidden port number);
      256 break;
      257 }
      258 if(callback)
      259 {
      260 (callback)(closure, DHT_EVENT_ANNOUNCE_PEER_VALUES, info_hash,(void )Addr, len);
      261 }
      262 storage_store(info_hash, Addr, port);
      263 / Note that if storage_store failed, we lie to the requestor.
      264 This is to prevent them backtracking, and hence polluting the DHT. /
      265 _dout(Sending peer announced.\n);
      266 send_peer_announced(Addr, len, tid, tid_len);
      267 }
      268 }
      269
      270 dontread:
      271 if(nowTime.tv_sec >= rotate_secrets_time)
      272 rotate_secrets();
      273
      274 if(nowTime.tv_sec >= expire_stuff_time) {
      275 expire_buckets(buckets);
      276 expire_buckets(buckets6);
      277 expire_storage();
      278 expire_searches();
      279 }
      280
      281 if(search_time > 0 && nowTime.tv_sec >= search_time) {
      282 struct search sr;
      283 sr = searches;
      284 while(sr) {
      285 if(!sr->done && sr->step_time + 5 <= nowTime.tv_sec)
      286 {
      287 search_step(sr, callback, closure);
      288 }
      289 sr = sr->next;
      290 }
      291
      292 search_time = 0;
      293
      294 sr = searches;
      295 while(sr) {
      296 if(!sr->done) {
      297 time_t tm = sr->step_time + 15 + random() % 10;
      298 if(search_time == 0 || search_time > tm)
      299 search_time = tm;
      300 }
      301 sr = sr->next;
      302 }
      303 }
      304
      305 if(nowTime.tv_sec >= confirm_nodes_time) {
      306 int soon = 0;
      307
      308 soon |= bucket_maintenance(AF_INET);
      309 soon |= bucket_maintenance(AF_INET6);
      310
      311 if(!soon)
      312 {
      313 if(mybucket_grow_time >= nowTime.tv_sec - 150
      314 soon |= neighbourhood_maintenance(AF_INET);
      315 if(mybucket6_grow_time >= nowTime.tv_sec - 150
      316 soon |= neighbourhood_maintenance(AF_INET6);
      317 }
      318
      319 / In order to maintain all buckets age within 600 seconds, worst
      320 case is roughly 27 seconds, assuming the table is 22 bits deep.
      321 We want to keep a margin for neighborhood maintenance, so keep
      322 this within 25 seconds. /
      323 if(soon)
      324 confirm_nodes_time = nowTime.tv_sec + 5 + random() % 20;
      325 else
      326 confirm_nodes_time = nowTime.tv_sec + 60 + random() % 120;
      327 }
      328
      329 if(confirm_nodes_time > nowTime.tv_sec)
      330 tosleep = confirm_nodes_time - nowTime.tv_sec;
      331 else
      332 tosleep = 0;
      333
      334 if(search_time > 0) {
      335 if(search_time <= nowTime.tv_sec)
      336 tosleep = 0;
      337 else if(tosleep > search_time - nowTime.tv_sec)
      338 tosleep = search_time - nowTime.tv_sec;
      339 }
      340
      341 return 1;
      342 }


      dht_periodic


    15. 至于节点如何进行桶操纵,调试过一次代码就会熟悉打听对应的道理,当然上方也介绍了如何进行桶割据的道理.

    16. 接下来就是将上方的操纵步调进行轮回.


    经由过程上方的流程,懂得DHT的工作办法后,如何增长更多的返回信息就须要下一篇的技巧性题目的介绍,大师一路批改我们的开源法度.


    大师有不熟悉打听的处所,可以一路评论辩论.


    别的求办事器进行法度测试,须要有固定IP,10G的WIN办事器空间,h31h31@163.com,感谢.



    大师的推荐才是下一篇介绍的动力...

    容易发怒的意思就是: 别人做了蠢事, 然后我们代替他们, 表现出笨蛋的样子。—— 蔡康永
    分享到: