# -- coding: latin-1
""" HarvestManUrlThread.py - Url thread downloader module.
    Has two classes, one for downloading of urls and another
    for managing the url threads.

    This software is part of the HarvestMan program.

    Author: Anand B Pillai (anandpillai at letterboxes dot org).
    Copyright (C) 2004-2005 Anand B Pillai.

    Modification History

    Jun 4-9 2004 Anand  1.4 development.

                        *Adopted a queue model for the sub
                        threads in this module similar to the
                        tracker threads. Instead of spawning new
                        threads for every download job, we reuse
                        the threads in the thread pool. The threads
                        are pre-spawned at the beginning of download.

                        Derived harvestManUrlThreadPool from Queue
                        and added method 'spawn_threads' to it.

    Jun 14 2004        1.3.9 release.

    Sep 22 2004        1.4 development

                        Performance Fixes

                        1. Added check for end flag in download() method
                        of threads for better clean up in case of
                        program termination or killing (using Ctrl-C).


import os, sys
import math
import time
import threading

from Queue import Queue, Full, Empty
from common import *

00047 class HarvestManUrlThreadInterrupt(Exception):
    """ Interrupt raised to kill a harvestManUrlThread class's object """

    def __init__(self, value):
        self.value = value

    def __str__(self):
        return str(self.value)

00056 class harvestManUrlThread(threading.Thread):
    """ Class to download a url in a separate thread """

00059     def __init__(self, name, timeout, threadpool):
        """ Constructor, the constructor takes a url, a filename
        , a timeout value, and the thread pool object pooling this
        thread """

        # url Object (This is an instance of urlPathParser class)
        self.__urlobject = None
        # thread queue object pooling this thread
        self.__pool = threadpool
        # max lifetime for the thread
        self.__timeout = timeout
        # start time of thread
        self.__starttime = 0
        # sleep time
        self.__sleepTime = 1.0
        # error dictionary
        self.__error = {}
        # download status flag
        self.__downloadstatus = 0
        # busy flag
        self.__busyflag = False
        # end flag
        self.__endflag = False
        # initialize threading
        threading.Thread.__init__(self, None, None, name)

00085     def get_error(self):
        """ Get error value of this thread """

        return self.__error

00090     def get_status(self):
        """ Get the download status of this thread """

        return self.__downloadstatus

00095     def is_busy(self):
        """ Get busy status for this thread """

        return self.__busyflag

00100     def join(self):
        """ The thread's join method to be called
        by other threads """

        threading.Thread.join(self, self.__timeout)

00106     def terminate(self):
        """ Kill this thread """

        msg = 'Download thread, ' + self.getName() + ' killed!'
        raise HarvestManUrlThreadInterrupt, msg

00113     def stop(self):
        """ Stop this thread """

        self.__endflag = True

00118     def download(self, url_obj):
        """ Download this url """

        url = url_obj.get_full_url()
        server = url_obj.get_domain()

        if url_obj.is_image():
            info('Downloading image ...', url)

        extrainfo('Downloading url ...', url)
        server = url_obj.get_domain()

        conn_factory = GetObject('connectorfactory')
        # This call will block if we exceed the number of connections
        # moreinfo("Creating connector for url ", urlobj.get_full_url())
        conn = conn_factory.create_connector( server )

        res = conn.save_url(url_obj)

        # Remove the connector from the factory

        # Set this as download status
        self.__downloadstatus = res

        # get error flag from connector

        del conn
        # Notify thread pool

        if res != 0:
            extrainfo('Finished download of ', url)

00154     def run(self):
        """ Run this thread """

        while not self.__endflag:
            if os.name=='nt' or sys.platform == 'win32':

            url_obj = self.__pool.get_next_urltask()
            if self.__pool.check_duplicates(url_obj):

            if not url_obj:
            # set busy flag to 1
            self.__busyflag = True

            # Save reference
            self.__urlobject = url_obj

            filename, url = url_obj.get_full_filename(), url_obj.get_full_url()
            if not filename and not url:

            # Perf fix: Check end flag
            # in case the program was terminated
            # between start of loop and now!
            if not self.__endflag: self.download(url_obj)
            # reset busyflag
            self.__busyflag = False

    def get_url(self):

        if self.__urlobject:
            return self.__urlobject.get_full_url()

        return ""

    def get_filename(self):

        if self.__urlobject:
            return self.__urlobject.get_full_filename()

        return ""

00202     def get_urlobject(self):
        """ Return this thread's url object """

        return self.__urlobject

00207     def get_elapsed_time(self):
        """ Get the time taken for this thread """


        if os.name=='nt' or sys.platform=='win32':

        return fetchtime

