From d7b6b570c6cc4facebaa608426c38511fda27d04 Mon Sep 17 00:00:00 2001 From: Felipe Ramos <framosmorale@student.unimelb.edu.au> Date: Thu, 27 May 2021 17:05:01 +1000 Subject: [PATCH] solving version conflict --- Data collection/AURIN.xml | 16 +- Data collection/AURIN_API.py | 27 ++ Data collection/big_to_couch.py | 156 +++++----- Data collection/filterbigupdate.py | 30 +- Data collection/preprocess.py | 89 +++--- Data collection/searchFile_to_couch.py | 44 +-- Data collection/sets.json | 10 +- Data collection/streamFile_to_couch.py | 56 ++-- Data collection/tweet_gatherer_app.py | 408 ++++++++++++------------- 9 files changed, 432 insertions(+), 404 deletions(-) create mode 100644 Data collection/AURIN_API.py diff --git a/Data collection/AURIN.xml b/Data collection/AURIN.xml index 0831439..aafe4ea 100644 --- a/Data collection/AURIN.xml +++ b/Data collection/AURIN.xml @@ -1,9 +1,9 @@ -<OGRWFSDataSource> - - <URL>http://openapi.aurin.org.au/wfs?version=1.0.0&TYPENAME=</URL> - - <HttpAuth>BASIC</HttpAuth> - - <UserPwd>student:dj78dfGF</UserPwd> - +<OGRWFSDataSource> + + <URL>http://openapi.aurin.org.au/wfs?version=1.0.0&TYPENAME aurin:datasource-AU_Govt_ABS_Census-UoM_AURIN_DB_2_gccsa_p11b_english_profic_by_arrival_yr_by_sex_census_2016</URL> + + <HttpAuth>BASIC</HttpAuth> + + <UserPwd>student:dj78dfGF</UserPwd> + </OGRWFSDataSource> \ No newline at end of file diff --git a/Data collection/AURIN_API.py b/Data collection/AURIN_API.py new file mode 100644 index 0000000..98be0f9 --- /dev/null +++ b/Data collection/AURIN_API.py @@ -0,0 +1,27 @@ +import configparser +import urllib.request +from lxml import etree + +username='student' +password='dj78dfGF' + +def openapi_request(url): + + # create an authenticated HTTP handler and submit URL + password_manager = urllib.request.HTTPPasswordMgrWithDefaultRealm() + password_manager.add_password(None, url, username, password) + auth_manager = urllib.request.HTTPBasicAuthHandler(password_manager) + opener = urllib.request.build_opener(auth_manager) + urllib.request.install_opener(opener) + + req = urllib.request.Request(url) + handler = urllib.request.urlopen(req) + + return handler.read() + +dataset = 'aurin:datasource-AU_Govt_ABS_Census-UoM_AURIN_DB_2_gccsa_p11b_english_profic_by_arrival_yr_by_sex_census_2016' +url ='http://openapi.aurin.org.au/wfs?request=DescribeFeatureType&service=WFS&version=1.1.0&typeName='+dataset +xml = openapi_request(url) +root = etree.fromstring(xml) +import xmltodict, json +out = xmltodict.parse(xml) \ No newline at end of file diff --git a/Data collection/big_to_couch.py b/Data collection/big_to_couch.py index 6c6774f..db40a84 100644 --- a/Data collection/big_to_couch.py +++ b/Data collection/big_to_couch.py @@ -1,78 +1,78 @@ -import json -import couchdb -from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt - - -coords = [] -areas = {} - -with open('sets.json') as f: - for line in f: - c = json.loads(line) - areas[c['region']] = c['box'] - - -couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") -db = couchserver['live_demo'] - -areas = {} -with open('sets.json') as f: - for line in f: - c = json.loads(line) - areas[c['region']] = c['box'] - -def insideArea(x,y,poly): - ### - # poly dimension [xmin, xmax, ymin, ymax] - ### - xmin, ymin, xmax, ymax = poly - inside = False - if x > xmin and x <= xmax: - if y > ymin and y <= ymax: - inside = True - return inside - - -def checkArea(point): - check = 'None' - for k, v in areas.items(): - if insideArea(point[0],point[1],v): - check = k - return check - -filtered = {} -with open('big.json', 'r', encoding = 'utf-8') as file: - first = file.readline() - for line in file: - twt = None - if line[-2] == ',': - twt = json.loads(line[:-2]) - elif line[-3]==']': - #big.append(json.loads(line[:-3])) - pass - else: - twt = json.loads(line[:-1]) - - if twt is not None: - coor = twt['value']['geometry']['coordinates'] - tweet = twt['doc'] - tweet['tags'] = classifyTwt(tweet) - del tweet['_rev'] - - if len(tweet['tags']) > 0: - - block = checkArea(coor) - if block == 'None': - if 'location' in tweet['user']: - block = tweet['user']['location'] - block = block.split(',')[0] - tweet['region'] = block - #save2Couch(tweet, db) - filtered[tweet['id_str']]=tweet - - -with open("big_filtered.json", "w") as outfile: - json.dump(filtered, outfile) - - - +import json +import couchdb +from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt + + +coords = [] +areas = {} + +with open('sets.json') as f: + for line in f: + c = json.loads(line) + areas[c['region']] = c['box'] + + +couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") +db = couchserver['live_demo'] + +areas = {} +with open('sets.json') as f: + for line in f: + c = json.loads(line) + areas[c['region']] = c['box'] + +def insideArea(x,y,poly): + ### + # poly dimension [xmin, xmax, ymin, ymax] + ### + xmin, ymin, xmax, ymax = poly + inside = False + if x > xmin and x <= xmax: + if y > ymin and y <= ymax: + inside = True + return inside + + +def checkArea(point): + check = 'None' + for k, v in areas.items(): + if insideArea(point[0],point[1],v): + check = k + return check + +filtered = {} +with open('big.json', 'r', encoding = 'utf-8') as file: + first = file.readline() + for line in file: + twt = None + if line[-2] == ',': + twt = json.loads(line[:-2]) + elif line[-3]==']': + #big.append(json.loads(line[:-3])) + pass + else: + twt = json.loads(line[:-1]) + + if twt is not None: + coor = twt['value']['geometry']['coordinates'] + tweet = twt['doc'] + tweet['tags'] = classifyTwt(tweet) + del tweet['_rev'] + + if len(tweet['tags']) > 0: + + block = checkArea(coor) + if block == 'None': + if 'location' in tweet['user']: + block = tweet['user']['location'] + block = block.split(',')[0] + tweet['region'] = block + #save2Couch(tweet, db) + filtered[tweet['id_str']]=tweet + + +with open("big_filtered.json", "w") as outfile: + json.dump(filtered, outfile) + + + diff --git a/Data collection/filterbigupdate.py b/Data collection/filterbigupdate.py index 57c40a4..f669dbc 100644 --- a/Data collection/filterbigupdate.py +++ b/Data collection/filterbigupdate.py @@ -1,16 +1,16 @@ -import json -import couchdb -from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt - -couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") -db = couchserver.create("live_demo2") - -with open('./'+'big_filtered.json', 'r', encoding = 'utf-8') as file: - biga = json.load(file) - - -for k,v in biga.items(): - loc = v['location'] - v['region'] = loc.capitalize() - v['tags'] = list(set(v['tags'])) +import json +import couchdb +from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt + +couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") +db = couchserver.create("live_demo2") + +with open('./'+'big_filtered.json', 'r', encoding = 'utf-8') as file: + biga = json.load(file) + + +for k,v in biga.items(): + loc = v['location'] + v['region'] = loc.capitalize() + v['tags'] = list(set(v['tags'])) save2Couch(v, db) \ No newline at end of file diff --git a/Data collection/preprocess.py b/Data collection/preprocess.py index ce0d908..8625383 100644 --- a/Data collection/preprocess.py +++ b/Data collection/preprocess.py @@ -1,45 +1,46 @@ - -def save2Couch(twt, db): - doc = twt - doc['_id'] = doc['id_str'] - try: - db.save(doc) - except: - replaceCDB(doc, db) - - -def updateCDB(twt, db): - doc = db.get(twt['id_str']) - up = twt['tags'] - doc['tags'] = up - db[twt['_id']] = doc - print('update:') - - -def replaceCDB(twt,db): - del db[twt['id_str']] - db.save(twt) - - -def classifyTwt(twt): - tags = [] - sports = ['AFL', 'tennis', 'footie','swimming','AustralianOpen', 'footy' ,'soccer', 'cricket', '#AFL', 'netball', 'basketball', 'NRL', '#NRL', 'rugby'] - - if 'text' in twt: - text = twt['text'] - else: - text = twt['full_text'] - for s in sports: - s = s.lower() - if s in text.lower(): - if s in ['afl','footie','#afl','footy', '#afl']: - tag = 'Footie' - elif s in ['tennis', 'australianopen']: - tag = 'Tennis' - elif s in ['nrl', '#nrl', 'rugby']: - tag = 'Rugby' - else: - tag = s - tags.append(tag) - tags = list(set(tags)) + +def save2Couch(twt, db, w_replace = False): + doc = twt + doc['_id'] = doc['id_str'] + try: + db.save(doc) + except: + if w_replace: + replaceCDB(doc, db) + + +def updateCDB(twt, db): + doc = db.get(twt['id_str']) + up = twt['tags'] + doc['tags'] = up + db[twt['_id']] = doc + print('update:') + + +def replaceCDB(twt,db): + del db[twt['id_str']] + db.save(twt) + + +def classifyTwt(twt): + tags = [] + sports = ['AFL', 'tennis', 'footie','swimming','AustralianOpen', 'footy' ,'soccer', 'cricket', '#AFL', 'netball', 'basketball', 'NRL', '#NRL', 'rugby'] + + if 'text' in twt: + text = twt['text'] + else: + text = twt['full_text'] + for s in sports: + s = s.lower() + if s in text.lower(): + if s in ['afl','footie','#afl','footy', '#afl']: + tag = 'Footie' + elif s in ['tennis', 'australianopen']: + tag = 'Tennis' + elif s in ['nrl', '#nrl', 'rugby']: + tag = 'Rugby' + else: + tag = s + tags.append(tag) + tags = list(set(tags)) return tags \ No newline at end of file diff --git a/Data collection/searchFile_to_couch.py b/Data collection/searchFile_to_couch.py index 67e942a..7e5c142 100644 --- a/Data collection/searchFile_to_couch.py +++ b/Data collection/searchFile_to_couch.py @@ -1,23 +1,23 @@ - -import json -import couchdb -import os -from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt - -data = {} -couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") -db = couchserver['live_demo2'] - - -for filename in os.listdir('./UPLOAD'): - if filename.startswith('twt_search'): - with open('.//UPLOAD//'+filename, 'r', encoding = 'utf-8') as file: - for line in file: - #line = ast.literal_eval(line) - line = json.loads(line) - for k,v in line.items(): - data[k] = v - -for k,v in data.items(): - v['tags'] = classifyTwt(v) + +import json +import couchdb +import os +from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt + +data = {} +couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") +db = couchserver['live_demo2'] + + +for filename in os.listdir('./UPLOAD'): + if filename.startswith('twt_search'): + with open('.//UPLOAD//'+filename, 'r', encoding = 'utf-8') as file: + for line in file: + #line = ast.literal_eval(line) + line = json.loads(line) + for k,v in line.items(): + data[k] = v + +for k,v in data.items(): + v['tags'] = classifyTwt(v) save2Couch(v,db) \ No newline at end of file diff --git a/Data collection/sets.json b/Data collection/sets.json index 3e70934..d96a8ef 100644 --- a/Data collection/sets.json +++ b/Data collection/sets.json @@ -1,5 +1,5 @@ -{"secret":{"API_key": "gfrSs7VJvSDNuDutLDLx2kdnP" ,"API_secret": "Czhc2ryBJmeaaV5KMcRVd35TmXii2N1y1PhH8sfZ4L1S10Nw4S","access_token": "1389730005953900549-Fpc4niDhzmCMEDwRDvJcFKDHaThchd","access_secret" : "sxly7UABoMN3p5l5PhGpVPqvALg2Vv7MCCNZ0fCoH8pnv"}, "point": "-37.7980882,144.9334346,60km","box": [144.5,-38.2,145.49,-37.41], "region": "Melbourne"} -{"secret":{"API_key": "YXWW9iJpOis462BdSQyBReojA" ,"API_secret": "yu0RjV8vVY3OZQa31nvZ6IqCVGt2MQvOVQrGE7b1oz7i93KuS4","access_token": "867498082732322816-vQr0G2qZDT4OF8Builvc2JNOkPGfl3h","access_secret" : "kf8eKJmkY5mZnUDXWvkwfvaKv3j6NV15lxV1n3jg46OUH"}, "point": "-33.9472383,151.0895501,60km","box": [150.928514,-33.948236, 151.311115,-33.738525], "region": "Sydney"} -{"secret":{"API_key": "ZYdJvM875rgmk3rJEhyFTVLql" ,"API_secret": "dPEb81AWELqR3qOSStbmUKSxZ8Esx706tjKbhEmc6vEuWotry0","access_token": "123223015-2fcEOpBg59x2QWSsEG3QpDIuwaqLXGl4eP98KM5e","access_secret" : "VHmvqa3Zu3uez5R4o2hIbMPYFLDYenBYMXcvW3vzqxUMR"}, "point": "-27.4943891,153.0341016,60km","box": [152.857697,-27.770669,153.335172,-27.272398], "region": "Brisbane"} -{"secret":{"API_key": "XkaRbn5rHBMNuFCPwC48DnYFJ" ,"API_secret": "GVz5xCoAE8hqa8X5YI3JtcM2XTd6RjENn8SicCw73RsClezgfo","access_token": "2907073011-D40d8dCJhledSRwOtwyKwBmcHsIF3RvvT9bycj9","access_secret" : "B6jSIfHuwRGu5pwgVAdYwVkFT7I05RhmLy0STt48x1i1H"}, "point": "-31.9469709,115.9218462,60km","box": [115.589457,-32.337839,116.100332, -31.631726], "region": "Perth"} -{"secret":{"API_key": "SZS77byadyNlV99Vc0JQvDNS3" ,"API_secret": "obAZTi5ovBhVVqDhwVTRQH0qFzlPe1YKhiSb134ylIUjNt55AO","access_token": "1386612665682853892-Tp6J5KfT4Wr8gsIPYSr2G5W15axUlj","access_secret" : "hN59hNhidGKLJpsJ8nX8Dr2EEB4m9ny0w70K2Fd0dpY8Q"}, "point": "-34.926068,138.736903,60km","box": [138.01,-35.5,139.4,-34.4], "region": "Adelaide"} +{"secret":{"API_key": "gfrSs7VJvSDNuDutLDLx2kdnP" ,"API_secret": "Czhc2ryBJmeaaV5KMcRVd35TmXii2N1y1PhH8sfZ4L1S10Nw4S","access_token": "1389730005953900549-Fpc4niDhzmCMEDwRDvJcFKDHaThchd","access_secret" : "sxly7UABoMN3p5l5PhGpVPqvALg2Vv7MCCNZ0fCoH8pnv"}, "point": "-37.7980882,144.9334346,60km","box": [144.5,-38.2,145.49,-37.41], "region": "Melbourne"} +{"secret":{"API_key": "YXWW9iJpOis462BdSQyBReojA" ,"API_secret": "yu0RjV8vVY3OZQa31nvZ6IqCVGt2MQvOVQrGE7b1oz7i93KuS4","access_token": "867498082732322816-vQr0G2qZDT4OF8Builvc2JNOkPGfl3h","access_secret" : "kf8eKJmkY5mZnUDXWvkwfvaKv3j6NV15lxV1n3jg46OUH"}, "point": "-33.9472383,151.0895501,60km","box": [150.928514,-33.948236, 151.311115,-33.738525], "region": "Sydney"} +{"secret":{"API_key": "ZYdJvM875rgmk3rJEhyFTVLql" ,"API_secret": "dPEb81AWELqR3qOSStbmUKSxZ8Esx706tjKbhEmc6vEuWotry0","access_token": "123223015-2fcEOpBg59x2QWSsEG3QpDIuwaqLXGl4eP98KM5e","access_secret" : "VHmvqa3Zu3uez5R4o2hIbMPYFLDYenBYMXcvW3vzqxUMR"}, "point": "-27.4943891,153.0341016,60km","box": [152.857697,-27.770669,153.335172,-27.272398], "region": "Brisbane"} +{"secret":{"API_key": "XkaRbn5rHBMNuFCPwC48DnYFJ" ,"API_secret": "GVz5xCoAE8hqa8X5YI3JtcM2XTd6RjENn8SicCw73RsClezgfo","access_token": "2907073011-D40d8dCJhledSRwOtwyKwBmcHsIF3RvvT9bycj9","access_secret" : "B6jSIfHuwRGu5pwgVAdYwVkFT7I05RhmLy0STt48x1i1H"}, "point": "-31.9469709,115.9218462,60km","box": [115.589457,-32.337839,116.100332, -31.631726], "region": "Perth"} +{"secret":{"API_key": "SZS77byadyNlV99Vc0JQvDNS3" ,"API_secret": "obAZTi5ovBhVVqDhwVTRQH0qFzlPe1YKhiSb134ylIUjNt55AO","access_token": "1386612665682853892-Tp6J5KfT4Wr8gsIPYSr2G5W15axUlj","access_secret" : "hN59hNhidGKLJpsJ8nX8Dr2EEB4m9ny0w70K2Fd0dpY8Q"}, "point": "-34.926068,138.736903,60km","box": [138.01,-35.5,139.4,-34.4], "region": "Adelaide"} diff --git a/Data collection/streamFile_to_couch.py b/Data collection/streamFile_to_couch.py index cb489df..ed6e19a 100644 --- a/Data collection/streamFile_to_couch.py +++ b/Data collection/streamFile_to_couch.py @@ -1,29 +1,29 @@ -import json -import codecs -import ast -import couchdb -import os -from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt - - -data = [] -log = [] -couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") -db = couchserver['live_demo2'] - - -for filename in os.listdir('./UPLOAD/'): - if filename.startswith('twt_stream'): - with open('./UPLOAD/'+filename, 'r', encoding = 'utf-8') as file: - for line in file: - try: - line = ast.literal_eval(line) - data.append(line) - except: - log.append(filename) - -for i in data: - twt = next(iter(i.values())) - twt['tags'] = classifyTwt(twt) - if len(twt['tags'])> 0: +import json +import codecs +import ast +import couchdb +import os +from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt + + +data = [] +log = [] +couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") +db = couchserver['live_demo2'] + + +for filename in os.listdir('./UPLOAD/'): + if filename.startswith('twt_stream'): + with open('./UPLOAD/'+filename, 'r', encoding = 'utf-8') as file: + for line in file: + try: + line = ast.literal_eval(line) + data.append(line) + except: + log.append(filename) + +for i in data: + twt = next(iter(i.values())) + twt['tags'] = classifyTwt(twt) + if len(twt['tags'])> 0: save2Couch(twt, db) \ No newline at end of file diff --git a/Data collection/tweet_gatherer_app.py b/Data collection/tweet_gatherer_app.py index 72f97c9..b84f6df 100644 --- a/Data collection/tweet_gatherer_app.py +++ b/Data collection/tweet_gatherer_app.py @@ -1,204 +1,204 @@ -import tweepy -import json -from tweepy.streaming import StreamListener -import couchdb -import sys -import os -import os.path -import datetime -import re -from urllib3.exceptions import ProtocolError -from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt - -collector_type = sys.argv[1] -region = sys.argv[2] - -#change code to your current Twitter account - - -if os.path.isfile('twt_stream{}.json'.format(region)): - outfile = open('twt_stream{}.json'.format(region), 'a+', encoding ='utf-8') -else: - outfile = open('twt_stream{}.json'.format(region), 'w+', encoding ='utf-8') - - -class StreamListener(tweepy.StreamListener): - - def __init__(self, config, db): - super(StreamListener, self).__init__() - self.file = [] - self.db = db - self.config = config - - def on_status(self, status): - is_retweet = False - if hasattr(status, 'retweeted_status'): - is_retweet = True - - #Avoid truncated text if possible - if hasattr(status, 'extended_tweet'): - text = status.extended_tweet['full_text'] - else: - text = status.text - - for char in [',','\n']: - text.replace(char, " ") - - #sports = ['afl', 'tennis', 'footie','swimming','AustralianOpen', 'soccer', 'cricket', '#afl'] - - #if any(s in text.lower() for s in sports): - - if not is_retweet: - doc = status._json - doc['region'] = self.config['region'] - doc['text'] = text - #save2Couch(doc, self.db) - - #self.file.append(json.load(doc)) - serial = {doc['id_str']:doc} - #if 'location' in doc['user']: - # print(doc['text'], doc['user']['location']) - #else: - print(doc['text']) - - outfile.write(str(serial) + '\n') - - def on_error(self, status_code): - print("Encountered streaming error (", status_code, ")") - sys.exit() - - - -if __name__ == "__main__": - - sets = [] - with open('sets.json', encoding = 'utf-8') as setdata: - for line in setdata: - sets.append(json.loads(line)) - - config = sets[int(region)] - - #couchserver = couchdb.Server("http://admin:admin@172.26.130.79:5984/") - #db = couchserver['twitter_demo'] - - API_key = config['secret']['API_key'] - API_secret = config['secret']['API_secret'] - access_token = config['secret']['access_token'] - access_secret = config['secret']['access_secret'] - - # Authenticate to Twitter - auth = tweepy.OAuthHandler(API_key, API_secret) - auth.set_access_token(access_token, access_secret) - api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify=True) - #outfile = open('twt_stream.json', 'r+', encoding ='utf-8') - - - - if collector_type == 'stream': - #init stream - streamListener = StreamListener(config, db) - stream = tweepy.Stream(auth=api.auth, listener = streamListener, tweet_mode ='extended') - - tags=['AFL', 'tennis', 'footie', 'swimming', 'AustralianOpen', 'soccer'] - box = config['box'] - - while True: - try: - stream.filter(locations= box) - except (ProtocolError, AttributeError): - continue - - - - elif collector_type == 'search': - - cityCoord = config['point'] - - tmp = [] - ids_ = [] - final = {} - replies = [] - up2source = [] - - # capure the most data - for i in range(7): - range_date = datetime.date.today() - datetime.timedelta(days=i) - for tweet in api.search(q="AFL OR tennis OR footie OR footy OR swimming OR AustralianOpen OR soccer OR cricket", geocode=cityCoord, until = range_date, count=100, tweet_mode="extended"): - #tweet = tweet._json - if tweet.id_str not in ids_: - ids_.append(tweet.id_str) - tmp.append(tweet) - - - for v in tmp: - final[v.id_str] = v - - # Get parent if any - if hasattr(v, 'in_reply_to_status_id_str'): - if v.in_reply_to_status_id_str not in ids_: - for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = v.in_reply_to_status_id_str, max_id = v.in_reply_to_status_id_str): - ids_.append(tweet.id_str) - replies.append(tweet) - final[tweet.id_str] = tweet - if hasattr(tweet, 'in_reply_to_status_id_str'): - if tweet.in_reply_to_status_id_str not in ids_: - up2source.append(tweet) - - # Get responses - for tweet in api.search(q='to:'+ v.user.screen_name, geocode=cityCoord, count=100, since_id = v.id): - if tweet.in_reply_to_status_id_str == v.id_str: - if tweet.id_str not in ids_: - replies.append(tweet) - ids_.append(tweet.id_str) - final[tweet.id_str] = tweet - - #climb up in the conversation - while up2source: - current = up2source.pop(0) - if hasattr(current, 'in_reply_to_status_id_str'): - if current.in_reply_to_status_id_str not in ids_: - for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = current.in_reply_to_status_id_str, max_id = current.in_reply_to_status_id_str): - ids_.append(tweet.id_str) - replies.append(tweet) - final[tweet.id_str] = tweet - if hasattr(tweet, 'in_reply_to_status_id_str'): - if tweet.in_reply_to_status_id_str not in ids_: - up2source.append(tweet) - - - # loops to get all responses of responses - while replies: - current = replies.pop(0) - for tweet in api.search(q='to:'+ current.user.screen_name, geocode=cityCoord, count=100, since_id = current.id): - if tweet.in_reply_to_status_id_str == current.id_str: - if tweet.id_str not in ids_: - replies.append(tweet) - ids_.append(tweet.id_str) - final[tweet.id_str] = tweet - - - - - #save results to disk - serializable = {k: v._json for k,v in final.items()} - - - for k, v in serializable.items(): - v['region'] = config['region'] - #v['tags'] = classifyTwt(v) - # Save to couch dv - #save2Couch(v) - - current_version = '-1' - versions = [] - for filename in os.listdir('./'): - if filename.startswith('twt_search'): - versions.append(int(re.search(r'\d+', filename).group(0))) - - current_version = max(versions) - - with open("twt_search{}.json".format(str(current_version + 1)), "w") as outfile: - json.dump(serializable, outfile) - - - +import tweepy +import json +from tweepy.streaming import StreamListener +import couchdb +import sys +import os +import os.path +import datetime +import re +from urllib3.exceptions import ProtocolError +from preprocess import save2couch, updateCBD, replaceCDB, classifyTwt + +collector_type = sys.argv[1] +region = sys.argv[2] +couchIP = sys.argv[3] +#"172.26.130.79" +storeCouch = True + +couchserver = couchdb.Server("http://admin:admin@"+couchIP+":5984/") +db = couchserver['live_demo2'] + +#change code to your current Twitter account + + +if os.path.isfile('twt_stream{}.json'.format(region)): + outfile = open('twt_stream{}.json'.format(region), 'a+', encoding ='utf-8') +else: + outfile = open('twt_stream{}.json'.format(region), 'w+', encoding ='utf-8') + + +class StreamListener(tweepy.StreamListener): + + def __init__(self, config, db): + super(StreamListener, self).__init__() + self.file = [] + self.db = db + self.config = config + + def on_status(self, status): + is_retweet = False + if hasattr(status, 'retweeted_status'): + is_retweet = True + + #Avoid truncated text if possible + if hasattr(status, 'extended_tweet'): + text = status.extended_tweet['full_text'] + else: + text = status.text + + for char in [',','\n']: + text.replace(char, " ") + + #if any(s in text.lower() for s in sports): + + if not is_retweet: + doc = status._json + doc['region'] = self.config['region'] + doc['text'] = text + #s + + #self.file.append(json.load(doc)) + serial = {doc['id_str']:doc} + #if 'location' in doc['user']: + # print(doc['text'], doc['user']['location']) + #else: + print(doc['text']) + if storeCouch: + ave2Couch(doc, self.db) + else: + outfile.write(str(serial) + '\n') + + def on_error(self, status_code): + print("Encountered streaming error (", status_code, ")") + sys.exit() + + + +if __name__ == "__main__": + + sets = [] + with open('sets.json', encoding = 'utf-8') as setdata: + for line in setdata: + sets.append(json.loads(line)) + + config = sets[int(region)] + + API_key = config['secret']['API_key'] + API_secret = config['secret']['API_secret'] + access_token = config['secret']['access_token'] + access_secret = config['secret']['access_secret'] + + # Authenticate to Twitter + auth = tweepy.OAuthHandler(API_key, API_secret) + auth.set_access_token(access_token, access_secret) + api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify=True) + + + if collector_type == 'stream': + #init stream + streamListener = StreamListener(config, db) + stream = tweepy.Stream(auth=api.auth, listener = streamListener, tweet_mode ='extended') + + tags=['AFL', 'tennis', 'footie', 'swimming', 'AustralianOpen', 'soccer'] + box = config['box'] + + while True: + try: + stream.filter(locations= box) + except (ProtocolError, AttributeError): + continue + + + elif collector_type == 'search': + + cityCoord = config['point'] + + tmp = [] + ids_ = [] + final = {} + replies = [] + up2source = [] + + # capure the most data + for i in range(7): + range_date = datetime.date.today() - datetime.timedelta(days=i) + for tweet in api.search(q="AFL OR tennis OR footie OR footy OR swimming OR AustralianOpen OR soccer OR cricket", geocode=cityCoord, until = range_date, count=100, tweet_mode="extended"): + #tweet = tweet._json + if tweet.id_str not in ids_: + ids_.append(tweet.id_str) + tmp.append(tweet) + + + for v in tmp: + final[v.id_str] = v + + # Get parent if any + if hasattr(v, 'in_reply_to_status_id_str'): + if v.in_reply_to_status_id_str not in ids_: + for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = v.in_reply_to_status_id_str, max_id = v.in_reply_to_status_id_str): + ids_.append(tweet.id_str) + replies.append(tweet) + final[tweet.id_str] = tweet + if hasattr(tweet, 'in_reply_to_status_id_str'): + if tweet.in_reply_to_status_id_str not in ids_: + up2source.append(tweet) + + # Get responses + for tweet in api.search(q='to:'+ v.user.screen_name, geocode=cityCoord, count=100, since_id = v.id): + if tweet.in_reply_to_status_id_str == v.id_str: + if tweet.id_str not in ids_: + replies.append(tweet) + ids_.append(tweet.id_str) + final[tweet.id_str] = tweet + + #climb up in the conversation + while up2source: + current = up2source.pop(0) + if hasattr(current, 'in_reply_to_status_id_str'): + if current.in_reply_to_status_id_str not in ids_: + for tweet in api.search(q='', geocode=cityCoord, count=100, since_id = current.in_reply_to_status_id_str, max_id = current.in_reply_to_status_id_str): + ids_.append(tweet.id_str) + replies.append(tweet) + final[tweet.id_str] = tweet + if hasattr(tweet, 'in_reply_to_status_id_str'): + if tweet.in_reply_to_status_id_str not in ids_: + up2source.append(tweet) + + + # loops to get all responses of responses + while replies: + current = replies.pop(0) + for tweet in api.search(q='to:'+ current.user.screen_name, geocode=cityCoord, count=100, since_id = current.id): + if tweet.in_reply_to_status_id_str == current.id_str: + if tweet.id_str not in ids_: + replies.append(tweet) + ids_.append(tweet.id_str) + final[tweet.id_str] = tweet + + #save results to disk + serializable = {k: v._json for k,v in final.items()} + + + for k, v in serializable.items(): + v['region'] = config['region'] + v['tags'] = classifyTwt(v) + + if storeCouch: + save2Couch(v) + + else: + #Save to disk as json. Check if saving file exist and increase # version + current_version = '-1' + versions = [] + for filename in os.listdir('./'): + if filename.startswith('twt_search'): + versions.append(int(re.search(r'\d+', filename).group(0))) + + current_version = max(versions) + + with open("twt_search{}.json".format(str(current_version + 1)), "w") as outfile: + json.dump(serializable, outfile) + + + -- GitLab