Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk group assignment #697

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d898646
Write initial mapping functions.
cody-littley Aug 9, 2024
a43dcd3
Incremental progress.
cody-littley Aug 12, 2024
d479563
Added priority queue.
cody-littley Aug 12, 2024
af4fb27
Incremental changes.
cody-littley Aug 12, 2024
4d4c0bf
Implemented a bunch of stuff.
cody-littley Aug 12, 2024
ac3c52c
Add node removal.
cody-littley Aug 12, 2024
5eafa2c
Map functionality completed, need to test
cody-littley Aug 12, 2024
ddfad42
Move things around.
cody-littley Aug 12, 2024
75a6fd0
Unit test now working.
cody-littley Aug 12, 2024
8a32f5c
Cleanup.
cody-littley Aug 12, 2024
799bf6e
Fix types.
cody-littley Aug 12, 2024
b196e55
Better random generator.
cody-littley Aug 13, 2024
bd699ab
Use more explicit data types.
cody-littley Aug 13, 2024
56534c4
Incremental progress, things are still broken
cody-littley Aug 30, 2024
71c6a17
Mostly finished refactor, tests not yet working.
cody-littley Sep 3, 2024
bd23914
Tests now passing.
cody-littley Sep 3, 2024
f42c295
Cleanup.
cody-littley Sep 3, 2024
2414f88
Make calculations API more user friendly.
cody-littley Sep 3, 2024
98d83f1
Fix bugs, add new unit test.
cody-littley Sep 3, 2024
a52844a
Moar tests.
cody-littley Sep 3, 2024
8697492
More tests.
cody-littley Sep 3, 2024
e23c352
More tests, fix uncovered bugs.
cody-littley Sep 3, 2024
b9b71f8
lint
cody-littley Sep 3, 2024
07fb65e
Use different strategy to combine values into random number.
cody-littley Sep 4, 2024
db71f24
Merge branch 'master' into chunk-group-assignment
cody-littley Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions common/testutils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func ExecuteWithTimeout(f func(), duration time.Duration, debugInfo ...any) {
}
}

// RandomTime returns a random time.
func RandomTime() time.Time {
return time.Unix(int64(rand.Int31()), int64(rand.Intn(int(time.Second))))
}

// RandomTimeInRange returns a random time within a given range.
func RandomTimeInRange(start time.Time, end time.Time) time.Time {
return start.Add(time.Duration(rand.Int63n(int64(end.Sub(start)))))
}

// RandomBytes generates a random byte slice of a given length.
func RandomBytes(length int) []byte {
bytes := make([]byte, length)
Expand All @@ -93,8 +103,3 @@ func RandomBytes(length int) []byte {
}
return bytes
}

// RandomTime generates a random time.
func RandomTime() time.Time {
return time.Unix(int64(rand.Int31()), 0)
}
112 changes: 112 additions & 0 deletions lightnode/chunkgroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Chunk Groups

A "chunk group" is a collection of light nodes who are interested in sampling all chunks with a particular chunk index.
This document describes the algorithm for mapping each light node onto its chunk group(s).

# Desired Properties

The following properties are desired for the chunk assignment group algorithm:

- **Randomness**: A light node operator should not be able to choose their chunk group(s) prior to registration.
A light node should have equal chance of being in any particular chunk group at any particular point in time.
- **Churn**: The chunk group of a light node should change over time.
- **Stability**: A light node should not change chunk groups too frequently, and at any point in time
only a few light nodes should be be changing chunk groups.
- **Determinism**: The chunk group(s) of a light node should be deterministic based on the node's
seed and the current time. Any two parties should agree which chunk group(s) a light node is in
at a particular timestamp.
- **Multiplicity**: This algorithm should support a light node being in 1 or more chunk groups simultaneously.
(Note: a maximum of 64 chunk groups per light node is supported by this algorithm. The likely configuration for the
number of chunk groups is 2.)

# Terms

```
Consider the timeline below, with time moving from left to right.

The "+" marks represent
the time when a particular The 7th shuffle
unix epoch light node shuffles a group. |
| | ↓
↓ 1 2 ↓ 4 5 6 7 8 9
|------+---|------+---|------+---|------+---|------+---|------+---|------+---|------+---|------+---|
\ / \ / \ /\ /\ /
\ / \ / \ / \ / \ /
\ / \ / \ / \ / \ /
\ / \/ \ / \ / \ /
\ / The "shuffle offset". \ / \ / \ /
\/ For each group a light epoch 6 epoch 7 epoch 8
A "shuffle period". Each node nodes is in, it has a
changes chunk groups once per random offset assigned
shuffle period per group it at registration time.
is in. Each shuffle period is
marked with a "|".
```

