Select Git revision
tweet_gatherer_app.py
tweet_gatherer_app.py 7.13 KiB
import tweepy
import json
from tweepy.streaming import StreamListener
import couchdb
import sys
import os
import os.path
import datetime
import re
from urllib3.exceptions import ProtocolError
from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt
collector_type = sys.argv[1]
region = sys.argv[2]
couchIP = sys.argv[3]
#"172.26.130.79"
storeCouch = True
couchserver = couchdb.Server("http://admin:admin@"+couchIP+":5984/")
db = couchserver['live_demo2']
#change code to your current Twitter account
if os.path.isfile('twt_stream{}.json'.format(region)):
outfile = open('twt_stream{}.json'.format(region), 'a+', encoding ='utf-8')
else:
outfile = open('twt_stream{}.json'.format(region), 'w+', encoding ='utf-8')
class StreamListener(tweepy.StreamListener):
def __init__(self, config, db):
super(StreamListener, self).__init__()
self.file = []
self.db = db
self.config = config
def on_status(self, status):
is_retweet = False
if hasattr(status, 'retweeted_status'):
is_retweet = True
#Avoid truncated text if possible
if hasattr(status, 'extended_tweet'):
text = status.extended_tweet['full_text']
else:
text = status.text
for char in [',','\n']:
text.replace(char, " ")
#if any(s in text.lower() for s in sports):
if not is_retweet:
doc = status._json
doc['region'] = self.config['region']
doc['text'] = text
#s
#self.file.append(json.load(doc))
serial = {doc['id_str']:doc}
#if 'location' in doc['user']:
# print(doc['text'], doc['user']['location'])
#else:
print(doc['text'])
if storeCouch:
ave2Couch(doc, self.db)
else:
outfile.write(str(serial) + '\n')
def on_error(self, status_code):
print("Encountered streaming error (", status_code, ")")
sys.exit()
if __name__ == "__main__":
sets = []
with open('sets.json', encoding = 'utf-8') as setdata:
for line in setdata:
sets.append(json.loads(line))
config = sets[int(region)]
API_key = config['secret']['API_key']
API_secret = config['secret']['API_secret']
access_token = config['secret']['access_token']
access_secret = config['secret']['access_secret']
# Authenticate to Twitter
auth = tweepy.OAuthHandler(API_key, API_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
if collector_type == 'stream':
#init stream
streamListener = StreamListener(config, db)
stream = tweepy.Stream(auth=api.auth, listener = streamListener, tweet_mode ='extended')
tags=['AFL', 'tennis', 'footie', 'swimming', 'AustralianOpen', 'soccer']
box = config['box']
while True:
try:
stream.filter(locations= box)
except (ProtocolError, AttributeError):
continue
elif collector_type == 'search':
cityCoord = config['point']
tmp = []
ids_ = []
final = {}
replies = []
up2source = []
# capure the most data
for i in range(7):
range_date = datetime.date.today() - datetime.timedelta(days=i)
for tweet in api.search(q="AFL OR tennis OR footie OR footy OR swimming OR AustralianOpen OR soccer OR cricket", geocode=cityCoord, until = range_date, count=100, tweet_mode="extended"):
#tweet = tweet._json
if tweet.id_str not in ids_:
ids_.append(tweet.id_str)
tmp.append(tweet)
for v in tmp:
final[v.id_str] = v
# Get parent if any
if hasattr(v, 'in_reply_to_status_id_str'):
if v.in_reply_to_status_id_str not in ids_:
for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = v.in_reply_to_status_id_str, max_id = v.in_reply_to_status_id_str):
ids_.append(tweet.id_str)
replies.append(tweet)
final[tweet.id_str] = tweet
if hasattr(tweet, 'in_reply_to_status_id_str'):
if tweet.in_reply_to_status_id_str not in ids_:
up2source.append(tweet)
# Get responses
for tweet in api.search(q='to:'+ v.user.screen_name, geocode=cityCoord, count=100, since_id = v.id):
if tweet.in_reply_to_status_id_str == v.id_str:
if tweet.id_str not in ids_:
replies.append(tweet)
ids_.append(tweet.id_str)
final[tweet.id_str] = tweet
#climb up in the conversation
while up2source:
current = up2source.pop(0)
if hasattr(current, 'in_reply_to_status_id_str'):
if current.in_reply_to_status_id_str not in ids_:
for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = current.in_reply_to_status_id_str, max_id = current.in_reply_to_status_id_str):
ids_.append(tweet.id_str)
replies.append(tweet)
final[tweet.id_str] = tweet
if hasattr(tweet, 'in_reply_to_status_id_str'):
if tweet.in_reply_to_status_id_str not in ids_:
up2source.append(tweet)
# loops to get all responses of responses
while replies:
current = replies.pop(0)
for tweet in api.search(q='to:'+ current.user.screen_name, geocode=cityCoord, count=100, since_id = current.id):
if tweet.in_reply_to_status_id_str == current.id_str:
if tweet.id_str not in ids_:
replies.append(tweet)
ids_.append(tweet.id_str)
final[tweet.id_str] = tweet
#save results to disk
serializable = {k: v._json for k,v in final.items()}
for k, v in serializable.items():
v['region'] = config['region']
v['tags'] = classifyTwt(v)
if storeCouch:
save2Couch(v)
else:
#Save to disk as json. Check if saving file exist and increase # version
current_version = '-1'
versions = []
for filename in os.listdir('./'):
if filename.startswith('twt_search'):
versions.append(int(re.search(r'\d+', filename).group(0)))
current_version = max(versions)
with open("twt_search{}.json".format(str(current_version + 1)), "w") as outfile:
json.dump(serializable, outfile)