Skip to content
Snippets Groups Projects
Select Git revision
  • 33c24050a7a8619edf9d58509dc56f88c7880832
  • master default protected
2 results

tweepy_stream.py

Blame
  • user avatar
    Terry Liao authored
    33c24050
    History
    tweepy_stream.py 3.17 KiB
    #  -*- coding: utf-8 -*-
    import json
    import os
    import tweepy
    import couchdb
    import threading
    from tweepy import OAuthHandler
    from tweepy import Stream
    from tweepy.streaming import StreamListener
    from sklearn.externals import joblib
    import general_process as gp
    
    class listener(StreamListener):
        def __init__(self,path):
            self.couch = couchdb.Server(path)
            self.model = joblib.load("./train_model.m")
        def convertValue(self,origin):
            dic = {}
            dic['_id'] = origin["id_str"]
            dic['create_time'] = origin["created_at"]
            dic['user_id'] = origin['user']['id']
            dic['text'] = origin["text"]
            dic['lang'] = origin["lang"]
            if(origin["place"] != None):
                dic['location'] = origin["place"]["name"]
            else:
                dic['location'] = "None"
            return dic
        def on_data(self,data):
            try:
                db = self.couch['raw_tweets']
                id_db = self.couch['user_id']
                pc_db = self.couch['tweet_results']
                content = json.loads(data)
                dic = self.convertValue(content)
                id_doc = {"_id":str(dic["user_id"]),"user_name":content['user']['name'],"isSearched":False}
                # print(id_doc)
                p_dic = gp.data_process(dic,self.model)
                if p_dic != None:
                    process_db.save(p_dic)
                id_db.save(id_doc)
                db.save(dic)
                # print("success")
                pass
            except:
                pass
            
            return True
        def on_error(self,status):
            print(status)
    
    class TweetStreamHavester():
        def __init__(self,server_path):
            self.server_path = server_path
        def process(self,city):
            #args是关键字参数,需要加上名字,写成args=(self,)
            print("start streaming"+city)
            th = threading.Thread(target=TweetStreamHavester.run, args=(self,city))
            th.start()
            th.join()
        def run(self, city):
            dict = {}
            with open('./tweet_havester_config.json','r') as f:
                dict = json.load(f)
                api_token = dict[city]["API"]["stream"]
                stream_area = dict[city]["bound"]
                consumer_key = api_token["consumer_key"]
                consumer_secret = api_token["consumer_secret"]
                access_token = api_token["access_token"]
                access_token_secret = api_token["access_token_secret"]
                auth = OAuthHandler(consumer_key,consumer_secret)
                auth.set_access_token(access_token,access_token_secret)
                twitterStream = Stream(auth,listener(self.server_path))
                twitterStream.filter(locations=stream_area,is_async = True)
            
            f.close()
    
    if __name__ == "__main__":
        couch = couchdb.Server('http://admin:password@127.0.0.1:5984/')
        # couch.create('raw_tweets')
        # couch.create('new_stream_tweet')
        server_path = 'http://127.0.0.1:5984/'
        a = TweetStreamHavester(server_path)
        try:
            a.process("melbourne")
            a.process("sydney")
            a.process("adelaide")
            a.process("brisbane")
            a.process("perth")
        except Exception as e:
            print(e)
        pass