Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
[Backport 5.2] graphqlbackend/telemetry: add query for recently expor…
Browse files Browse the repository at this point in the history
…ted events (#59041)

* graphqlbackend/telemetry: add query for recently exported events (#57029)

Adds a `telemetry { exportedEvents { ... } }` query that allows a site admin to view recently exported events, before they are removed from the queue by the queue cleaner after `TELEMETRY_GATEWAY_EXPORTER_EXPORTED_EVENTS_RETENTION`, which defaults to 24 hours. This is very different from `event_logs` because it provides a `protojson` rendering of the "true" event payload - the data shown is exactly the data that was exported (except the export happens over proto), whereas the `event_logs` equivalent is translated from the raw data and may be missing some things. The new query and resolver supports pagination.

This is useful in local development to see events without interrogating the telemetry-gateway's local dev logging mode or connecting it to a real telemetry-gateway and querying BigQuery.

This can also be useful if a customer wants to see what is getting exported - right now, there's no easy way to do so without asking someone at Sourcegraph to check BigQuery, or for the customer to parse the raw proto payloads in the database themselves. I have a feeling this ask will eventually arise as we roll out v2 telemetry adoption more broadly.

Closes https://github.com/sourcegraph/sourcegraph/issues/57027

## Test plan

Unit and integration tests, and some manual testing with `sg start` and running some searches:

![image](https://github.com/sourcegraph/sourcegraph/assets/23356519/ab39d9ad-829f-475a-b093-411edbcdf579)

---------

Co-authored-by: Joe Chen <[email protected]>
(cherry picked from commit 5bfda07)

* fixup

---------

Co-authored-by: Robert Lin <[email protected]>
Co-authored-by: Warren Gifford <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent ac23781 commit e464b48
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 357 deletions.
25 changes: 24 additions & 1 deletion cmd/frontend/graphqlbackend/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package graphqlbackend
import (
"context"

"github.com/graph-gophers/graphql-go"

"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend/graphqlutil"
"github.com/sourcegraph/sourcegraph/internal/gqlutil"
)

Expand All @@ -13,8 +16,28 @@ type TelemetryRootResolver struct{ Resolver TelemetryResolver }
func (t *TelemetryRootResolver) Telemetry() TelemetryResolver { return t.Resolver }

type TelemetryResolver interface {
// Queries
ExportedEvents(context.Context, *ExportedEventsArgs) (ExportedEventsConnectionResolver, error)

// Mutations
RecordEvents(ctx context.Context, args *RecordEventsArgs) (*EmptyResponse, error)
RecordEvents(context.Context, *RecordEventsArgs) (*EmptyResponse, error)
}

type ExportedEventsArgs struct {
First int32
After *string
}

type ExportedEventResolver interface {
ID() graphql.ID
ExportedAt() gqlutil.DateTime
Payload() (JSONValue, error)
}

type ExportedEventsConnectionResolver interface {
Nodes() []ExportedEventResolver
TotalCount() (int32, error)
PageInfo() *graphqlutil.PageInfo
}

type RecordEventArgs struct{ Event TelemetryEventInput }
Expand Down
68 changes: 67 additions & 1 deletion cmd/frontend/graphqlbackend/telemetry.graphql
Original file line number Diff line number Diff line change
@@ -1,9 +1,75 @@
extend type Query {
"""
Telemetry queries for "Event Logging Everywhere", aka a version 2 of
existing event-logging/event-recording APIs.
"""
telemetry: TelemetryQuery!
}

"""
Queries for exported telemetry events.
"""
type TelemetryQuery {
"""
List events that were recently exported, up to the retention period configured
on the instance.
"""
exportedEvents(
"""
Returns the first n recently exported events.
"""
first: Int = 50
"""
Opaque pagination cursor.
"""
after: String
): ExportedEventsConnection!
}

"""
ExportedEvent does not implement Node for lookup because they are ephemereal.
"""
type ExportedEvent {
"""
The unique id of the event queued for export.
"""
id: ID!
"""
The time this event was exported at.
"""
exportedAt: DateTime!
"""
The raw event payload that was exported, rendered as JSON.
"""
payload: JSONValue!
}

"""
A list of recently exported telemetry event payloads.
"""
type ExportedEventsConnection {
"""
A list of exported events, with the most recent events first.
"""
nodes: [ExportedEvent!]!

"""
The total number of events in the connection.
"""
totalCount: Int!

"""
Pagination information.
"""
pageInfo: PageInfo!
}

extend type Mutation {
"""
Telemetry mutations for "Event Logging Everywhere", aka a version 2 of
existing event-logging/event-recording APIs.
"""
telemetry: TelemetryMutation
telemetry: TelemetryMutation!
}

"""
Expand Down
4 changes: 4 additions & 0 deletions cmd/frontend/graphqlbackend/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (

type mockTelemetryResolver struct {
events []TelemetryEventInput

// Embed interface directly in mock so that it satisfies TelemetryResolver.
// Unexpected usage of interface methods that aren't mocked will panic.
TelemetryResolver
}

func (m *mockTelemetryResolver) RecordEvents(_ context.Context, args *RecordEventsArgs) (*EmptyResponse, error) {
Expand Down
26 changes: 25 additions & 1 deletion cmd/frontend/internal/telemetry/resolvers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,58 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "resolvers",
srcs = [
"exportedevents_resolvers.go",
"resolvers.go",
"telemetrygateway.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/frontend/internal/telemetry/resolvers",
visibility = ["//cmd/frontend:__subpackages__"],
deps = [
"//cmd/frontend/graphqlbackend",
"//cmd/frontend/graphqlbackend/graphqlutil",
"//internal/auth",
"//internal/database",
"//internal/gqlutil",
"//internal/telemetry/teestore",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/version",
"//lib/errors",
"//lib/pointers",
"@com_github_graph_gophers_graphql_go//:graphql-go",
"@com_github_graph_gophers_graphql_go//relay",
"@com_github_sourcegraph_log//:log",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//types/known/structpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)

go_test(
name = "resolvers_test",
srcs = ["telemetrygateway_test.go"],
srcs = [
"resolvers_test.go",
"telemetrygateway_test.go",
],
embed = [":resolvers"],
tags = [
# Test requires localhost database
"requires-network",
],
deps = [
"//cmd/frontend/graphqlbackend",
"//internal/actor",
"//internal/database",
"//internal/database/dbmocks",
"//internal/database/dbtest",
"//internal/database/fakedb",
"//internal/gqlutil",
"//internal/telemetrygateway/v1:telemetrygateway",
"//internal/types",
"@com_github_hexops_autogold_v2//:autogold",
"@com_github_sourcegraph_log//logtest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package resolvers

import (
"context"
"encoding/json"
"time"

"github.com/graph-gophers/graphql-go"
"github.com/graph-gophers/graphql-go/relay"
"google.golang.org/protobuf/encoding/protojson"

"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend/graphqlutil"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/gqlutil"
"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/sourcegraph/lib/pointers"
)

func decodeExportedEventsCursor(cursor string) (*time.Time, error) {
cursor, err := graphqlutil.DecodeCursor(&cursor)
if err != nil {
return nil, errors.Wrap(err, "invalid cursor")
}
t, err := time.Parse(time.RFC3339, cursor)
if err != nil {
return nil, errors.Wrap(err, "invalid cursor data")
}
return &t, nil
}

func encodeExportedEventsCursor(t time.Time) *graphqlutil.PageInfo {
ts, err := t.MarshalText()
if err != nil {
return graphqlutil.HasNextPage(false)
}
return graphqlutil.EncodeCursor(pointers.Ptr(string(ts)))
}

type ExportedEventResolver struct {
event database.ExportedTelemetryEvent
}

var _ graphqlbackend.ExportedEventResolver = &ExportedEventResolver{}

func (r *ExportedEventResolver) ID() graphql.ID {
return relay.MarshalID("ExportedEvent", r.event.ID)
}

func (r *ExportedEventResolver) ExportedAt() gqlutil.DateTime {
return gqlutil.DateTime{Time: r.event.ExportedAt}
}

func (r *ExportedEventResolver) Payload() (graphqlbackend.JSONValue, error) {
payload, err := protojson.Marshal(r.event.Payload)
if err != nil {
return graphqlbackend.JSONValue{Value: struct{}{}},
errors.Wrapf(err, "failed to marshal payload of event ID %q", r.event.ID)
}
return graphqlbackend.JSONValue{Value: json.RawMessage(payload)}, nil
}

type ExportedEventsConnectionResolver struct {
ctx context.Context
diagnostics database.TelemetryEventsExportQueueDiagnosticsStore

limit int
exported []database.ExportedTelemetryEvent
}

var _ graphqlbackend.ExportedEventsConnectionResolver = &ExportedEventsConnectionResolver{}

func (r *ExportedEventsConnectionResolver) Nodes() []graphqlbackend.ExportedEventResolver {
nodes := make([]graphqlbackend.ExportedEventResolver, len(r.exported))
for i, event := range r.exported {
nodes[i] = &ExportedEventResolver{event: event}
}
return nodes
}

func (r *ExportedEventsConnectionResolver) TotalCount() (int32, error) {
count, err := r.diagnostics.CountRecentlyExported(r.ctx)
if err != nil {
return 0, errors.Wrap(err, "CountRecentlyExported")
}
return int32(count), nil
}

func (r *ExportedEventsConnectionResolver) PageInfo() *graphqlutil.PageInfo {
if len(r.exported) == 0 || len(r.exported) < r.limit {
return graphqlutil.HasNextPage(false)
}
lastEvent := r.exported[len(r.exported)-1]
return encodeExportedEventsCursor(lastEvent.Timestamp)
}
42 changes: 40 additions & 2 deletions cmd/frontend/internal/telemetry/resolvers/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/sourcegraph/log"

"github.com/sourcegraph/sourcegraph/cmd/frontend/graphqlbackend"
"github.com/sourcegraph/sourcegraph/internal/auth"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/telemetry/teestore"
telemetrygatewayv1 "github.com/sourcegraph/sourcegraph/internal/telemetrygateway/v1"
Expand All @@ -17,15 +18,52 @@ import (
// Resolver is the GraphQL resolver of all things related to telemetry V2.
type Resolver struct {
logger log.Logger
db database.DB
teestore *teestore.Store
}

var _ graphqlbackend.TelemetryResolver = &Resolver{}

// New returns a new Resolver whose store uses the given database
func New(logger log.Logger, db database.DB) graphqlbackend.TelemetryResolver {
return &Resolver{logger: logger, teestore: teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs())}
return &Resolver{
logger: logger,
db: db,
teestore: teestore.NewStore(db.TelemetryEventsExportQueue(), db.EventLogs()),
}
}

var _ graphqlbackend.TelemetryResolver = &Resolver{}
func (r *Resolver) ExportedEvents(ctx context.Context, args *graphqlbackend.ExportedEventsArgs) (graphqlbackend.ExportedEventsConnectionResolver, error) {
// 🚨 SECURITY: Caller must be a site admin.
if err := auth.CheckCurrentUserIsSiteAdmin(ctx, r.db); err != nil {
return nil, err
}

first := int(args.First)
if first <= 0 {
first = 50
}
var before *time.Time
if args.After != nil {
var err error
before, err = decodeExportedEventsCursor(*args.After)
if err != nil {
return nil, errors.Wrap(err, "invalid cursor data")
}
}

exported, err := r.db.TelemetryEventsExportQueue().ListRecentlyExported(ctx, first, before)
if err != nil {
return nil, errors.Wrap(err, "ListRecentlyExported")
}

return &ExportedEventsConnectionResolver{
ctx: ctx,
diagnostics: r.db.TelemetryEventsExportQueue(),
limit: first,
exported: exported,
}, nil
}

func (r *Resolver) RecordEvents(ctx context.Context, args *graphqlbackend.RecordEventsArgs) (*graphqlbackend.EmptyResponse, error) {
if args == nil || len(args.Events) == 0 {
Expand Down
Loading

0 comments on commit e464b48

Please sign in to comment.