- **Unix Epoch**: time = 0, aka January 1, 1970. All times are measured in nanoseconds since the Unix epoch,
and all light nodes considered to have an epoch number of 0 at precisely this time. Even though this protocol
obviously did not exist at this time, it's convenient to use it as a reference point in order to keep the math simple.
- **Shuffle Period**: The time that should pass in between a particular light node changing one of its chunk groups
(e.g. 1 week). Each time one shuffle period passes, all light nodes within a chunk group will have been assigned to a
random chunk group (note that it's possible for a node to be randomly assigned to the same chunk group multiple
times in a row).
- **Shuffle Offset**: In order to avoid too many nodes changing chunk groups at the same time, each node switches chunk
groups at an offset relative to the beginning of each shuffle period. This offset is randomly assigned to each node at
registration time, but remains constant for the lifetime of the node. If a light node is in multiple chunk groups,
then it will have a different shuffle offset for each group it is in.
- **Assignment Index**: a light node may participate in multiple chunk groups simultaneously. The first group it is in
is said to have an "assignment index" of 0, the second has an assignment index of 1, and so on.
The number of chunk groups that a light node participates in is a configuration parameter of the protocol.
A light node shuffles each of its chunk groups independently. That is, the light node shuffles its first chunk group
assignment at a random time that is independent of the time that it shuffles its second chunk group assignment
(and so on).
- **Shuffle Epoch**: An shuffle epoch describes the number of times a particular light node has changed chunk groups
for a group assignment. At genesis, all light nodes' group assignments are in epoch 0. The epoch for a particular
light node's group assignment is incremented by 1 for each time it is randomly shuffled into a new chunk group.
The length of each epoch is equal to the shuffle period.


# Algorithm

The algorithm for determining which chunk group a particular light node is described below. A reference implementation
of this algorithm can be found in [calculations.go](./calculations.go).

## Determining a node's seed

A light node's seed is an 8 byte value that is randomly assigned to the node at registration time. The seed should
be generated using on-chain randomness that is difficult for an attacker to predict. The light node's seed
is public information and stored on-chain.

## `randomInt(uint64, uint64, ..., uint64)`

Define a function `randomInt(uint64, uint64, ..., uint64)` that takes a variable number of 8 byte unsigned integers
and returns a pseudo-random 8 byte unsigned integer.

- For each unsigned integer from left to right, append the integer's bytes into a byte array called
`seedBytes` in big endian order.
- Use the `seedBytes` as the input to `keccak256` to generate a 32 byte array called `hashBytes`.
- Use the first 8 bytes of `hashBytes` to create an 8 byte unsigned integer using big endian order called `result`.
- Return `result`.

## Determining a node's shuffle offset for a particular assignment index

A node's shuffle offset for a particular assignment index is a duration between 0 and the shuffle period at
nanosecond granularity. The shuffle offset is determined as follows:

```
shuffleOffset := randomInt(nodeSeed, assignmentIndex) % shufflePeriod_nanoseconds
```

## Shuffle Epoch Calculation

At the unix epoch, the shuffle epoch for each group assignment is defined as `0`. The epoch increases by `1` at time
`shuffleOffset(nodeSeed, assignmentIndex)`, and then by `1` again every `shufflePeriod` nanoseconds.

## Chunk Group Calculation

To determine a node's chunk group, first compute the current epoch for the node, then plug it into the following
function:

```
chunkGroup := randomInt(nodeSeed, assignmentIndex, shuffleEpoch) % numberOfChunks
```
68 changes: 68 additions & 0 deletions lightnode/chunkgroup/assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package chunkgroup

import (
"github.com/Layr-Labs/eigenda/lightnode"
"time"
)

// assignmentKey uniquely identifies a light node chunkGroupAssignment.
type assignmentKey struct {
lightNodeID uint64
assignmentIndex uint64
}

// chunkGroupAssignment is a struct that holds a registration and the chunk group it is currently assigned to.
type chunkGroupAssignment struct {

// registration contains publicly known information about a light node that is registered on-chain.
registration *lightnode.Registration

// assignmentIndex describes which of a light node's multiple groups this struct represents.
// The first of a light node's groups has an chunkGroupAssignment index of 0, the second has
// an index of 1, and so on.
assignmentIndex uint64

// assignmentKey is the key that uniquely identifies this chunkGroupAssignment. Note that this information
// is also stored in the registration, but we cache this object here for convenience.
key assignmentKey

// shuffleOffset is the offset at which this group chunkGroupAssignment should be shuffled into a
// new chunk group relative the beginning of each shuffle interval.
//
// This value is deterministic and does not change, so we cache it here.
shuffleOffset time.Duration

// chunkGroup is the chunk group currently associated with this chunkGroupAssignment index.
chunkGroup uint64

// startOfEpoch is the start of the current shuffle epoch,
// i.e. the time when this chunkGroupAssignment index was last shuffled into the current chunk group.
startOfEpoch time.Time

// endOfEpoch is the end of the current shuffle epoch,
// i.e. the next time when this chunkGroupAssignment index will be shuffled into a new chunk group.
endOfEpoch time.Time
}

// newChunkGroupAssignment creates a new chunkGroupAssignment.
func newChunkGroupAssignment(
registration *lightnode.Registration,
assignmentIndex uint64,
shuffleOffset time.Duration,
chunkGroup uint64,
startOfEpoch time.Time,
endOfEpoch time.Time) *chunkGroupAssignment {

return &chunkGroupAssignment{
registration: registration,
assignmentIndex: assignmentIndex,
key: assignmentKey{
lightNodeID: registration.ID(),
assignmentIndex: assignmentIndex,
},
shuffleOffset: shuffleOffset,
chunkGroup: chunkGroup,
startOfEpoch: startOfEpoch,
endOfEpoch: endOfEpoch,
}
}
165 changes: 165 additions & 0 deletions lightnode/chunkgroup/assignment_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package chunkgroup

