Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log compaction for hybrid state machines #440

Open
kjnilsson opened this issue May 14, 2024 · 3 comments
Open

Log compaction for hybrid state machines #440

kjnilsson opened this issue May 14, 2024 · 3 comments

Comments

@kjnilsson
Copy link
Contributor

kjnilsson commented May 14, 2024

Log compaction for hybrid state machines

Overview

A hybrid state machine is a state machine that keeps some of its state in memory
and some of its state, typically payload data, in the Raft log. RabbitMQ quorum
queues is an example of a hybrid state machine that relies on fifo-ish consumption
patterns in order to use standard (ish) snapshotting and log truncation.

Combined with the checkpoint feature which allows the state machine to store
a snapshot of its (typically light) state on disk without truncating the log
at that index we can consider additional options for reducing the data stored
by the log without a full truncation. Checkpoints allow hybrid state machines to
recover quickly after a restart as they do not need to restore the state from
the last snapshot, only from the last checkpoint.
This means that the log between last snapshot and the latest checkpoint will,
during normal operation, only be read on demand to read a
payload value. As we will never need every single entry in this section of
the log to deterministically recover the log this part of the log can thus be
compacted towards the state where it only contain indexes with payload data
that is currently referenced by the latest checkpoint.

A new ra_machine callback that returns a map of all live indexes will be used
for compaction calculations.

The log before the last checkpoint will be referred to as the "compacting area"
and the log after the last snapshot/checkpoint will be referred to as the "active
log". The compacting area of the log is effectively a k/v store where dead
entries eventually will be deleted and space reclaimed.

Another way to look at it is that the snapshot is the checkpoint + all live
entries in the compacting area. 🤯

Traditional snapshotting (that includes the full state of the state machine)
can be achieved by writing a checkpoint and return no
live indexes from the new ra_machine callback.

With these generalisations in mind there will be no checkpoints,
at least not multiple ones as in the current implementation. Instead there will
be a new definition of a snapshot as a checkpoint + live entries in the compacting
area of the log.

A compacting log would not take "traditional" snapshots by emitting release_cursors,
only checkpoints would be written.

Checkpoints are taken by the Ra process based on entries, time and log size,
not initiated by the ra_machine. Once a checkpoint is written and fsynced
prior checkpoints will no longer be needed (as there is no promotion to snapshot
anymore) and thus will also be deleted.

Follower snapshot replication

This new definition of a snapshot as a checkpoint + live entries in the compacting
area of the log complicates the snapshot replication / installation part of the
code as it is no longer just necessary to replicate the latest checkpoint but also
ensure that the follower has all live entires in the compacting area.

The case where a new follower is added and / or
an existing follower will need a full log sync can be extended as follows:
The snapshot sender process that
is spawned could after replicating the checkpoint then continue to replicate all
segments whole to complete the new snapshot state.
After that point normal log replication could take over.

However, a follower that just ended up a bit behind may already have many of the
live entries and only need some replicated. In this case we will need to negotiate
which entries in the compacting area the follower needs. This could be done
by first replicating the checkpoint then the follower replying with the indexes
it needs (by comparing the indexes in the checkpoint with its own local log state)
and then having these replicated on a per entry basis.

To avoid concurrent segment replication and compaction we can move from using
transient processes for this work to a single companion worker process that every
Ra member has that does all compaction and snapshot replication work. Testing
will need to be done to see if this will provide sufficient parallelism.

Impact on segment writer process

The segment writer process is responsible for flushing entries in the mem tables
to segments.
Currently it is designed to skip any entries with indexes lower than the last written
snapshot. As checkpoints can be (and should to reduce write amplifiction) written
before any segments have been written with the new approach it will need to
inspect the latest checkpoint and use the set of live indexes to ensure it
includes all relevant entries but avoid those that are already dead.

Some coordination with the compaction process will no doubt also be needed to
avoid compaction and segment writing occurring at the same time.

Compaction

Compaction should be triggered every time a new checkpoint is written.
It is performed in phases in order of efficiency (most efficient first) until
some configured ratio of live / vs dead entries has been achieved.
This ratio should ideally be byte based but this may require expensive scanning
to work out the total size of all live entries or require the state machine to
somehow track the approximate size of all live entries in addition to their indexes
which may not be practical in all cases. At a first attempt we should a simpler
entry based approach where we allow ratio of dead vs live entries, e.g for
every 100 live entries we would allow 30 dead entries to remain in the log.

Compaction Phases:

  1. Delete whole segments. Scan the list of segments to find any segments that
    contain no live indexes and delete these. This is cheap and effective but may
    leave many dead entries in the log. For checkpoints that return no live entries
    in the compacting area this becomes a simple cutoff job where all segments that
    only contain entries with a lower index than the last checkoint will simple be
    deleted.
  2. Delete trailing data in segment.
    Because all data portions of a segment file is kept at the end of the file
    dead data at the end of the file could be truncated. This is also relatively
    cheap as it only requires a truncate system call to reclaim the space.
  3. Compact multiple segments with few live indexes into fewer segments.
    This option both has high read and write amplification and will transiently consume
    more disk space and thus should only be used when necessary. It also requires
    coordination with any log readers (such as the main Ra process) to avoid deleting
    a segment where all entries have been written to another segment but that the log
    readers may want to read.

There is also a 2b option where data within a segment could possibly be moved around
to provide more compaction but moving data is always tricky as it would
invalidate any in memory caches of the segment index that the Ra process may currently
have if it recently read an entry from this segment.

@kjnilsson kjnilsson added this to the 3.0.0 milestone May 14, 2024
@ikavgo
Copy link

ikavgo commented May 14, 2024

Thinking about use cases and motivations other than saving space.

For example message priorities can be implemented like this:

  1. Keep messages in log like QQ
  2. Keep index in state: list of lists Priority -> [MessageIndexInLog...]
  3. When message sent exclude it (its index) from that new ra_machine callback.

Or Delayed messages:

  1. <>
  2. Index is DelayTime => MessageIndexInLog
  3. <>

To generalize Index (Metadata) is in Ra State, Messages in Log (as they are now).
This addresses the main concern I have wrt storing delayed messages in log - they are not fifo and will be sent "randomly". Would be wasteful to keep all the log just because some message is sitting behind next two months.

Returning all live can be costly then. Maybe returning dead is also an option?

Thoughts?

@ikavgo
Copy link

ikavgo commented May 15, 2024

Also wonder if TTL handling in QQs can be improved? Like index kept and messages removed proactively:

https://www.rabbitmq.com/docs/ttl#quorum-queues

@lhoguin
Copy link
Contributor

lhoguin commented May 16, 2024

There is also a 2b option where data within a segment could possibly be moved around
to provide more compaction but moving data is always tricky as it would
invalidate any in memory caches of the segment index that the Ra process may currently
have if it recently read an entry from this segment.

For what it's worth moving data around in rabbit_msg_store worked very well for CQs. It moves data from the end toward the beginning of the file, in holes created by no longer needed data. It then takes special care to not truncate before the existing readers are done reading from the end of the file. This approach allows rabbit_msg_store to have lock-free readers while still compacting when appropriate.

Not sure how that can apply to QQs / ra but the cache invalidation was not an issue compared to the lock the code used before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants