Skip to content

Commit

Permalink
batch requests executed by callGAPI so they're retried
Browse files Browse the repository at this point in the history
  • Loading branch information
jay0lee committed Jan 14, 2016
1 parent 15a60b7 commit 1960b20
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions gyb.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,16 @@ def buildGAPIServiceObject(api, soft_errors=False):
sys.exit(4)

def callGAPI(service, function, soft_errors=False, throw_reasons=[], **kwargs):
method = getattr(service, function)
retries = 3
retries = 10
parameters = kwargs.copy()
parameters.update(extra_args)
if function:
method = getattr(service, function)(**parameters)
else:
method = service
for n in range(1, retries+1):
try:
return method(**parameters).execute()
return method.execute()
except googleapiclient.errors.HttpError as e:
error = simplejson.loads(e.content.decode('utf-8'))
try:
Expand All @@ -479,8 +482,8 @@ def callGAPI(service, function, soft_errors=False, throw_reasons=[], **kwargs):
message = error['error']['errors'][0]['message']
if reason in throw_reasons:
raise
if n != retries and reason in ['rateLimitExceeded',
'userRateLimitExceeded', 'backendError']:
if n != retries and (http_status >= 500 or
reason in ['rateLimitExceeded', 'userRateLimitExceeded', 'backendError']):
wait_on_fail = (2 ** n) if (2 ** n) < 60 else 60
randomness = float(random.randint(1,1000)) / 1000
wait_on_fail = wait_on_fail + randomness
Expand Down Expand Up @@ -973,13 +976,13 @@ def main(argv):
callback=backup_message)
backed_up_messages += 1
if len(gbatch._order) == options.batch_size:
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("backed up %s of %s messages" %
(backed_up_messages, backup_count))
if len(gbatch._order) > 0:
gbatch.execute()
callGAPI(gbatch, None)
sqlconn.commit()
rewrite_line("backed up %s of %s messages" %
(backed_up_messages, backup_count))
Expand All @@ -1001,13 +1004,13 @@ def main(argv):
callback=refresh_message)
refreshed_messages += 1
if len(gbatch._order) == options.batch_size:
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("refreshed %s of %s messages" %
(refreshed_messages, refresh_count))
if len(gbatch._order) > 0:
gbatch.execute()
callGAPI(gbatch, None)
sqlconn.commit()
rewrite_line("refreshed %s of %s messages" %
(refreshed_messages, refresh_count))
Expand Down Expand Up @@ -1113,7 +1116,7 @@ def main(argv):
# this message would put us over max, execute current batch first
rewrite_line("restoring %s messages (%s/%s)" % (len(gbatch._order),
current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("restored %s messages (%s/%s)" % (len(gbatch._order),
Expand All @@ -1127,7 +1130,7 @@ def main(argv):
if len(gbatch._order) == options.batch_size:
rewrite_line("restoring %s messages (%s/%s)" % (len(gbatch._order),
current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("restored %s messages (%s/%s)" % (len(gbatch._order),
Expand All @@ -1137,7 +1140,7 @@ def main(argv):
if len(gbatch._order) > 0:
rewrite_line("restoring %s messages (%s/%s)" % (len(gbatch._order),
current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
sqlconn.commit()
rewrite_line("restored %s messages (%s/%s)" % (len(gbatch._order),
current, restore_count))
Expand Down Expand Up @@ -1244,7 +1247,7 @@ def main(argv):
# this message would put us over max, execute current batch first
rewrite_line("restoring %s messages (%s/%s)" %
(len(gbatch._order), current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("restored %s messages (%s/%s)" %
Expand All @@ -1259,7 +1262,7 @@ def main(argv):
if len(gbatch._order) == options.batch_size:
rewrite_line("restoring %s messages (%s/%s)" %
(len(gbatch._order), current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("restored %s messages (%s/%s)" %
Expand All @@ -1269,7 +1272,7 @@ def main(argv):
if len(gbatch._order) > 0:
rewrite_line("restoring %s messages (%s/%s)" %
(len(gbatch._order), current, restore_count))
gbatch.execute()
callGAPI(gbatch, None)
sqlconn.commit()
rewrite_line("restoring %s messages (%s/%s)" %
(len(gbatch._order), current, restore_count))
Expand Down Expand Up @@ -1364,12 +1367,12 @@ def main(argv):
id=a_message['id']), callback=purged_message)
purged_messages += 1
if len(gbatch._order) == options.batch_size:
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
rewrite_line("purged %s of %s messages" %
(purged_messages, purge_count))
if len(gbatch._order) > 0:
gbatch.execute()
callGAPI(gbatch, None)
rewrite_line("purged %s of %s messages" % (purged_messages, purge_count))
print("\n")

Expand Down Expand Up @@ -1488,14 +1491,14 @@ def main(argv):
callback=estimate_message)
estimated_messages += 1
if len(gbatch._order) == options.batch_size:
gbatch.execute()
callGAPI(gbatch, None)
gbatch = googleapiclient.http.BatchHttpRequest()
sqlconn.commit()
rewrite_line("Estimated size %s %s/%s messages" %
(bytes_to_larger(message_size_estimate), estimated_messages,
estimate_count))
if len(gbatch._order) > 0:
gbatch.execute()
callGAPI(gbatch, None)
sqlconn.commit()
rewrite_line("Estimated size %s %s/%s messages" %
(bytes_to_larger(message_size_estimate), estimated_messages,
Expand Down

0 comments on commit 1960b20

Please sign in to comment.