monitor.py

#
'''
Created by Martin Vasko
3BIT, Brno, Faculty of Information Technology.

Brief information:
This is implementation of monitoring BiTtorrent with Kademlia DHT.
Whole monitor class which will be presented next is going to be supported by
torrentDHT implementation, which was implemented by Martin Vasko.
'''
import signal
import time
import datetime
import hashlib
import select
import re
import socket
from threading import Thread, Semaphore
from bencoder import bencode
#

from handshake import TorrentHandshake

from arg_parse import parse_input_args
from torrent_dht import TorrentDHT, TorrentArguments,\
                       random_infohash, decode_krpc, get_neighbor, \
                       get_myip
from process_output import ProcessOutput
#
def kill_sender_reciever(thread1, thread2=None):
    '''
    kill sender reciever and TorrentDHT socket when there is
    continuous bootstrap.
    '''
    identification = thread1.ident
    try:
        signal.pthread_kill(identification, 2)
    except ProcessLookupError:
        pass
#

take some time to kill reciever thread

    if thread2 is None:
        return

    identification = thread2.ident
    try:
        signal.pthread_kill(identification, 2)
    except ProcessLookupError:
        pass
#

Parse it from class methods to monitor class where we want to exchange this information. Start monitoring and initialize all necessary things at first

class Monitor:
#
#

Construct a new 'Foo' object.

:param name: The name of foo :param age: The ageof foo :return: returns nothing

    def __init__(self, arguments, torrent):
#
        self.timeout = 1
        self.torrent = torrent
        self.infohash = random_infohash()
        self.test = False
        self.duration = 600
#

file which should be parsed

        self.file = arguments.file
#

magnet-link given

        self.magnet = arguments.magnet
        self.country = arguments.country
#

print output in db format required for tarzan server

        self.db_format = arguments.db_format
        self.queue_type = arguments.queue_type
        self.max_peers = 600
        if arguments.max_peers is not None:
            self.max_peers = arguments.max_peers
        if arguments.hash is not None:
#

infohash of some file on internet, if not specified randomly generate infohash

            self.torrent.change_info(arguments.hash)
        if arguments.counter is not None:
#

How long should wait after queue is empty

            self.timeout = arguments.counter
        if arguments.test is not None:
#

Test of connection !

            self.test = arguments.test
        if arguments.duration is not None:
#

Duration of crawl

            self.duration = arguments.duration
#

local variables for class

        self.sock = self.torrent.query_socket
        self.n_nodes = 0             # Number of nodes in a specified n-bit zone
        self.tnspeed = 0
        self.no_recieve = 0      # timer that should point how many packets were timed out

        self.torrent_name = ""
        self.info_pool = {}      # infohashes already found
        self.peers_pool = {}     # peers already found
        self.addr_pool = {}      # Addr recieved from
        self.peer_announce = {}  # pool of NODES announced peers
        self.respondent = 0     # Number of respondents
        self.output = ProcessOutput(self, arguments.print_as_country,
                                    arguments.country)
        self.lock = Semaphore()
#
    def __str__(self):
        return "File: {},\nMagnet-link: {},\nDuration of crawl: {},\
                \nCounter: {}".format(self.file, self.magnet,
                                      self.duration, self.timeout)
#
    def vprint(self, msg):
        '''
        Print only when -v parameter is present
        '''
        if self.torrent.verbosity:
            print(msg)
#
    def init_socket(self, port):
        '''
        Initialize empty socket to send announce peer messages
        '''
        sock = socket.socket(socket.AF_INET,
                             socket.SOCK_DGRAM,
                             socket.IPPROTO_UDP)
        sock.bind((get_myip(), port))
        return sock
#

START OF CRAWLING #

    def query_for_connectivity(self):
        '''
        Query all found peers for connectivity.
        When respond, then connection is still there and peer is valid, else
        peer is deleted from dictionary.
        '''
#

port = self.sock.getsockname() self.sock.close()

        try:
            self.torrent.query_socket.close()
        except KeyboardInterrupt:
#

TODO

            pass
        present_time = datetime.datetime.now()
        peers_outdated = []
#

take all of incomming peers and check them

        for value in self.peers_pool.values():
            try:
                past_time = datetime.datetime.strptime(value[0], "%d.%m.%Y %H:%M:%S:%f")
            except KeyboardInterrupt:
                continue
            delta_time = present_time - past_time
            total_seconds = delta_time.total_seconds()
            if int(total_seconds) > 800:
#

outdated peer

                peers_outdated.append(value[1])
#

send announce peer to 'start' session query queried node

        for value in peers_outdated:
            del self.peers_pool[value[1] + ":" + str(value[2])]
#
    def insert_to_queue(self, nodes):
        '''
        Inserts nodes to queue by given queue type.
        '''
#

Do not remove already asked

        last_node = None
        for node in nodes["Nodes"]:
            if node == last_node:
                continue
            self.torrent.nodes.put((node))
            last_node = node
