Skip to content
Snippets Groups Projects
Commit 779168b0 authored by Matthew O'Halloran's avatar Matthew O'Halloran
Browse files

Merge branch 'harvester_beta' into 'master'

harvester_beta2

See merge request !9
parents e61e2786 53de466a
No related branches found
No related tags found
1 merge request!9harvester_beta2
......@@ -2,5 +2,6 @@
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$PROJECT_DIR$/twitter_harvest/src/webpy" vcs="Git" />
</component>
</project>
\ No newline at end of file
# The access credentials for Twitter API - Account 1
ACCESS_TOKEN = "1247090411245432832-uI1WTHtzfAjcqEw5tl6Vms9TlC2P4C"
ACCESS_TOKEN_SECRET = "2kj3Dp0EpmajzXyBOBEHwD5KXuEgMmisMGsVQJCNqzKD6"
CONSUMER_KEY = "0jbzqFhSCP4DOwnnxiz4woIFz"
CONSUMER_SECRET = "Yi1ToBPMrGn13satRSYB5opG75nzg3rgIg5fS8XOkWDQ2QGHP7"
# The access credentials for Twitter API
cred = [{
'ACCESS_TOKEN':"1247090411245432832-uI1WTHtzfAjcqEw5tl6Vms9TlC2P4C" ,
'ACCESS_TOKEN_SECRET':"2kj3Dp0EpmajzXyBOBEHwD5KXuEgMmisMGsVQJCNqzKD6" ,
'CONSUMER_KEY':"0jbzqFhSCP4DOwnnxiz4woIFz" ,
'CONSUMER_SECRET':"Yi1ToBPMrGn13satRSYB5opG75nzg3rgIg5fS8XOkWDQ2QGHP7"
},
{
'ACCESS_TOKEN':"1255301814666383361-3fAON2pjoZaSj1Jhywq4n1rE8JSX5a" ,
'ACCESS_TOKEN_SECRET':"VYbTsC8ESkyxiHTi68wBezp4cPS3YyegmHObk3o7mXdBi" ,
'CONSUMER_KEY':"UWccr6Sb6TQDMDvQkkQPKF9Lg" ,
'CONSUMER_SECRET':"KfkOYNVvBoWMtBcb9INvrBROJC3QH3xo2Qw0gigDwCsHNEcXti"
},
{
'ACCESS_TOKEN': "472782005-K0iI81Al2d4zbBgCScTNBdRBAmAuM1rssUPE4z2I",
'ACCESS_TOKEN_SECRET': "iZimYCcKlKnoY80aaAAEZwfwAtJhf99kYGMIbtIg1KIKL",
'CONSUMER_KEY': "jCDsQ2YbbtULszBLff9MFphlL",
'CONSUMER_SECRET': "q6f6AUigGSYFvPbysGAOFRBaOeiGiEBOC9mOSdXPWzEUTvVPNa"
},
{
'ACCESS_TOKEN': "952806966371348481-IQ0xPxGSjrreFQVDqEEFQyrg0BMTUvC",
'ACCESS_TOKEN_SECRET': "JgzHMJrPF311Z4FxYy0dXIaU6bAtYpveiUoKaJsc9S80I",
'CONSUMER_KEY': "YGFJ4TYr1A3EfJKaCB0BnKnM8",
'CONSUMER_SECRET': "lN9GfdrOS5SnAlq6b7zx57DKyJTUWoN6p4fBpf7yxscLqI3VAp"
},
{
'ACCESS_TOKEN': "275499572-AugV2oYayyyWGkaBgOWQMYSmeKKUAgDOqvrXIKul",
'ACCESS_TOKEN_SECRET': "Vx0DBmEIFnUhfqQiZHnABHffuXArLeLjjuk1gXMBTQKjg",
'CONSUMER_KEY': "HjkDgma8SF8Rpbz8XTzeUKSf2",
'CONSUMER_SECRET': "zJRoTW7up79hYXs4Nbp61VzfPz15ueecWwUYsZ8vyYvzsqQABX"
},
{
'ACCESS_TOKEN': "1256833404592549889-w5HXCdkwHeKjvtmGdaNmgYEp3g3fge",
'ACCESS_TOKEN_SECRET': "pOTVrS8Zfe6BEEejqf7GMjuUE318eyHsctzvvVXRkX1bb",
'CONSUMER_KEY': "3bYyNS1oOKPMigUDLByIZttRF",
'CONSUMER_SECRET': "u0iWEmoHiH4VWuEMMO4tVwXUMy6r16BXDfJdGd0z6CHBA8NjPO"
}]
from tweepy import API
import twitter_harvest.util.utilities as utils
import re
import shapefile
import logging
import threading
import time
import json
import couchdb
import twitter_harvest.util.utilities as utils
import twitter_harvest.util.config as config
from twitter_harvest.auth import TwitterCredentials
def main():
print(countLocations())
# logger format
format = "%(asctime)s: %(message)s"
logging.basicConfig(filename='harvester.log', filemode='w', format=format,
level=logging.INFO, datefmt="%H:%M:%S")
threads = list()
# Thread for searching for Tweets by GeoLocation
logging.info("Main : create and start thread %d.", 0)
api = utils.TwitterAPI(cred_rank=0)
threadSearchTweets = threading.Thread(target=api.search_tweets,
args=('searchCovid', config.VIC_GeoLocation, ['covid19', 'coronavirus', 'covid-19'],))
threads.append(threadSearchTweets)
threadSearchTweets.start()
# Thread for searching for Tweets by Users
logging.info("Main : create and start thread %d.", 1)
api2 = utils.TwitterAPI(cred_rank=1)
threadSearchUserTweets = threading.Thread(target=searchTimelinesThread,
args=('searchCovid', api2, ['covid19', 'coronavirus', 'covid-19'],))
threads.append(threadSearchUserTweets)
threadSearchUserTweets.start()
# Thread for Streaming Tweets by GeoLocation
logging.info("Main : create and start thread %d.", 2)
streamVIC = utils.TwitterStream(cred_rank=1)
threadStreamTweets = threading.Thread(target=streamVIC.stream,
args=('tweetsVIC.txt', config.VIC_BBox, ['covid19', 'coronavirus', 'covid-19'],))
#threads.append(threadStreamTweets)
#threadStreamTweets.start()
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
# connection to CouchDB
db_conn = config.DB_Config
couchserver = couchdb.Server(db_conn)
couchserver2 = couchdb.Server(db_conn)
while True:
for thread in threads:
if not thread.is_alive():
logging.info("Main : Restarting thread !!")
time.sleep(15 * 60)
thread.start()
"""
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
"""
db_twitter_name = 'twitter'
if db_twitter_name in couchserver:
db_twitter = couchserver[db_twitter_name]
else:
db_twitter = couchserver.create(db_twitter_name)
db_users_name = 'users'
if db_users_name in couchserver2:
db_users = couchserver2[db_users_name]
else:
db_users = couchserver2.create(db_users_name)
def getTwitterUsers():
users_set = set()
filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r')
# Preparing a list of theard to cover all available credentials.
threads = list()
num_threads = len(TwitterCredentials.cred)
for i in range(num_threads):
if i == 0:
# Search States
logging.info("Main : create and start Search thread %d.", i)
api_0 = utils.TwitterAPI(cred_rank=i)
threadSearch = threading.Thread(target=searchThread,
args=(api_0, db_twitter, db_users,
config.Keywords,))
threads.append(threadSearch)
threadSearch.start()
else:
# Search user timelines
logging.info("Main : create and start Search Timeline thread %d.", i)
api_i = utils.TwitterAPI(cred_rank=i)
threadSearchTimeline = threading.Thread(target=searchTimelinesThread,
args=(api_i, db_twitter, db_users,
config.Keywords,))
threads.append(threadSearchTimeline)
threadSearchTimeline.start()
def searchThread(api, dbtwitter, dbusers, filter_list):
while True:
try:
line = filetweetsVIC.readline()
jline = json.loads(line)
users_set.add(jline['user']['screen_name'])
except:
break
return list(users_set)
# Continuously iterating through the states and search for the target tweets
for state in config.States:
logging.info("Starting search for {} :".format(state['state_name']))
api.search_tweets(state['state_name'], dbtwitter, dbusers, state['geocode'], filter_list)
def searchTimelinesThread(nametag, api, filter_list=[]):
# time.sleep(5 * 60)
users = getTwitterUsers()
for user in users:
tweets = api.retrieve_timeline_tweets(nametag,search_user=user, filter_list=filter_list)
def countLocations():
filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r')
countall = 0
countloc = 0
users_set = set()
def searchTimelinesThread(api, db_twitter, db_users, filter_list=[]):
while True:
# Continuously iterating through the users and search their timelines for the target tweets
for userid in db_users:
if db_users[userid]['searched']:
continue
else:
updated_user = db_users[userid]
updated_user['searched'] = True
updated_user['_id'] = userid
try:
line = filetweetsVIC.readline()
jline = json.loads(line)
countall += 1
if jline['coordinates'] is not None or jline['geo'] is not None or jline['place'] is not None:
countloc += 1
users_set.add(jline['user']['screen_name'])
except:
break
return "Location available in {} records out of {} Total, for {} users. ".format(countloc, countall, len(users_set))
def mapSA4():
# sf = shapefile.Reader('SA4_2016_AUST.shx')
# print(sf.shapeTypeName, sf.bbox)
# fields = sf.fields
# shapes = sf.shapes()
# vicshapes = shapes[30:47]
# print(shapes[30].bbox)
return
db_users.save(updated_user)
except couchdb.http.ResourceConflict:
continue
api.retrieve_timeline_tweets(db_users[userid]['state'], db_twitter, search_user=userid,
filter_list=filter_list)
if __name__ == "__main__":
......
VIC_GeoLocation = "-37.840935,144.946457,50km"
MelbBoundingBox = [143.124990,-38.553368,146.915274,-36.276213]
# https://www.mapdevelopers.com/draw-circle-tool.php
VIC_GeoLocation = "-37.804541,144.811588,221km"
NSW_GeoLocation = "-33.152083,150.870422,495km"
QLD_GeoLocation = "-20.176761,149.449390,987km"
SA_GeoLocation = "-33.910191,134.486057,585km"
WA_GeoLocation = "-26.036284,118.163063,1062km"
NA1_GeoLocation = "-14.337986,133.759122,512km"
NA2_GeoLocation = "-21.805241,133.517536,460km"
TAS_GeoLocation = "-41.611075,146.880160,235km"
ACT_GeoLocation = "-35.516222,149.056436,45km"
VIC_BBox = [142.729921 ,-39.017529 ,148.003359 ,-36.233694]
NSW_BBox = [141.068634 ,-34.690775 ,153.593105 ,-29.030032]
......@@ -10,3 +18,20 @@ NT_BBox = [128.711129,-25.952703,137.983590,-10.913785]
WA_BBox = [111.836129 ,-34.772836 ,129.018746 ,-13.832312]
AU_BBox = [110.338446 ,-43.005195 ,154.782046 ,-11.867894]
WORLD_BBox = [-158.417411 ,-50.190013, 163.347554 ,73.687492]
States = [
{'state_name': 'VIC','geocode': VIC_GeoLocation},
{'state_name': 'NSW', 'geocode': NSW_GeoLocation},
{'state_name': 'QLD', 'geocode': QLD_GeoLocation},
{'state_name': 'NA', 'geocode': NA1_GeoLocation},
{'state_name': 'WA', 'geocode': WA_GeoLocation},
{'state_name': 'SA', 'geocode': SA_GeoLocation},
{'state_name': 'TAS', 'geocode': TAS_GeoLocation}
]
Keywords = ['covid19', 'coronavirus', 'covid-19']
from_date = '2019/9/9'
DB_Config = 'http://admin:admin@172.26.133.58:5984'
#DB_Config = 'http://admin:admin@45.113.234.209:5984'
#DB_Config = 'http://admin:admin@127.0.0.1:5984'
......@@ -4,20 +4,19 @@ from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import RateLimitError
from dateutil.parser import parse as dateparser
from dateutil import tz
from textblob import TextBlob
import argparse
from twitter_harvest.auth import TwitterCredentials
import twitter_harvest.util.config as config
import couchdb
import json
import logging
import time
import re
from datetime import date
from twitter_harvest.auth import TwitterCredentials
# Class for managing Twitter's API operations
# The
class TwitterAPI():
def __init__(self, cred_rank=0, search_user=None):
cred_count = len(TwitterCredentials.cred)
......@@ -28,57 +27,105 @@ class TwitterAPI():
self.twitter_api = API(self.auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
self.search_user = search_user
def search_tweets(self, nametag, geocode, filter_list):
def search_tweets(self, state, db_twitter, db_users, geocode, filter_list):
filter_list = " OR ".join(filter_list)
logging.info("Searching %s: ", nametag)
logging.info("Searching %s: ", state)
count = 0
# try:
for page in limit_handled(
Cursor(self.twitter_api.search, q=filter_list, geocode=geocode, count=100, since_id=None).pages()):
try:
target = open(nametag + "-" + str(date.today()) + ".txt", 'a', encoding="utf-8")
for tweet in page:
"""
tweet = tweet._json
# convert the data into a simplified format for easy manipulation in CouchDB
tweet_datetime = dateparser(tweet['created_at']).replace(tzinfo=tz.tzlocal())
if 'retweeted_status' in tweet:
retweet = True
else:
retweet = False
# Prepare a compact version of the tweets
mini_tweet = {
'created_at':tweet._json['created_at'],
'id': tweet._json['id'],
'text': tweet._json['text'],
'user': tweet._json['user'],
'geo': tweet._json['geo'],
'coordinates': tweet._json['coordinates'],
'place': tweet._json['place'],
'lang': tweet._json['lang'],
'sentiment': TextBlob(tweet._json['text']).sentiment
'_id': tweet['id_str'],
'id': tweet['id_str'],
'created_at': str(tweet_datetime),
'text': tweet['text'],
'user': tweet['user'],
'state': state,
'geo': tweet['geo'],
'coordinates': tweet['coordinates'],
'place': tweet['place'],
'lang': tweet['lang'],
'sentiment': TextBlob(tweet['text']).sentiment, #conduct sentiment analysis
'retweet': retweet
}
juser = mini_tweet['user']
mini_user = {
'_id': juser['id_str'],
'id': juser['id_str'],
'screen_name': juser['screen_name'],
'location': juser['location'],
'description': juser['description'],
'followers_count': juser['followers_count'],
'friends_count': juser['friends_count'],
'state': state,
'searched': False
}
"""
count += 1
target.write(json.dumps(tweet._json) + '\r\n')
try:
db_twitter.save(mini_tweet)
db_users.save(mini_user)
except couchdb.http.ResourceConflict:
pass
except Exception as e:
pass
# target.close()
logging.info("Searched %s tweets: ", count)
# except Exception as e:
# logging.info("Interruption in search_tweets !!")
# return True
logging.info("Searched tweets: {} , in {}".format(count, state))
return True
def retrieve_timeline_tweets(self, nametag, search_user=None, filter_list=[], since_id=None):
def retrieve_timeline_tweets(self, state, db_twitter, search_user=None, filter_list=[], since_id=None):
if search_user is None:
user = self.search_user
else:
user = search_user
target = open(nametag + "-usertweets-" + str(date.today()) + ".txt", 'a', encoding="utf-8")
try:
for page in limit_handled(Cursor(self.twitter_api.user_timeline,
screen_name=search_user, count=200, since_id=since_id).pages()):
user_id=search_user, count=200, since_id=since_id).pages()):
for tweet in page:
try:
# Limit the search timeline to specific date
if dateparser(tweet._json['created_at']).replace(tzinfo=tz.tzlocal()) \
< dateparser(config.from_date).replace(tzinfo=tz.tzlocal()):
return True
# check whether this tweet meet our search criteria or not
if isMatching(tweet._json['text'], filter_list):
target.write(json.dumps(tweet._json) + '\r\n')
tweet = tweet._json
# convert the data into a simplified format for easy manipulation in CouchDB
tweet_datetime = dateparser(tweet['created_at']).replace(tzinfo=tz.tzlocal())
if 'retweeted_status' in tweet:
retweet = True
else:
retweet = False
# Prepare a compact version of the tweets
mini_tweet = {
'_id': tweet['id_str'],
'id': tweet['id_str'],
'created_at': str(tweet_datetime), # tweet['created_at']
'text': tweet['text'],
'user': tweet['user'],
'state': state,
'geo': tweet['geo'],
'coordinates': tweet['coordinates'],
'place': tweet['place'],
'lang': tweet['lang'],
'sentiment': TextBlob(tweet['text']).sentiment, #conduct sentiment analysis
'retweet': retweet
}
try:
db_twitter.save(mini_tweet)
except couchdb.http.ResourceConflict:
pass
except Exception:
pass
except Exception as e:
......@@ -121,14 +168,10 @@ class TwitterListener(StreamListener):
def on_data(self, data):
try:
# print(data)
with open(self.filename, 'a', encoding="utf-8") as target:
try:
tweet = json.loads(data)
# if "australia" in str(tweet['user']['location']).lower():
target.write(data)
# else:
# pass
except:
logging.info("Error %s: thread", self.filename + " : " + data)
pass
......@@ -146,7 +189,7 @@ class TwitterListener(StreamListener):
time.sleep(5 * 60)
return
# regulating the tweets flow to meet the API limits
def limit_handled(cursor):
while True:
try:
......@@ -154,9 +197,10 @@ def limit_handled(cursor):
except RateLimitError:
logging.info("RateLimitException !!")
time.sleep(15 * 60)
except StopIteration as e:
return False
# Check if at least one of the keywords in matchList exists in the text
# Check if the text is matching the search query
def isMatching(text, matchList):
try:
p = re.compile(r'(?:{})'.format('|'.join(map(re.escape, matchList))))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment