Coverage for monitor.py : 71%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
#!/usr/bin/env python3 Created by Martin Vasko 3BIT, Brno, Faculty of Information Technology.
Brief information: This is implementation of monitoring BiTtorrent with Kademlia DHT. Whole monitor class which will be presented next is going to be supported by torrentDHT implementation, which was implemented by Martin Vasko. ''' random_infohash, decode_krpc, get_neighbor except ImportError: from src.torrent_dht import TorrentDHT, TorrentArguments,\ random_infohash, decode_krpc, get_neighbor from src.process_output import ProcessOutput
''' This part is about parsing input arguments. Using argparse for standardized use '''
help='Specifies info_hash of torrent file which can \ be get from magnet-link.') help='Specify port to which should be connection \ binded.') action='store', help='Store country name to \ converge only in this country and do not \ bootstrap away from it.') help='Gets torrent file, which decompose and start \ monitoring from DHT nodes in this file or \ from tracker (swarm).') help='Given magnet-link or file with magnet-link \ would be parsed and its output filled to proper \ class variables and starts crawling from magnet-link \ (Some DHT node).') action='store', help='Set for how long should program \ monitor continuously.') help='Store ip addresses with coresponding \ dictionary in format country:city -> ip.addr.') help='Change shared queue between processes \ from default lifo to fifo') # settings for dht
help='Counter specifies how long to wait after \ a queue with nodes is empty.') action='store', help='Specifies maximum number of \ peers in queue. This is set by default \ on value of 200.') help='Tests connection to remote(local) server.')
''' Parse arguments from argParse class '''
''' Parse it from class methods to monitor class where we want to exchange this information. Start monitoring and initialize all necessary things at first '''
# file which should be parsed # magnet-link given
self.max_peers = arguments.max_peers # infohash of some file on internet, # if not specified randomly generate infohash self.torrent.change_info(arguments.hash) # How long should wait after queue is empty self.timeout = arguments.counter # Test of connection ! # Duration of crawl # local variables for class arguments.country)
return "File: {},\nMagnet-link: {},\nDuration of crawl: {},\ \nCounter: {}".format(self.file, self.magnet, self.duration, self.timeout)
''' Print only when -v parameter is present ''' print(msg)
##################### # START OF CRAWLING # #####################
''' send handshake message for bitTorrent connection ''' def _int_to_bytes(data, bytes_len): return data.to_bytes(bytes_len, "big") message = bytes() value = 4 ver = 1 message += _int_to_bytes(value << 4 | ver, 1) message += _int_to_bytes(0, 1) message += _int_to_bytes(randrange(0xffff), 2) message += _int_to_bytes(int(time.time()), 4) message += _int_to_bytes(0, 4) message += _int_to_bytes(0xf000, 4) message += _int_to_bytes(randrange(0xffff), 2) message += _int_to_bytes(0, 2)
bt_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # just to establish connection to get result of this, but handshake # should be good to get positive or negative acknowledgment # TODO hole punch, those messages are mostly filtered because of firewall bt_socket.sendto(message, (peer[1], peer[2])) try: ready = select.select([bt_socket], [], [], 0.1) except (OSError, ValueError): bt_socket.close() return False if ready[0]: msg = bt_socket.recvfrom(1024) else: bt_socket.close() return False if msg: bt_socket.close() return True bt_socket.close() return False
''' Query all found peers for connectivity. When respond, then connection is still there and peer is valid, else peer is deleted from dictionary. ''' is_recieved = self.send_handshake(value[1]) # outdated peer if not is_recieved: peers_outdated.append(value[1])
''' Inserts nodes to queue by given queue type. ''' # for already_asked in self.addr_pool: # node_key = (node[1], node[2]) # already asked, do not add to queue # if already_asked[0] is node_key[0]: # print(already_asked, node_key) # break
''' start listener thread. Recieve query packet and decode its body. There is shared queue between listener and sender thread. '''
# socket is closed no value returned
else: self.no_recieve = self.no_recieve + 0.1 continue continue
# update dictionary by given pool value # when 3/4 of queue is not resolved, do not resolve next # Resolution without cleaning queue # for key in nodes.keys():
''' start sender thread. There is test parameter to test connection for unit testing. Otherwise continuous connection is performed till we dont get all nodes from k-zone or duration is exhausted. ''' # if test is given perform single message send node = self.torrent.nodes.get(True) hexdig_self = int(self.infohash, 16) hexdig_target = int(node[0], 16) self.torrent.query_get_peers(node, self.infohash) self.torrent.rejoin.cancel() return
# print("Closer than expected") # print("qsize: {} max_peers: {}" # .format(self.torrent.nodes.qsize(), self.max_peers)) except OSError: return 9 # Speed is less than 2000 bps for i in range(1, 10): try: self.torrent.query_get_peers(node, self.infohash) except OSError: return 9 node = self.torrent.nodes.get(True) else: except OSError: return 9
''' start thread timer for duration, when exhausted kill threads and exit program. ''' # sleep for shorter time # Clear all resources
''' kill sender reciever and TorrentDHT socket when there is continuous bootstrap. ''' except ProcessLookupError: pass except ProcessLookupError: pass
''' Create all threads, duration to count how long program is executed. When Ctrl+C is pressed kill all threads '''
args=(send_thread, listen_thread))
self.kill_sender_reciever(send_thread, listen_thread) self.lock.acquire() self.output.get_geolocations() self.lock.release() else: # self.info() # self.output.print_geolocations()
''' Print info for current state of crawling. ''' %.2f%%\t\t[Queue]:%i\t\t" % (len(self.info_pool), len(self.peers_pool), self.n_nodes, self.tnspeed, self.respondent*100.0 / max(1, len(self.info_pool)), self.torrent.nodes.qsize()))
''' After climbing to another teritory, do not access it, return adjusted list of nodes. ''' iplist = self.output.translate_node(self.info_pool) for ip_addr in iplist: num = 0 for node_ip in nodes: if node_ip[1] == ip_addr[0]: nodes.remove(nodes[num]) num = num + 1 return nodes
''' parse torrent file to get infohash and announce list of nodes for better bootstrap. ''' .fromtimestamp(value) .strftime("%Y-%m-%d %H:%M:%S")) # set torrent target
pass
''' parse magnet link ''' for magnet in self.magnet: file_r = open(magnet, "rb") content = file_r.read() info_hash = re.search(r"urn:.*&(xl|dn)", content.decode('utf-8')) # match last `:` and its content info_hash = re.search(r"(?:.(?!:))+$", info_hash.group(0)) self.infohash = get_neighbor(info_hash, self.infohash) # set torrent target self.torrent.target_pool.append(info_hash.group(0)[1:-3])
######################################## # This should be used in main function # ########################################
''' creates monitor class object. TorrentDHT creates udp socket which is binded on `bind_port`. Monitor needs this `dht_socket` and command line arguments to be created successfully. Then change of hash and parsing can change resolution of crawl. When they are not specified then global bootstrap nodes are used instead. ''' # This is variant with verbose output to track some lib imported staff torrent_arguments = TorrentArguments() dht_socket = TorrentDHT(torrent_arguments, bind_port=args.bind_port, verbosity=verbosity) else:
# Monitor class needs dht_socket, which is imported from TorrentDHT.py # This variant is only to test connection to BOOTSTRAP_NODES result = monitor.start_sender(test=True) exit(result) |