-
Notifications
You must be signed in to change notification settings - Fork 1
/
helpers.py
237 lines (192 loc) · 6.19 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
HELPERS
This is where all the actual code is hidden away, in order to keep the
app and event handlers lean and tidy.
"""
import cStringIO
import datetime
import hashlib
import os
import sys
import time
import urllib2
# The boto3 library is always available on Lambda.
import boto3
from boto3.dynamodb.conditions import Key
# Since Pillow is bundled by serverless-wsgi, we need to load it from the
# .requirements directory.
root = os.path.abspath(os.path.join(os.path.dirname(__file__)))
sys.path.insert(0, os.path.join(root, '.requirements'))
from PIL import Image, ImageOps # noqa
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
s3 = boto3.resource('s3')
bucket = os.environ['BUCKET_NAME']
def get_wats():
"""
Queries the DynamoDB GSI to get wats created today, in reverse order,
limited to 30 items.
"""
return table.query(
IndexName='created_at-index',
KeyConditionExpression=Key('created_at_date').eq(_today()),
Limit=30,
ScanIndexForward=False
)['Items']
def create_wat(url):
"""
Inserts the entry into DynamoDB. We need both the date and timestamp
for the GSI to work.
"""
table.put_item(Item={
'url': url,
'status': 'queued',
'created_at_date': _today(),
'created_at_time': int(time.time())
})
def get_queued_urls(event):
"""
Filters the DynamoDB stream event to get only queued wats.
"""
for record in event['Records']:
status = _deep_get(record, 'dynamodb', 'NewImage', 'status', 'S')
if status == 'queued':
yield _deep_get(record, 'dynamodb', 'Keys', 'url', 'S')
def get_s3_keys(event):
"""
In practice, we only get a single record in the list.
"""
for record in event['Records']:
yield record['s3']['object']['key']
def download_to_s3(url):
"""
Wraps the download and upload functions into one.
"""
body = _download_url(url)
key = _generate_filename(url)
return _upload_original_to_s3(key, body, url)
def register_status(url, status):
"""
Sets the status attribute of an existing wat. Used to indicate either
download completion or errors.
"""
table.update_item(
Key={'url': url},
UpdateExpression=(
'SET #status = :status'),
ExpressionAttributeValues={
':status': status
},
ExpressionAttributeNames={'#status': 'status'},
)
def watify(key):
"""
Download the original from S3, watify it and put a new image
on S3 in a different folder.
"""
body, original_url = _get_s3_object(key)
watified = _overlay_image(body, 'watboy.png')
watified_url = _upload_watified_to_s3(key, watified)
return _update_completed_wat(original_url, watified_url)
def _update_completed_wat(url, watified_url):
"""
Sets the wat status to completed and saves the URL of the watified version.
"""
table.update_item(
Key={'url': url},
UpdateExpression=(
'SET watified_url = :watified_url, #status = :status'),
ExpressionAttributeValues={
':watified_url': watified_url,
':status': 'completed'
},
ExpressionAttributeNames={'#status': 'status'},
)
return watified_url
def _today():
"""
Today's date in ISO format, e.g. 2017-06-23
"""
return datetime.date.today().isoformat()
def _get_s3_object(key):
"""
Reads the object given by key from S3 and returns the binary contents,
including the `original url` from metadata.
"""
s3_object = s3.Object(bucket, key).get()
body = s3_object['Body'].read()
original_url = s3_object['Metadata']['original_url']
return (body, original_url)
def _upload_original_to_s3(key, body, original_url):
"""
upload the original binary contents to s3, set its `original_url` in
metadata and return its public url.
"""
s3.Object(bucket, 'original/{}'.format(key)).put(
ACL='public-read',
ContentType='image/jpeg',
Body=body,
Metadata={'original_url': original_url})
return 'https://s3.amazonaws.com/{}/original/{}'.format(bucket, key)
def _upload_watified_to_s3(key, body):
"""
upload the watified binary contents to s3 and return its public url.
"""
key = os.path.basename(key)
s3.Object(bucket, 'watified/{}'.format(key)).put(
ACL='public-read',
ContentType='image/jpeg',
Body=body)
return 'https://s3.amazonaws.com/{}/watified/{}'.format(bucket, key)
def _download_url(url):
"""
Download a file from the URL, timeout in 5 secs.
"""
return urllib2.urlopen(url, None, 5).read()
def _generate_filename(url):
"""
Hash the original URL to get a safe filename.
"""
sha1 = hashlib.sha1()
sha1.update(url)
return '{}.jpg'.format(sha1.hexdigest())
def _overlay_image(original_bytes, overlay_file):
"""
This is where the secret sauce is made, take the one image and put
it on top of the other one! It's easy!
"""
original = _image_from_bytes(original_bytes)
overlay = _scale_image_to_match(Image.open(overlay_file), original)
result = original.copy().convert('RGBA')
result.paste(overlay, (0, 0), mask=overlay)
return _image_to_bytes(result)
def _scale_image_to_match(image, reference):
"""
Scales image to match the size of the reference image, upscaling if
necessary.
"""
if image.size[0] > reference.size[0] or image.size[1] > reference.size[1]:
image.thumbnail(reference.size)
else:
image = ImageOps.fit(image, reference.size)
return image
def _image_from_bytes(image_bytes):
"""
Converts a string to a PIL Image object.
"""
return Image.open(cStringIO.StringIO(image_bytes))
def _image_to_bytes(image):
"""
Converts a PIL Image object to a string.
"""
output = cStringIO.StringIO()
image.save(output, 'JPEG')
jpeg_data = output.getvalue()
output.close()
return jpeg_data
def _deep_get(dictionary, *keys):
"""
Get a deeply nested value from a dictionary, returning None if any key
in the chain is not found.
"""
return reduce(lambda d, key: d.get(key) if d else None, keys, dictionary)