Skip to content
Snippets Groups Projects
Commit 6196ce44 authored by Terry Liao's avatar Terry Liao
Browse files

把两个harvester用一个py文件调起,增加了init_db.py去创建指定的db

parent 7b71bfa3
Branches
No related tags found
No related merge requests found
File added
File added
File added
import couchdb
import sys
def run(server_path):
couch = couchdb.Server(server_path)
couch.create('tweet_2014_raw')
couch.create('raw_tweets')
couch.create('tweet_2014_results')
couch.create('tweet_results')
couch.create('user_id')
print("create all db successful")
if __name__ == "__main__":
a=sys.argv
if(len(a) == 4):
ip = a[1]
username = a[2]
password = a[3]
path = 'http://' + username +':' + password +'@'+ip+':5984/'
else:
path = 'http://admin:password@127.0.0.1:5984/'
run(path)
pass
...@@ -67,11 +67,16 @@ class TweetSearchHavester(): ...@@ -67,11 +67,16 @@ class TweetSearchHavester():
if __name__ == '__main__': def run(server_path):
couch = couchdb.Server('http://admin:password@127.0.0.1:5984/') couch = couchdb.Server(server_path)
db = couch['user_id'] db = couch['user_id']
# couch.create('test_db') # couch.create('test_db')
city = ["melbourne","sydney","perth","adelaide","brisbane"] dict = {}
with open('./tweet_havester_config.json','r') as f:
dict = json.load(f)
cities = []
for city in dict:
cities.append(city)
switch = 0 switch = 0
count = 0 count = 0
ids = list() ids = list()
...@@ -88,7 +93,7 @@ if __name__ == '__main__': ...@@ -88,7 +93,7 @@ if __name__ == '__main__':
if(count > 20): if(count > 20):
switch = (switch+1)%5 switch = (switch+1)%5
count = 0 count = 0
a.run(ids,city[switch]) a.run(ids,cities[switch])
for id in ids: for id in ids:
data = db[id] data = db[id]
data['isSearched'] = True data['isSearched'] = True
......
...@@ -12,6 +12,7 @@ import general_process as gp ...@@ -12,6 +12,7 @@ import general_process as gp
class listener(StreamListener): class listener(StreamListener):
def __init__(self,path): def __init__(self,path):
StreamListener.__init__(self)
self.couch = couchdb.Server(path) self.couch = couchdb.Server(path)
self.model = joblib.load("./train_model.m") self.model = joblib.load("./train_model.m")
def convertValue(self,origin): def convertValue(self,origin):
...@@ -34,7 +35,7 @@ class listener(StreamListener): ...@@ -34,7 +35,7 @@ class listener(StreamListener):
content = json.loads(data) content = json.loads(data)
dic = self.convertValue(content) dic = self.convertValue(content)
id_doc = {"_id":str(dic["user_id"]),"user_name":content['user']['name'],"isSearched":False} id_doc = {"_id":str(dic["user_id"]),"user_name":content['user']['name'],"isSearched":False}
# print(id_doc) print(id_doc)
p_dic = gp.data_process(dic,self.model) p_dic = gp.data_process(dic,self.model)
if p_dic != None: if p_dic != None:
process_db.save(p_dic) process_db.save(p_dic)
...@@ -52,16 +53,13 @@ class listener(StreamListener): ...@@ -52,16 +53,13 @@ class listener(StreamListener):
class TweetStreamHavester(): class TweetStreamHavester():
def __init__(self,server_path): def __init__(self,server_path):
self.server_path = server_path self.server_path = server_path
def process(self,city): def process(self,city,dict):
#args是关键字参数,需要加上名字,写成args=(self,) #args是关键字参数,需要加上名字,写成args=(self,)
print("start streaming"+city) print("start streaming city: "+city)
th = threading.Thread(target=TweetStreamHavester.run, args=(self,city)) th = threading.Thread(target=TweetStreamHavester.run, args=(self,city,dict))
th.start() th.start()
th.join() th.join()
def run(self, city): def run(self, city, dict):
dict = {}
with open('./tweet_havester_config.json','r') as f:
dict = json.load(f)
api_token = dict[city]["API"]["stream"] api_token = dict[city]["API"]["stream"]
stream_area = dict[city]["bound"] stream_area = dict[city]["bound"]
consumer_key = api_token["consumer_key"] consumer_key = api_token["consumer_key"]
...@@ -73,23 +71,20 @@ class TweetStreamHavester(): ...@@ -73,23 +71,20 @@ class TweetStreamHavester():
twitterStream = Stream(auth,listener(self.server_path)) twitterStream = Stream(auth,listener(self.server_path))
twitterStream.filter(locations=stream_area,is_async = True) twitterStream.filter(locations=stream_area,is_async = True)
f.close()
if __name__ == "__main__": def run(server_path):
couch = couchdb.Server('http://admin:password@127.0.0.1:5984/') couch = couchdb.Server(server_path)
# couch.create('raw_tweets') # server_path = 'http://127.0.0.1:5984/'
# couch.create('new_stream_tweet')
server_path = 'http://127.0.0.1:5984/'
a = TweetStreamHavester(server_path) a = TweetStreamHavester(server_path)
with open('./tweet_havester_config.json','r') as f:
dict = json.load(f)
for city in dict:
try: try:
a.process("melbourne") a.process(city,dict)
a.process("sydney")
a.process("adelaide")
a.process("brisbane")
a.process("perth")
except Exception as e: except Exception as e:
print(e) print(e)
pass pass
f.close()
......
import tweepy_search as tSearch
import tweepy_stream as tStream
import time
if __name__ == "__main__":
server_path = 'http://admin:password@127.0.0.1:5984/'
tStream.run(server_path)
# wait for streamming for a while to start searching
time.sleep(200)
tSearch.run(server_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment