-
Notifications
You must be signed in to change notification settings - Fork 2
/
AJB_TrigonometryBot_v12.py
392 lines (341 loc) · 14.3 KB
/
AJB_TrigonometryBot_v12.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# @abrightmoore - see @TrigonometryBot
# This version refactored for extensibility
from math import sqrt, tan, sin, cos, pi, ceil, floor, acos, atan, asin, degrees, radians, log, atan2
import os
import time
from random import randint, random, Random
import io
from io import BytesIO
import sys
import urllib2 as urllib
from PIL import Image, ImageDraw
from TwitterAPI import TwitterAPI # by @boxnumber03 https://dev.twitter.com/resources/twitter-libraries. Wrapper to communicate via Twitter
from markovbot import MarkovBot # by @esdalmaijer. Use for conversation
# from Trigpic import *
import ImageFactory
from ImageTools import *
TWITTERLIMIT = 140 # Characters
GAMENAME = "AJB_TrigonometryBot"
GAMEVER = 9.0
FILENAMEIMAGE = '@abrightmoore_@TrigonometryBot_output.png'
MYNAME = "TrigonometryBot"
PREVIMG = None
MEMORY = [] # A list of interaction tuples
jobsComplete = 0
CONFAIL_BACKOFFTIME = 15
CONFAIL_RETRIES = 3
def mainLoop():
# Init
tweetbot = initMarkovBot() # Required for supporting conversations
tweetPrefixDefault = "I made this. "
max_ids = loadStringsFromFile(GAMENAME,"MAXID.txt")
max_id = int(max_ids[len(max_ids)-1])
width = 400
height = width
iAmAwake = True
sleeps = 0 # a count of 'days'
restTimeDefault = 15 # Seconds.
RESTTIMEQUANTUM = 30
restTimeMax = 300 # Five minutes
restTime = restTimeDefault # when it is time to sleep, how long to sleep for
CREATIVEQUANTUM = 0.03
moodCreative = 0.60 # How likely I will create _something_ when tested
jobsComplete = 0
# Wake periodically and decide what to do
while iAmAwake == True:
api = makeNewConnection()
print time.ctime()+" Trigmonologue: I remember "+str(len(MEMORY))+" things. I am feeling "+str(moodCreative*100)+" percent creative after "+str(sleeps)+" sleeps."
# Either pro-actively work on something creative or otherwise check what people have sent through
if random() <= moodCreative:
moodCreative = moodCreative/2 # Diminishing interest in creating something
img = beCreative(width,height,"@"+MYNAME)
# newFile = createImgFile(height,width,"@"+MYNAME) # Create something
# To Do: Adjust image based on 'mood' or how 'tired' I am
tweet_text = getTweetText(tweetbot,tweetPrefixDefault,SEEDWORD)
while len(tweet_text) > TWITTERLIMIT: # Twitter limit
tweet_text = getTweetText(tweetbot,tweetPrefixDefault,SEEDWORD)
print tweet_text
postToTwitter_Image(api,img,tweet_text,"")
jobsComplete = jobsComplete +1
# Poll Twitter for mentions
max_id = handleMentions(api,max_id,tweetbot)
# Go to sleep - duration depends on jobs completed this cycle
restTime = restTimeDefault+RESTTIMEQUANTUM*jobsComplete
if restTime > restTimeMax:
restTime = restTimeMax
iAmAwake = False
print "Sleeping for "+str(restTime)+" seconds"
time.sleep(restTime)
# Post sleep processing
sleeps = sleeps+1
iAmAwake = True
moodCreative = moodCreative+CREATIVEQUANTUM # Increasing interest in creating something over time
# jobsComplete = 0
del api
def beCreative(width,height,postToName):
""" Creates an image using available strategies to create elements and combine them
"""
print "I feel creative!"
images = [] # A collection of images
keepGoing = True
while keepGoing == True:
img = ImageFactory.makeRandomImage(width,height)
images.append(img)
if random() > 0.95:
keepGoing = False
resultImg = images[0]
# print len(images)
if len(images) == 2:
if random() < 0.2:
resultImg = mergeImages(images[0],images[1], "Circle" )
else:
resultImg = mergeImages(images[0],images[1], "Spike" )
elif len(images) > 2:
for i in xrange(1,len(images)):
resultImg = mergeImages(images[i],resultImg, "Blend" )
# alphaAvg = checkAverageAlpha(img)
# if alphaAvg < 32:
# collapseAlpha(resultImg)
if random() > 0.7:
circlePic(resultImg)
ImageFactory.cacheImage(resultImg)
return resultImg
# STATIC STRINGS AND OTHER THINGS
def loadStringsFromFile(filePrefix,fileSuffix):
fileName = filePrefix+"_"+fileSuffix
fileOfStatements = open(fileName, 'r+')
keys = fileOfStatements.read().split("\n")
fileOfStatements.close()
return keys
# CONVERSATION MANAGEMENT
def makeNewConnection():
""" Establish a Twitter connection using security keys and secrets
"""
(a,b,c,d) = loadTwitterKeysFromFile(GAMENAME)
return TwitterAPI(a,b,c,d)
def postToTwitter_Image(api,img,tweet_text,idReply):
""" Given an image, create an in-memory file representation and pass to the posting handler
"""
file = BytesIO()
img.save(file, 'png')
file.name = FILENAMEIMAGE
file.seek(0)
postToTwitter_File(api,file,tweet_text,idReply)
def postToTwitter_File(api,newFile,tweet_text,idReply):
data = newFile.read()
retries = CONFAIL_RETRIES
media_id = -1
while retries > 0:
try:
if media_id == -1:
r = api.request('media/upload', None, {'media': data})
print('UPLOAD MEDIA SUCCESS' if r.status_code == 200 else 'UPLOAD MEDIA FAILURE '+str(r.status_code))
if r.status_code == 200:
media_id = r.json()['media_id']
if media_id != -1:
r = None
if idReply is not "":
r = api.request('statuses/update', {'status':tweet_text, 'media_ids':media_id, 'in_reply_to_status_id':idReply})
else:
r = api.request('statuses/update', {'status':tweet_text, 'media_ids':media_id })
print('UPDATE STATUS SUCCESS' if r.status_code == 200 else 'UPDATE STATUS FAILURE '+str(r.status_code))
retries = 0
except Exception as e:
# back off and retry connection
print "Error, backing off..."
print e
time.sleep(CONFAIL_BACKOFFTIME)
del api
api = makeNewConnection()
retries = retries-1
def getTweetText(tweetbot,prefix,seedword):
TWEET_TEXT = STATEMENT[randint(0,len(STATEMENT)-1)] # Choose a pre-canned statement
if random() > 0.2:
TWEET_TEXT = tweetbot.generate_text(25, seedword)
TWEET_TEXT = TWEET_TEXT+"\n"+prefix
return TWEET_TEXT
def initMarkovBot():
tweetbot = MarkovBot()
dirname = os.path.dirname(os.path.abspath(__file__))
book = os.path.join(dirname,'book.txt')
tweetbot.read(book)
return tweetbot
def memoryAppend(obj):
MEMORY.append(obj)
addStringToFile(GAMENAME,"memory.txt",str(obj))
def readMemory():
print 'stub'
# IMAGE MANIPULATION
# Moved to TrigPic
# TWITTER ACCESS
def handleMentions(api,max_id,tweetbot):
# Read the Twitter queue for messages, and process them
height = 640
width = 640
jobsComplete = 0
NUM_TWEET_FETCH = 20
r = None
if max_id != 0:
# print max_id
r = api.request('statuses/mentions_timeline', {'count':NUM_TWEET_FETCH,'since_id':max_id})
else:
r = api.request('statuses/mentions_timeline', {'count':NUM_TWEET_FETCH})
print('Message poll complete' if r.status_code == 200 else str(r.status_code))
if r.status_code == 200:
for status in r:
id = status["id"]
# print "id: "+str(id)
print "%s (%s) %s by %s" % (status["id"],status["created_at"], status["text"].encode("ascii", "ignore"), status["user"]["screen_name"])
replyToName = "@"+status["user"]["screen_name"]
# Oh excitement! I've been mentioned! Parse the message, create functions and an image, and reply
msg = status["text"].encode("ascii", "ignore")
handleMention = True
if replyToName in ADMIN and handleMention == True: # Pre-parser for command handling from users with elevated privilege (see files)
# Check if an Admin command has been issued
pos = msg.find("Remember") # Pop back into memory for details of an image
if pos > 0:
words = msg.split()
count = 0
for word in words:
if word == "Remember":
if count+1 < len(words):
value = words[count+1]
try:
val = int(value)
if val < len(MEMORY):
(formulaR,formulaG,formulaB,author) = MEMORY[val]
newFile = createImgFileWithRules(800,800,formulaR,formulaG,formulaB,replyToName)
tweet_text = replyToName+"\nI remember "+str(len(MEMORY))+" things.\n\nI remember this canvas from "+author+"\nR: "+formulaR+"\nG: "+formulaG+"\nB: "+formulaB
print tweet_text
postToTwitter_File(api,newFile,tweet_text,id)
handleMention = False
except:
print "Unable to cast "+value+" as an index on MEMORY for "+replyToName
count = count +1
elif msg.find("Memories") > 0 and len(MEMORY) > 0: # Count the number of active memories
(formulaR,formulaG,formulaB,author) = MEMORY[randint(0,len(MEMORY)-1)]
newFile = createImgFileWithRules(800,800,formulaR,formulaG,formulaB,replyToName)
tweet_text = replyToName+"\nI remember "+str(len(MEMORY))+" things.\n\nI remember this from "+author+"\nR: "+formulaR+"\nG: "+formulaG+"\nB: "+formulaB
print tweet_text
postToTwitter_File(api,newFile,tweet_text,id)
handleMention = False
pos = msg.find("Rule") # Force render a trig picture (the colourful ones)
if pos > 0 and handleMention == True:
posR = msg.find("R:")
posG = msg.find("G:")
posB = msg.find("B:")
if posR > 0 and posG > 0 and posB > 0 and posB > posG > posR:
formulaR = msg[posR+2:posG-1]
formulaG = msg[posG+2:posB-1]
formulaB = msg[posB+2:]
newFile = createImgFileWithRules(800,800,formulaR,formulaG,formulaB,replyToName) # Create something
tweet_text = replyToName+"\nHere is the image from\nR: "+formulaR+"\nG: "+formulaG+"\nB: "+formulaB
print tweet_text
postToTwitter_File(api,newFile,tweet_text,id)
handleMention = False
if handleMention == True and replyToName != MYNAME:
# Respond with a new picture
# Strip out the username
msg.replace(replyToName,"")
msg.replace("@"+MYNAME,"")
msg.replace("@","") # 2017-02-05 AB - ensure formula munging doesn't spam random Twitter handles
msg.strip()
l = len(msg)-1
print "Message: "+msg+"\nLength: "+str(l)
# First pass...
img = beCreative(width,height,replyToName)
# Choose what type of image to create
# If the requester has posted an image, let's use it in the composition!
print status
print "Found "+str(len(status["entities"]))+" entities"
if len(status["entities"]) > 0 and "media" in status["entities"].keys():
medias = status["entities"]["media"]
userImgs = []
print "Found "+str(len(medias))+" medias"
for m in medias:
url = m["media_url"]
print "Trying to recover user image "+url
try:
fd = urllib.urlopen(url)
image_file = io.BytesIO(fd.read())
im = Image.open(image_file)
userImgs.append(im)
except:
print "Unable to load image "+url
url = m["media_url_https"]
try:
fd = urllib.urlopen(url)
image_file = io.BytesIO(fd.read())
im = Image.open(image_file)
userImgs.append(im)
except:
print "Unable to load image "+url
if len(userImgs) > 0:
#for usrimg in userImgs:
usrimg = userImgs[randint(0,len(userImgs)-1)]
usrimg.save("userimages/UserImage_"+str(randint(1000000,9999999))+".png")
usrimgrgba = usrimg.convert("RGBA")
size = width,height
sx = usrimgrgba.size[0]
sy = usrimgrgba.size[1]
if sx > width or sy > height:
usrimgrgba = usrimgrgba.resize((width,height), Image.ANTIALIAS)
methods = ["Circle","Circle","Blend","Circle","Blend","Circle","Blend","Circle","Blend","Spike","Blend","Threshold"]
img = mergeImages(usrimgrgba,img, methods[randint(0,len(methods)-1)] )
img.save("resultimages/ResultImage_"+str(randint(1000000,9999999))+".png")
seedword = msg.split()
tweet_text = getTweetText(tweetbot,"I made this for you, "+replyToName+", because you asked nicely.\n",seedword) # Conversation management goes here
while len(tweet_text) > TWITTERLIMIT: # Twitter limit
tweet_text = getTweetText(tweetbot,replyToName,seedword)
print tweet_text
postToTwitter_Image(api,img,tweet_text,id)
else:
seedword = msg.split()
tweet_text = getTweetText(tweetbot,"I made this for you, "+replyToName+", because you asked nicely.\n",seedword) # Conversation management goes here
while len(tweet_text) > TWITTERLIMIT: # Twitter limit
tweet_text = getTweetText(tweetbot,replyToName,seedword)
print tweet_text
postToTwitter_Image(api,img,tweet_text,id)
r = api.request('favorites/create', {'id':id})
print('FAVORITE SUCCESS' if r.status_code == 200 else 'FAVORITE FAILURE')
else:
seedword = msg.split()
tweet_text = getTweetText(tweetbot,"I made this for you, "+replyToName+", because you asked nicely.\n",seedword) # Conversation management goes here
while len(tweet_text) > TWITTERLIMIT: # Twitter limit
tweet_text = getTweetText(tweetbot,replyToName,seedword)
print tweet_text
postToTwitter_Image(api,img,tweet_text,id)
r = api.request('favorites/create', {'id':id})
print('FAVORITE SUCCESS' if r.status_code == 200 else 'FAVORITE FAILURE')
jobsComplete = jobsComplete +1
if id > max_id:
max_id = id
addStringToFile(GAMENAME,'MAXID.txt',str(max_id))
#maxidfile = open('./TrigonometryBot_MAXID.txt', 'a+')
#maxidfile.write("\n"+str(id))
#maxidfile.close()
if replyToName not in ACQUAINTANCE:
addStringToFile(GAMENAME,"Acquaintances.txt",replyToName)
ACQUAINTANCE.append(replyToName)
return max_id
def addStringToFile(prefix,suffix,theString):
theFile = open(prefix+'_'+suffix, 'a+')
theFile.write("\n"+str(theString))
theFile.close()
def loadTwitterKeysFromFile(filePrefix):
fileName = filePrefix+"_TwitterKeys.txt"
fileOfKeys = open(fileName, 'r+')
keys = fileOfKeys.read().split()
fileOfKeys.close()
return (keys[0],keys[1],keys[2],keys[3])
def connectToTwitter():
# Load keys from file, connect to Twitter using them.
(cons_key, cons_secret, access_token, access_token_secret) = loadTwitterKeysFromFile(GAMENAME)
api = TwitterAPI(cons_key,cons_secret,access_token,access_token_secret) # Does this need to be retried?
return api
##################
STATEMENT = loadStringsFromFile(GAMENAME,"Statements.txt")
SEEDWORD = loadStringsFromFile(GAMENAME,"Seedwords.txt")
ADMIN = loadStringsFromFile(GAMENAME,"Admins.txt")
ACQUAINTANCE = loadStringsFromFile(GAMENAME,"Acquaintances.txt")
mainLoop()