Skip to content

Commit

Permalink
[Backport 5.2] Embeddings: fail job immediately if rate limited excee…
Browse files Browse the repository at this point in the history
…ded (#58939)

Embeddings: fail job immediately if rate limited exceeded (#58869)

Usually, during an embeddings job we allow 10% of embedding requests to fail,
simply skipping over failed chunks. If a customer has hit their rate limits,
this means we might continually send a huge number of embedding requests that
we know will immediately fail. With this change, we immediately fail a job if
the rate limit is exceeded.

It also increases the amount of time between attempting to run a job to 15
minutes. This won't make a big difference to user experience, since by default
embeddings jobs aren't allowed to be scheduled within 24h of the last run. But
it helps prevent jobs from continuously being scheduled then failing.

This change is unlikely to have a user-facing impact, but just helps cut down
on noise in logs and excessive requests to Cody Gateway.
  • Loading branch information
jtibshirani committed Dec 13, 2023
1 parent fb0a625 commit 1e2545b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ func newRepoEmbeddingScheduler(
enqueueActive,
goroutine.WithName("repoEmbeddingSchedulerJob"),
goroutine.WithDescription("resolves embedding policies and schedules jobs to embed repos"),
goroutine.WithInterval(5*time.Minute),
goroutine.WithInterval(15*time.Minute),
)
}
35 changes: 19 additions & 16 deletions internal/embeddings/embed/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,25 @@ func embedFiles(
}

batchEmbeddings, err := embeddingsClient.GetDocumentEmbeddings(ctx, batchChunks)
if err != nil && !excludeChunksOnError {
return nil, errors.Wrap(err, "error while getting embeddings")
} else if err != nil {
// To avoid failing large jobs on a flaky API, just mark all files
// as failed and continue. This means we may have some missing
// files, but they will be logged as such below and some embeddings
// are better than no embeddings.
logger.Warn("error while getting embeddings", log.Error(err))
failed := make([]int, len(batchChunks))
for i := 0; i < len(batchChunks); i++ {
failed[i] = i
}
batchEmbeddings = &client.EmbeddingsResults{
Embeddings: make([]float32, len(batchChunks)*dimensions),
Failed: failed,
Dimensions: dimensions,
if err != nil {
if !excludeChunksOnError || errors.Is(err, &client.RateLimitExceededError{}) {
// Fail immediately if we hit a rate limit, so we don't continually retry and fail on every chunk.
return nil, errors.Wrap(err, "error while getting embeddings")
} else {
// To avoid failing large jobs on a flaky API, just mark all files
// as failed and continue. This means we may have some missing
// files, but they will be logged as such below and some embeddings
// are better than no embeddings.
logger.Warn("error while getting embeddings", log.Error(err))
failed := make([]int, len(batchChunks))
for i := 0; i < len(batchChunks); i++ {
failed[i] = i
}
batchEmbeddings = &client.EmbeddingsResults{
Embeddings: make([]float32, len(batchChunks)*dimensions),
Failed: failed,
Dimensions: dimensions,
}
}
}

Expand Down
22 changes: 20 additions & 2 deletions internal/embeddings/embed/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/sourcegraph/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -475,6 +476,7 @@ func TestEmbedRepo_ExcludeChunkOnError(t *testing.T) {
partialFailureClient := &flakyEmbeddingsClient{
EmbeddingsClient: embeddingsClient,
remainingFailures: 1,
err: errors.New("FAIL"),
}
_, _, stats, err := EmbedRepo(ctx, partialFailureClient, inserter, contextService, rl, repoIDName, mockRepoPathRanks, opts, logger, noopReport)
require.NoError(t, err)
Expand All @@ -489,6 +491,21 @@ func TestEmbedRepo_ExcludeChunkOnError(t *testing.T) {
partialFailureClient := &flakyEmbeddingsClient{
EmbeddingsClient: embeddingsClient,
remainingFailures: 100,
err: errors.New("FAIL"),
}
_, _, _, err := EmbedRepo(ctx, partialFailureClient, inserter, contextService, rl, repoIDName, mockRepoPathRanks, opts, logger, noopReport)
require.Error(t, err)
})

t.Run("immediately fail if rate limit hit", func(t *testing.T) {
rl := newReadLister("a.go", "b.md", "c.java", "big.java")
opts := opts
opts.TolerableFailureRatio = 0.1

partialFailureClient := &flakyEmbeddingsClient{
EmbeddingsClient: embeddingsClient,
remainingFailures: 1,
err: client.NewRateLimitExceededError(time.Now().Add(time.Minute)),
}
_, _, _, err := EmbedRepo(ctx, partialFailureClient, inserter, contextService, rl, repoIDName, mockRepoPathRanks, opts, logger, noopReport)
require.Error(t, err)
Expand Down Expand Up @@ -703,20 +720,21 @@ func (c *mockEmbeddingsClient) GetDocumentEmbeddings(_ context.Context, texts []
type flakyEmbeddingsClient struct {
client.EmbeddingsClient
remainingFailures int
err error
}

func (c *flakyEmbeddingsClient) GetQueryEmbedding(ctx context.Context, query string) (*client.EmbeddingsResults, error) {
if c.remainingFailures > 0 {
c.remainingFailures -= 1
return nil, errors.New("FAIL")
return nil, c.err
}
return c.EmbeddingsClient.GetQueryEmbedding(ctx, query)
}

func (c *flakyEmbeddingsClient) GetDocumentEmbeddings(ctx context.Context, documents []string) (*client.EmbeddingsResults, error) {
if c.remainingFailures > 0 {
c.remainingFailures -= 1
return nil, errors.New("FAIL")
return nil, c.err
}
return c.EmbeddingsClient.GetDocumentEmbeddings(ctx, documents)
}
Expand Down

0 comments on commit 1e2545b

Please sign in to comment.