Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport container restart support in loki source #6897

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Main (unreleased)

- Fix an issue where nested import.git config blocks could conflict if they had the same labels. (@wildum)

- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum)

### Other changes

- Change the Docker base image for Linux containers to `ubuntu:noble`. (@amontalban)
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) {
}

return &options{
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
targetRestartInterval: 5 * time.Second,
}, nil
}

Expand Down
105 changes: 105 additions & 0 deletions internal/component/loki/source/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,32 @@ package docker

import (
"context"
"io"
"os"
"strings"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/agent/internal/component"
"github.com/grafana/agent/internal/component/common/loki/client/fake"
"github.com/grafana/agent/internal/component/common/loki/positions"
dt "github.com/grafana/agent/internal/component/loki/source/docker/internal/dockertarget"
"github.com/grafana/agent/internal/flow/componenttest"
"github.com/grafana/agent/internal/util"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const targetRestartInterval = 20 * time.Millisecond

func Test(t *testing.T) {
// Use host that works on all platforms (including Windows).
var cfg = `
Expand Down Expand Up @@ -73,3 +88,93 @@ func TestDuplicateTargets(t *testing.T) {

require.Len(t, cmp.manager.tasks, 1)
}

func TestRestart(t *testing.T) {
runningState := true
client := clientMock{
logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor",
running: func() bool { return runningState },
}
expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor"

tailer, entryHandler := setupTailer(t, client)
go tailer.Run(context.Background())

// The container is already running, expect log lines.
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.")

// Stops the container.
runningState = false
time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines.
entryHandler.Clear()
time.Sleep(targetRestartInterval + 10*time.Millisecond)
assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running.

// Restart the container and expect log lines.
runningState = true
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.")
}

func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
entryHandler = fake.NewClient(func() {})

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
require.NoError(t, err)

tgt, err := dt.NewTarget(
dt.NewMetrics(prometheus.NewRegistry()),
logger,
entryHandler,
ps,
"flog",
model.LabelSet{"job": "docker"},
[]*relabel.Config{},
client,
)
require.NoError(t, err)
tailerTask := &tailerTask{
options: &options{
client: client,
targetRestartInterval: targetRestartInterval,
},
target: tgt,
}
return newTailer(logger, tailerTask), entryHandler
}

type clientMock struct {
client.APIClient
logLine string
running func() bool
}

func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: c,
State: &types.ContainerState{
Running: mock.running(),
},
},
Config: &container.Config{Tty: true},
}, nil
}

func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(mock.logLine)), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
go t.processLoop(ctx)
} else {
level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName)
}
}

Expand Down
41 changes: 23 additions & 18 deletions internal/component/loki/source/docker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package docker
import (
"context"
"sync"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/agent/internal/component/common/loki"
Expand Down Expand Up @@ -52,6 +52,9 @@ type options struct {

// positions interface so tailers can save/restore offsets in log files.
positions positions.Positions

// targetRestartInterval to restart task that has stopped running.
targetRestartInterval time.Duration
}

// tailerTask is the payload used to create tailers. It implements runner.Task.
Expand Down Expand Up @@ -95,23 +98,25 @@ func newTailer(l log.Logger, task *tailerTask) *tailer {
}

func (t *tailer) Run(ctx context.Context) {
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)

t.target.StartIfNotRunning()

select {
case err := <-chErr:
// Error setting up the Wait request from the client; either failed to
// read from /containers/{containerID}/wait, or couldn't parse the
// response. Stop the target and exit the task after logging; if it was
// a transient error, the target will be retried on the next discovery
// refresh.
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
t.target.Stop()
return
case <-ch:
t.target.Stop()
return
ticker := time.NewTicker(t.opts.targetRestartInterval)
tickerC := ticker.C

for {
select {
case <-tickerC:
res, err := t.opts.client.ContainerInspect(ctx, t.target.Name())
if err != nil {
level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err)
continue
}
if res.State.Running {
t.target.StartIfNotRunning()
}
case <-ctx.Done():
t.target.Stop()
ticker.Stop()
return
}
}
}

Expand Down
Loading