Skip to content

Commit

Permalink
[mq] working branch - merge 8f16f72 on top of main at 1cc7604
Browse files Browse the repository at this point in the history
{"baseBranch":"main","baseCommit":"1cc7604f4b6b35a230b3dc1226d35b86034a832a","createdAt":"2024-12-29T11:30:25.407598Z","headSha":"8f16f727850763044508fc289cec202e1449530f","id":"80613902-14d6-4ca1-9b69-a94856ff7f20","priority":"200","pullRequestNumber":"32292","queuedAt":"2024-12-29T11:30:25.406825Z","status":"STATUS_QUEUED"}
  • Loading branch information
dd-mergequeue[bot] authored Dec 29, 2024
2 parents 323dad3 + 8f16f72 commit 837da03
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 96 deletions.
4 changes: 2 additions & 2 deletions pkg/network/protocols/events/batch_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newOffsetManager(numCPUS int) *offsetManager {
}

// Get returns the data offset that hasn't been consumed yet for a given batch
func (o *offsetManager) Get(cpu int, batch *batch, syncing bool) (begin, end int) {
func (o *offsetManager) Get(cpu int, batch *Batch, syncing bool) (begin, end int) {
o.mux.Lock()
defer o.mux.Unlock()
state := o.stateByCPU[cpu]
Expand Down Expand Up @@ -85,6 +85,6 @@ func (o *offsetManager) NextBatchID(cpu int) int {
return o.stateByCPU[cpu].nextBatchID
}

func batchComplete(b *batch) bool {
func batchComplete(b *Batch) bool {
return b.Cap > 0 && b.Len == b.Cap
}
26 changes: 13 additions & 13 deletions pkg/network/protocols/events/batch_offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,36 @@ func TestOffsets(t *testing.T) {
assert.Equal(t, 0, offsets.NextBatchID(1))

// reading full batch: cpu=0 batchID=0
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, false)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, false)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
// nextBatchID is advanced to 1 for cpu=0
assert.Equal(t, 1, offsets.NextBatchID(0))

// reading partial batch: cpu=1 batchID=0 sync=true
begin, end = offsets.Get(1, &batch{Idx: 0, Len: 8, Cap: 10}, true)
begin, end = offsets.Get(1, &Batch{Idx: 0, Len: 8, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 8, end)
// nextBatchID remains 0 for cpu=1 since this batch hasn't been filled up yet
assert.Equal(t, 0, offsets.NextBatchID(1))

// reading full batch: cpu=1 batchID=0
begin, end = offsets.Get(1, &batch{Idx: 0, Len: 10, Cap: 10}, false)
begin, end = offsets.Get(1, &Batch{Idx: 0, Len: 10, Cap: 10}, false)
// notice we only read now the remaining offsets
assert.Equal(t, 8, begin)
assert.Equal(t, 10, end)
// nextBatchID is advanced to 1 for cpu=1
assert.Equal(t, 1, offsets.NextBatchID(1))

// reading partial batch: cpu=0 batchID=1 sync=true
begin, end = offsets.Get(0, &batch{Idx: 1, Len: 4, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 4, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 4, end)
// nextBatchID remains 1 for cpu=0
assert.Equal(t, 1, offsets.NextBatchID(0))

// reading partial batch: cpu=0 batchID=1 sync=true
begin, end = offsets.Get(0, &batch{Idx: 1, Len: 5, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 5, Cap: 10}, true)
assert.Equal(t, 4, begin)
assert.Equal(t, 5, end)
// nextBatchID remains 1 for cpu=0
Expand All @@ -63,20 +63,20 @@ func TestDelayedBatchReads(t *testing.T) {

// this emulates the scenario where we preemptively read (sync=true) two
// complete batches in a row before they are read from perf buffer
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)

begin, end = offsets.Get(0, &batch{Idx: 1, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)

// now the "delayed" batches from perf buffer are read in sequence
begin, end = offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 0, end)

begin, end = offsets.Get(0, &batch{Idx: 1, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 0, end)
}
Expand All @@ -85,11 +85,11 @@ func TestUnchangedBatchRead(t *testing.T) {
const numCPUs = 1
offsets := newOffsetManager(numCPUs)

begin, end := offsets.Get(0, &batch{Idx: 0, Len: 5, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 5, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 5, end)

begin, end = offsets.Get(0, &batch{Idx: 0, Len: 5, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 0, Len: 5, Cap: 10}, true)
assert.Equal(t, 5, begin)
assert.Equal(t, 5, end)
}
Expand All @@ -99,13 +99,13 @@ func TestReadGap(t *testing.T) {
offsets := newOffsetManager(numCPUs)

// this emulates the scenario where a batch is lost in the perf buffer
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
assert.Equal(t, 1, offsets.NextBatchID(0))

// batch idx=1 was lost
begin, end = offsets.Get(0, &batch{Idx: 2, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 2, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
assert.Equal(t, 3, offsets.NextBatchID(0))
Expand Down
12 changes: 6 additions & 6 deletions pkg/network/protocols/events/batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ import (
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)

var batchPool = ddsync.NewDefaultTypedPool[batch]()
var batchPool = ddsync.NewDefaultTypedPool[Batch]()

type batchReader struct {
sync.Mutex
numCPUs int
batchMap *maps.GenericMap[batchKey, batch]
batchMap *maps.GenericMap[batchKey, Batch]
offsets *offsetManager
workerPool *workerPool
stopped bool
}

func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batchKey, batch], numCPUs int) (*batchReader, error) {
func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batchKey, Batch], numCPUs int) (*batchReader, error) {
// initialize eBPF maps
batch := new(batch)
batch := new(Batch)
for i := 0; i < numCPUs; i++ {
// Ring buffer events don't have CPU information, so we associate each
// batch entry with a CPU during startup. This information is used by
Expand Down Expand Up @@ -57,7 +57,7 @@ func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batc

// ReadAll batches from eBPF (concurrently) and execute the given
// callback function for each batch
func (r *batchReader) ReadAll(f func(cpu int, b *batch)) {
func (r *batchReader) ReadAll(f func(cpu int, b *Batch)) {
// This lock is used only for the purposes of synchronizing termination
// and it's only held while *enqueing* the jobs.
r.Lock()
Expand All @@ -77,7 +77,7 @@ func (r *batchReader) ReadAll(f func(cpu int, b *batch)) {

b := batchPool.Get()
defer func() {
*b = batch{}
*b = Batch{}
batchPool.Put(b)
}()

Expand Down
14 changes: 7 additions & 7 deletions pkg/network/protocols/events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
batchMapSuffix = "_batches"
eventsMapSuffix = "_batch_events"
sizeOfBatch = int(unsafe.Sizeof(batch{}))
sizeOfBatch = int(unsafe.Sizeof(Batch{}))
)

var errInvalidPerfEvent = errors.New("invalid perf event")
Expand Down Expand Up @@ -59,7 +59,7 @@ type Consumer[V any] struct {
// 2) be thread-safe, as the callback may be executed concurrently from multiple go-routines;
func NewConsumer[V any](proto string, ebpf *manager.Manager, callback func([]V)) (*Consumer[V], error) {
batchMapName := proto + batchMapSuffix
batchMap, err := maps.GetMap[batchKey, batch](ebpf, batchMapName)
batchMap, err := maps.GetMap[batchKey, Batch](ebpf, batchMapName)
if err != nil {
return nil, fmt.Errorf("unable to find map %s: %s", batchMapName, err)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (c *Consumer[V]) Start() {
return
}

c.batchReader.ReadAll(func(_ int, b *batch) {
c.batchReader.ReadAll(func(_ int, b *Batch) {
c.process(b, true)
})
if log.ShouldLog(log.DebugLvl) {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *Consumer[V]) Stop() {
close(c.syncRequest)
}

func (c *Consumer[V]) process(b *batch, syncing bool) {
func (c *Consumer[V]) process(b *Batch, syncing bool) {
cpu := int(b.Cpu)

// Determine the subset of data we're interested in as we might have read
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *Consumer[V]) process(b *batch, syncing bool) {
c.callback(events)
}

func batchFromEventData(data []byte) (*batch, error) {
func batchFromEventData(data []byte) (*Batch, error) {
if len(data) < sizeOfBatch {
// For some reason the eBPF program sent us a perf event with a size
// different from what we're expecting.
Expand All @@ -260,10 +260,10 @@ func batchFromEventData(data []byte) (*batch, error) {
return nil, errInvalidPerfEvent
}

return (*batch)(unsafe.Pointer(&data[0])), nil
return (*Batch)(unsafe.Pointer(&data[0])), nil
}

func pointerToElement[V any](b *batch, elementIdx int) *V {
func pointerToElement[V any](b *Batch, elementIdx int) *V {
offset := elementIdx * int(b.Event_size)
return (*V)(unsafe.Pointer(uintptr(unsafe.Pointer(&b.Data[0])) + uintptr(offset)))
}
69 changes: 3 additions & 66 deletions pkg/network/protocols/events/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ import (

manager "github.com/DataDog/ebpf-manager"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/features"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/ebpf/bytecode"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
)
Expand All @@ -38,7 +33,7 @@ func TestConsumer(t *testing.T) {

const numEvents = 100
c := config.New()
program, err := newEBPFProgram(c)
program, err := NewEBPFProgram(c)
require.NoError(t, err)

var mux sync.Mutex
Expand Down Expand Up @@ -86,7 +81,7 @@ func TestInvalidBatchCountMetric(t *testing.T) {
}

c := config.New()
program, err := newEBPFProgram(c)
program, err := NewEBPFProgram(c)
require.NoError(t, err)
t.Cleanup(func() { program.Stop(manager.CleanAll) })

Expand All @@ -95,7 +90,7 @@ func TestInvalidBatchCountMetric(t *testing.T) {

// We are creating a raw sample with a data length of 4, which is smaller than sizeOfBatch
// and would be considered an invalid batch.
recordSample(c, consumer, []byte("test"))
RecordSample(c, consumer, []byte("test"))

consumer.Start()
t.Cleanup(func() { consumer.Stop() })
Expand All @@ -113,22 +108,6 @@ type eventGenerator struct {
testFile *os.File
}

// recordSample records a sample using the consumer handler.
func recordSample(c *config.Config, consumer *Consumer[uint64], sampleData []byte) {
// Ring buffers require kernel version 5.8.0 or higher, therefore, the handler is chosen based on the kernel version.
if c.EnableUSMRingBuffers && features.HaveMapType(ebpf.RingBuf) == nil {
handler := consumer.handler.(*ddebpf.RingBufferHandler)
handler.RecordHandler(&ringbuf.Record{
RawSample: sampleData,
}, nil, nil)
} else {
handler := consumer.handler.(*ddebpf.PerfHandler)
handler.RecordHandler(&perf.Record{
RawSample: sampleData,
}, nil, nil)
}
}

func newEventGenerator(program *manager.Manager, t *testing.T) *eventGenerator {
m, _, _ := program.GetMap("test")
require.NotNilf(t, m, "couldn't find test map")
Expand Down Expand Up @@ -169,45 +148,3 @@ func (e *eventGenerator) Generate(eventID uint64) error {
func (e *eventGenerator) Stop() {
e.testFile.Close()
}

func newEBPFProgram(c *config.Config) (*manager.Manager, error) {
bc, err := bytecode.GetReader(c.BPFDir, "usm_events_test-debug.o")
if err != nil {
return nil, err
}
defer bc.Close()

m := &manager.Manager{
Probes: []*manager.Probe{
{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "tracepoint__syscalls__sys_enter_write",
},
},
},
}
options := manager.Options{
RemoveRlimit: true,
ActivatedProbes: []manager.ProbesSelector{
&manager.ProbeSelector{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "tracepoint__syscalls__sys_enter_write",
},
},
},
ConstantEditors: []manager.ConstantEditor{
{
Name: "test_monitoring_enabled",
Value: uint64(1),
},
},
}

Configure(config.New(), "test", m, &options)
err = m.InitWithOptions(bc, options)
if err != nil {
return nil, err
}

return m, nil
}
Loading

0 comments on commit 837da03

Please sign in to comment.