Skip to content

Commit

Permalink
Merge pull request #434 from sapcc/reduced-account
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperSandro2000 authored Sep 25, 2024
2 parents e75a891 + 2dc8214 commit d1c34ea
Show file tree
Hide file tree
Showing 30 changed files with 198 additions and 144 deletions.
8 changes: 4 additions & 4 deletions internal/api/keppel/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ func uploadManifest(t *testing.T, s test.Setup, account *models.Account, repo *m
Digest: manifest.Digest.String(),
Content: manifest.Contents,
}))
mustDo(t, s.SD.WriteManifest(s.Ctx, *account, repo.Name, manifest.Digest, manifest.Contents))
mustDo(t, s.SD.WriteManifest(s.Ctx, account.Reduced(), repo.Name, manifest.Digest, manifest.Contents))
return dbManifest
}

Expand Down Expand Up @@ -1872,11 +1872,11 @@ func TestDeleteAccount(t *testing.T) {
mustInsert(t, s.DB, &blob)
blobs = append(blobs, blob)

err := s.SD.AppendToBlob(s.Ctx, *accounts[0], storageID, 1, &blob.SizeBytes, bytes.NewReader(testBlob.Contents))
err := s.SD.AppendToBlob(s.Ctx, accounts[0].Reduced(), storageID, 1, &blob.SizeBytes, bytes.NewReader(testBlob.Contents))
if err != nil {
t.Fatal(err.Error())
}
err = s.SD.FinalizeBlob(s.Ctx, *accounts[0], storageID, 1)
err = s.SD.FinalizeBlob(s.Ctx, accounts[0].Reduced(), storageID, 1)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -1900,7 +1900,7 @@ func TestDeleteAccount(t *testing.T) {
NextCheckAt: time.Unix(0, 0),
VulnerabilityStatus: models.PendingVulnerabilityStatus,
})
err := s.SD.WriteManifest(s.Ctx, *accounts[0], repos[0].Name, image.Manifest.Digest, image.Manifest.Contents)
err := s.SD.WriteManifest(s.Ctx, accounts[0].Reduced(), repos[0].Name, image.Manifest.Digest, image.Manifest.Contents)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
6 changes: 3 additions & 3 deletions internal/api/keppel/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (a *API) handleDeleteManifest(w http.ResponseWriter, r *http.Request) {
return
}

