-
Notifications
You must be signed in to change notification settings - Fork 401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add migation users memories to firestore #914
Conversation
WalkthroughThis change introduces a migration function to correct start and finish times for user memories in Firestore. It includes a new data model, Changes
Uplevel your code reviews with Entelligence.AI ProEntelligence.AI ProIf you like this project, please support us by purchasing the Pro version. The Pro version has advanced context, superior noise reduction and several proprietary improvements compared to the open source version. Moreover, Entelligence.AI Pro is free for open source projects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (2)
- backend/migration/main.py (1)
- backend/migration/memories.py (1)
Review comments generated (2)
- Review: 1
- LGTM: 1
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
backend/migration/memories.py
Outdated
import math | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
from datetime import datetime, timedelta | ||
from database._client import db | ||
from google.cloud import firestore | ||
from google.cloud.firestore_v1.field_path import FieldPath | ||
from google.cloud.firestore_v1 import FieldFilter | ||
|
||
class MemoryTime(BaseModel): | ||
id: str | ||
created_at: datetime | ||
started_at: Optional[datetime] | ||
finished_at: Optional[datetime] | ||
|
||
|
||
def migration_incorrect_start_finish_time(): | ||
user_offset = 0 | ||
user_limit = 400 | ||
while True: | ||
users_ref = ( | ||
db.collection('users') | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
|
||
memories_ref = users_ref.limit(user_limit).offset(user_offset) | ||
users = [doc for doc in memories_ref.stream()] | ||
if not users or len(users) == 0: | ||
print("no users") | ||
break | ||
for user in users: | ||
memories_ref = ( | ||
db.collection('users').document(user.id).collection("memories") | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
offset = 0 | ||
limit = 400 | ||
while True: | ||
print(f"running...user...{user.id}...{offset}") | ||
memories_ref = memories_ref.limit(limit).offset(offset) | ||
docs = [doc for doc in memories_ref.stream()] | ||
if not docs or len(docs) == 0: | ||
print("done") | ||
break | ||
batch = db.batch() | ||
for doc in docs: | ||
if not doc: | ||
continue | ||
|
||
memory = MemoryTime(**doc.to_dict()) | ||
if not memory.started_at: | ||
continue | ||
|
||
delta = memory.created_at.timestamp() - memory.started_at.timestamp() | ||
print(delta) | ||
if math.fabs(delta) < 15*60: | ||
continue | ||
td = None | ||
if delta > 0: | ||
td = timedelta(seconds=delta) | ||
else: | ||
td = timedelta(seconds=-delta) | ||
if memory.finished_at: | ||
memory.finished_at = memory.finished_at + td | ||
memory.started_at = memory.started_at + td | ||
print(f'{memory.dict()}') | ||
|
||
memory_ref = ( | ||
db.collection('users').document(user.id).collection("memories").document(memory.id) | ||
) | ||
|
||
batch.update(memory_ref, memory.dict()) | ||
|
||
batch.commit() | ||
offset += len(docs) | ||
|
||
user_offset = user_offset + len(users) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function migration_incorrect_start_finish_time
is quite large and does multiple things. It would be beneficial to break it down into smaller, more manageable functions. This will improve readability, maintainability, and testability of the code.
Here's a suggestion on how you could refactor this function:
- def migration_incorrect_start_finish_time():
+ def get_users(user_offset, user_limit):
while True:
users_ref = (
db.collection('users')
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING)
)
memories_ref = users_ref.limit(user_limit).offset(user_offset)
users = [doc for doc in memories_ref.stream()]
if not users or len(users) == 0:
print("no users")
break
return users
+ def get_memories(user, offset, limit):
while True:
print(f"running...user...{user.id}...{offset}")
memories_ref = (
db.collection('users').document(user.id).collection("memories")
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING)
)
memories_ref = memories_ref.limit(limit).offset(offset)
docs = [doc for doc in memories_ref.stream()]
if not docs or len(docs) == 0:
print("done")
break
return docs
+ def update_memory_times(user, docs):
batch = db.batch()
for doc in docs:
if not doc:
continue
memory = MemoryTime(**doc.to_dict())
if not memory.started_at:
continue
delta = memory.created_at.timestamp() - memory.started_at.timestamp()
print(delta)
if math.fabs(delta) < 15*60:
continue
td = None
if delta > 0:
td = timedelta(seconds=delta)
else:
td = timedelta(seconds=-delta)
if memory.finished_at:
memory.finished_at = memory.finished_at + td
memory.started_at = memory.started_at + td
print(f'{memory.dict()}')
memory_ref = (
db.collection('users').document(user.id).collection("memories").document(memory.id)
)
batch.update(memory_ref, memory.dict())
batch.commit()
+ def migration_incorrect_start_finish_time():
user_offset = 0
user_limit = 400
while True:
users = get_users(user_offset, user_limit)
for user in users:
offset = 0
limit = 400
while True:
docs = get_memories(user, offset, limit)
update_memory_times(user, docs)
offset += len(docs)
user_offset = user_offset + len(users)
This way, each function has a single responsibility and the code is much easier to understand and test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- backend/migration/memories.py (1)
Review comments generated (1)
- Review: 1
- LGTM: 0
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
backend/migration/memories.py
Outdated
import math | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
from datetime import datetime, timedelta | ||
from database._client import db | ||
from google.cloud import firestore | ||
from google.cloud.firestore_v1.field_path import FieldPath | ||
from google.cloud.firestore_v1 import FieldFilter | ||
|
||
class MemoryTime(BaseModel): | ||
id: str | ||
created_at: datetime | ||
started_at: Optional[datetime] | ||
finished_at: Optional[datetime] | ||
|
||
|
||
def migration_incorrect_start_finish_time(): | ||
user_offset = 0 | ||
user_limit = 400 | ||
while True: | ||
users_ref = ( | ||
db.collection('users') | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
|
||
users_ref = users_ref.limit(user_limit).offset(user_offset) | ||
users = list(users_ref.stream()) | ||
if not users or len(users) == 0: | ||
print("no users") | ||
break | ||
for user in users: | ||
memories_ref = ( | ||
db.collection('users').document(user.id).collection("memories") | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
offset = 0 | ||
limit = 400 | ||
while True: | ||
print(f"running...user...{user.id}...{offset}") | ||
memories_ref = memories_ref.limit(limit).offset(offset) | ||
docs = list(memories_ref.stream()) | ||
if not docs or len(docs) == 0: | ||
print("done") | ||
break | ||
batch = db.batch() | ||
for doc in docs: | ||
if not doc: | ||
continue | ||
|
||
memory = MemoryTime(**doc.to_dict()) | ||
if not memory.started_at: | ||
continue | ||
|
||
delta = memory.created_at.timestamp() - memory.started_at.timestamp() | ||
print(delta) | ||
if math.fabs(delta) < 15*60: # gaps in 15' is ok | ||
continue | ||
td = None | ||
if delta > 0: | ||
td = timedelta(seconds=delta) | ||
else: | ||
td = timedelta(seconds=-delta) | ||
if memory.finished_at: | ||
memory.finished_at = memory.finished_at + td | ||
memory.started_at = memory.started_at + td | ||
print(f'{memory.dict()}') | ||
|
||
memory_ref = ( | ||
db.collection('users').document(user.id).collection("memories").document(memory.id) | ||
) | ||
|
||
batch.update(memory_ref, memory.dict()) | ||
|
||
batch.commit() | ||
offset += len(docs) | ||
|
||
user_offset = user_offset + len(users) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration_incorrect_start_finish_time
function seems to be doing a lot of work. It's responsible for fetching users, iterating over their memories, calculating time differences, and updating the Firestore database. This violates the Single Responsibility Principle (SRP) which states that a function should do one thing only.
To improve maintainability and readability, consider breaking down this function into smaller ones, each with its own responsibility. For example, you could have separate functions for fetching users, processing memories, and updating Firestore.
Here is a rough sketch of how you might refactor this:
+ def fetch_users(user_offset, user_limit):
+ # Fetch users logic here...
+ return users
+ def process_memories(user):
+ # Process memories logic here...
+ return updated_memories
+ def update_firestore(updated_memories):
+ # Update Firestore logic here...
- def migration_incorrect_start_finish_time():
+ def migrate_memories():
user_offset = 0
user_limit = 400
while True:
users = fetch_users(user_offset, user_limit)
if not users or len(users) == 0:
print("no users")
break
for user in users:
updated_memories = process_memories(user)
update_firestore(updated_memories)
user_offset = user_offset + len(users)
This way, each function has a clear purpose, making the code easier to understand and maintain. Additionally, it would be easier to write tests for these individual functions.
Also, consider replacing the print
statements with proper logging. This will give you more control over the output and allow you to include additional information such as timestamps, log levels, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- backend/migration/memories.py (1)
Review comments generated (1)
- Review: 1
- LGTM: 0
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
backend/migration/memories.py
Outdated
import math | ||
import time | ||
from typing import Optional | ||
from pydantic import BaseModel | ||
from datetime import datetime, timedelta | ||
from database._client import db | ||
from google.cloud import firestore | ||
from google.cloud.firestore_v1.field_path import FieldPath | ||
from google.cloud.firestore_v1 import FieldFilter | ||
|
||
class MemoryTime(BaseModel): | ||
id: str | ||
created_at: datetime | ||
started_at: Optional[datetime] | ||
finished_at: Optional[datetime] | ||
|
||
|
||
def migration_incorrect_start_finish_time(): | ||
user_offset = 0 | ||
user_limit = 400 | ||
while True: | ||
users_ref = ( | ||
db.collection('users') | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
users_ref = users_ref.limit(user_limit).offset(user_offset) | ||
users = list(users_ref.stream()) | ||
if not users or len(users) == 0: | ||
print("no users") | ||
break | ||
for user in users: | ||
offset = 0 | ||
limit = 400 | ||
while True: | ||
print(f"running...user...{user.id}...{offset}") | ||
memories_ref = ( | ||
db.collection('users').document(user.id).collection("memories") | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
memories_ref = memories_ref.limit(limit).offset(offset) | ||
docs = list(memories_ref.stream()) | ||
if not docs or len(docs) == 0: | ||
print("done") | ||
break | ||
batch = db.batch() | ||
for doc in docs: | ||
if not doc: | ||
continue | ||
|
||
memory = MemoryTime(**doc.to_dict()) | ||
if not memory.started_at: | ||
continue | ||
|
||
delta = memory.created_at.timestamp() - memory.started_at.timestamp() | ||
print(delta) | ||
if math.fabs(delta) < 15*60: # gaps in 15' is ok | ||
continue | ||
td = None | ||
if delta > 0: | ||
td = timedelta(seconds=math.fabs(delta)) | ||
else: | ||
td = -timedelta(seconds=math.fabs(delta)) | ||
if memory.finished_at: | ||
memory.finished_at = memory.finished_at + td | ||
memory.started_at = memory.started_at + td | ||
print(f'{memory.dict()}') | ||
|
||
memory_ref = ( | ||
db.collection('users').document(user.id).collection("memories").document(memory.id) | ||
) | ||
|
||
batch.update(memory_ref, memory.dict()) | ||
|
||
batch.commit() | ||
offset += len(docs) | ||
time.sleep(.1) # sleep 100ms | ||
|
||
user_offset = user_offset + len(users) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration_incorrect_start_finish_time
function seems to be doing a lot of work. It's fetching users, iterating over them, fetching their memories, and updating them if necessary. This could potentially lead to performance issues due to the large number of database operations being performed in a single function.
To improve this, you could consider breaking down the function into smaller ones, each with its own responsibility. For example, one function could fetch users, another could fetch memories for a given user, and another could update a memory if necessary. This would make the code more modular and easier to maintain.
Also, there is no error handling in the function. If any operation fails (for example, a Firestore operation), the entire function will fail. You should add appropriate error handling to ensure that the function can recover from errors and continue processing other data.
Lastly, the function uses print statements for logging. While this might be fine for local development, it's not ideal for production environments. Consider using a proper logging library which can provide more control over log levels, formats, and destinations.
Here's an example of how you could refactor the function:
def get_users(user_offset, user_limit):
# Fetch users logic here
pass
def get_memories(user, offset, limit):
# Fetch memories logic here
pass
def update_memory_if_necessary(memory):
# Update memory logic here
pass
def migration_incorrect_start_finish_time():
user_offset = 0
user_limit = 400
while True:
try:
users = get_users(user_offset, user_limit)
if not users:
break
for user in users:
offset = 0
limit = 400
while True:
memories = get_memories(user, offset, limit)
if not memories:
break
for memory in memories:
update_memory_if_necessary(memory)
offset += len(memories)
user_offset += len(users)
except Exception as e:
# Log the error and continue with the next iteration
logging.error(f"An error occurred: {e}")
continue
This is just a rough example, but it should give you an idea of how to start refactoring the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files selected (1)
- backend/migration/memories.py (1)
Review comments generated (2)
- Review: 1
- LGTM: 1
Tips
Chat with Entelligence.AI Bot (@Entelligence.AI
)
- Reply on review comments left by this bot to ask follow-up questions. A review comment is a comment on a diff or a file.
- Invite the bot into a review comment chain by tagging
@Entelligence.AI
in a reply.
Code suggestions
- The bot may make code suggestions, but please review them carefully before committing since the line number ranges may be misaligned.
- You can edit the comment made by the bot and manually tweak the suggestion if it is slightly off.
Pausing incremental reviews
- Add
@Entelligence.AI: ignore
anywhere in the PR description to pause further reviews from the bot.
|
||
def migration_incorrect_start_finish_time(): | ||
user_offset = 0 | ||
user_limit = 400 | ||
while True: | ||
print(f"running...user...{user_offset}") | ||
users_ref = ( | ||
db.collection('users') | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
users_ref = users_ref.limit(user_limit).offset(user_offset) | ||
users = list(users_ref.stream()) | ||
if not users or len(users) == 0: | ||
print("no users") | ||
break | ||
for user in users: | ||
offset = 0 | ||
limit = 400 | ||
while True: | ||
print(f"running...user...{user.id}...memories...{offset}") | ||
memories_ref = ( | ||
db.collection('users').document(user.id).collection("memories") | ||
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING) | ||
) | ||
memories_ref = memories_ref.limit(limit).offset(offset) | ||
docs = list(memories_ref.stream()) | ||
if not docs or len(docs) == 0: | ||
break | ||
batch = db.batch() | ||
for doc in docs: | ||
if not doc: | ||
continue | ||
|
||
memory = MemoryTime(**doc.to_dict()) | ||
if not memory.started_at: | ||
continue | ||
|
||
delta = memory.created_at.timestamp() - memory.started_at.timestamp() | ||
if math.fabs(delta) < 15*60: # gaps in 15' is ok | ||
continue | ||
td = None | ||
if delta > 0: | ||
td = timedelta(seconds=math.fabs(delta)) | ||
else: | ||
td = -timedelta(seconds=math.fabs(delta)) | ||
if memory.finished_at: | ||
memory.finished_at = memory.finished_at + td | ||
memory.started_at = memory.started_at + td | ||
|
||
memory_ref = db.collection('users').document(user.id).collection('memories').document(memory.id) | ||
|
||
batch.update(memory_ref, memory.dict()) | ||
|
||
batch.commit() | ||
offset += len(docs) | ||
time.sleep(.01) # sleep 100ms | ||
|
||
user_offset = user_offset + len(users) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The migration_incorrect_start_finish_time
function seems to be doing a lot of work and has multiple responsibilities. It's fetching users, iterating over their memories, performing calculations on the timestamps, and updating the Firestore database. This violates the Single Responsibility Principle (SRP) of SOLID principles, which states that a function should do one thing only.
To improve maintainability and readability, consider breaking down this function into smaller functions each with a single responsibility. For example, you could have separate functions for fetching users, fetching memories, calculating new timestamps, and updating Firestore.
Here is a rough sketch of how you might refactor this:
def fetch_users(user_offset, user_limit):
# Fetch users logic here
def fetch_memories(user_id, offset, limit):
# Fetch memories logic here
def calculate_new_timestamps(memory):
# Calculate new timestamps logic here
def update_firestore(user_id, memory):
# Update Firestore logic here
def migration_incorrect_start_finish_time():
user_offset = 0
user_limit = 400
while True:
print(f"running...user...{user_offset}")
users = fetch_users(user_offset, user_limit)
if not users or len(users) == 0:
print("no users")
break
for user in users:
offset = 0
limit = 400
while True:
print(f"running...user...{user.id}...memories...{offset}")
docs = fetch_memories(user.id, offset, limit)
if not docs or len(docs) == 0:
break
batch = db.batch()
for doc in docs:
if not doc:
continue
memory = MemoryTime(**doc.to_dict())
if not memory.started_at:
continue
memory = calculate_new_timestamps(memory)
update_firestore(user.id, memory)
batch.commit()
offset += len(docs)
time.sleep(.01) # sleep 100ms
user_offset = user_offset + len(users)
This way, each function is easier to understand and test independently. It also makes the main migration function more readable by abstracting away the details of each step into separate functions.
Also, consider adding error handling around database operations. Currently, if there's an issue with any of the Firestore operations (like fetching users or updating memories), the entire script might fail without clear information about what went wrong. Wrapping these operations in try/except blocks and logging any exceptions could help with troubleshooting potential issues.
Issue: #783
Summary by Entelligence.AI