-
Notifications
You must be signed in to change notification settings - Fork 6
/
bot.py
159 lines (124 loc) · 4.59 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Built-in imports
import logging
import os
import random
import time
# Third-party dependencies
# import dataset
import giphypop
import requests
import tweepy
from ttp import ttp
# Custom imports
try:
import config
except:
import config_example as config
# Gloabl variable init
# db = dataset.connect(config.db)
# table = db['tweets']
TWEET_LENGTH = 140
IMAGE_URL_LENGTH = 23
MAX_TWEET_TEXT_LENGTH = TWEET_LENGTH - IMAGE_URL_LENGTH - 1
DOTS = '...'
BACKOFF = 0.5 # Initial wait time before attempting to reconnect
MAX_BACKOFF = 300 # Maximum wait time between connection attempts
MAX_IMAGE_SIZE = 3072 * 1024 # bytes
USERNAME = 'slashgif'
# BLACKLIST
# Do not respond to queries by these accounts
BLACKLIST = [
'pixelsorter',
'Lowpolybot'
]
logging.basicConfig(filename='logger.log',
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Giphy client
giphy = giphypop.Giphy(api_key=config.giphy['key'])
# Twitter client
auth = tweepy.OAuthHandler(config.twitter['key'], config.twitter['secret'])
auth.set_access_token(config.twitter['access_token'],
config.twitter['access_token_secret'])
api = tweepy.API(auth)
# Tweet parser
parser = ttp.Parser()
# backoff time
backoff = BACKOFF
def get_gif_filename(term):
images = [i for i in giphy.search(term, limit=20) if i.filesize < MAX_IMAGE_SIZE]
if not images or images is []:
return None
image = images[0]
if not image:
return None
filename = 'images/%s.%s' % (term.replace(' ', '_'), image.type)
logging.info('get_gif_filename: %s--%s' % (term, filename))
f = open(filename, 'wb')
f.write(requests.get(image.media_url).content)
f.close()
return filename
def parse_tweet(tweet_from, tweet_text):
query = tweet_text[tweet_text.index('@%s' % USERNAME) + len('@%s' % USERNAME) + 1:]
result = parser.parse(tweet_text)
tagged_users = result.users + [tweet_from]
tagged_hashtags = result.tags
tagged_urls = result.urls
for user in tagged_users:
query = query.replace('@%s' % user, '')
for tag in tagged_hashtags:
query = query.replace('#%s' % tag, '')
for url in tagged_urls:
query = query.replace('%s' % url, '')
logging.info('parse_tweet: %s--%s' % (tagged_users, query))
return tagged_users, query.strip()
def generate_reply_tweet(users, search_term):
reply = '%s %s' % (search_term, ' '.join(['@%s' % user for user in users if user != USERNAME]))
if len(reply) > MAX_TWEET_TEXT_LENGTH:
reply = reply[:MAX_TWEET_TEXT_LENGTH - len(DOTS) - 1] + DOTS
logging.info('generate_reply_tweet: %s' % reply)
return reply
class StreamListener(tweepy.StreamListener):
def on_status(self, status):
global backoff
backoff = BACKOFF
# Collect logging and debugging data
tweet_id = status.id
tweet_text = status.text
tweet_from = status.user.screen_name
if tweet_from != USERNAME and tweet_from not in BLACKLIST and not hasattr(status, 'retweeted_status'):
logging.info('on_status: %s--%s' % (tweet_id, tweet_text))
# Parse tweet for search term
tagged_users, search_term = parse_tweet(tweet_from, tweet_text)
if search_term:
# Search and save the image
filename = get_gif_filename(search_term)
if filename:
# Generate and send the the reply tweet
reply_tweet = generate_reply_tweet(tagged_users, search_term)
reply_status = api.update_with_media(filename=filename,
status=reply_tweet, in_reply_to_status_id=tweet_id)
logging.info('on_status_sent: %s %s' % (
reply_status.id_str, reply_status.text))
else:
logging.info('on_status_failed: No images for %s' % search_term)
else:
logging.info('on_status_failed: No search terms')
def on_error(self, status_code):
global backoff
logging.info('on_error: %d' % status_code)
if status_code == 420:
backoff = backoff * 2
logging.info('on_error: backoff %s seconds' % backoff)
time.sleep(backoff)
return True
if not os.path.exists('images/'):
os.makedirs('images/')
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
try:
stream.userstream(_with='user', replies='all')
except Exception as e:
logging.INFO('stream_exception: %s' % e)
raise e