Coverage for dht_crawler/monitor.py : 70%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
#!/usr/bin/env python3 Created by Martin Vasko 3BIT, Brno, Faculty of Information Technology.
Brief information: This is implementation of monitoring BiTtorrent with Kademlia DHT. Whole monitor class which will be presented next is going to be supported by torrentDHT implementation, which was implemented by Martin Vasko. ''' # from handshake import TorrentHandshake random_infohash, decode_krpc, get_neighbor, \ get_myip
''' kill sender reciever and TorrentDHT socket when there is continuous bootstrap. ''' except ProcessLookupError: pass
# take some time to kill reciever thread return
except ProcessLookupError: pass
''' Initialize empty socket to send announce peer messages ''' sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.bind((get_myip(), port)) return sock
""" Parse it from class methods to monitor class where we want to exchange this information. Start monitoring and initialize all necessary things at first """
""" Construct a new 'Foo' object.
:param name: The name of foo :param age: The ageof foo :return: returns nothing """
# file which should be parsed # magnet-link given
# print output in db format required for tarzan server self.max_peers = arguments.max_peers # infohash of some file on internet, # if not specified randomly generate infohash self.torrent.change_info(arguments.hash) # How long should wait after queue is empty self.timeout = arguments.counter # Test of connection ! # Duration of crawl self.duration = arguments.duration # local variables for class
arguments.country)
return "File: {},\nMagnet-link: {},\nDuration of crawl: {},\ \nCounter: {}".format(self.file, self.magnet, self.duration, self.timeout)
''' Print only when -v parameter is present ''' print(msg)
##################### # START OF CRAWLING # ##################### ''' Query all found peers for connectivity. When respond, then connection is still there and peer is valid, else peer is deleted from dictionary. ''' except KeyboardInterrupt: pass
# take all of incomming peers and check them except KeyboardInterrupt: continue # outdated peer peers_outdated.append(value[1])
# send announce peer to 'start' session # query queried node del self.peers_pool[value[1] + ":" + str(value[2])]
''' Inserts nodes to queue by given queue type. ''' # Do not remove already asked continue
''' process packet and update all necesseties like info_pool, peers_pool. When decoding failed or not ready socket for recieving return back to listener thread. ''' except OSError: return last_time else: return last_time
# update dictionary by given pool value
# Resolution without cleaning queue
''' start listener thread. Recieve query packet and decode its body. There is shared queue between listener and sender thread. '''
# socket is closed no value returned except (OSError, ValueError): continue
# if self.torrent.nodes.qsize() > self.max_peers * 0.3: # for _ in range(1, 20): # last_time = self.process_and_update(ready, last_time) # else:
''' start sender thread. There is test parameter to test connection for unit testing. Otherwise continuous connection is performed till we dont get all nodes from k-zone or duration is exhausted. ''' # if test is given perform single message send node = self.torrent.nodes.get(True) hexdig_self = int(self.infohash, 16) hexdig_target = int(node[0], 16) self.torrent.query_get_peers(node, self.infohash, self.sock) return 1
# we are close, we should send more packets but it is slow # on recieving thread because of decoding and composing # dictionaries # Speed is less than 2000 bps else: except OSError: return 9
''' start thread timer for duration, when exhausted kill threads and exit program. ''' # sleep for shorter time # Clear all resources
''' Create all threads, duration to count how long program is executed. When Ctrl+C is pressed kill all threads
Parameters ---------- torrent : infohash 20 bytes long infohash which should be used as part of monitoring. test : bool This paramter is for testing connection.
'''
args=()) args=(send_thread, listen_thread)) break self.lock.acquire() self.output.get_geolocations() self.lock.release() self.output.get_geolocations() self.output.print_chosen_output()
''' Print info for current state of crawling. ''' %.2f%%\t\t[Queue]:%i\t\t" % (len(self.info_pool), len(self.peers_pool), self.respondent*100.0 / max(1, len(self.info_pool)), self.torrent.nodes.qsize()))
''' After climbing to another teritory, do not access it, return adjusted list of nodes. ''' iplist = self.output.translate_node(self.info_pool) for ip_addr in iplist: num = 0 for node_ip in nodes: if node_ip[1] == ip_addr[0]: nodes.remove(nodes[num]) num = num + 1 return nodes
''' get name of torrent from torrent file
Parameters ---------- value : dict This should contain encoded name of torrent.
Returns ------- self.torrent_name Parsed torrent name from value dictionary. ''' for name, name_val in value.items(): name = name.decode('utf-8') if name == "name": self.torrent_name = name_val.decode("utf-8")
''' parse torrent file to get infohash and announce list of nodes for better bootstrap. ''' for file in self.file: file_r = open(file, "rb") content = file_r.read() info_hash = None nodes = [] self.vprint("Torrent file content") if not decode_krpc(content): raise TypeError("WrongFileType") if not isinstance(decode_krpc(content), dict): raise TypeError("WrongFileType") for key, value in decode_krpc(content).items(): key = key.decode('utf-8') if key == "creation date": self.vprint("Creation of file: ") self.vprint(datetime.datetime .fromtimestamp(value) .strftime("%Y-%m-%d %H:%M:%S")) if key == "info": self.get_torrent_name(value) info_hash = hashlib.sha1(bencode(value)).hexdigest() # set torrent target self.infohash = get_neighbor(info_hash, self.infohash) self.torrent.infohash_list[1].append(info_hash)
if key == "nodes": pass if key == "announce-list": nodes = value self.torrent.change_bootstrap(info_hash, nodes, self.queue_type) file_r.close()
''' parse magnet link '''
# match last `:` and its content # set torrent target
######################################## # This should be used in main function # ########################################
''' creates monitor class object. TorrentDHT creates udp socket which is binded on `bind_port`. Monitor needs this `dht_socket` and command line arguments to be created successfully. Then change of hash and parsing can change resolution of crawl. When they are not specified then global bootstrap nodes are used instead.
Parameters ---------- verbosity : bool Indicate verbose output
Returns ------- object Monitor object with initialized DHT socket and parsed arguments. ''' # This is variant with verbose output to track some lib imported staff torrent_arguments = TorrentArguments() dht_socket = TorrentDHT(torrent_arguments, bind_port=args.bind_port, verbosity=verbosity) else:
# Monitor class needs dht_socket, which is imported from TorrentDHT.py # This variant is only to test connection to BOOTSTRAP_NODES result = monitor.start_sender(test=True) exit(result) |