#
    def process_and_update(self, ready, last_time):
        '''
        process packet and update all necesseties like info_pool, peers_pool.
        When decoding failed or not ready socket for recieving return back to
        listener thread.
        '''
        if ready[0]:
            try:
                msg, addr = self.sock.recvfrom(2048)
            except OSError:
                return last_time
        else:
            self.no_recieve = self.no_recieve + 0.1
            return last_time
        msg = decode_krpc(msg)
        if msg is None:
            return last_time
        self.addr_pool[addr] = {"timestamp": time.time()}
        self.respondent += 1

        pool = {}
        nodes = self.torrent.decode_message(msg, pool, self.peer_announce, addr)
#

update dictionary by given pool value

        try:
            if nodes["Nodes"]:
                self.info_pool.update(pool["Nodes"])
            if nodes["Peers"]:
                self.peers_pool.update(pool["Peers"])
        except KeyError:
            pass
#

Resolution without cleaning queue

        if self.torrent.nodes.qsize() <= self.max_peers * 0.9:
            try:
                if nodes["Nodes"]:
                    self.insert_to_queue(nodes)
            except KeyError:
                pass

        if not self.db_format:
            curr = time.time()
            if curr - last_time > 5:
                self.info()
                last_time = time.time()
            return last_time
#
    def start_listener(self, thread1):
        '''
        start listener thread. Recieve query packet and decode its body.
        There is shared queue between listener and sender thread.
        '''
        last_time = time.time()
        while True:
            if self.timeout is not None:
                time.sleep(self.timeout)
#

socket is closed no value returned

            try:
                ready = select.select([self.sock], [], [], 0.1)
            except (OSError, ValueError):
                continue

            if self.torrent.nodes.qsize() > self.max_peers * 0.3:
                for _ in range(1, 20):
                    last_time = self.process_and_update(ready, last_time)
            else:
                last_time = self.process_and_update(ready, last_time)
#
    def start_sender(self, test=False):
        '''
        start sender thread. There is test parameter to test connection for
        unit testing. Otherwise continuous connection is performed till
        we dont get all nodes from k-zone or duration is exhausted.
        '''
        if test:
#

if test is given perform single message send

            node = self.torrent.nodes.get(True)
            hexdig_self = int(self.infohash, 16)
            hexdig_target = int(node[0], 16)
            self.torrent.query_get_peers(node, self.infohash, self.sock)
            return 1

        while True:
            if self.timeout is not None:
                time.sleep(self.timeout)
            node = self.torrent.nodes.get(True)

            hexdig_self = int(self.infohash, 16)
            hexdig_target = int(node[0], 16)
            if((hexdig_self ^ hexdig_target) >> 148) == 0:
                try:
                    self.torrent.query_get_peers(node, self.infohash, self.sock)
                except OSError:
                    return 9
#

TODO for i in range(10, 20): tid = get_neighbor(self.infohash, node[0], i) self.torrent.query_get_peers(node, self.infohash, self.sock) node = self.torrent.nodes.get(True) Speed is less than 2000 bps

            elif self.n_nodes < 2000:
                try:
                    self.torrent.query_get_peers(node, self.infohash, self.sock)
                except OSError:
                    return 9
#
    def start_timer(self, thread1, thread2):
        '''
        start thread timer for duration, when exhausted kill threads
        and exit program.
        '''
        self.vprint("Start of duration")
#

sleep for shorter time

        for _ in range(self.duration):
            time.sleep(1)
        self.vprint("End of duration")
#

Clear all resources

        kill_sender_reciever(thread1, thread2)
#
    def crawl_begin(self, torrent=None, test=False):
        '''
        Create all threads, duration to count how long program is executed.
        When Ctrl+C is pressed kill all threads

        Parameters
        ----------
        torrent : infohash
            20 bytes long infohash which should be used as part of monitoring.
        test : bool
            This paramter is for testing connection.

        '''
        if torrent:
            self.torrent.target = torrent

        send_thread = Thread(target=self.start_sender, args=())
        send_thread.daemon = True
        send_thread.start()
        listen_thread = Thread(target=self.start_listener,
                               args=(send_thread,))
        listen_thread.daemon = True
        listen_thread.start()
        duration_thread = Thread(target=self.start_timer,
                                 args=(send_thread, listen_thread))
        duration_thread.daemon = True
        duration_thread.start()
        while True:
            if test:
                time.sleep(5)
                break
            try:
                if self.country:
                    self.lock.acquire()
                    self.output.get_geolocations()
                    self.lock.release()
                time.sleep(1)
            except KeyboardInterrupt:
                self.vprint("\nClearing threads, wait a second")
                break
        if test:
            kill_sender_reciever(send_thread, listen_thread)
        else:
            self.query_for_connectivity()
            if self.output.print_country and not self.db_format:
                self.output.get_geolocations()
                self.output.print_geolocations()
            if (self.db_format and self.output.print_country) or self.db_format:
                self.output.get_geolocations()
                self.output.print_geolocations()
            if not self.db_format:
                self.info()
