Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot update a record with pending changes #5824

Open
KotoriMinami opened this issue Aug 12, 2024 · 4 comments
Open

Cannot update a record with pending changes #5824

KotoriMinami opened this issue Aug 12, 2024 · 4 comments

Comments

@KotoriMinami
Copy link

Description:

Some messages within the conversation encounter the following issues.

 Diagnostic error: Cannot update a record with pending changes

Environment Information:

  • Rocket.Chat Server Version:
  • Rocket.Chat App Version:
  • Device Name:
  • OS Version:

Steps to reproduce:

  1. Enter a conversation, with a very low probability of occurrence.

Expected behavior:

Within the conversation, messages can be updated and further actions can be performed.

Actual behavior:

There's a probability that a certain batch of messages may not update correctly.

Additional context:

After examining the logs, it is suspected that the issue is caused by the following:

In the

handleMessageReceived = (ddpMessage: IDDPMessage) => {
method that handles message updates over a certain period of time.
All creations and updates are stored in
this._messagesBatch[message._id] = operation;

If there is a slightly long-running task when executing

await this.updateMessage(_queue[i]);

before reaching db.batch,


because

the next timer execution may be triggered.

At this point,

this._messagesBatch = this.messagesBatch;

the creation and updates from the previous batch will be lost,
causing them to not reach the db.batch method.

As a result, Model._preparedState will remain in an executing state in memory and will never recover.
Consequently, this batch of messages will not receive any updates.

@diegolmello
Copy link
Member

Thanks for evaluating that, @KotoriMinami.
Do you have any solution in mind?

@KotoriMinami
Copy link
Author

KotoriMinami commented Aug 13, 2024

@diegolmello Maybe this is possible?

consumer = async () => {
    // copy variables values to local and clean them
    const _lastOpen = this.lastOpen;
    const _queue = Object.keys(this.queue).map(key => this.queue[key]);
    this._messagesBatch = this.messagesBatch;
    this._threadsBatch = this.threadsBatch;
    this._threadMessagesBatch = this.threadMessagesBatch;
    this.queue = {};
    this.messagesBatch = {};
    this.threadsBatch = {};
    this.threadMessagesBatch = {};
    
    for (let i = 0; i < _queue.length; i += 1) {
	    try {
		    // eslint-disable-next-line no-await-in-loop
		    await this.updateMessage(_queue[i]);
	    } catch (e) {
		    log(e);
	    }
    }
    
    try {
	    const db = database.active;
	    await db.write(async () => {
		    await db.batch(
			    ...Object.values(this._messagesBatch),
			    ...Object.values(this._threadsBatch),
			    ...Object.values(this._threadMessagesBatch)
		    );
	    });
    
	    this.read(_lastOpen);
    } catch (e) {
	    log(e);
    }
    
    // Clean local variables
    this._messagesBatch = {};
    this._threadsBatch = {};
    this._threadMessagesBatch = {};
    this.timer = null;
    if (Object.keys(this.queue).length) {
      this.timer = setTimeout(this.consumer, WINDOW_TIME);
    }
}

handleMessageReceived = (ddpMessage: IDDPMessage) => {
    if (!this.timer) {
        this.timer = setTimeout(this.consumer, WINDOW_TIME);
    }
    this.lastOpen = new Date();
    const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])) as IMessage;
    this.queue[message._id] = message;
};

Additionally, I'm not quite sure about the role of this.messagesBatch in the logic.

@diegolmello
Copy link
Member

@KotoriMinami Thanks. That's promising!

This code is a little bit harder to read than it should, because of a performance improvement to make messages to be saved in batches of 1 second.
We can reevaluate if it's really needed though, since it was implemented a long time ago.

Without it, every message coming from websocket would be saved on WatermelonDB directly.
If there was too many messages coming from websocket at the same time, it would lead to even more Diagnostic error you reported.

Can you remove everything related to the queue and give it a try?
I'm pretty sure you could just call updateMessage on handleMessageReceived.

@KotoriMinami
Copy link
Author

@diegolmello Yes, I think using a queue to handle this logic is reasonable.
Once you use the prepare related APIs, any subsequent updates before executing db.batch are invalid.
Therefore, it's best to ensure that the next prepare runs after the batch is completed, otherwise, the latest updates might be lost.

I'm confident that using a queue will be much more effective than directly calling updateMessage within handleMessageReceived. I only had this question because I noticed that this.messagesBatch was continuously assigned as {},
and then this._messagesBatch = this.messagesBatch was executed.
It feels like it's not really being utilized in the process.

You can try simulating a time consuming operation before db.write, for example:

for (let i = 0; i < _queue.length; i += 1) {
    try {
        // eslint-disable-next-line no-await-in-loop
        await this.updateMessage(_queue[i]);
    } catch (e) {
        log(e);
    }
}
await new Promise((resolve, reject) => {
    setTimeout(resolve, 4000);
});
try {
    const db = database.active;
    await db.write(async () => {
        await db.batch(
            ...Object.values(this._messagesBatch),
            ...Object.values(this._threadsBatch),
            ...Object.values(this._threadMessagesBatch)
        );
    });

    this.read(_lastOpen);
} catch (e) {
    log(e);
}

If a new websocket push is received during the waiting period,
you can see that this._messagesBatch actually gets lost because it’s overwritten by the next timer.
This results in the messages from the previous queue not receiving any further updates.

Of course, this simulated scenario is quite extreme, and the likelihood of it happening in reality is extremely low.

After a simple test, the above code is able to handle this situation without losing the latest updates.
However, I'm not sure about its robustness.

After all, the current code logic has been running for four years.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants