From 53de466ac72763fa53abb7df12c9fa412006e59f Mon Sep 17 00:00:00 2001 From: AhmedUniMelb <ahmed.mahmoud@student.unimelb.edu.au> Date: Mon, 18 May 2020 01:51:14 +1000 Subject: [PATCH] harvester_beta2 --- .idea/vcs.xml | 1 + twitter_harvest/auth/TwitterCredentials.py | 43 +++++- twitter_harvest/harvester.py | 158 ++++++++------------- twitter_harvest/util/config.py | 47 ++++-- twitter_harvest/util/utilities.py | 126 ++++++++++------ 5 files changed, 223 insertions(+), 152 deletions(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7..8e13905 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,5 +2,6 @@ <project version="4"> <component name="VcsDirectoryMappings"> <mapping directory="$PROJECT_DIR$" vcs="Git" /> + <mapping directory="$PROJECT_DIR$/twitter_harvest/src/webpy" vcs="Git" /> </component> </project> \ No newline at end of file diff --git a/twitter_harvest/auth/TwitterCredentials.py b/twitter_harvest/auth/TwitterCredentials.py index 22d866f..b65b89d 100644 --- a/twitter_harvest/auth/TwitterCredentials.py +++ b/twitter_harvest/auth/TwitterCredentials.py @@ -1,5 +1,38 @@ -# The access credentials for Twitter API - Account 1 -ACCESS_TOKEN = "1247090411245432832-uI1WTHtzfAjcqEw5tl6Vms9TlC2P4C" -ACCESS_TOKEN_SECRET = "2kj3Dp0EpmajzXyBOBEHwD5KXuEgMmisMGsVQJCNqzKD6" -CONSUMER_KEY = "0jbzqFhSCP4DOwnnxiz4woIFz" -CONSUMER_SECRET = "Yi1ToBPMrGn13satRSYB5opG75nzg3rgIg5fS8XOkWDQ2QGHP7" +# The access credentials for Twitter API +cred = [{ + 'ACCESS_TOKEN':"1247090411245432832-uI1WTHtzfAjcqEw5tl6Vms9TlC2P4C" , + 'ACCESS_TOKEN_SECRET':"2kj3Dp0EpmajzXyBOBEHwD5KXuEgMmisMGsVQJCNqzKD6" , + 'CONSUMER_KEY':"0jbzqFhSCP4DOwnnxiz4woIFz" , + 'CONSUMER_SECRET':"Yi1ToBPMrGn13satRSYB5opG75nzg3rgIg5fS8XOkWDQ2QGHP7" + }, + { + 'ACCESS_TOKEN':"1255301814666383361-3fAON2pjoZaSj1Jhywq4n1rE8JSX5a" , + 'ACCESS_TOKEN_SECRET':"VYbTsC8ESkyxiHTi68wBezp4cPS3YyegmHObk3o7mXdBi" , + 'CONSUMER_KEY':"UWccr6Sb6TQDMDvQkkQPKF9Lg" , + 'CONSUMER_SECRET':"KfkOYNVvBoWMtBcb9INvrBROJC3QH3xo2Qw0gigDwCsHNEcXti" + }, + { + 'ACCESS_TOKEN': "472782005-K0iI81Al2d4zbBgCScTNBdRBAmAuM1rssUPE4z2I", + 'ACCESS_TOKEN_SECRET': "iZimYCcKlKnoY80aaAAEZwfwAtJhf99kYGMIbtIg1KIKL", + 'CONSUMER_KEY': "jCDsQ2YbbtULszBLff9MFphlL", + 'CONSUMER_SECRET': "q6f6AUigGSYFvPbysGAOFRBaOeiGiEBOC9mOSdXPWzEUTvVPNa" + }, + { + 'ACCESS_TOKEN': "952806966371348481-IQ0xPxGSjrreFQVDqEEFQyrg0BMTUvC", + 'ACCESS_TOKEN_SECRET': "JgzHMJrPF311Z4FxYy0dXIaU6bAtYpveiUoKaJsc9S80I", + 'CONSUMER_KEY': "YGFJ4TYr1A3EfJKaCB0BnKnM8", + 'CONSUMER_SECRET': "lN9GfdrOS5SnAlq6b7zx57DKyJTUWoN6p4fBpf7yxscLqI3VAp" + }, + { + 'ACCESS_TOKEN': "275499572-AugV2oYayyyWGkaBgOWQMYSmeKKUAgDOqvrXIKul", + 'ACCESS_TOKEN_SECRET': "Vx0DBmEIFnUhfqQiZHnABHffuXArLeLjjuk1gXMBTQKjg", + 'CONSUMER_KEY': "HjkDgma8SF8Rpbz8XTzeUKSf2", + 'CONSUMER_SECRET': "zJRoTW7up79hYXs4Nbp61VzfPz15ueecWwUYsZ8vyYvzsqQABX" + }, + { + 'ACCESS_TOKEN': "1256833404592549889-w5HXCdkwHeKjvtmGdaNmgYEp3g3fge", + 'ACCESS_TOKEN_SECRET': "pOTVrS8Zfe6BEEejqf7GMjuUE318eyHsctzvvVXRkX1bb", + 'CONSUMER_KEY': "3bYyNS1oOKPMigUDLByIZttRF", + 'CONSUMER_SECRET': "u0iWEmoHiH4VWuEMMO4tVwXUMy6r16BXDfJdGd0z6CHBA8NjPO" + }] + diff --git a/twitter_harvest/harvester.py b/twitter_harvest/harvester.py index 69b6fba..9a278bf 100644 --- a/twitter_harvest/harvester.py +++ b/twitter_harvest/harvester.py @@ -1,115 +1,83 @@ -from tweepy import API -import twitter_harvest.util.utilities as utils -import re -import shapefile import logging import threading -import time -import json +import couchdb +import twitter_harvest.util.utilities as utils import twitter_harvest.util.config as config +from twitter_harvest.auth import TwitterCredentials def main(): - - print(countLocations()) - + # logger format format = "%(asctime)s: %(message)s" logging.basicConfig(filename='harvester.log', filemode='w', format=format, level=logging.INFO, datefmt="%H:%M:%S") - threads = list() + # connection to CouchDB + db_conn = config.DB_Config + couchserver = couchdb.Server(db_conn) + couchserver2 = couchdb.Server(db_conn) - # Thread for searching for Tweets by GeoLocation - logging.info("Main : create and start thread %d.", 0) - api = utils.TwitterAPI(cred_rank=0) - threadSearchTweets = threading.Thread(target=api.search_tweets, - args=('searchCovid', config.VIC_GeoLocation, ['covid19', 'coronavirus', 'covid-19'],)) - threads.append(threadSearchTweets) - threadSearchTweets.start() + db_twitter_name = 'twitter' + if db_twitter_name in couchserver: + db_twitter = couchserver[db_twitter_name] + else: + db_twitter = couchserver.create(db_twitter_name) - # Thread for searching for Tweets by Users - logging.info("Main : create and start thread %d.", 1) - api2 = utils.TwitterAPI(cred_rank=1) - threadSearchUserTweets = threading.Thread(target=searchTimelinesThread, - args=('searchCovid', api2, ['covid19', 'coronavirus', 'covid-19'],)) - threads.append(threadSearchUserTweets) - threadSearchUserTweets.start() - - # Thread for Streaming Tweets by GeoLocation - logging.info("Main : create and start thread %d.", 2) - streamVIC = utils.TwitterStream(cred_rank=1) - threadStreamTweets = threading.Thread(target=streamVIC.stream, - args=('tweetsVIC.txt', config.VIC_BBox, ['covid19', 'coronavirus', 'covid-19'],)) - #threads.append(threadStreamTweets) - #threadStreamTweets.start() - - for index, thread in enumerate(threads): - logging.info("Main : before joining thread %d.", index) - thread.join() - logging.info("Main : thread %d done", index) - - while True: - for thread in threads: - if not thread.is_alive(): - logging.info("Main : Restarting thread !!") - time.sleep(15 * 60) - thread.start() + db_users_name = 'users' + if db_users_name in couchserver2: + db_users = couchserver2[db_users_name] + else: + db_users = couchserver2.create(db_users_name) - """ - for index, thread in enumerate(threads): - logging.info("Main : before joining thread %d.", index) - thread.join() - logging.info("Main : thread %d done", index) - """ - - -def getTwitterUsers(): - users_set = set() - filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r') + # Preparing a list of theard to cover all available credentials. + threads = list() + num_threads = len(TwitterCredentials.cred) + + for i in range(num_threads): + if i == 0: + # Search States + logging.info("Main : create and start Search thread %d.", i) + api_0 = utils.TwitterAPI(cred_rank=i) + threadSearch = threading.Thread(target=searchThread, + args=(api_0, db_twitter, db_users, + config.Keywords,)) + threads.append(threadSearch) + threadSearch.start() + else: + # Search user timelines + logging.info("Main : create and start Search Timeline thread %d.", i) + api_i = utils.TwitterAPI(cred_rank=i) + threadSearchTimeline = threading.Thread(target=searchTimelinesThread, + args=(api_i, db_twitter, db_users, + config.Keywords,)) + threads.append(threadSearchTimeline) + threadSearchTimeline.start() + + +def searchThread(api, dbtwitter, dbusers, filter_list): while True: - try: - line = filetweetsVIC.readline() - jline = json.loads(line) - users_set.add(jline['user']['screen_name']) - except: - break - - return list(users_set) + # Continuously iterating through the states and search for the target tweets + for state in config.States: + logging.info("Starting search for {} :".format(state['state_name'])) + api.search_tweets(state['state_name'], dbtwitter, dbusers, state['geocode'], filter_list) -def searchTimelinesThread(nametag, api, filter_list=[]): - # time.sleep(5 * 60) - users = getTwitterUsers() - for user in users: - tweets = api.retrieve_timeline_tweets(nametag,search_user=user, filter_list=filter_list) - - -def countLocations(): - filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r') - countall = 0 - countloc = 0 - users_set = set() +def searchTimelinesThread(api, db_twitter, db_users, filter_list=[]): while True: - try: - line = filetweetsVIC.readline() - jline = json.loads(line) - countall += 1 - if jline['coordinates'] is not None or jline['geo'] is not None or jline['place'] is not None: - countloc += 1 - users_set.add(jline['user']['screen_name']) - except: - break - - return "Location available in {} records out of {} Total, for {} users. ".format(countloc, countall, len(users_set)) - -def mapSA4(): - # sf = shapefile.Reader('SA4_2016_AUST.shx') - # print(sf.shapeTypeName, sf.bbox) - # fields = sf.fields - # shapes = sf.shapes() - # vicshapes = shapes[30:47] - # print(shapes[30].bbox) - return + # Continuously iterating through the users and search their timelines for the target tweets + for userid in db_users: + if db_users[userid]['searched']: + continue + else: + updated_user = db_users[userid] + updated_user['searched'] = True + updated_user['_id'] = userid + try: + db_users.save(updated_user) + except couchdb.http.ResourceConflict: + continue + api.retrieve_timeline_tweets(db_users[userid]['state'], db_twitter, search_user=userid, + filter_list=filter_list) if __name__ == "__main__": diff --git a/twitter_harvest/util/config.py b/twitter_harvest/util/config.py index a7e84ba..86e9ea7 100644 --- a/twitter_harvest/util/config.py +++ b/twitter_harvest/util/config.py @@ -1,12 +1,37 @@ -VIC_GeoLocation = "-37.840935,144.946457,50km" -MelbBoundingBox = [143.124990,-38.553368,146.915274,-36.276213] - -VIC_BBox = [142.729921,-39.017529,148.003359,-36.233694] -NSW_BBox = [141.068634,-34.690775,153.593105,-29.030032] -QLD_BBox = [138.159371,-28.800968,156.001168,-11.474203] -SA_BBox = [129.062692,-38.060715,140.971871,-26.031704] -NT_BBox = [128.711129,-25.952703,137.983590,-10.913785] -WA_BBox = [111.836129,-34.772836,129.018746,-13.832312] -AU_BBox = [110.338446,-43.005195,154.782046,-11.867894] -WORLD_BBox = [-158.417411,-50.190013, 163.347554,73.687492] +# https://www.mapdevelopers.com/draw-circle-tool.php +VIC_GeoLocation = "-37.804541,144.811588,221km" +NSW_GeoLocation = "-33.152083,150.870422,495km" +QLD_GeoLocation = "-20.176761,149.449390,987km" +SA_GeoLocation = "-33.910191,134.486057,585km" +WA_GeoLocation = "-26.036284,118.163063,1062km" +NA1_GeoLocation = "-14.337986,133.759122,512km" +NA2_GeoLocation = "-21.805241,133.517536,460km" +TAS_GeoLocation = "-41.611075,146.880160,235km" +ACT_GeoLocation = "-35.516222,149.056436,45km" + +VIC_BBox = [142.729921 ,-39.017529 ,148.003359 ,-36.233694] +NSW_BBox = [141.068634 ,-34.690775 ,153.593105 ,-29.030032] +QLD_BBox = [138.159371 ,-28.800968 ,156.001168 ,-11.474203] +SA_BBox = [129.062692 ,-38.060715 ,140.971871 ,-26.031704] +NT_BBox = [128.711129 ,-25.952703 ,137.983590 ,-10.913785] +WA_BBox = [111.836129 ,-34.772836 ,129.018746 ,-13.832312] +AU_BBox = [110.338446 ,-43.005195 ,154.782046 ,-11.867894] +WORLD_BBox = [-158.417411 ,-50.190013, 163.347554 ,73.687492] + +States = [ + {'state_name': 'VIC','geocode': VIC_GeoLocation}, + {'state_name': 'NSW', 'geocode': NSW_GeoLocation}, + {'state_name': 'QLD', 'geocode': QLD_GeoLocation}, + {'state_name': 'NA', 'geocode': NA1_GeoLocation}, + {'state_name': 'WA', 'geocode': WA_GeoLocation}, + {'state_name': 'SA', 'geocode': SA_GeoLocation}, + {'state_name': 'TAS', 'geocode': TAS_GeoLocation} + ] + +Keywords = ['covid19', 'coronavirus', 'covid-19'] +from_date = '2019/9/9' + +DB_Config = 'http://admin:admin@172.26.133.58:5984' +#DB_Config = 'http://admin:admin@45.113.234.209:5984' +#DB_Config = 'http://admin:admin@127.0.0.1:5984' diff --git a/twitter_harvest/util/utilities.py b/twitter_harvest/util/utilities.py index 0af54e1..b164bed 100644 --- a/twitter_harvest/util/utilities.py +++ b/twitter_harvest/util/utilities.py @@ -4,20 +4,19 @@ from tweepy import Cursor from tweepy.streaming import StreamListener from tweepy import Stream from tweepy import RateLimitError - +from dateutil.parser import parse as dateparser +from dateutil import tz from textblob import TextBlob -import argparse +from twitter_harvest.auth import TwitterCredentials +import twitter_harvest.util.config as config +import couchdb import json import logging import time import re -from datetime import date - -from twitter_harvest.auth import TwitterCredentials # Class for managing Twitter's API operations -# The class TwitterAPI(): def __init__(self, cred_rank=0, search_user=None): cred_count = len(TwitterCredentials.cred) @@ -28,57 +27,105 @@ class TwitterAPI(): self.twitter_api = API(self.auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) self.search_user = search_user - def search_tweets(self, nametag, geocode, filter_list): + def search_tweets(self, state, db_twitter, db_users, geocode, filter_list): filter_list = " OR ".join(filter_list) - logging.info("Searching %s: ", nametag) + logging.info("Searching %s: ", state) count = 0 - # try: for page in limit_handled( Cursor(self.twitter_api.search, q=filter_list, geocode=geocode, count=100, since_id=None).pages()): try: - target = open(nametag + "-" + str(date.today()) + ".txt", 'a', encoding="utf-8") for tweet in page: - """ + tweet = tweet._json + # convert the data into a simplified format for easy manipulation in CouchDB + tweet_datetime = dateparser(tweet['created_at']).replace(tzinfo=tz.tzlocal()) + if 'retweeted_status' in tweet: + retweet = True + else: + retweet = False + # Prepare a compact version of the tweets mini_tweet = { - 'created_at':tweet._json['created_at'], - 'id': tweet._json['id'], - 'text': tweet._json['text'], - 'user': tweet._json['user'], - 'geo': tweet._json['geo'], - 'coordinates': tweet._json['coordinates'], - 'place': tweet._json['place'], - 'lang': tweet._json['lang'], - 'sentiment': TextBlob(tweet._json['text']).sentiment + '_id': tweet['id_str'], + 'id': tweet['id_str'], + 'created_at': str(tweet_datetime), + 'text': tweet['text'], + 'user': tweet['user'], + 'state': state, + 'geo': tweet['geo'], + 'coordinates': tweet['coordinates'], + 'place': tweet['place'], + 'lang': tweet['lang'], + 'sentiment': TextBlob(tweet['text']).sentiment, #conduct sentiment analysis + 'retweet': retweet } - """ - count += 1 - target.write(json.dumps(tweet._json) + '\r\n') + juser = mini_tweet['user'] + mini_user = { + '_id': juser['id_str'], + 'id': juser['id_str'], + 'screen_name': juser['screen_name'], + 'location': juser['location'], + 'description': juser['description'], + 'followers_count': juser['followers_count'], + 'friends_count': juser['friends_count'], + 'state': state, + 'searched': False + } + try: + db_twitter.save(mini_tweet) + db_users.save(mini_user) + except couchdb.http.ResourceConflict: + pass except Exception as e: pass - # target.close() - - logging.info("Searched %s tweets: ", count) - # except Exception as e: - # logging.info("Interruption in search_tweets !!") - # return True - + logging.info("Searched tweets: {} , in {}".format(count, state)) return True - def retrieve_timeline_tweets(self, nametag, search_user=None, filter_list=[], since_id=None): + def retrieve_timeline_tweets(self, state, db_twitter, search_user=None, filter_list=[], since_id=None): if search_user is None: user = self.search_user else: user = search_user - target = open(nametag + "-usertweets-" + str(date.today()) + ".txt", 'a', encoding="utf-8") try: for page in limit_handled(Cursor(self.twitter_api.user_timeline, - screen_name=search_user, count=200, since_id=since_id).pages()): + user_id=search_user, count=200, since_id=since_id).pages()): for tweet in page: try: + # Limit the search timeline to specific date + if dateparser(tweet._json['created_at']).replace(tzinfo=tz.tzlocal()) \ + < dateparser(config.from_date).replace(tzinfo=tz.tzlocal()): + return True + + # check whether this tweet meet our search criteria or not if isMatching(tweet._json['text'], filter_list): - target.write(json.dumps(tweet._json) + '\r\n') + tweet = tweet._json + + # convert the data into a simplified format for easy manipulation in CouchDB + tweet_datetime = dateparser(tweet['created_at']).replace(tzinfo=tz.tzlocal()) + + if 'retweeted_status' in tweet: + retweet = True + else: + retweet = False + # Prepare a compact version of the tweets + mini_tweet = { + '_id': tweet['id_str'], + 'id': tweet['id_str'], + 'created_at': str(tweet_datetime), # tweet['created_at'] + 'text': tweet['text'], + 'user': tweet['user'], + 'state': state, + 'geo': tweet['geo'], + 'coordinates': tweet['coordinates'], + 'place': tweet['place'], + 'lang': tweet['lang'], + 'sentiment': TextBlob(tweet['text']).sentiment, #conduct sentiment analysis + 'retweet': retweet + } + try: + db_twitter.save(mini_tweet) + except couchdb.http.ResourceConflict: + pass except Exception: pass except Exception as e: @@ -121,14 +168,10 @@ class TwitterListener(StreamListener): def on_data(self, data): try: - # print(data) with open(self.filename, 'a', encoding="utf-8") as target: try: tweet = json.loads(data) - # if "australia" in str(tweet['user']['location']).lower(): target.write(data) - # else: - # pass except: logging.info("Error %s: thread", self.filename + " : " + data) pass @@ -146,7 +189,7 @@ class TwitterListener(StreamListener): time.sleep(5 * 60) return - +# regulating the tweets flow to meet the API limits def limit_handled(cursor): while True: try: @@ -154,9 +197,10 @@ def limit_handled(cursor): except RateLimitError: logging.info("RateLimitException !!") time.sleep(15 * 60) + except StopIteration as e: + return False - -# Check if at least one of the keywords in matchList exists in the text +# Check if the text is matching the search query def isMatching(text, matchList): try: p = re.compile(r'(?:{})'.format('|'.join(map(re.escape, matchList)))) -- GitLab