Skip to content
Snippets Groups Projects
Select Git revision
  • d7b6b570c6cc4facebaa608426c38511fda27d04
  • master default protected
  • ansibleBranch
  • AURIN
4 results

tweet_gatherer_app.py

Blame
  • tweet_gatherer_app.py 7.13 KiB
    import tweepy
    import json
    from tweepy.streaming import StreamListener
    import couchdb
    import sys
    import os
    import os.path
    import datetime 
    import re
    from urllib3.exceptions import ProtocolError
    from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt
    
    collector_type = sys.argv[1]
    region = sys.argv[2]
    couchIP = sys.argv[3]
    #"172.26.130.79"
    storeCouch = True
    
    couchserver = couchdb.Server("http://admin:admin@"+couchIP+":5984/")
    db = couchserver['live_demo2']
    
    #change code to your current Twitter account
    
    
    if os.path.isfile('twt_stream{}.json'.format(region)):
        outfile = open('twt_stream{}.json'.format(region), 'a+', encoding ='utf-8')
    else:
        outfile = open('twt_stream{}.json'.format(region), 'w+', encoding ='utf-8')
    
    
    class StreamListener(tweepy.StreamListener):
        
        def __init__(self, config, db):
            super(StreamListener, self).__init__()
            self.file = []
            self.db = db
            self.config = config
        
        def on_status(self, status):
            is_retweet = False
            if hasattr(status, 'retweeted_status'):
                is_retweet = True
    
            #Avoid truncated text if possible
            if hasattr(status, 'extended_tweet'):
                text = status.extended_tweet['full_text']
            else:
                text = status.text
            
            for char in [',','\n']:
                text.replace(char, " ")
    
            #if any(s in text.lower() for s in sports):
            
            if not is_retweet:
                doc = status._json
                doc['region'] = self.config['region']
                doc['text'] = text
                #s
    
                #self.file.append(json.load(doc))
                serial = {doc['id_str']:doc}
                #if 'location' in doc['user']:
                #    print(doc['text'], doc['user']['location'])
                #else:
                print(doc['text'])
                if storeCouch:
                    ave2Couch(doc, self.db)
                else:
                    outfile.write(str(serial) + '\n')  
    
        def on_error(self, status_code):
            print("Encountered streaming error (", status_code, ")")
            sys.exit()
    
            
            
    if __name__ == "__main__":
    
        sets = []
        with open('sets.json', encoding = 'utf-8') as setdata:
            for line in setdata:
                sets.append(json.loads(line))
    
        config = sets[int(region)]
    
        API_key = config['secret']['API_key']
        API_secret = config['secret']['API_secret']
        access_token = config['secret']['access_token'] 
        access_secret = config['secret']['access_secret']
        
        # Authenticate to Twitter
        auth = tweepy.OAuthHandler(API_key, API_secret)
        auth.set_access_token(access_token, access_secret)
        api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
    
    
        if collector_type == 'stream':
            #init stream
            streamListener = StreamListener(config, db)
            stream = tweepy.Stream(auth=api.auth, listener = streamListener, tweet_mode ='extended')
            
            tags=['AFL', 'tennis', 'footie', 'swimming', 'AustralianOpen', 'soccer']
            box = config['box']
    
            while True:
                try:
                    stream.filter(locations= box)
                except (ProtocolError, AttributeError):
                    continue
    
    
        elif collector_type == 'search':
    
            cityCoord = config['point']
    
            tmp = []
            ids_ = []
            final = {}
            replies = []
            up2source = []
    
            # capure the most data
            for i in range(7):
                range_date = datetime.date.today() - datetime.timedelta(days=i)
                for tweet in api.search(q="AFL OR tennis OR footie OR footy OR swimming OR AustralianOpen OR soccer OR cricket", geocode=cityCoord, until = range_date, count=100, tweet_mode="extended"):
                    #tweet = tweet._json
                    if tweet.id_str not in ids_:
                        ids_.append(tweet.id_str)
                        tmp.append(tweet)
    
    
            for v in tmp:
                final[v.id_str] = v
    
                # Get parent if any
                if hasattr(v, 'in_reply_to_status_id_str'):
                    if v.in_reply_to_status_id_str not in ids_:
                        for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = v.in_reply_to_status_id_str, max_id = v.in_reply_to_status_id_str):
                            ids_.append(tweet.id_str)
                            replies.append(tweet)
                            final[tweet.id_str] = tweet
                            if hasattr(tweet, 'in_reply_to_status_id_str'):
                                if tweet.in_reply_to_status_id_str not in ids_:
                                    up2source.append(tweet)
    
                # Get responses
                for tweet in api.search(q='to:'+ v.user.screen_name, geocode=cityCoord, count=100, since_id = v.id):
                    if tweet.in_reply_to_status_id_str == v.id_str:
                        if tweet.id_str not in ids_:
                            replies.append(tweet)
                            ids_.append(tweet.id_str)
                            final[tweet.id_str] = tweet
    
            #climb up in the conversation
            while up2source:
                current = up2source.pop(0)
                if hasattr(current, 'in_reply_to_status_id_str'):
                    if current.in_reply_to_status_id_str not in ids_:
                        for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = current.in_reply_to_status_id_str, max_id = current.in_reply_to_status_id_str):
                            ids_.append(tweet.id_str)
                            replies.append(tweet)
                            final[tweet.id_str] = tweet
                            if hasattr(tweet, 'in_reply_to_status_id_str'):
                                if tweet.in_reply_to_status_id_str not in ids_:
                                    up2source.append(tweet)
    
    
            # loops to get all responses of responses
            while replies:  
                current = replies.pop(0)
                for tweet in api.search(q='to:'+ current.user.screen_name, geocode=cityCoord, count=100, since_id = current.id):
                    if tweet.in_reply_to_status_id_str == current.id_str:
                        if tweet.id_str not in ids_:
                            replies.append(tweet)
                            ids_.append(tweet.id_str)
                            final[tweet.id_str] = tweet   
    
            #save results to disk
            serializable = {k: v._json for k,v in final.items()}
    
    
            for k, v in serializable.items():
                v['region'] = config['region']
                v['tags'] = classifyTwt(v)
                
            if storeCouch:
                save2Couch(v)
    
            else:
                #Save to disk as json. Check if saving file exist and increase # version
                current_version = '-1' 
                versions = []
                for filename in os.listdir('./'):
                    if filename.startswith('twt_search'):
                        versions.append(int(re.search(r'\d+', filename).group(0)))
    
                current_version = max(versions)
    
                with open("twt_search{}.json".format(str(current_version + 1)), "w") as outfile: 
                    json.dump(serializable, outfile)