Live streaming tweet for hashtags
Streaming of tweets for hashtags and keywords¶
This notebook explains how to search the live twitter stream for certain hashtags and keywords. The gathered tweets are stored as text files and further processed into a panda DataFrame.
The script is designed for long term streaming with gathered tweets stored in succeeding files.
Note: This notebook is not directly runable since personalized OAuths keys are required to access the Twitter API. See below for how to obtain them. Once you have OAuth keys, save them in a json file and point to this file in script described in "Harvesting tweets".
Prerequisites¶
For the streaming of the twitter stream, we will use Tweepy. Currently (by 22.11.2017) there is a bug in the Tweepy version available at PyPI which breaks the code for certain twitter responses. A bug fixed version of Tweepy is available at the source code repo here: https://github.com/tweepy/tweepy
To install from source, clone the repository and install locally as explained in the readme file:
git clone https://github.com/tweepy/tweepy.git
cd tweepy
python setup.py install
To start using the Twitter API, we also need OAuth keys to grant us access rights.
To generate these, login into your twitter account and generate a new application at https://apps.twitter.com with read and write access rights. You can find further information about the OAuth of Twitter at the Twitter developer guide.
Once you have tweepy installed and the OAuth keys you are ready to go.
Harvesting tweets¶
import json
import os
import tweepy
First, we need a function providing the OAuths keys obtained above. The example function below assumes that the OAuths keys are stored in a json file. This should have the following format:
{
"consumer_key": "str_sequence",
"consumer_secret": "str_sequence",
"access_token": "str_sequence",
"access_secret": "str_sequence"
}
def get_oauth_from_file(oauth_file):
""" Provides oauth keys stored in json file 'oauth_file'
"""
with open(oauth_file) as json_file:
keys = json.load(json_file)
auth = tweepy.OAuthHandler(keys["consumer_key"],
keys["consumer_secret"])
auth.set_access_token(keys["access_token"],
keys["access_secret"])
return auth
Next, we design a custom Twitter listener which saves the gathered tweets in succeeding files (for further information on how to design your own StreamListener check the tweepy docs for this topic).
class MyListener(tweepy.streaming.StreamListener):
def __init__(self, storage_folder, base_file_name, nr_tweet_per_file=1000):
""" Twitter listener storing tweets as json entries in a file
Note
-----
The tweets are stored as a list of json entries in a file.
Thus, every line in the resulting file is a valid json entry.
Parameters
----------
storage_folder: str
Folder for storing the tweets. Will be created if not existing.
Already existing files matching the base_file_name will be overwritten!
base_file_name: str
Basename of the file for storing tweets.
E.g. 'tweets' would result in
'tweet0000000000.jsons'
'tweet0000000001.jsons'
...
nr_tweet_per_file: int, optional
How many tweets to store in one json file (default: 1000)
"""
try:
os.mkdir(storage_folder)
except FileExistsError:
pass
self.store_base = os.path.join(storage_folder, base_file_name)
self.nr_tweet_per_file = nr_tweet_per_file
self.counter_tweets = 0
self.counter_files = 0
def on_data(self, data):
try:
if self.nr_tweet_per_file <= self.counter_tweets:
self.counter_files += 1
self.counter_tweets = 0
else:
self.counter_tweets += 1
current_file = (
self.store_base +
'_' + str(self.counter_files).zfill(10) +
'.jsons')
with open(current_file, 'a') as f:
f.write(data)
return True
except BaseException as e:
print("Error on_data: {}".format(str(e)))
return True
def on_error(self, status):
print(status)
return True
Next, we define the hashtags, keywords and usernames which we would like to stream (here an example of the cop22 and cop23 streaming):
twitter_tracking_list = [
'@COP22', 'cop22', '#COP', '#cop22',
'@COP23', 'cop23', '#COP', '#cop23',
'#climatechange', '#climateaction',
'#ParisAgreement', '#globalwarming', '#beforetheflood',
'#actonclimate', '#climate',
]
Now everything is setup and we can start streaming:
try:
print('Started streaming tweets')
while True:
twitter_stream = tweepy.Stream(
auth=get_oauth_from_file(oauth_file='/tmp/OAuthTwitter.json'),
listener=MyListener(storage_folder='/tmp/twitter_stream',
base_file_name='tweets'))
twitter_stream.filter(track=twitter_tracking_list)
except KeyboardInterrupt:
print('Stopped streaming tweets')
To stop the streaming interrupt the kernel with:
- Jupyter notebook: Press the stop button or choose "Interrupt" from the "Kernel" menu
- Script: Press Control-C
The tweets are stored as json entries in the files in the specified folder. These can easily be read in Python with:
with open('/tmp/twitter_stream/tweets_0000000000.jsons', 'r') as twitter_jsons:
tweets = [json.loads(line) for line in twitter_jsons]
tweets[:2]
Parsing tweets into pandas DataFrame and store as csv¶
import pandas as pd
First, we define a function which extracts the interesting information of a specific tweet:
def parse_tweet(tweet):
""" Parse an individual tweet into a dict
"""
def get_list_from_dict_list(list_of_dict, ik):
""" Extract values from a list of dictionaries
Parameters:
-----------
list_of_dict : list of dictonaries with same key
ik : key for the dict to extract
Returns
-------
list of extracted items or None
"""
try:
list_result = [dd.get(ik) for dd in list_of_dict]
except TypeError:
list_result = None
return list_result
def extractor_closure(tweet_dict):
"""Closure for extracting nested entries from a dict
"""
def get_item(*args):
dd_nested=tweet_dict
for item in args:
dd_nested = dd_nested.get(item, None)
if dd_nested is None:
break
return dd_nested
return get_item
extractor = extractor_closure(tweet)
dd_extract = {
'id' : extractor('id'),
'created_at' : extractor('created_at'),
'lang' : extractor('lang'),
'text' : extractor('text').replace('\n', ' ').replace('\r', ' ') if extractor('text') else None,
'user_name' : extractor('user', 'name'),
'screen_name' : extractor('user', 'screen_name'),
'followers_count' : extractor('user', 'followers_count'),
'country' : extractor('place', 'country_code'),
'place' : extractor('place', 'full_name'),
'user_mentions_names' : get_list_from_dict_list(
extractor('entities', 'user_mentions'), 'screen_name'),
'hashtags' : get_list_from_dict_list(
extractor('entities', 'hashtags'), 'text'),
}
return dd_extract
Next, we define a function which parsed one jsons file into a pandas DataFrame:
def jsons_to_df(jsons_file):
"""Read the jsons file and return the content as pandas DataFrame"""
with open(jsons_file, 'r') as twitter_jsons:
list_tweets = [parse_tweet(json.loads(line))
for line in twitter_jsons]
return pd.DataFrame(list_tweets)
To run the parser over all gathered files, we need a list of these files:
tweets_folder = '/tmp/twitter_stream/'
list_jsons_files = [os.path.join(tweets_folder, tweets_file)
for tweets_file in
os.listdir(tweets_folder) if
os.path.splitext(tweets_file)[1] == '.jsons']
We can than process all gathers tweets into a pandas DataFrame, removing duplicate tweets:
df_tweets = (pd.concat([jsons_to_df(ff) for ff in list_jsons_files]).
drop_duplicates(subset='id').
sort_values('id').
reset_index(drop=True))
To store the English tweets we can than use
df_eng = df_tweets[df_tweets['lang']=='en']
df_eng.to_csv('/tmp/eng_tweets.csv', sep='|')
df_eng.head()
How to process the resulting DataFrame will be the topic of the next notebook example.