Skip to content
Snippets Groups Projects
Commit 6877c065 authored by Ahmed Mahmoud's avatar Ahmed Mahmoud
Browse files

Harvester beta version 1

parent 3d0db32e
Branches
No related tags found
1 merge request!5Harvester beta version 1
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
from tweepy import API
import twitter_harvest.util.utilities as utils
import re
import shapefile
import logging
import threading
import time
import json
import twitter_harvest.util.config as config
def main():
print(countLocations())
format = "%(asctime)s: %(message)s"
logging.basicConfig(filename='harvester.log', filemode='w', format=format,
level=logging.INFO, datefmt="%H:%M:%S")
threads = list()
# Thread for searching for Tweets by GeoLocation
logging.info("Main : create and start thread %d.", 0)
api = utils.TwitterAPI(cred_rank=0)
threadSearchTweets = threading.Thread(target=api.search_tweets,
args=('searchCovid', config.VIC_GeoLocation, ['covid19', 'coronavirus', 'covid-19'],))
threads.append(threadSearchTweets)
threadSearchTweets.start()
# Thread for searching for Tweets by Users
logging.info("Main : create and start thread %d.", 1)
api2 = utils.TwitterAPI(cred_rank=1)
threadSearchUserTweets = threading.Thread(target=searchTimelinesThread,
args=('searchCovid', api2, ['covid19', 'coronavirus', 'covid-19'],))
threads.append(threadSearchUserTweets)
threadSearchUserTweets.start()
# Thread for Streaming Tweets by GeoLocation
logging.info("Main : create and start thread %d.", 2)
streamVIC = utils.TwitterStream(cred_rank=1)
threadStreamTweets = threading.Thread(target=streamVIC.stream,
args=('tweetsVIC.txt', config.VIC_BBox, ['covid19', 'coronavirus', 'covid-19'],))
#threads.append(threadStreamTweets)
#threadStreamTweets.start()
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
while True:
for thread in threads:
if not thread.is_alive():
logging.info("Main : Restarting thread !!")
time.sleep(15 * 60)
thread.start()
"""
for index, thread in enumerate(threads):
logging.info("Main : before joining thread %d.", index)
thread.join()
logging.info("Main : thread %d done", index)
"""
def getTwitterUsers():
users_set = set()
filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r')
while True:
try:
line = filetweetsVIC.readline()
jline = json.loads(line)
users_set.add(jline['user']['screen_name'])
except:
break
return list(users_set)
def searchTimelinesThread(nametag, api, filter_list=[]):
# time.sleep(5 * 60)
users = getTwitterUsers()
for user in users:
tweets = api.retrieve_timeline_tweets(nametag,search_user=user, filter_list=filter_list)
def countLocations():
filetweetsVIC = open('searchCovid-2020-05-01.txt', 'r')
countall = 0
countloc = 0
users_set = set()
while True:
try:
line = filetweetsVIC.readline()
jline = json.loads(line)
countall += 1
if jline['coordinates'] is not None or jline['geo'] is not None or jline['place'] is not None:
countloc += 1
users_set.add(jline['user']['screen_name'])
except:
break
return "Location available in {} records out of {} Total, for {} users. ".format(countloc, countall, len(users_set))
def mapSA4():
# sf = shapefile.Reader('SA4_2016_AUST.shx')
# print(sf.shapeTypeName, sf.bbox)
# fields = sf.fields
# shapes = sf.shapes()
# vicshapes = shapes[30:47]
# print(shapes[30].bbox)
return
if __name__ == "__main__":
main()
from tweepy import API
import twitter_harvest.utilities.utilities as utils
import re
def get_tweets(query, consumer_secret=300):
# empty list to store parsed tweets
tweets = []
target = open("tweets.txt", 'w', encoding="utf-8")
t1 = open("review.txt", 'w', encoding="utf-8")
# call twitter api to fetch tweets
fetched_tweets = API.search(query, count=100)
# Other params, geocode, lang
for tweet in fetched_tweets:
# empty dictionary to store required params of a tweet
parsed_tweet = {}
# saving text of tweet
parsed_tweet['text'] = tweet.text
if "http" not in tweet.text:
line = re.sub("[^A-Za-z]", " ", tweet.text)
target.write(line + "\n")
t1.write(line + "\n")
return tweets
def main():
# Create instance of Twitter API
# api = utils.TwitterAPI('HSunRealestate')
# tweets = api.retrieve_timeline_tweets(num_of_tweets=100)
# print(tweets)
stream = utils.TwitterStream()
stream.stream('tweets.txt', ['covid19', 'coronavirus', 'covid-19'])
if __name__ == "__main__":
main()
import tweepy
consumer_key =
consumer_secret =
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.text)
def on_error(self, status_code):
if status_code == 420:
#returning False in on_error disconnects the stream
return False
# returning non-False reconnects the stream, with backoff.
def main():
# OAuthHandler instance OAuth 2 Authentication
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
myStream.filter(track=['python'])
if __name__ == "__main__":
main()
VIC_GeoLocation = "-37.840935,144.946457,50km"
MelbBoundingBox = [143.124990,-38.553368,146.915274,-36.276213]
VIC_BBox = [142.729921,-39.017529,148.003359,-36.233694]
NSW_BBox = [141.068634,-34.690775,153.593105,-29.030032]
QLD_BBox = [138.159371,-28.800968,156.001168,-11.474203]
SA_BBox = [129.062692,-38.060715,140.971871,-26.031704]
NT_BBox = [128.711129,-25.952703,137.983590,-10.913785]
WA_BBox = [111.836129,-34.772836,129.018746,-13.832312]
AU_BBox = [110.338446,-43.005195,154.782046,-11.867894]
WORLD_BBox = [-158.417411,-50.190013, 163.347554,73.687492]
from tweepy import API
from tweepy import OAuthHandler
from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import Stream
from tweepy import RateLimitError
from textblob import TextBlob
import argparse
import json
import logging
import time
import re
from datetime import date
from twitter_harvest.auth import TwitterCredentials
# Class for managing Twitter's API operations
# The
class TwitterAPI():
def __init__(self, cred_rank=0, search_user=None):
cred_count = len(TwitterCredentials.cred)
self.auth = OAuthHandler(TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_KEY"],
TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_SECRET"])
self.auth.set_access_token(TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN"],
TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN_SECRET"])
self.twitter_api = API(self.auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
self.search_user = search_user
def search_tweets(self, nametag, geocode, filter_list):
filter_list = " OR ".join(filter_list)
logging.info("Searching %s: ", nametag)
count = 0
# try:
for page in limit_handled(
Cursor(self.twitter_api.search, q=filter_list, geocode=geocode, count=100, since_id=None).pages()):
try:
target = open(nametag + "-" + str(date.today()) + ".txt", 'a', encoding="utf-8")
for tweet in page:
"""
mini_tweet = {
'created_at':tweet._json['created_at'],
'id': tweet._json['id'],
'text': tweet._json['text'],
'user': tweet._json['user'],
'geo': tweet._json['geo'],
'coordinates': tweet._json['coordinates'],
'place': tweet._json['place'],
'lang': tweet._json['lang'],
'sentiment': TextBlob(tweet._json['text']).sentiment
}
"""
count += 1
target.write(json.dumps(tweet._json) + '\r\n')
except Exception as e:
pass
# target.close()
logging.info("Searched %s tweets: ", count)
# except Exception as e:
# logging.info("Interruption in search_tweets !!")
# return True
return True
def retrieve_timeline_tweets(self, nametag, search_user=None, filter_list=[], since_id=None):
if search_user is None:
user = self.search_user
else:
user = search_user
target = open(nametag + "-usertweets-" + str(date.today()) + ".txt", 'a', encoding="utf-8")
try:
for page in limit_handled(Cursor(self.twitter_api.user_timeline,
screen_name=search_user, count=200, since_id=since_id).pages()):
for tweet in page:
try:
if isMatching(tweet._json['text'], filter_list):
target.write(json.dumps(tweet._json) + '\r\n')
except Exception:
pass
except Exception as e:
logging.info("Interruption in retrieve_timeline_tweets ")
return True
return True
def retrieve_friends(self, num_of_friends):
friends = []
for friend in limit_handled(Cursor(self.search_user.friends, id=self.search_user).items(num_of_friends)):
friend.append(friend)
return friends
def limit_status(self):
return self.twitter_api.rate_limit_status()
class TwitterStream:
def __init__(self, cred_rank=0):
cred_count = len(TwitterCredentials.cred)
self.auth = OAuthHandler(TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_KEY"],
TwitterCredentials.cred[cred_rank % cred_count]["CONSUMER_SECRET"])
self.auth.set_access_token(TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN"],
TwitterCredentials.cred[cred_rank % cred_count]["ACCESS_TOKEN_SECRET"])
def stream(self, filename, bbox, filter_list):
self.filename = filename
logging.info("Streamer %s: starting", filename)
listener = TwitterListener(filename)
stream = Stream(self.auth, listener)
stream.filter(track=filter_list, locations=bbox)
class TwitterListener(StreamListener):
def __init__(self, filename):
self.filename = filename
def on_data(self, data):
try:
# print(data)
with open(self.filename, 'a', encoding="utf-8") as target:
try:
tweet = json.loads(data)
# if "australia" in str(tweet['user']['location']).lower():
target.write(data)
# else:
# pass
except:
logging.info("Error %s: thread", self.filename + " : " + data)
pass
return True
except BaseException as e:
print("Error on_data %s" % str(e))
return True
def on_error(self, status):
logging.info("Error %s: thread", self.filename + str(status))
if status <= 420:
return False
def on_timeout(self):
time.sleep(5 * 60)
return
def limit_handled(cursor):
while True:
try:
yield cursor.next()
except RateLimitError:
logging.info("RateLimitException !!")
time.sleep(15 * 60)
# Check if at least one of the keywords in matchList exists in the text
def isMatching(text, matchList):
try:
p = re.compile(r'(?:{})'.format('|'.join(map(re.escape, matchList))))
if p.search(text.lower()):
return True
else:
return False
except Exception as e:
print(e.args[0])
return False
from tweepy import API
from tweepy import OAuthHandler
from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import Stream
from twitter_harvest.auth import TwitterCredentials
class TwitterAPI():
def __init__(self, search_user=None):
self.auth = OAuthHandler(TwitterCredentials.CONSUMER_KEY, TwitterCredentials.CONSUMER_SECRET)
self.auth.set_access_token(TwitterCredentials.ACCESS_TOKEN, TwitterCredentials.ACCESS_TOKEN_SECRET)
self.twitter_api = API(self.auth)
self.search_user = search_user
def retrieve_timeline_tweets(self, num_of_tweets=100):
tweets = []
for tweet in Cursor(self.twitter_api.user_timeline, id=self.search_user).items(num_of_tweets):
tweets.append(tweet)
return tweets
def retrieve_friends(self, num_of_friends):
friends = []
for friend in Cursor(self.search_user.friends, id=self.search_user).items(num_of_friends):
friend.append(friend)
return friends
class TwitterStream:
def __init__(self):
self.auth = OAuthHandler(TwitterCredentials.CONSUMER_KEY, TwitterCredentials.CONSUMER_SECRET)
self.auth.set_access_token(TwitterCredentials.ACCESS_TOKEN, TwitterCredentials.ACCESS_TOKEN_SECRET)
def stream(self, filename, filter_list):
listener = TwitterListener(filename)
stream = Stream(self.auth, listener)
stream.filter(track=filter_list)
class TwitterListener(StreamListener):
def __init__(self, filename):
self.filename = filename
def on_data(self, data):
try:
print(data)
with open(self.filename, 'a') as target:
target.write(data)
return True
except BaseException as e:
print("Error on_data %s" % str(e))
return True
def on_error(self, status):
if status == 420:
return False
print(status)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment