Skip to content

Commit

Permalink
Merge pull request #467 from sapcc/use-audittools-auditor
Browse files Browse the repository at this point in the history
use new go-bits/audittools API
  • Loading branch information
majewsky authored Dec 5, 2024
2 parents 7b42cc4 + 0bb55e4 commit e9bc32b
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 161 deletions.
2 changes: 1 addition & 1 deletion cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func run(cmd *cobra.Command, args []string) {

cfg := keppel.ParseConfiguration()
ctx := httpext.ContextWithSIGINT(cmd.Context(), 10*time.Second)
auditor := keppel.InitAuditTrail(ctx)
auditor := must.Return(keppel.InitAuditTrail(ctx))

db := must.Return(keppel.InitDB(cfg.DatabaseURL))
must.Succeed(setupDBIfRequested(db))
Expand Down
2 changes: 1 addition & 1 deletion cmd/janitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func run(cmd *cobra.Command, args []string) {

cfg := keppel.ParseConfiguration()
ctx := httpext.ContextWithSIGINT(cmd.Context(), 10*time.Second)
auditor := keppel.InitAuditTrail(ctx)
auditor := must.Return(keppel.InitAuditTrail(ctx))

ad := must.Return(keppel.NewAuthDriver(ctx, osext.MustGetenv("KEPPEL_DRIVER_AUTH"), nil))
amd := must.Return(keppel.NewAccountManagementDriver(osext.MustGetenv("KEPPEL_DRIVER_ACCOUNT_MANAGEMENT")))
Expand Down
3 changes: 1 addition & 2 deletions docs/operator-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,11 @@ The following configuration options are understood by both the API server and th
| Variable | Default | Explanation |
| -------- | ------- | ----------- |
| `KEPPEL_API_PUBLIC_FQDN` | *(required)* | Full domain name where users reach keppel-api. |
| `KEPPEL_AUDIT_RABBITMQ_QUEUE_NAME` | *(required for enabling audit trail)* | Name for the queue that will hold the audit events. The events are published to the default exchange. |
| `KEPPEL_AUDIT_RABBITMQ_QUEUE_NAME` | *(required for enabling audit trail)* | Name for the queue that will hold the audit events. The events are published to the default exchange. If not given, audit events will only be written to the debug log. |
| `KEPPEL_AUDIT_RABBITMQ_USERNAME` | `guest` | RabbitMQ Username. |
| `KEPPEL_AUDIT_RABBITMQ_PASSWORD` | `guest` | Password for the specified user. |
| `KEPPEL_AUDIT_RABBITMQ_HOSTNAME` | `localhost` | Hostname of the RabbitMQ server. |
| `KEPPEL_AUDIT_RABBITMQ_PORT` | `5672` | Port number to which the underlying connection is made. |
| `KEPPEL_AUDIT_SILENT` | *(optional)* | Whether to disable audit event logging to standard output. |
| `KEPPEL_DB_NAME` | `keppel` | The name of the database. |
| `KEPPEL_DB_USERNAME` | `postgres` | Username of the user that Keppel should use to connect to the database. |
| `KEPPEL_DB_PASSWORD` | *(optional)* | Password for the specified user. |
Expand Down
5 changes: 3 additions & 2 deletions internal/api/keppel/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/sapcc/go-bits/audittools"
"github.com/sapcc/go-bits/respondwith"

"github.com/sapcc/keppel/internal/auth"
Expand All @@ -47,14 +48,14 @@ type API struct {
sd keppel.StorageDriver
icd keppel.InboundCacheDriver
db *keppel.DB
auditor keppel.Auditor
auditor audittools.Auditor
rle *keppel.RateLimitEngine // may be nil
// non-pure functions that can be replaced by deterministic doubles for unit tests
timeNow func() time.Time
}

// NewAPI constructs a new API instance.
func NewAPI(cfg keppel.Configuration, ad keppel.AuthDriver, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor keppel.Auditor, rle *keppel.RateLimitEngine) *API {
func NewAPI(cfg keppel.Configuration, ad keppel.AuthDriver, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor audittools.Auditor, rle *keppel.RateLimitEngine) *API {
return &API{cfg, ad, fd, sd, icd, db, auditor, rle, time.Now}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/api/registry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/sapcc/go-bits/audittools"
"github.com/sapcc/go-bits/errext"
"github.com/sapcc/go-bits/httpapi"
"github.com/sapcc/go-bits/respondwith"
Expand All @@ -45,15 +46,15 @@ type API struct {
sd keppel.StorageDriver
icd keppel.InboundCacheDriver
db *keppel.DB
auditor keppel.Auditor
auditor audittools.Auditor
rle *keppel.RateLimitEngine // may be nil
// non-pure functions that can be replaced by deterministic doubles for unit tests
timeNow func() time.Time
generateStorageID func() string
}

// NewAPI constructs a new API instance.
func NewAPI(cfg keppel.Configuration, ad keppel.AuthDriver, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor keppel.Auditor, rle *keppel.RateLimitEngine) *API {
func NewAPI(cfg keppel.Configuration, ad keppel.AuthDriver, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, auditor audittools.Auditor, rle *keppel.RateLimitEngine) *API {
return &API{cfg, ad, fd, sd, icd, db, auditor, rle, time.Now, keppel.GenerateStorageID}
}

Expand Down
109 changes: 12 additions & 97 deletions internal/keppel/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,121 +20,36 @@ package keppel

import (
"context"
"encoding/json"
"net"
"net/http"
"net/url"
"os"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/sapcc/go-api-declarations/bininfo"
"github.com/sapcc/go-api-declarations/cadf"
"github.com/sapcc/go-bits/audittools"
"github.com/sapcc/go-bits/logg"
"github.com/sapcc/go-bits/osext"
)

// Auditor is a component that forwards audit events to the appropriate logs.
// It is used by some of the API modules.
type Auditor interface {
// Record forwards the given audit event to the audit log.
// EventParameters.Observer will be filled by the auditor.
Record(params audittools.EventParameters)
}

// AuditContext collects arguments that business logic methods need only for
// generating audit events.
type AuditContext struct {
UserIdentity UserIdentity
Request *http.Request
}

////////////////////////////////////////////////////////////////////////////////
// auditorImpl

var (
auditEventPublishSuccessCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "keppel_successful_auditevent_publish",
Help: "Counter for successful audit event publish to RabbitMQ server.",
})
auditEventPublishFailedCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "keppel_failed_auditevent_publish",
Help: "Counter for failed audit event publish to RabbitMQ server.",
})
)

// auditorImpl is the productive implementation of the Auditor interface.
// (We only expose the interface publicly because we want to be able to
// substitute a double in unit tests.)
type auditorImpl struct {
OnStdout bool
EventSink chan<- cadf.Event // nil if not wanted
ObserverUUID string
}

// InitAuditTrail initializes a Auditor from the configuration variables
// found in the environment.
func InitAuditTrail(ctx context.Context) Auditor {
func InitAuditTrail(ctx context.Context) (audittools.Auditor, error) {
logg.Debug("initializing audit trail...")

prometheus.MustRegister(auditEventPublishSuccessCounter)
prometheus.MustRegister(auditEventPublishFailedCounter)

var eventSink chan cadf.Event
if rabbitQueueName := os.Getenv("KEPPEL_AUDIT_RABBITMQ_QUEUE_NAME"); rabbitQueueName != "" {
portStr := osext.GetenvOrDefault("KEPPEL_AUDIT_RABBITMQ_PORT", "5672")
port, err := strconv.Atoi(portStr)
if err != nil {
logg.Fatal("invalid value for KEPPEL_AUDIT_RABBITMQ_PORT: %s", err.Error())
}
rabbitURI := url.URL{
Scheme: "amqp",
Host: net.JoinHostPort(
osext.GetenvOrDefault("KEPPEL_AUDIT_RABBITMQ_HOSTNAME", "localhost"),
strconv.Itoa(port),
),
User: url.UserPassword(
osext.GetenvOrDefault("KEPPEL_AUDIT_RABBITMQ_USERNAME", "guest"),
osext.GetenvOrDefault("KEPPEL_AUDIT_RABBITMQ_PASSWORD", "guest"),
),
Path: "/",
}

eventSink = make(chan cadf.Event, 20)
auditEventPublishSuccessCounter.Add(0)
auditEventPublishFailedCounter.Add(0)

go audittools.AuditTrail{
EventSink: eventSink,
OnSuccessfulPublish: func() { auditEventPublishSuccessCounter.Inc() },
OnFailedPublish: func() { auditEventPublishFailedCounter.Inc() },
}.Commit(ctx, rabbitURI, rabbitQueueName)
}

return auditorImpl{
OnStdout: !osext.GetenvBool("KEPPEL_AUDIT_SILENT"),
EventSink: eventSink,
ObserverUUID: audittools.GenerateUUID(),
}
}

// Record implements the Auditor interface.
func (a auditorImpl) Record(params audittools.EventParameters) {
params.Observer.TypeURI = "service/docker-registry"
params.Observer.Name = bininfo.Component()
params.Observer.ID = a.ObserverUUID

event := audittools.NewEvent(params)

if a.OnStdout {
msg, _ := json.Marshal(event)
logg.Other("AUDIT", string(msg))
}

if a.EventSink != nil {
a.EventSink <- event
if os.Getenv("KEPPEL_AUDIT_RABBITMQ_QUEUE_NAME") == "" {
return audittools.NewMockAuditor(), nil
} else {
return audittools.NewAuditor(ctx, audittools.AuditorOpts{
EnvPrefix: "KEPPEL_AUDIT_RABBITMQ",
Observer: audittools.Observer{
TypeURI: "service/docker-registry",
Name: bininfo.Component(),
ID: audittools.GenerateUUID(),
},
})
}
}
5 changes: 3 additions & 2 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/go-gorp/gorp/v3"
"github.com/sapcc/go-bits/audittools"
"github.com/sapcc/go-bits/logg"

"github.com/sapcc/keppel/internal/client"
Expand All @@ -43,7 +44,7 @@ type Processor struct {
fd keppel.FederationDriver
sd keppel.StorageDriver
icd keppel.InboundCacheDriver
auditor keppel.Auditor
auditor audittools.Auditor
repoClients map[string]*client.RepoClient // key = account name

// non-pure functions that can be replaced by deterministic doubles for unit tests
Expand All @@ -52,7 +53,7 @@ type Processor struct {
}

// New creates a new Processor.
func New(cfg keppel.Configuration, db *keppel.DB, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, auditor keppel.Auditor, fd keppel.FederationDriver, timenow func() time.Time) *Processor {
func New(cfg keppel.Configuration, db *keppel.DB, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, auditor audittools.Auditor, fd keppel.FederationDriver, timenow func() time.Time) *Processor {
return &Processor{cfg, db, fd, sd, icd, auditor, make(map[string]*client.RepoClient), timenow, keppel.GenerateStorageID}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/tasks/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Janitor struct {
icd keppel.InboundCacheDriver
db *keppel.DB
amd keppel.AccountManagementDriver
auditor keppel.Auditor
auditor audittools.Auditor

// non-pure functions that can be replaced by deterministic doubles for unit tests
timeNow func() time.Time
Expand All @@ -63,7 +63,7 @@ type Janitor struct {
}

// NewJanitor creates a new Janitor.
func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, amd keppel.AccountManagementDriver, auditor keppel.Auditor) *Janitor {
func NewJanitor(cfg keppel.Configuration, fd keppel.FederationDriver, sd keppel.StorageDriver, icd keppel.InboundCacheDriver, db *keppel.DB, amd keppel.AccountManagementDriver, auditor audittools.Auditor) *Janitor {
j := &Janitor{cfg, fd, sd, icd, db, amd, auditor, time.Now, keppel.GenerateStorageID, addJitter}
return j
}
Expand Down
50 changes: 0 additions & 50 deletions internal/test/mock_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ package test

import (
"encoding/json"
"testing"

"github.com/sapcc/go-api-declarations/cadf"
"github.com/sapcc/go-bits/assert"
"github.com/sapcc/go-bits/audittools"
)

var (
Expand All @@ -45,50 +42,3 @@ func ToJSON(x any) string {
}
return string(result)
}

// Auditor is a test recorder that satisfies the keppel.Auditor interface.
type Auditor struct {
events []cadf.Event
}

// Record implements the keppel.Auditor interface.
func (a *Auditor) Record(params audittools.EventParameters) {
a.events = append(a.events, a.normalize(audittools.NewEvent(params)))
}

// ExpectEvents checks that the recorded events are equivalent to the supplied expectation.
func (a *Auditor) ExpectEvents(t *testing.T, expectedEvents ...cadf.Event) {
t.Helper()
if len(expectedEvents) == 0 {
expectedEvents = nil
} else {
for idx, event := range expectedEvents {
expectedEvents[idx] = a.normalize(event)
}
}
assert.DeepEqual(t, "CADF events", a.events, expectedEvents)

// reset state for next test
a.events = nil
}

// IgnoreEventsUntilNow clears the list of recorded events, so that the next
// ExpectEvents() will only cover events generated after this point.
func (a *Auditor) IgnoreEventsUntilNow() {
a.events = nil
}

func (a *Auditor) normalize(event cadf.Event) cadf.Event {
// overwrite some attributes where we don't care about variance
event.TypeURI = "http://schemas.dmtf.org/cloud/audit/1.0/event"
event.ID = "00000000-0000-0000-0000-000000000000"
event.EventTime = "2006-01-02T15:04:05.999999+00:00"
event.EventType = "activity"
if event.Initiator.TypeURI != "service/docker-registry/janitor-task" {
// for janitor tasks, we *are* interested in the initiator because special
// attributes like relevant GC policies get encoded there
event.Initiator = cadf.Resource{}
}
event.Observer = cadf.Resource{}
return event
}
5 changes: 3 additions & 2 deletions internal/test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/sapcc/go-bits/audittools"
"github.com/sapcc/go-bits/easypg"
"github.com/sapcc/go-bits/httpapi"
"github.com/sapcc/go-bits/logg"
Expand Down Expand Up @@ -151,7 +152,7 @@ type Setup struct {
DB *keppel.DB
Clock *mock.Clock
SIDGenerator *StorageIDGenerator
Auditor *Auditor
Auditor *audittools.MockAuditor
AD *AuthDriver
AMD *basic.AccountManagementDriver
FD *FederationDriver
Expand Down Expand Up @@ -304,7 +305,7 @@ func NewSetup(t *testing.T, opts ...SetupOption) Setup {
s.Clock = mock.NewClock()
s.SIDGenerator = &StorageIDGenerator{}
s.AMD = &basic.AccountManagementDriver{}
s.Auditor = &Auditor{}
s.Auditor = audittools.NewMockAuditor()

// if we are secondary and we know the primary, share the clock with it
if params.SetupOfPrimary != nil {
Expand Down

0 comments on commit e9bc32b

Please sign in to comment.