-
Notifications
You must be signed in to change notification settings - Fork 0
/
drink_the_hose.py
213 lines (186 loc) · 7.73 KB
/
drink_the_hose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/python
"""
Script to display information from the twitter stream
To print the text of tweets from the twitter garden hose:
$ python drink_the_hose.py -u your_name
To print the text of tweets that contain a smiley face:
$ python drink_the_hose.py -u your_name ":-)"
To only limit to 10 returns:
$ python drink_the_hose.py -u your_name -l 10
To get help:
$ python drink_the_hose.py --help
"""
from optparse import OptionParser
from getpass import getpass
from collections import deque
import time
import logging
import gzip
import sys
import os
import tweepy
from tweepy.models import Status
from tweepy.utils import import_simplejson
from tweepy.api import API
from tweepy import OAuthHandler
json = import_simplejson()
api = API()
from secrets import consumer_key, consumer_secret, access_token, access_token_secret
BACKOFF = 0.5 #Initial wait time before attempting to reconnect
MAX_BACKOFF = 300 #Maximum wait time between connection attempts
UNICODE_LINES = (u'\u000a', u'\u000b', u'\u000c', u'\u000d', u'\u0085', u'\u2028', u'\u2029')
logging.basicConfig(level=logging.INFO)
class EchoListener(tweepy.StreamListener):
def __init__(self, *args, **kwargs):
try:
self.queue = deque(maxlen = kwargs['maxlen'])
del kwargs['maxlen']
except KeyError:
self.queue = deque(maxlen = 1000)
super(EchoListener, self).__init__(*args, **kwargs)
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
self.queue.append(data)
logging.debug("Append data (%s)"%(len(self.queue)))
return True
elif 'delete' in data:
logging.debug('Delete received')
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
logging.info('Limit received')
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
def connect(self, stringlist=[], async=True):
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.stream = tweepy.Stream(auth, self)
try:
if stringlist:
self.stream.filter(track = stringlist, async=async)
else:
self.stream.sample(async=async)
logging.debug('Connected to twitter')
except:
self.stream.disconnect()
logging.debug('Something went wrong - disconnected from twitter')
def running(self):
return self.stream.running
def disconnect(self):
return self.stream.disconnect()
class AbstractConsumer(object):
"""Consumes tweets"""
def process(self, tweet):
raise NotImplementedError
class Lineprinter(AbstractConsumer):
def process(self, tweet):
status = Status.parse(api, json.loads(tweet))
for lf in UNICODE_LINES:
text = status.text.replace(lf, ' ')
print "@%s (%s, %s, %s, %s): %s"%(status.user.screen_name,
status.user.lang, status.user.statuses_count, status.user.friends_count,
status.user.followers_count, text)
class Rawprinter(AbstractConsumer):
def process(self, tweet):
print tweet
class Counter(AbstractConsumer):
def __init__(self):
self.count = 0
def process(self, tweet):
self.count += 1
sys.stdout.write('%s\r'%self.count)
class Timedfilewriter(AbstractConsumer):
def __init__(self, path='.', base='drink-the-hose', fmt='%Y%m%d%H'):
self.base = base
self.path = path
self.fmt = fmt
self.time = time.strftime(self.fmt)
self.fid = gzip.open(os.path.join(self.path, self.base + '-' + self.time + '.txt.gz'), 'ab')
self.warn_count = 0
def process(self, tweet):
status = Status.parse(api, json.loads(tweet))
out = {"screen_name": status.user.screen_name,
"id": status.id,
"lang": status.user.lang,
"statuses_count": status.user.statuses_count,
"friend_count": status.user.friends_count,
"followers_count":status.user.followers_count,
"profile_image_url": status.user.profile_image_url,
"text": status.text.encode('utf8'),
"entities": status.entities,
"created_at": status.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"geo":status.geo,
"location":status.user.location,
"timezone":status.user.time_zone}
now = time.strftime(self.fmt)
if now != self.time:
self.time = str(now)
self.fid.close()
self.fid = gzip.open(os.path.join(self.path, self.base + '-' + self.time + '.txt.gz'), 'ab')
self.fid.write(json.dumps(out) + '\n')
def drink(limit=0, stringlist=[], maxlen=1000, consumers=[Lineprinter()]):
listener = EchoListener(maxlen=maxlen)
listener.connect(stringlist=stringlist)
count = 0
backoff = BACKOFF
backoff_warning = False
warn_count = 0
#Consume tweets until the queue is empty, and then wait
try:
while True:
if listener.running and (not limit or count < limit):
logging.debug('Try and get a tweet from the queue ...')
try:
tweet = listener.queue.popleft()
count += 1
logging.debug('... pulled tweet %s from the queue (%s)'%(count, len(listener.queue)))
for consumer in consumers:
try:
consumer.process(str(tweet))
except:
if warn_count < 100:
logging.warn("Something went wrong with the consumer %s on the tweet %s"%(consumer, tweet))
warn_count += 1
else:
raise
except IndexError:
logging.debug('... queue empty, wait a while')
time.sleep(1)
elif not limit or count < limit:
try:
logging.debug("Wait %i s before reconnecting"%(backoff,))
time.sleep(backoff)
listener = EchoListener(maxlen=maxlen)
listener.connect(stringlist=stringlist)
if listener.running:
backoff = BACKOFF
backoff_warning = False
finally:
backoff = min(MAX_BACKOFF, backoff*2)
if backoff == MAX_BACKOFF and not backoff_warning:
logging.warn('Having trouble connecting to twitter')
backoff_warning = True
elif count >= limit:
logging.debug('Limit reached')
break
finally:
listener.disconnect()
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-l", "--limit", dest="limit", help= "Number of status updates to harvest", default=0, type='int')
parser.add_option("-p", "--path", dest="path", help= "Directory", default='')
(options, args) = parser.parse_args()
if options.path:
try:
os.makedirs(options.path)
except OSError:
pass
consumers=[Timedfilewriter(path=options.path), Counter()]
else:
consumers=[Lineprinter()]
drink(limit=options.limit, stringlist=args, consumers=consumers)