00220     def long_running(self):
        """ Find out if this thread is running for a long time
        (more than given timeout) """

        # if any thread is running for more than <timeout>
        # time, return TRUE
        return (self.get_elapsed_time() > self.__timeout)

00228     def set_timeout(self, value):
        """ Set the timeout value for this thread """

        self.__timeout = value

00233 class harvestManUrlThreadPool(Queue):
    """ Thread group/pool class to manage download threads """

00236     def __init__(self):
        """ Initialize this class """

        # list of spawned threads
        self.__threads = []
        # list of url tasks
        self.__tasks = []

        cfg = GetObject('config')
        # Maximum number of threads spawned
        self.__numthreads = cfg.threadpoolsize
        self.__timeout = cfg.timeout
        # Last thread report time
        self._ltrt = 0.0
        # Local buffer
        self.buffer = []
        Queue.__init__(self, self.__numthreads + 5)

00255     def spawn_threads(self):
        """ Start the download threads """

        for x in range(self.__numthreads):
            name = 'Fetcher-'+ str(x+1)
            fetcher = harvestManUrlThread(name, self.__timeout, self)
            # Append this thread to the list of threads

00266     def download_urls(self, listofurlobjects):
        """ Method to download a list of urls.
        Each member is an instance of a urlPathParser class """

        for urlinfo in listofurlobjects:

    def __get_num_blocked_threads(self):

        blocked = 0
        for t in self.__threads:
            if not t.is_busy(): blocked += 1

        return blocked

00281     def is_blocked(self):
        """ The queue is considered blocked if all threads
        are waiting for data, and no data is coming """

        blocked = self.__get_num_blocked_threads()

        if blocked == len(self.__threads):
            return True
            return False

00292     def push(self, urlObj):
        """ Push the url object and start downloading the url """

        # unpack the tuple
            filename, url = urlObj.get_full_filename(), urlObj.get_full_url()

        # Wait till we have a thread slot free, and push the
        # current url's info when we get one
            self.put( urlObj )
        except Full:
    def get_next_urltask(self):

            if len(self.buffer):
                # Get last item from buffer
                return buffer.pop()
                return self.get()
        except Empty:
            return None

00319     def notify(self, thread):
        """ Method called by threads to notify that they
        have finished """

        # Mark the time stamp (last thread report time)
        self._ltrt = time.time()

        urlObj = thread.get_urlobject()
        # if the thread failed, update failure stats on the data manager
        dmgr = GetObject('datamanager')

        err = thread.get_error()

        if err.get('number',0)==0:
            # thread succeeded, increment file count stats on the data manager
            dmgr.update_file_stats( urlObj, thread.get_status())
            dmgr.update_failed_files( urlObj )

00339     def has_busy_threads(self):
        """ Return whether I have any busy threads """

        for thread in self.__threads:
            if thread.is_busy():
                val += 1

        return val

00349     def locate_thread(self, url):
        """ Find a thread which downloaded a certain url """

        for x in self.__threads:
            if not x.is_busy():
                if x.get_url() == url:
                    return x

        return None

00359     def locate_busy_threads(self, url):
        """ Find all threads which are downloading a certain url """

        for x in self.__threads:
            if x.is_busy():
                if x.get_url() == url:

        return threads

00370     def check_duplicates(self, urlobj):
        """ Avoid downloading same url file twice.
        It can happen that same url is linked from
        different web pages. We query any thread which
        has downloaded this url, and copy the file to
        the file location of the new download request """

        filename = urlobj.get_full_filename()
        url = urlobj.get_full_url()

        # First check if any thread is in the process
        # of downloading this url.
        if self.locate_thread(url):
            debug('Another thread is downloading %s' % url)
            return True
        # Get data manager object
        dmgr = GetObject('datamanager')

        if dmgr.is_file_downloaded(filename):
            return True

        return False

00394     def end_hanging_threads(self):
        """ If any download thread is running for too long,
        kill it, and remove it from the thread pool """

        for thread in self.__threads:
            if thread.long_running(): pool.append(thread)

        for thread in pool:
            extrainfo('Killing hanging thread ', thread)
            # remove this thread from the thread list
            # kill it
            except HarvestManUrlThreadInterrupt:

            del thread

00414     def end_all_threads(self):
        """ Kill all running threads """

        for t in self.__threads:
                del t
            except HarvestManUrlThreadInterrupt, e:

00424     def remove_finished_threads(self):
        """ Clean up all threads that have completed """

        for thread in self.__threads:
            if not thread.is_busy():
                del thread

00432     def last_thread_report_time(self):
        """ Return the last thread reported time """

        return self._ltrt

