Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migation users memories to firestore #914

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/migration/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from memories import migration_incorrect_start_finish_time

migration_incorrect_start_finish_time()
78 changes: 78 additions & 0 deletions backend/migration/memories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import math
import time
from typing import Optional
from pydantic import BaseModel
from datetime import datetime, timedelta
from database._client import db
from google.cloud import firestore
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1 import FieldFilter

class MemoryTime(BaseModel):
id: str
created_at: datetime
started_at: Optional[datetime]
finished_at: Optional[datetime]


def migration_incorrect_start_finish_time():
user_offset = 0
user_limit = 400
while True:
users_ref = (
db.collection('users')
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING)
)
users_ref = users_ref.limit(user_limit).offset(user_offset)
users = list(users_ref.stream())
if not users or len(users) == 0:
print("no users")
break
for user in users:
offset = 0
limit = 400
while True:
print(f"running...user...{user.id}...{offset}")
memories_ref = (
db.collection('users').document(user.id).collection("memories")
.order_by(FieldPath.document_id(), direction=firestore.Query.ASCENDING)
)
memories_ref = memories_ref.limit(limit).offset(offset)
docs = list(memories_ref.stream())
if not docs or len(docs) == 0:
print("done")
break
batch = db.batch()
for doc in docs:
if not doc:
continue

memory = MemoryTime(**doc.to_dict())
if not memory.started_at:
continue

delta = memory.created_at.timestamp() - memory.started_at.timestamp()
print(delta)
if math.fabs(delta) < 15*60: # gaps in 15' is ok
continue
td = None
if delta > 0:
td = timedelta(seconds=math.fabs(delta))
else:
td = -timedelta(seconds=math.fabs(delta))
if memory.finished_at:
memory.finished_at = memory.finished_at + td
memory.started_at = memory.started_at + td
print(f'{memory.dict()}')

memory_ref = (
db.collection('users').document(user.id).collection("memories").document(memory.id)
)

batch.update(memory_ref, memory.dict())

batch.commit()
offset += len(docs)
time.sleep(.1) # sleep 100ms

user_offset = user_offset + len(users)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image description Entelligence.AI

The migration_incorrect_start_finish_time function seems to be doing a lot of work. It's fetching users, iterating over them, fetching their memories, and updating them if necessary. This could potentially lead to performance issues due to the large number of database operations being performed in a single function.

To improve this, you could consider breaking down the function into smaller ones, each with its own responsibility. For example, one function could fetch users, another could fetch memories for a given user, and another could update a memory if necessary. This would make the code more modular and easier to maintain.

Also, there is no error handling in the function. If any operation fails (for example, a Firestore operation), the entire function will fail. You should add appropriate error handling to ensure that the function can recover from errors and continue processing other data.

Lastly, the function uses print statements for logging. While this might be fine for local development, it's not ideal for production environments. Consider using a proper logging library which can provide more control over log levels, formats, and destinations.

Here's an example of how you could refactor the function:

def get_users(user_offset, user_limit):
    # Fetch users logic here
    pass

def get_memories(user, offset, limit):
    # Fetch memories logic here
    pass

def update_memory_if_necessary(memory):
    # Update memory logic here
    pass

def migration_incorrect_start_finish_time():
    user_offset = 0
    user_limit = 400
    while True:
        try:
            users = get_users(user_offset, user_limit)
            if not users:
                break
            for user in users:
                offset = 0
                limit = 400
                while True:
                    memories = get_memories(user, offset, limit)
                    if not memories:
                        break
                    for memory in memories:
                        update_memory_if_necessary(memory)
                    offset += len(memories)
            user_offset += len(users)
        except Exception as e:
            # Log the error and continue with the next iteration
            logging.error(f"An error occurred: {e}")
            continue

This is just a rough example, but it should give you an idea of how to start refactoring the function.

Loading