#
    def info(self):
        '''
        Print info for current state of crawling.
        '''
        print("[NodeSet]:%i\t\t[PeerSet]:%i\t\t[Response]:\
            %.2f%%\t\t[Queue]:%i\t\t" %
              (len(self.info_pool), len(self.peers_pool),
               self.respondent*100.0 / max(1, len(self.info_pool)),
               self.torrent.nodes.qsize()))
#
    def diverge_in_location(self, nodes):
        '''
        After climbing to another teritory, do not access it,
        return adjusted list of nodes.
        '''
        iplist = self.output.translate_node(self.info_pool)
        for ip_addr in iplist:
            num = 0
            for node_ip in nodes:
                if node_ip[1] == ip_addr[0]:
                    nodes.remove(nodes[num])
                num = num + 1
        return nodes
#
    def get_torrent_name(self, value):
        '''
        get name of torrent from torrent file

        Parameters
        ----------
        value : dict
            This should contain encoded name of torrent.

        Returns
        -------
        self.torrent_name
            Parsed torrent name from value dictionary.
        '''
        for name, name_val in value.items():
            name = name.decode('utf-8')
            if name == "name":
                self.torrent_name = name_val.decode("utf-8")
#
    def parse_torrent(self):
        '''
        parse torrent file to get infohash and announce list of nodes for
        better bootstrap.
        '''
        if self.file is not None:
            for file in self.file:
                file_r = open(file, "rb")
                content = file_r.read()
                info_hash = None
                nodes = []
                self.vprint("Torrent file content")
                if not decode_krpc(content):
                    raise TypeError("WrongFileType")
                if not isinstance(decode_krpc(content), dict):
                    raise TypeError("WrongFileType")
                for key, value in decode_krpc(content).items():
                    key = key.decode('utf-8')
                    if key == "creation date":
                        self.vprint("Creation of file: ")
                        self.vprint(datetime.datetime
                                    .fromtimestamp(value)
                                    .strftime("%Y-%m-%d %H:%M:%S"))
                    if key == "info":
                        self.get_torrent_name(value)
                        info_hash = hashlib.sha1(bencode(value)).hexdigest()
#

set torrent target

                        self.infohash = get_neighbor(info_hash, self.infohash)
                        self.torrent.target_pool.append(info_hash)

                    if key == "nodes":
                        pass
                    if key == "announce-list":
                        nodes = value
                self.torrent.change_bootstrap(info_hash, nodes, self.queue_type)
                file_r.close()
#
    def parse_magnet(self):
        '''
        parse magnet link
        '''
        if self.magnet is not None:
            for magnet in self.magnet:
                file_r = open(magnet, "rb")
                content = file_r.read().decode('utf-8')
                name = re.search(r"&dn.*&(xt|tr)", content)
                name = re.search(r"^((?!tr.*).)*", name.group(0))
                self.torrent_name = name.group(0)[4:-1]

                info_hash = re.search(r"urn:.*&(xl|dn)", content)
#

match last : and its content

                info_hash = re.search(r"(?:.(?!:))+$", info_hash.group(0))
                info_hash = info_hash.group(0)[1:-3]
                self.infohash = get_neighbor(info_hash, self.infohash)
#

set torrent target

                self.torrent.target_pool.append(info_hash)
#

This should be used in main function #

#
def create_monitor(verbosity=False):
    '''
    creates monitor class object. TorrentDHT creates udp socket which is
    binded on `bind_port`. Monitor needs this `dht_socket` and command line
    arguments to be created successfully. Then change of hash and parsing
    can change resolution of crawl. When they are not specified then
    global bootstrap nodes are used instead.

    Parameters
    ----------
    verbosity : bool
        Indicate verbose output

    Returns
    -------
    object
        Monitor object with initialized DHT socket and parsed arguments.
    '''
    args = parse_input_args()
#

This is variant with verbose output to track some lib imported staff

    if args.bind_port is not None:
        torrent_arguments = TorrentArguments()
        dht_socket = TorrentDHT(torrent_arguments, bind_port=args.bind_port,
                                verbosity=verbosity)
    else:
        torrent_arguments = TorrentArguments()
        dht_socket = TorrentDHT(torrent_arguments, verbosity=verbosity)
#

Monitor class needs dht_socket, which is imported from TorrentDHT.py

    monitor = Monitor(args, dht_socket)
#

This variant is only to test connection to BOOTSTRAP_NODES

    if monitor.test:
        result = monitor.start_sender(test=True)
        exit(result)
    monitor.torrent.change_arguments(monitor.max_peers, monitor.queue_type)
    monitor.parse_torrent()
    monitor.parse_magnet()
    return monitor