""" Twitter / HTTP / REST API Processing Technical Infrastructure (c) 2009 : Dhananjay Nene Released under Creative Commons Attribution License 3.0 as published at : http://creativecommons.org/licenses/by/3.0/legalcode """ import datetime import time import random import Queue import threading import sys import base64 import urllib2 import simplejson class BooleanWrapper(object): """ This is just a dummy object to represent a boolean. It is wrapped in an object since function attributes are not mutable from within a function body. However calling a method on an object to change its internal attribute works. """ def __init__(self): self.status = True def false(self): self.status = False def batcher(sizer,gen): """ This object takes a sizer function which computes a size of a batch dynamically, and creates a batch out of the generator function supplied by gen. Thus it effectively breaks down the data supplied by gen into individual potentially unequally sized batches as specified by the sizer function To put it differently, it takes a generator and outputs a generator of generators, each second level generator being a batch """ loop = BooleanWrapper() def lot(sizer,gen): counter = 0 size = sizer.next() if size == 0 : loop.false() raise StopIteration while counter < size : try : x = gen.next() yield x counter += 1 except StopIteration, e: loop.false() raise StopIteration while loop and loop.status: try : yield lot(sizer,gen) except StopIteration, e : raise StopIteration def threadfunc(threadid, queue, process_func, func_args, global_dict): """ This method runs takes data item from a queue, invokes another function as specified by process_func and its arguments func_args, updates the result in the dictionary global_dict with the key being the data item. Since this function is to be intended to be run as a standalone thread, the first argument threadid, is a identifier for the thread purely for debugging purposes """ print "Thread %s started " % threadid while not queue.empty() : try : val = queue.get_nowait() args = [arg for arg in func_args] args.append(val) result = process_func(*args) print "Thread %s processed %s with result %s" % (threadid,val,result) global_dict[val] = result time.sleep(1) except Exception, e: print "Unexpected error in thread %s for value %s : %s " % (threadid, val, e) queue.task_done() print "Thread %s exited " % threadid def batch_threader(size,gen, process_func, process_func_args): """ This is a generator which expects a two level generator (generator which returns a generator). The top level returns a set of batches, whereas the nested generator supplies a set of individual items in the batch. It iterates through all the batches, and for each batch spawns a thread pool of the requested size and executes the function process_func along with the supplied arguments on each of the items concurrently in the thread pool. It finally returns a dictionary with each item being the key and the result of the processing being the value. """ batch_number = 0 for batch in gen : batch_number += 1 print "Running Batch : %s with pool size %s" % (batch_number, size) # Construct a list to figure out size of queue items = [] for item in batch : items.append(item) # Initialise queue queue = Queue.Queue(len(items)) # Push Items into the queue for item in items : queue.put(item) # Start threads result_dict = {} for i in range(size) : t = threading.Thread(None,threadfunc,None, (i,queue, process_func, process_func_args, result_dict)) t.setDaemon(True) t.start() # Wait for all threads to complete processing queue.join() yield result_dict def check_retry(data,e, break_codes, continue_codes, default_policy): """ This function provides the basic logic for deciding whether to retry or not based on set of http codes. In case the code is not found in either, the default_policy applies. Note : True implies retry should be done, false implies no retries """ if data : return False elif e : if e.code in break_codes : return False elif e.code in continue_codes : return True return default_policy def retry(max_tries, check_error_func, break_codes, continue_codes, retry_policy, func,args): """ This is a wrapper which runs a function in a retry loop . In this case it is assumed to be a function thats dealing with HTTP. The arguments are as follows : max_tries : maximum number of tries before giving up check_error_func : function to check if a repeatable occurred. (Repeatable errors should not be retried. Plugin for custom logic. break_codes : sequence of http error codes on which no retry should be attempted continue_codes : sequence of http_error codes on which retry should be attempted retry_policy : default policy in case of http code not matching either of the two sequences func : function to be performed / executed args : arguments to the function """ try_num = 1 success = False data = None e = None while try_num <= max_tries : data = None e = None try : data = func(*args) except Exception, e : pass if check_error_func(data,e) : break if not check_retry(data,e,break_codes, continue_codes, retry_policy) : break try_num += 1 if e : raise e else : return data class Data(object): """ This is a generic data object. It is a completely dynamic object meant to create itself from a dictionary or a set of nested dictionaries. All keys in the dictionary are converted to object attributes allowing for an easier and more intuitive access to the object attributes. """ def __repr__(self): return "Generic Data Object:%s" % self.__dict__.__repr__() def __str__(self): return "Generic Data Object:%s" % self.__dict__.__str__() @classmethod def load_from_json(self,json): data = Data() for key,val in json.items(): data.__dict__[key] = val return data #to allow chaining of calls def get_opener(username,password,realm,host): """ The default urllib2 opener is adequate for non authenticated http api requests. However twitter API requires basic authentication in many situations. Hence this method initialises the opener to use the appropriate user id / password parameters """ headers = {} basic_auth = base64.encodestring('%s:%s' % (username, password))[:-1] headers['Authorization'] = 'Basic %s' % basic_auth handler = urllib2.HTTPBasicAuthHandler() handler.add_password(realm, host, username, password) opener = urllib2.build_opener(handler) opener.addheaders = headers.items() return opener def http_to_json(opener,url): """ This method invokes the remote URL which is expected to revert with a JSON datastream. It returns an object with the json keys being attributes of the object along with the corresponding values. """ try: response = opener.open(url) stream = response.read() # A JSON stream begins with a { or a [ if (stream[0] == '{') or (stream[0] == '[') : jsonobj = simplejson.loads(stream) if jsonobj : # Some twitter API return an 'error' in case of error if stream[0] == '[' : data = [Data.load_from_json(obj) for obj in jsonobj] else: data = Data.load_from_json(jsonobj) except urllib2.URLError, e: print 'Received Error while fetching twitter record for url : %s = %s' % (url, e) raise e return data # ----- User supplied methods ----- def gen(min,max): """ A generator to generate twitter ids """ counter = min while counter < max : yield counter counter += 1 def get_user_data(opener,id): """ Method to return twitter user data for the given id """ return http_to_json(opener,"http://twitter.com/users/show/%d.json" %id) def resultfunc(opener,val): """ Method to generate user data along with the retry and twitter specific error testing """ user_data = retry(3,is_twitter_error,(404),(520,101),True,get_user_data,(opener,val)) return user_data def get_rate_status(opener): """ Method to get current rate limit status """ rate_status_url = "http://twitter.com/account/rate_limit_status.json" return http_to_json(opener,rate_status_url) def is_twitter_error(data,e): """ Method to check if the contained data is actually an error based on twitter return value semantics """ if hasattr(data,'error') : return True return False def twitter_batch_sizer(opener,max_size, keep_unused): """ Method to generate a batch size based on available rate limit, maximum batch size, and number of API calls to keep as unused """ while True : rate_status = retry(3, is_twitter_error,(404),(520,101),True,get_rate_status,(opener,)) if rate_status : size = rate_status.remaining_hits - keep_unused if size > max_size : size = max_size elif size < 0 : size = 0 yield size else : yield 0 if __name__ == "__main__": opener = get_opener('replace_with_your_twitter_user_id','replace_with_your_twitter_password','Twitter API','twitter.com') data = http_to_json(opener,"http://twitter.com/account/rate_limit_status.json") print data for results in batch_threader(3, batcher(twitter_batch_sizer(opener,6,70),gen(0,10)), resultfunc,(opener,)) : print "====== Begin Batch =======" for key, val in results.items() : print "%s was processed with result %s" % (key,val) print "====== End Batch ======="