Skip to content

Commit

Permalink
temporarily disable tasklet patch and hydrating objects/authors in pa…
Browse files Browse the repository at this point in the history
…ges.serve_feed

for #1149 (comment)
  • Loading branch information
snarfed committed Dec 19, 2024
1 parent 77bb1c8 commit 01ba8b5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 64 deletions.
74 changes: 37 additions & 37 deletions flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,40 +62,40 @@

###########################################

# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
#
# fixes "RuntimeError: Key has already been set in this batch" errors due to
# tasklets in pages.serve_feed
from logging import error as log_error
from sys import modules

from google.cloud.datastore_v1.types.entity import Key
from google.cloud.ndb._cache import (
_GlobalCacheSetBatch,
global_compare_and_swap,
global_set_if_not_exists,
global_watch,
)
from google.cloud.ndb.tasklets import Future, Return, tasklet

GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME


@tasklet
def custom_global_lock_for_read(key: str, value: str):
if value is not None:
yield global_watch(key, value)
lock_acquired = yield global_compare_and_swap(
key, LOCKED_FOR_READ, expires=LOCK_TIME
)
else:
lock_acquired = yield global_set_if_not_exists(
key, LOCKED_FOR_READ, expires=LOCK_TIME
)

if lock_acquired:
raise Return(LOCKED_FOR_READ)

modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
# # https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945
# #
# # fixes "RuntimeError: Key has already been set in this batch" errors due to
# # tasklets in pages.serve_feed
# from logging import error as log_error
# from sys import modules

# from google.cloud.datastore_v1.types.entity import Key
# from google.cloud.ndb._cache import (
# _GlobalCacheSetBatch,
# global_compare_and_swap,
# global_set_if_not_exists,
# global_watch,
# )
# from google.cloud.ndb.tasklets import Future, Return, tasklet

# GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
# LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
# LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME


# @tasklet
# def custom_global_lock_for_read(key: str, value: str):
# if value is not None:
# yield global_watch(key, value)
# lock_acquired = yield global_compare_and_swap(
# key, LOCKED_FOR_READ, expires=LOCK_TIME
# )
# else:
# lock_acquired = yield global_set_if_not_exists(
# key, LOCKED_FOR_READ, expires=LOCK_TIME
# )

# if lock_acquired:
# raise Return(LOCKED_FOR_READ)

# modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
55 changes: 28 additions & 27 deletions pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,33 +260,34 @@ def serve_feed(*, objects, format, user, title, as_snippets=False, quiet=False):
else:
activities = [obj.as1 for obj in objects]

# hydrate authors, actors, objects from stored Objects
fields = 'author', 'actor', 'object'
gets = []
for a in activities:
for field in fields:
val = as1.get_object(a, field)
if val and val.keys() <= set(['id']):
def hydrate(a, f):
def maybe_set(future):
if future.result() and future.result().as1:
a[f] = future.result().as1
return maybe_set

# TODO: extract a Protocol class method out of User.profile_id,
# then use that here instead. the catch is that we'd need to
# determine Protocol for every id, which is expensive.
#
# same TODO is in models.fetch_objects
id = val['id']
if id.startswith('did:'):
id = f'at://{id}/app.bsky.actor.profile/self'

future = Object.get_by_id_async(id)
future.add_done_callback(hydrate(a, field))
gets.append(future)

tasklets.wait_all(gets)
# TODO: bring back?
# # hydrate authors, actors, objects from stored Objects
# fields = 'author', 'actor', 'object'
# gets = []
# for a in activities:
# for field in fields:
# val = as1.get_object(a, field)
# if val and val.keys() <= set(['id']):
# def hydrate(a, f):
# def maybe_set(future):
# if future.result() and future.result().as1:
# a[f] = future.result().as1
# return maybe_set

# # TODO: extract a Protocol class method out of User.profile_id,
# # then use that here instead. the catch is that we'd need to
# # determine Protocol for every id, which is expensive.
# #
# # same TODO is in models.fetch_objects
# id = val['id']
# if id.startswith('did:'):
# id = f'at://{id}/app.bsky.actor.profile/self'

# future = Object.get_by_id_async(id)
# future.add_done_callback(hydrate(a, field))
# gets.append(future)

# tasklets.wait_all(gets)

actor = (user.obj.as1 if user.obj and user.obj.as1
else {'displayName': user.readable_id, 'url': user.web_url()})
Expand Down
3 changes: 3 additions & 0 deletions tests/test_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ def test_feed_html_empty(self):
self.assert_equals(200, got.status_code)
self.assert_equals([], microformats2.html_to_activities(got.text))

@skip
def test_feed_html(self):
self.add_objects()

Expand Down Expand Up @@ -524,6 +525,7 @@ def test_feed_atom_empty_g_user_without_obj(self):
self.user.put()
self.test_feed_atom_empty()

@skip
def test_feed_atom(self):
self.add_objects()
got = self.client.get('/web/user.com/feed?format=atom')
Expand All @@ -547,6 +549,7 @@ def test_feed_rss_empty(self):
self.assert_equals(rss.CONTENT_TYPE, got.headers['Content-Type'])
self.assert_equals([], rss.to_activities(got.text))

@skip
def test_feed_rss(self):
self.add_objects()
got = self.client.get('/web/user.com/feed?format=rss')
Expand Down

0 comments on commit 01ba8b5

Please sign in to comment.