err = a.processor().DeleteManifest(r.Context(), *account, *repo, parsedDigest, keppel.AuditContext{
err = a.processor().DeleteManifest(r.Context(), account.Reduced(), *repo, parsedDigest, keppel.AuditContext{
UserIdentity: authz.UserIdentity,
Request: r,
})
Expand Down Expand Up @@ -257,7 +257,7 @@ func (a *API) handleDeleteTag(w http.ResponseWriter, r *http.Request) {
}
tagName := mux.Vars(r)["tag_name"]

err := a.processor().DeleteTag(*account, *repo, tagName, keppel.AuditContext{
err := a.processor().DeleteTag(account.Reduced(), *repo, tagName, keppel.AuditContext{
UserIdentity: authz.UserIdentity,
Request: r,
})
Expand All @@ -283,7 +283,7 @@ func (a *API) handleGetTrivyReport(w http.ResponseWriter, r *http.Request) {
return
}

err := api.CheckRateLimit(r, a.rle, *account, authz, keppel.TrivyReportRetrieveAction, 1)
err := api.CheckRateLimit(r, a.rle, account.Reduced(), authz, keppel.TrivyReportRetrieveAction, 1)
if err != nil {
if rerr, ok := errext.As[*keppel.RegistryV2Error](err); ok && rerr != nil {
rerr.WriteAsRegistryV2ResponseTo(w, r)
Expand Down
2 changes: 1 addition & 1 deletion internal/api/keppel/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestManifestsAPI(t *testing.T) {

err := s.SD.WriteManifest(
s.Ctx,
models.Account{Name: repo.AccountName},
models.ReducedAccount{Name: repo.AccountName},
repo.Name, dummyDigest, []byte(strings.Repeat("x", sizeBytes)),
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/api/registry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (info anycastRequestInfo) AsPrometheusLabels() prometheus.Labels {
// If the account does not exist locally, but the request is for the anycast API
// and the account exists elsewhere, the `anycastHandler` is invoked if given
// instead of giving a 404 response.
func (a *API) checkAccountAccess(w http.ResponseWriter, r *http.Request, strategy repoAccessStrategy, anycastHandler func(http.ResponseWriter, *http.Request, anycastRequestInfo)) (*models.Account, *models.Repository, *auth.Authorization) {
func (a *API) checkAccountAccess(w http.ResponseWriter, r *http.Request, strategy repoAccessStrategy, anycastHandler func(http.ResponseWriter, *http.Request, anycastRequestInfo)) (*models.ReducedAccount, *models.Repository, *auth.Authorization) {
// must be set even for 401 responses!
w.Header().Set("Docker-Distribution-Api-Version", "registry/2.0")

Expand All @@ -211,7 +211,7 @@ func (a *API) checkAccountAccess(w http.ResponseWriter, r *http.Request, strateg
return nil, nil, nil
}

// check authorization before FindAccount(); otherwise we might leak
// check authorization before FindReducedAccount(); otherwise we might leak
// information about account existence to unauthorized users
switch r.Method {
case http.MethodDelete:
Expand All @@ -234,7 +234,7 @@ func (a *API) checkAccountAccess(w http.ResponseWriter, r *http.Request, strateg

// we need to know the account to select the registry instance for this request
repoScope := scope.ParseRepositoryScope(authz.Audience)
account, err := keppel.FindAccount(a.db, repoScope.AccountName)
account, err := keppel.FindReducedAccount(a.db, repoScope.AccountName)
if respondWithError(w, r, err) {
return nil, nil, nil
}
Expand Down
14 changes: 7 additions & 7 deletions internal/api/registry/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (a *API) handleStartBlobUpload(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}

func (a *API) performCrossRepositoryBlobMount(w http.ResponseWriter, r *http.Request, account models.Account, targetRepo models.Repository, authz *auth.Authorization, sourceRepoFullName, blobDigestStr string) {
func (a *API) performCrossRepositoryBlobMount(w http.ResponseWriter, r *http.Request, account models.ReducedAccount, targetRepo models.Repository, authz *auth.Authorization, sourceRepoFullName, blobDigestStr string) {
// validate source repository
sourceRepoName, ok := strings.CutPrefix(sourceRepoFullName, string(account.Name)+"/")
if !ok {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (a *API) performCrossRepositoryBlobMount(w http.ResponseWriter, r *http.Req
w.WriteHeader(http.StatusCreated)
}

func (a *API) performMonolithicUpload(w http.ResponseWriter, r *http.Request, account models.Account, repo models.Repository, authz *auth.Authorization, blobDigestStr string) (ok bool) {
func (a *API) performMonolithicUpload(w http.ResponseWriter, r *http.Request, account models.ReducedAccount, repo models.Repository, authz *auth.Authorization, blobDigestStr string) (ok bool) {
blobDigest, err := digest.Parse(blobDigestStr)
if err != nil {
keppel.ErrDigestInvalid.With(err.Error()).WriteAsRegistryV2ResponseTo(w, r)
Expand Down Expand Up @@ -519,7 +519,7 @@ func (a *API) findUpload(w http.ResponseWriter, r *http.Request, repo models.Rep
return upload
}

func (a *API) resumeUpload(ctx context.Context, account models.Account, upload *models.Upload, stateStr string) (dw *digestWriter, returnErr *keppel.RegistryV2Error) {
func (a *API) resumeUpload(ctx context.Context, account models.ReducedAccount, upload *models.Upload, stateStr string) (dw *digestWriter, returnErr *keppel.RegistryV2Error) {
// when encountering an error, cancel the upload entirely
defer func() {
if returnErr != nil {
Expand Down Expand Up @@ -615,7 +615,7 @@ func (a *API) parseContentRange(upload *models.Upload, hdr http.Header) (uint64,
return length, nil
}

func (a *API) streamIntoUpload(ctx context.Context, account models.Account, upload *models.Upload, dw *digestWriter, chunk io.Reader, chunkSizeBytes *uint64) (digestState string, returnErr error) {
func (a *API) streamIntoUpload(ctx context.Context, account models.ReducedAccount, upload *models.Upload, dw *digestWriter, chunk io.Reader, chunkSizeBytes *uint64) (digestState string, returnErr error) {
// if anything happens during this operation, we likely have produced an
// inconsistent state between DB, storage backend and our internal book
// keeping (esp. the digestState in dw.Hash), so we will have to abort the
Expand Down Expand Up @@ -670,7 +670,7 @@ func (a *API) streamIntoUpload(ctx context.Context, account models.Account, uplo
return base64.URLEncoding.EncodeToString(digestStateBytes), nil
}

func (a *API) createBlobFromUpload(ctx context.Context, account models.Account, repo models.Repository, upload models.Upload, blobDigestStr string) (blob *models.Blob, returnErr error) {
func (a *API) createBlobFromUpload(ctx context.Context, account models.ReducedAccount, repo models.Repository, upload models.Upload, blobDigestStr string) (blob *models.Blob, returnErr error) {
// validate the digest provided by the user
if blobDigestStr == "" {
return nil, keppel.ErrDigestInvalid.With("missing digest")
Expand Down Expand Up @@ -716,7 +716,7 @@ var insertBlobIfMissingQuery = sqlext.SimplifyWhitespace(`
// Insert a Blob object in the database. This is similar to building a
// keppel.Blob and doing tx.Insert(blob), but handles a collision where another
// blob with the same account name and digest already exists in the database.
func (a *API) createOrUpdateBlobObject(ctx context.Context, tx *gorp.Transaction, sizeBytes uint64, storageID string, blobDigest digest.Digest, blobPushedAt time.Time, account models.Account) (*models.Blob, error) {
func (a *API) createOrUpdateBlobObject(ctx context.Context, tx *gorp.Transaction, sizeBytes uint64, storageID string, blobDigest digest.Digest, blobPushedAt time.Time, account models.ReducedAccount) (*models.Blob, error) {
// try to insert the blob atomically (I would like to SELECT the result
// directly via `RETURNING *`, but that gives sql.ErrNoRows when nothing was
// inserted because of ON CONFLICT, so in the general case, we need another
Expand Down Expand Up @@ -762,7 +762,7 @@ func (w *digestWriter) Write(buf []byte) (n int, err error) {
return n, err
}

func countAbortedBlobUpload(account models.Account) {
func countAbortedBlobUpload(account models.ReducedAccount) {
l := prometheus.Labels{"account": string(account.Name), "auth_tenant_id": account.AuthTenantID, "method": "registry-api"}
api.UploadsAbortedCounter.With(l).Inc()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/sapcc/keppel/internal/models"
)

func CheckRateLimit(r *http.Request, rle *keppel.RateLimitEngine, account models.Account, authz *auth.Authorization, action keppel.RateLimitedAction, amount uint64) error {
func CheckRateLimit(r *http.Request, rle *keppel.RateLimitEngine, account models.ReducedAccount, authz *auth.Authorization, action keppel.RateLimitedAction, amount uint64) error {
// rate-limiting is optional
if rle == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/drivers/basic/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d RateLimitDriver) Init(ad keppel.AuthDriver, cfg keppel.Configuration) er
}

// GetRateLimit implements the keppel.RateLimitDriver interface.
func (d RateLimitDriver) GetRateLimit(account models.Account, action keppel.RateLimitedAction) *redis_rate.Limit {
func (d RateLimitDriver) GetRateLimit(account models.ReducedAccount, action keppel.RateLimitedAction) *redis_rate.Limit {
quota, ok := d.Limits[action]
if ok {
return &quota
Expand Down
38 changes: 19 additions & 19 deletions internal/drivers/filesystem/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ func (d *StorageDriver) Init(ad keppel.AuthDriver, cfg keppel.Configuration) (er
return err
}

func (d *StorageDriver) getBlobBasePath(account models.Account) string {
func (d *StorageDriver) getBlobBasePath(account models.ReducedAccount) string {
return fmt.Sprintf("%s/%s/%s/blobs", d.rootPath, account.AuthTenantID, account.Name)
}

func (d *StorageDriver) getBlobPath(account models.Account, storageID string) string {
func (d *StorageDriver) getBlobPath(account models.ReducedAccount, storageID string) string {
return fmt.Sprintf("%s/%s/%s/blobs/%s", d.rootPath, account.AuthTenantID, account.Name, storageID)
}

func (d *StorageDriver) getManifestBasePath(account models.Account) string {
func (d *StorageDriver) getManifestBasePath(account models.ReducedAccount) string {
return fmt.Sprintf("%s/%s/%s/manifests", d.rootPath, account.AuthTenantID, account.Name)
}

func (d *StorageDriver) getManifestPath(account models.Account, repoName string, manifestDigest digest.Digest) string {
func (d *StorageDriver) getManifestPath(account models.ReducedAccount, repoName string, manifestDigest digest.Digest) string {
return fmt.Sprintf("%s/%s/%s/manifests/%s/%s", d.rootPath, account.AuthTenantID, account.Name, repoName, manifestDigest)
}

// AppendToBlob implements the keppel.StorageDriver interface.
func (d *StorageDriver) AppendToBlob(ctx context.Context, account models.Account, storageID string, chunkNumber uint32, chunkLength *uint64, chunk io.Reader) error {
func (d *StorageDriver) AppendToBlob(ctx context.Context, account models.ReducedAccount, storageID string, chunkNumber uint32, chunkLength *uint64, chunk io.Reader) error {
path := d.getBlobPath(account, storageID)
tmpPath := path + ".tmp"
flags := os.O_APPEND | os.O_WRONLY
Expand All @@ -90,21 +90,21 @@ func (d *StorageDriver) AppendToBlob(ctx context.Context, account models.Account
}

// FinalizeBlob implements the keppel.StorageDriver interface.
func (d *StorageDriver) FinalizeBlob(ctx context.Context, account models.Account, storageID string, chunkCount uint32) error {
func (d *StorageDriver) FinalizeBlob(ctx context.Context, account models.ReducedAccount, storageID string, chunkCount uint32) error {
path := d.getBlobPath(account, storageID)
tmpPath := path + ".tmp"
return os.Rename(tmpPath, path)
}

// AbortBlobUpload implements the keppel.StorageDriver interface.
func (d *StorageDriver) AbortBlobUpload(ctx context.Context, account models.Account, storageID string, chunkCount uint32) error {
func (d *StorageDriver) AbortBlobUpload(ctx context.Context, account models.ReducedAccount, storageID string, chunkCount uint32) error {
path := d.getBlobPath(account, storageID)
tmpPath := path + ".tmp"
return os.Remove(tmpPath)
}

// ReadBlob implements the keppel.StorageDriver interface.
func (d *StorageDriver) ReadBlob(ctx context.Context, account models.Account, storageID string) (io.ReadCloser, uint64, error) {
func (d *StorageDriver) ReadBlob(ctx context.Context, account models.ReducedAccount, storageID string) (io.ReadCloser, uint64, error) {
path := d.getBlobPath(account, storageID)
f, err := os.Open(path)
if err != nil {
Expand All @@ -119,24 +119,24 @@ func (d *StorageDriver) ReadBlob(ctx context.Context, account models.Account, st
}

// URLForBlob implements the keppel.StorageDriver interface.
func (d *StorageDriver) URLForBlob(ctx context.Context, account models.Account, storageID string) (string, error) {
func (d *StorageDriver) URLForBlob(ctx context.Context, account models.ReducedAccount, storageID string) (string, error) {
return "", keppel.ErrCannotGenerateURL
}

// DeleteBlob implements the keppel.StorageDriver interface.
func (d *StorageDriver) DeleteBlob(ctx context.Context, account models.Account, storageID string) error {
func (d *StorageDriver) DeleteBlob(ctx context.Context, account models.ReducedAccount, storageID string) error {
path := d.getBlobPath(account, storageID)
return os.Remove(path)
}

// ReadManifest implements the keppel.StorageDriver interface.
func (d *StorageDriver) ReadManifest(ctx context.Context, account models.Account, repoName string, manifestDigest digest.Digest) ([]byte, error) {
func (d *StorageDriver) ReadManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest) ([]byte, error) {
path := d.getManifestPath(account, repoName, manifestDigest)
return os.ReadFile(path)
}

// WriteManifest implements the keppel.StorageDriver interface.
func (d *StorageDriver) WriteManifest(ctx context.Context, account models.Account, repoName string, manifestDigest digest.Digest, contents []byte) error {
func (d *StorageDriver) WriteManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, contents []byte) error {
path := d.getManifestPath(account, repoName, manifestDigest)
tmpPath := path + ".tmp"
err := os.MkdirAll(filepath.Dir(tmpPath), 0777)
Expand All @@ -151,13 +151,13 @@ func (d *StorageDriver) WriteManifest(ctx context.Context, account models.Accoun
}

// DeleteManifest implements the keppel.StorageDriver interface.
func (d *StorageDriver) DeleteManifest(ctx context.Context, account models.Account, repoName string, manifestDigest digest.Digest) error {
func (d *StorageDriver) DeleteManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest) error {
path := d.getManifestPath(account, repoName, manifestDigest)
return os.Remove(path)
}

// ListStorageContents implements the keppel.StorageDriver interface.
func (d *StorageDriver) ListStorageContents(ctx context.Context, account models.Account) ([]keppel.StoredBlobInfo, []keppel.StoredManifestInfo, error) {
func (d *StorageDriver) ListStorageContents(ctx context.Context, account models.ReducedAccount) ([]keppel.StoredBlobInfo, []keppel.StoredManifestInfo, error) {
blobs, err := d.getBlobs(account)
if err != nil {
return nil, nil, err
Expand All @@ -169,7 +169,7 @@ func (d *StorageDriver) ListStorageContents(ctx context.Context, account models.
return blobs, manifests, nil
}

func (d *StorageDriver) getBlobs(account models.Account) ([]keppel.StoredBlobInfo, error) {
func (d *StorageDriver) getBlobs(account models.ReducedAccount) ([]keppel.StoredBlobInfo, error) {
var blobs []keppel.StoredBlobInfo
directory, err := os.Open(d.getBlobBasePath(account))
if err != nil {
Expand All @@ -194,7 +194,7 @@ func (d *StorageDriver) getBlobs(account models.Account) ([]keppel.StoredBlobInf
return blobs, nil
}

func (d *StorageDriver) getManifests(account models.Account) ([]keppel.StoredManifestInfo, error) {
func (d *StorageDriver) getManifests(account models.ReducedAccount) ([]keppel.StoredManifestInfo, error) {
var manifests []keppel.StoredManifestInfo
directory, err := os.Open(d.getManifestBasePath(account))
if err != nil {
Expand All @@ -218,7 +218,7 @@ func (d *StorageDriver) getManifests(account models.Account) ([]keppel.StoredMan
return manifests, nil
}

func (d *StorageDriver) getRepoManifests(account models.Account, repo string) ([]keppel.StoredManifestInfo, error) {
func (d *StorageDriver) getRepoManifests(account models.ReducedAccount, repo string) ([]keppel.StoredManifestInfo, error) {
var manifests []keppel.StoredManifestInfo
directory, err := os.Open(filepath.Join(d.getManifestBasePath(account), repo))
if err != nil {
Expand Down Expand Up @@ -251,12 +251,12 @@ func (d *StorageDriver) getRepoManifests(account models.Account, repo string) ([
}

// CanSetupAccount implements the keppel.StorageDriver interface.
func (d *StorageDriver) CanSetupAccount(ctx context.Context, account models.Account) error {
func (d *StorageDriver) CanSetupAccount(ctx context.Context, account models.ReducedAccount) error {
return nil // this driver does not perform any preflight checks here
}

// CleanupAccount implements the keppel.StorageDriver interface.
func (d *StorageDriver) CleanupAccount(ctx context.Context, account models.Account) error {
func (d *StorageDriver) CleanupAccount(ctx context.Context, account models.ReducedAccount) error {
// double-check that cleanup order is right; when the account gets deleted,
// all blobs and manifests must have been deleted from it before
storedBlobs, storedManifests, err := d.ListStorageContents(ctx, account)
Expand Down
Loading

0 comments on commit d1c34ea

Please sign in to comment.