diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000000000000000000000000000000000000..94a25f7f4cb416c083d265558da75d457237d671 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="$PROJECT_DIR$" vcs="Git" /> + </component> +</project> \ No newline at end of file diff --git a/twitter_harvest/harvester.py b/twitter_harvest/harvester.py new file mode 100644 index 0000000000000000000000000000000000000000..69b6fbafac7380a50d4d4ef85571f864ed5238ab --- /dev/null +++ b/twitter_harvest/harvester.py @@ -0,0 +1,116 @@ +from tweepy import API +import twitter_harvest.util.utilities as utils +import re +import shapefile +import logging +import threading +import time +import json +import twitter_harvest.util.config as config + + +def main(): + + print(countLocations()) + + format = "%(asctime)s: %(message)s" + logging.basicConfig(filename='harvester.log', filemode='w', format=format, + level=logging.INFO, datefmt="%H:%M:%S") + + threads = list() + + # Thread for searching for Tweets by GeoLocation + logging.info("Main : create and start thread %d.", 0) + api = utils.TwitterAPI(cred_rank=0) + threadSearchTweets = threading.Thread(target=api.search_tweets, + args=('searchCovid', config.VIC_GeoLocation, ['covid19', 'coronavirus', 'covid-19'],)) + threads.append(threadSearchTweets) + threadSearchTweets.start() + + # Thread for searching for Tweets by Users + logging.info("Main : create and start thread %d.", 1) + api2 = utils.TwitterAPI(cred_rank=1) + threadSearchUserTweets = threading.Thread(target=searchTimelinesThread, + args=('searchCovid', api2, ['covid19', 'coronavirus', 'covid-19'],)) + threads.append(threadSearchUserTweets) + threadSearchUserTweets.start() + + # Thread for Streaming Tweets by GeoLocation + logging.info("Main : create and start thread %d.", 2) + streamVIC = utils.TwitterStream(cred_rank=1) + threadStreamTweets = threading.Thread(target=streamVIC.stream, + args=('tweetsVIC.txt', config.VIC_BBox, ['covid19', 'coronavirus', 'covid-19'],)) + #threads.append(threadStreamTweets) + #threadStreamTweets.start() + + for index, thread in enumerate(threads): + logging.info("Main : before joining thread %d.", index) + thread.join() + logging.info("Main : thread %d done", index) + + while True: + for thread in threads: + if not thread.is_alive(): + logging.info("Main : Restarting thread !!") + time.sleep(15 * 60) + thread.start() + + """ + for index, thread in enumerate(threads): + logging.info("Main : before joining thread %d.", index) + thread.join() + logging.info("Main : thread %d done", index) + """ + + +def getTwitterUsers(): + users_set = set() + filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r') + while True: + try: + line = filetweetsVIC.readline() + jline = json.loads(line) + users_set.add(jline['user']['screen_name']) + except: + break + + return list(users_set) + + +def searchTimelinesThread(nametag, api, filter_list=[]): + # time.sleep(5 * 60) + users = getTwitterUsers() + for user in users: + tweets = api.retrieve_timeline_tweets(nametag,search_user=user, filter_list=filter_list) + + +def countLocations(): + filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r') + countall = 0 + countloc = 0 + users_set = set() + while True: + try: + line = filetweetsVIC.readline() + jline = json.loads(line) + countall += 1 + if jline['coordinates'] is not None or jline['geo'] is not None or jline['place'] is not None: + countloc += 1 + users_set.add(jline['user']['screen_name']) + except: + break + + return "Location available in {} records out of {} Total, for {} users. ".format(countloc, countall, len(users_set)) + +def mapSA4(): + # sf = shapefile.Reader('SA4_2016_AUST.shx') + # print(sf.shapeTypeName, sf.bbox) + # fields = sf.fields + # shapes = sf.shapes() + # vicshapes = shapes[30:47] + # print(shapes[30].bbox) + return + + +if __name__ == "__main__": + main() diff --git a/twitter_harvest/search/tweepy-search.py b/twitter_harvest/search/tweepy-search.py deleted file mode 100644 index 0f2d17753d03e9e6f4e48dad5b2634f8f865699a..0000000000000000000000000000000000000000 --- a/twitter_harvest/search/tweepy-search.py +++ /dev/null @@ -1,39 +0,0 @@ -from tweepy import API -import twitter_harvest.utilities.utilities as utils -import re - - -def get_tweets(query, consumer_secret=300): - # empty list to store parsed tweets - tweets = [] - target = open("tweets.txt", 'w', encoding="utf-8") - t1 = open("review.txt", 'w', encoding="utf-8") - - # call twitter api to fetch tweets - fetched_tweets = API.search(query, count=100) - # Other params, geocode, lang - - for tweet in fetched_tweets: - # empty dictionary to store required params of a tweet - parsed_tweet = {} - # saving text of tweet - parsed_tweet['text'] = tweet.text - if "http" not in tweet.text: - line = re.sub("[^A-Za-z]", " ", tweet.text) - target.write(line + "\n") - t1.write(line + "\n") - return tweets - - -def main(): - # Create instance of Twitter API - # api = utils.TwitterAPI('HSunRealestate') - # tweets = api.retrieve_timeline_tweets(num_of_tweets=100) - # print(tweets) - - stream = utils.TwitterStream() - stream.stream('tweets.txt', ['covid19', 'coronavirus', 'covid-19']) - - -if __name__ == "__main__": - main() diff --git a/twitter_harvest/stream/tweepy-stream.py b/twitter_harvest/stream/tweepy-stream.py deleted file mode 100644 index a964824d2756174435b43a493ee64e17d514dac5..0000000000000000000000000000000000000000 --- a/twitter_harvest/stream/tweepy-stream.py +++ /dev/null @@ -1,33 +0,0 @@ -import tweepy - -consumer_key = -consumer_secret = - -#override tweepy.StreamListener to add logic to on_status -class MyStreamListener(tweepy.StreamListener): - - def on_status(self, status): - print(status.text) - - def on_error(self, status_code): - if status_code == 420: - #returning False in on_error disconnects the stream - return False - - # returning non-False reconnects the stream, with backoff. - -def main(): - # OAuthHandler instance OAuth 2 Authentication - auth = tweepy.OAuthHandler(consumer_key, consumer_secret) - - api = tweepy.API(auth) - - myStreamListener = MyStreamListener() - myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener) - - myStream.filter(track=['python']) - - -if __name__ == "__main__": - main() - diff --git a/twitter_harvest/util/config.py b/twitter_harvest/util/config.py new file mode 100644 index 0000000000000000000000000000000000000000..a7e84bac093fde064c4a149d21d3fd9353c2b704 --- /dev/null +++ b/twitter_harvest/util/config.py @@ -0,0 +1,12 @@ + +VIC_GeoLocation = "-37.840935,144.946457,50km" +MelbBoundingBox = [143.124990,-38.553368,146.915274,-36.276213] + +VIC_BBox = [142.729921,-39.017529,148.003359,-36.233694] +NSW_BBox = [141.068634,-34.690775,153.593105,-29.030032] +QLD_BBox = [138.159371,-28.800968,156.001168,-11.474203] +SA_BBox = [129.062692,-38.060715,140.971871,-26.031704] +NT_BBox = [128.711129,-25.952703,137.983590,-10.913785] +WA_BBox = [111.836129,-34.772836,129.018746,-13.832312] +AU_BBox = [110.338446,-43.005195,154.782046,-11.867894] +WORLD_BBox = [-158.417411,-50.190013, 163.347554,73.687492] diff --git a/twitter_harvest/util/utilities.py b/twitter_harvest/util/utilities.py new file mode 100644 index 0000000000000000000000000000000000000000..0af54e11c6c21d927fb14a5e0ae77a435e9346f4 --- /dev/null +++ b/twitter_harvest/util/utilities.py @@ -0,0 +1,169 @@ +from tweepy import API +from tweepy import OAuthHandler +from tweepy import Cursor +from tweepy.streaming import StreamListener +from tweepy import Stream +from tweepy import RateLimitError + +from textblob import TextBlob +import argparse +import json +import logging +import time +import re +from datetime import date + +from twitter_harvest.auth import TwitterCredentials + + +# Class for managing Twitter's API operations +# The +class TwitterAPI(): + def __init__(self, cred_rank=0, search_user=None): + cred_count = len(TwitterCredentials.cred) + self.auth = OAuthHandler(TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_KEY"], + TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_SECRET"]) + self.auth.set_access_token(TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN"], + TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN_SECRET"]) + self.twitter_api = API(self.auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) + self.search_user = search_user + + def search_tweets(self, nametag, geocode, filter_list): + filter_list = " OR ".join(filter_list) + + logging.info("Searching %s: ", nametag) + count = 0 + # try: + for page in limit_handled( + Cursor(self.twitter_api.search, q=filter_list, geocode=geocode, count=100, since_id=None).pages()): + try: + target = open(nametag + "-" + str(date.today()) + ".txt", 'a', encoding="utf-8") + for tweet in page: + """ + mini_tweet = { + 'created_at':tweet._json['created_at'], + 'id': tweet._json['id'], + 'text': tweet._json['text'], + 'user': tweet._json['user'], + 'geo': tweet._json['geo'], + 'coordinates': tweet._json['coordinates'], + 'place': tweet._json['place'], + 'lang': tweet._json['lang'], + 'sentiment': TextBlob(tweet._json['text']).sentiment + } + """ + count += 1 + target.write(json.dumps(tweet._json) + '\r\n') + except Exception as e: + pass + # target.close() + + logging.info("Searched %s tweets: ", count) + # except Exception as e: + # logging.info("Interruption in search_tweets !!") + # return True + + return True + + def retrieve_timeline_tweets(self, nametag, search_user=None, filter_list=[], since_id=None): + if search_user is None: + user = self.search_user + else: + user = search_user + + target = open(nametag + "-usertweets-" + str(date.today()) + ".txt", 'a', encoding="utf-8") + try: + for page in limit_handled(Cursor(self.twitter_api.user_timeline, + screen_name=search_user, count=200, since_id=since_id).pages()): + for tweet in page: + try: + if isMatching(tweet._json['text'], filter_list): + target.write(json.dumps(tweet._json) + '\r\n') + except Exception: + pass + except Exception as e: + logging.info("Interruption in retrieve_timeline_tweets ") + return True + + return True + + def retrieve_friends(self, num_of_friends): + friends = [] + for friend in limit_handled(Cursor(self.search_user.friends, id=self.search_user).items(num_of_friends)): + friend.append(friend) + return friends + + def limit_status(self): + return self.twitter_api.rate_limit_status() + + +class TwitterStream: + + def __init__(self, cred_rank=0): + cred_count = len(TwitterCredentials.cred) + self.auth = OAuthHandler(TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_KEY"], + TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_SECRET"]) + self.auth.set_access_token(TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN"], + TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN_SECRET"]) + + def stream(self, filename, bbox, filter_list): + self.filename = filename + logging.info("Streamer %s: starting", filename) + listener = TwitterListener(filename) + stream = Stream(self.auth, listener) + stream.filter(track=filter_list, locations=bbox) + + +class TwitterListener(StreamListener): + + def __init__(self, filename): + self.filename = filename + + def on_data(self, data): + try: + # print(data) + with open(self.filename, 'a', encoding="utf-8") as target: + try: + tweet = json.loads(data) + # if "australia" in str(tweet['user']['location']).lower(): + target.write(data) + # else: + # pass + except: + logging.info("Error %s: thread", self.filename + " : " + data) + pass + return True + except BaseException as e: + print("Error on_data %s" % str(e)) + return True + + def on_error(self, status): + logging.info("Error %s: thread", self.filename + str(status)) + if status <= 420: + return False + + def on_timeout(self): + time.sleep(5 * 60) + return + + +def limit_handled(cursor): + while True: + try: + yield cursor.next() + except RateLimitError: + logging.info("RateLimitException !!") + time.sleep(15 * 60) + + +# Check if at least one of the keywords in matchList exists in the text +def isMatching(text, matchList): + try: + p = re.compile(r'(?:{})'.format('|'.join(map(re.escape, matchList)))) + if p.search(text.lower()): + return True + else: + return False + except Exception as e: + print(e.args[0]) + return False diff --git a/twitter_harvest/utilities/utilities.py b/twitter_harvest/utilities/utilities.py deleted file mode 100644 index 0d8430f2ffeb47511a357c4e19bfb252103a5b8b..0000000000000000000000000000000000000000 --- a/twitter_harvest/utilities/utilities.py +++ /dev/null @@ -1,61 +0,0 @@ -from tweepy import API -from tweepy import OAuthHandler -from tweepy import Cursor -from tweepy.streaming import StreamListener -from tweepy import Stream - -from twitter_harvest.auth import TwitterCredentials - - -class TwitterAPI(): - def __init__(self, search_user=None): - self.auth = OAuthHandler(TwitterCredentials.CONSUMER_KEY, TwitterCredentials.CONSUMER_SECRET) - self.auth.set_access_token(TwitterCredentials.ACCESS_TOKEN, TwitterCredentials.ACCESS_TOKEN_SECRET) - self.twitter_api = API(self.auth) - self.search_user = search_user - - def retrieve_timeline_tweets(self, num_of_tweets=100): - tweets = [] - for tweet in Cursor(self.twitter_api.user_timeline, id=self.search_user).items(num_of_tweets): - tweets.append(tweet) - return tweets - - def retrieve_friends(self, num_of_friends): - friends = [] - for friend in Cursor(self.search_user.friends, id=self.search_user).items(num_of_friends): - friend.append(friend) - return friends - - -class TwitterStream: - - def __init__(self): - self.auth = OAuthHandler(TwitterCredentials.CONSUMER_KEY, TwitterCredentials.CONSUMER_SECRET) - self.auth.set_access_token(TwitterCredentials.ACCESS_TOKEN, TwitterCredentials.ACCESS_TOKEN_SECRET) - - def stream(self, filename, filter_list): - listener = TwitterListener(filename) - stream = Stream(self.auth, listener) - - stream.filter(track=filter_list) - - -class TwitterListener(StreamListener): - - def __init__(self, filename): - self.filename = filename - - def on_data(self, data): - try: - print(data) - with open(self.filename, 'a') as target: - target.write(data) - return True - except BaseException as e: - print("Error on_data %s" % str(e)) - return True - - def on_error(self, status): - if status == 420: - return False - print(status)