Changeset e19fdd68c9407b567c1608c8b3a5a9396ab27552

Show
Ignore:
Timestamp:
04/06/08 03:38:44 (9 months ago)
Author:
Jason Michalski <armooo@armooo.net>
git-committer:
Jason Michalski <armooo@armooo.net> 1207471124 -0500
git-parent:

[81d4220ba0feab8bdd6f39101f7bea2f451b9a98]

git-author:
Jason Michalski <armooo@armooo.net> 1207471124 -0500
Message:

Updated to use a pool or worker threads

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • plugins/webvideo/webvideo.py

    r81d4220 re19fdd6  
    77import threading 
    88import xml.etree.ElementTree as ElementTree 
     9import Queue 
    910 
    1011CLASS_NAME = 'WebVideo' 
     
    1617 
    1718    def init(self): 
    18         self.sem = threading.Semaphore(1) 
     19        self.work_queue = Queue.Queue() 
     20        self.download_thread_num = 1 
     21        self.in_progress = {} 
     22        self.in_progress_lock = threading.Lock() 
    1923 
    2024        self.startXMPP() 
    2125        self.xmpp_cdsupdate() 
     26        self.startWorkerThreads() 
    2227 
    2328    def startXMPP(self): 
     
    4146            cl.sendPresence(jid) 
    4247 
    43  
    4448        t = threading.Thread(target=self.processXMPP, args=(cl,)) 
    4549        t.setDaemon(True) 
    4650        t.start() 
     51 
     52    def startWorkerThreads(self): 
     53        for i in range(self.download_thread_num): 
     54            t = threading.Thread(target=self.processDlRequest) 
     55            t.setDaemon(True) 
     56            t.start() 
    4757 
    4858    def processXMPP(self, client): 
     
    6070        method(xmpp_action) 
    6171 
    62  
    6372    def xmpp_cdsupdate(self, xml=None): 
    6473        m = mind.getMind() 
    65         for request in m.getDownloadRequests(): 
    66             t = threading.Thread(target=self.processDlRequest, args=(request,)) 
    67             t.setDaemon(True) 
    68             t.start() 
    6974 
    70     def processDlRequest(self, data): 
     75        self.in_progress_lock.acquire() 
     76        try: 
     77            for request in m.getDownloadRequests(): 
     78                if not request['bodyOfferId'] in self.in_progress: 
     79                    self.in_progress[request['bodyOfferId']] = True 
     80                    self.work_queue.put(request) 
     81        finally: 
     82            self.in_progress_lock.release() 
     83 
     84    def processDlRequest(self): 
    7185        import shutil 
    7286        import os.path 
     
    7488        import urllib 
    7589 
    76         for share_name, settings in config.getShares(): 
    77             if settings['type'] == 'webvideo': 
    78                 break 
     90        while True: 
     91            data = self.work_queue.get() 
    7992 
    80         self.sem.acquire() 
     93            for share_name, settings in config.getShares(): 
     94                if settings['type'] == 'webvideo': 
     95                    break 
    8196 
    82         path = settings['path'] 
    83         file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1])) 
    8497 
    85         print 'downloading %s to %s' % (data['url'], file_name) 
     98            path = settings['path'] 
     99            file_name = os.path.join(path, '%s-%s' % (data['bodyOfferId'] ,data['url'].split('/')[-1])) 
    86100 
    87         outfile = open(file_name, 'wb'
     101            print 'downloading %s to %s' % (data['url'], file_name
    88102 
    89         infile = urllib2.urlopen(data['url']) 
    90         shutil.copyfileobj(infile, outfile) 
     103            outfile = open(file_name, 'wb') 
    91104 
    92         print 'done downloading %s to %s' % (data['url'], file_name) 
     105            infile = urllib2.urlopen(data['url']) 
     106            shutil.copyfileobj(infile, outfile) 
    93107 
    94         tsn = data['bodyId'] 
    95         file_info = VideoDetails() 
    96         file_info.update(self.metadata_full(file_name, tsn)) 
     108            print 'done downloading %s to %s' % (data['url'], file_name) 
    97109 
    98         import socket 
    99         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
    100         s.connect(('tivo.com',123)) 
    101         ip = s.getsockname()[0] 
    102         port = config.getPort() 
     110            tsn = data['bodyId'] 
     111            file_info = VideoDetails() 
     112            file_info.update(self.metadata_full(file_name, tsn)) 
    103113 
    104         data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1])) 
    105         data['duration'] = file_info['duration'] / 1000 
    106         data['size'] = file_info['size'] 
     114            import socket 
     115            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
     116            s.connect(('tivo.com',123)) 
     117            ip = s.getsockname()[0] 
     118            port = config.getPort() 
    107119 
    108         print data 
     120            data['url'] = 'http://%s:%s' % (ip, port) + urllib.quote('/%s/%s' % (share_name, os.path.split(file_name)[-1])) 
     121            data['duration'] = file_info['duration'] / 1000 
     122            data['size'] = file_info['size'] 
    109123 
    110         m = mind.getMind() 
    111         m.completeDownloadRequest(data) 
     124            print data 
    112125 
    113         self.sem.release() 
     126            m = mind.getMind() 
     127            m.completeDownloadRequest(data) 
    114128 
     129            self.in_progress_lock.acquire() 
     130            try: 
     131                del self.in_progress[data['bodyOfferId']] 
     132            finally: 
     133                self.in_progress_lock.release() 
     134