import (
"container/heap"
"fmt"
)

// assignmentHeap implements the heap.Interface for chunkGroupAssignment objects, used to create a priority queue.
type assignmentHeap struct {
data []*chunkGroupAssignment
}

// Len returns the number of elements in the priority queue.
func (h *assignmentHeap) Len() int {
return len(h.data)
}

// Less returns whether the element with index i should sort before the element with index j.
// This assignmentHeap sorts based on the endOfEpoch of the light nodes.
func (h *assignmentHeap) Less(i int, j int) bool {
ii := h.data[i]
jj := h.data[j]
return ii.endOfEpoch.Before(jj.endOfEpoch)
}

// Swap swaps the elements with indexes i and j.
func (h *assignmentHeap) Swap(i int, j int) {
h.data[i], h.data[j] = h.data[j], h.data[i]
}

// Push adds an element to the end of the priority queue.
func (h *assignmentHeap) Push(x any) {
h.data = append(h.data, x.(*chunkGroupAssignment))
}

// Pop removes and returns the last element in the priority queue.
func (h *assignmentHeap) Pop() any {
n := len(h.data)
x := h.data[n-1]
h.data = h.data[:n-1]
return x
}

// assignmentQueue is a priority queue that sorts light nodes based on their endOfEpoch.
type assignmentQueue struct {
// The heap that stores the light nodes. Nodes are sorted by their endOfEpoch.
heap *assignmentHeap

// A set assignments in the queue. This is used to do efficient removals.
// A true value indicates that the assignment is in the queue. A false value indicates
// that the node was removed from the queue but has not yet been fully deleted.
assignmentSet map[assignmentKey]bool

// The number of elements in the queue. Tracked separately since the heap and NodeIdSet
// may contain removed nodes that have not yet been fully garbage collected.
size uint64
}

// newAssignmentQueue creates a new priority queue.
func newAssignmentQueue() *assignmentQueue {
return &assignmentQueue{
heap: &assignmentHeap{
data: make([]*chunkGroupAssignment, 0),
},
assignmentSet: make(map[assignmentKey]bool),
}
}

// Size returns the number of elements in the priority queue.
func (queue *assignmentQueue) Size() uint64 {
return queue.size
}

// Push adds an chunkGroupAssignment to the priority queue.
// This is a no-op if the chunkGroupAssignment is already in the queue.
func (queue *assignmentQueue) Push(assignment *chunkGroupAssignment) {
notRemoved, present := queue.assignmentSet[assignment.key]
if present && notRemoved {
return
}

queue.size++

if !present {
heap.Push(queue.heap, assignment)
}

queue.assignmentSet[assignment.key] = true
}

// Pop removes and returns the chunkGroupAssignment with the earliest endOfEpoch.
func (queue *assignmentQueue) Pop() *chunkGroupAssignment {
queue.collectGarbage()
if queue.size == 0 {
return nil
}
assignment := heap.Pop(queue.heap).(*chunkGroupAssignment)
delete(queue.assignmentSet, assignment.key)
queue.size--
return assignment
}

// Peek returns the chunkGroupAssignment with the earliest endOfEpoch without removing it from the queue. Returns
// nil if the queue is empty.
func (queue *assignmentQueue) Peek() *chunkGroupAssignment {
queue.collectGarbage()
if queue.size == 0 {
return nil
}
return queue.heap.data[0]
}

// Remove removes the assignment with the given key from the queue.
// This is a no-op if the assignment is not in the queue.
func (queue *assignmentQueue) Remove(key assignmentKey) {
// Deletion is lazy. The assignment is fully removed when it reaches the top of the heap.

notRemoved, present := queue.assignmentSet[key]
if !present || !notRemoved {
// Element is either not in the queue or has already been marked for removal.
return
}

queue.size--

queue.assignmentSet[key] = false
queue.collectGarbage()
}

// collectGarbage removes all nodes that have been removed from the queue but have not yet been fully deleted.
// This is done by popping elements from the heap until the first element is not marked for deletion.
func (queue *assignmentQueue) collectGarbage() {
if len(queue.heap.data) == 0 {
return
}

// sanity check to prevent infinite loops
maxIterations := len(queue.heap.data) + 1

for {
maxIterations--
if maxIterations < 0 {
panic("garbage collection did not terminate")
}

if len(queue.heap.data) == 0 {
return
}

next := queue.heap.data[0]

notRemoved, present := queue.assignmentSet[next.key]
if !present {
panic(fmt.Sprintf("node %d is not in the assignmentSet", next.registration.ID()))
}

if notRemoved {
// Once we find the first element that is not marked for deletion, we can stop.
return
}

heap.Pop(queue.heap)
delete(queue.assignmentSet, next.key)
}
}
Loading
Loading