Skip to content

Commit

Permalink
Merge branch 'main' into notation
Browse files Browse the repository at this point in the history
  • Loading branch information
wy65701436 authored Jul 10, 2023
2 parents e80253f + cbb211e commit 11e0322
Show file tree
Hide file tree
Showing 19 changed files with 820 additions and 156 deletions.
2 changes: 2 additions & 0 deletions src/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,6 @@ const (
UIMaxLengthLimitedOfNumber = 10
// ExecutionStatusRefreshIntervalSeconds is the interval seconds for refreshing execution status
ExecutionStatusRefreshIntervalSeconds = "execution_status_refresh_interval_seconds"
// QuotaUpdateProvider is the provider for updating quota, currently support Redis and DB
QuotaUpdateProvider = "quota_update_provider"
)
14 changes: 12 additions & 2 deletions src/controller/blob/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ func (c *controller) Sync(ctx context.Context, references []distribution.Descrip

func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string, size int64) error {
key := blobSizeKey(sessionID)
err := libredis.Instance().Set(ctx, key, size, c.blobSizeExpiration).Err()
rc, err := libredis.GetRegistryClient()
if err != nil {
return err
}

Check warning on line 329 in src/controller/blob/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/blob/controller.go#L328-L329

Added lines #L328 - L329 were not covered by tests

err = rc.Set(ctx, key, size, c.blobSizeExpiration).Err()
if err != nil {
log.Errorf("failed to set accepted blob size for session %s in redis, error: %v", sessionID, err)
return err
Expand All @@ -334,7 +339,12 @@ func (c *controller) SetAcceptedBlobSize(ctx context.Context, sessionID string,

func (c *controller) GetAcceptedBlobSize(ctx context.Context, sessionID string) (int64, error) {
key := blobSizeKey(sessionID)
size, err := libredis.Instance().Get(ctx, key).Int64()
rc, err := libredis.GetRegistryClient()
if err != nil {
return 0, err
}

Check warning on line 345 in src/controller/blob/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/blob/controller.go#L344-L345

Added lines #L344 - L345 were not covered by tests

size, err := rc.Get(ctx, key).Int64()
if err != nil {
if err == redis.Nil {
return 0, nil
Expand Down
215 changes: 194 additions & 21 deletions src/controller/quota/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,48 @@ import (
"fmt"
"time"

"github.com/go-redis/redis/v8"
"golang.org/x/sync/singleflight"

// quota driver
_ "github.com/goharbor/harbor/src/controller/quota/driver"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/gtask"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
libredis "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/lib/retry"
"github.com/goharbor/harbor/src/pkg/quota"
"github.com/goharbor/harbor/src/pkg/quota/driver"
"github.com/goharbor/harbor/src/pkg/quota/types"

// init the db config
_ "github.com/goharbor/harbor/src/pkg/config/db"
)

func init() {
// register the async task for flushing quota to db when enable update quota by redis
if provider := config.GetQuotaUpdateProvider(); provider == updateQuotaProviderRedis.String() {
gtask.DefaultPool().AddTask(flushQuota, 30*time.Second)
}

Check warning on line 48 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

type updateQuotaProviderType string

func (t updateQuotaProviderType) String() string {
return string(t)
}

var (
defaultRetryTimeout = time.Minute * 5
// quotaExpireTimeout is the expire time for quota when update quota by redis
quotaExpireTimeout = time.Minute * 5

updateQuotaProviderRedis updateQuotaProviderType = "Redis"
updateQuotaProviderDB updateQuotaProviderType = "DB"
)

var (
Expand Down Expand Up @@ -87,6 +115,31 @@ type controller struct {
reservedExpiration time.Duration

quotaMgr quota.Manager
g singleflight.Group
}

// flushQuota flushes the quota info from redis to db asynchronously.
func flushQuota(ctx context.Context) {
iter, err := cache.Default().Scan(ctx, "quota:*")
if err != nil {
log.Errorf("failed to scan out the quota records from redis")
}

Check warning on line 126 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L122-L126

Added lines #L122 - L126 were not covered by tests

for iter.Next(ctx) {
key := iter.Val()
q := &quota.Quota{}
err = cache.Default().Fetch(ctx, key, q)
if err != nil {
log.Errorf("failed to fetch quota: %s, error: %v", key, err)
continue

Check warning on line 134 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L128-L134

Added lines #L128 - L134 were not covered by tests
}

if err = Ctl.Update(ctx, q); err != nil {
log.Errorf("failed to refresh quota: %s, error: %v", key, err)
} else {
log.Debugf("successfully refreshed quota: %s", key)
}

Check warning on line 141 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}
}

func (c *controller) Count(ctx context.Context, query *q.Query) (int64, error) {
Expand Down Expand Up @@ -163,13 +216,83 @@ func (c *controller) List(ctx context.Context, query *q.Query, options ...Option
return quotas, nil
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), retryOpts ...retry.Option) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
// updateUsageByDB updates the quota usage by the database which updates the quota usage immediately.
func (c *controller) updateUsageByDB(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return retry.Abort(err)
}

Check warning on line 224 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L223-L224

Added lines #L223 - L224 were not covered by tests

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
}

Check warning on line 229 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L228-L229

Added lines #L228 - L229 were not covered by tests

used, err := q.GetUsed()
if err != nil {
return retry.Abort(err)
}

Check warning on line 234 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L233-L234

Added lines #L233 - L234 were not covered by tests

newUsed, err := op(hardLimits, used)
if err != nil {
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
}

q.SetUsed(newUsed)

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
return retry.Abort(err)
}

Check warning on line 253 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L252-L253

Added lines #L252 - L253 were not covered by tests

return err
}

// updateUsageByRedis updates the quota usage by the redis and flush the quota usage to db asynchronously.
func (c *controller) updateUsageByRedis(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error)) error {
// earlier abort if context is error such as context canceled
if ctx.Err() != nil {
return retry.Abort(ctx.Err())
}

Check warning on line 263 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L259-L263

Added lines #L259 - L263 were not covered by tests

client, err := libredis.GetCoreClient()
if err != nil {
return retry.Abort(err)
}

Check warning on line 268 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L265-L268

Added lines #L265 - L268 were not covered by tests
// normally use cache.Save will append prefix "cache:", in order to keep consistent
// here adopts raw redis client should also pad the prefix manually.
key := fmt.Sprintf("%s:quota:%s:%s", "cache", reference, referenceID)
return client.Watch(ctx, func(tx *redis.Tx) error {
data, err := tx.Get(ctx, key).Result()
if err != nil && err != redis.Nil {

Check warning on line 274 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L271-L274

Added lines #L271 - L274 were not covered by tests
return retry.Abort(err)
}

q := &quota.Quota{}
// calc the quota usage in real time if no key found
if err == redis.Nil {
// use singleflight to prevent cache penetration and cause pressure on the database.
realQuota, err, _ := c.g.Do(key, func() (interface{}, error) {
return c.calcQuota(ctx, reference, referenceID)
})
if err != nil {
return retry.Abort(err)
}

Check warning on line 287 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L278-L287

Added lines #L278 - L287 were not covered by tests

q = realQuota.(*quota.Quota)
} else {
if err = cache.DefaultCodec().Decode([]byte(data), q); err != nil {
return retry.Abort(err)
}

Check warning on line 293 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L289-L293

Added lines #L289 - L293 were not covered by tests
}

hardLimits, err := q.GetHard()
if err != nil {
return retry.Abort(err)
Expand All @@ -185,21 +308,42 @@ func (c *controller) updateUsageWithRetry(ctx context.Context, reference, refere
return retry.Abort(err)
}

// The PR https://github.com/goharbor/harbor/pull/17392 optimized the logic for post upload blob which use size 0
// for checking quota, this will increase the pressure of optimistic lock, so here return earlier
// if the quota usage has not changed to reduce the probability of optimistic lock.
if types.Equals(used, newUsed) {
return nil
q.SetUsed(newUsed)

val, err := cache.DefaultCodec().Encode(q)
if err != nil {
return retry.Abort(err)

Check warning on line 315 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L311-L315

Added lines #L311 - L315 were not covered by tests
}

q.SetUsed(newUsed)
_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
_, err = p.Set(ctx, key, val, quotaExpireTimeout).Result()
return err
})

Check warning on line 321 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L318-L321

Added lines #L318 - L321 were not covered by tests

err = c.quotaMgr.Update(ctx, q)
if err != nil && !errors.Is(err, orm.ErrOptimisticLock) {
if err != nil && err != redis.TxFailedErr {

Check warning on line 323 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L323

Added line #L323 was not covered by tests
return retry.Abort(err)
}

return err
}, key)
}

func (c *controller) updateUsageWithRetry(ctx context.Context, reference, referenceID string, op func(hardLimits, used types.ResourceList) (types.ResourceList, error), provider updateQuotaProviderType, retryOpts ...retry.Option) error {
var f func() error
switch provider {
case updateQuotaProviderDB:
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}
case updateQuotaProviderRedis:
f = func() error {
return c.updateUsageByRedis(ctx, reference, referenceID, op)
}

Check warning on line 341 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L334-L341

Added lines #L334 - L341 were not covered by tests
default:
// by default is update quota by db
f = func() error {
return c.updateUsageByDB(ctx, reference, referenceID, op)
}
}

options := []retry.Option{
Expand Down Expand Up @@ -235,23 +379,25 @@ func (c *controller) Refresh(ctx context.Context, reference, referenceID string,
return newUsed, err
}

return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), opts.RetryOptions...)
// update quota usage by db for refresh operation
return c.updateUsageWithRetry(ctx, reference, referenceID, refreshResources(calculateUsage, opts.IgnoreLimitation), updateQuotaProviderType(config.GetQuotaUpdateProvider()), opts.RetryOptions...)
}

func (c *controller) Request(ctx context.Context, reference, referenceID string, resources types.ResourceList, f func() error) error {
if len(resources) == 0 {
return f()
}

if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources)); err != nil {
provider := updateQuotaProviderType(config.GetQuotaUpdateProvider())
if err := c.updateUsageWithRetry(ctx, reference, referenceID, reserveResources(resources), provider); err != nil {
log.G(ctx).Errorf("reserve resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, err)
return err
}

err := f()

if err != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources)); er != nil {
if er := c.updateUsageWithRetry(ctx, reference, referenceID, rollbackResources(resources), provider); er != nil {
// ignore this error, the quota usage will be correct when users do operations which will call refresh quota
log.G(ctx).Warningf("rollback resources %s for %s %s failed, error: %v", resources.String(), reference, referenceID, er)
}
Expand All @@ -260,22 +406,49 @@ func (c *controller) Request(ctx context.Context, reference, referenceID string,
return err
}

// calcQuota calculates the quota and usage in real time.
func (c *controller) calcQuota(ctx context.Context, reference, referenceID string) (*quota.Quota, error) {
// get quota and usage from db
q, err := c.quotaMgr.GetByRef(ctx, reference, referenceID)
if err != nil {
return nil, err
}

Check warning on line 415 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L410-L415

Added lines #L410 - L415 were not covered by tests
// the usage in the db maybe outdated, calc it in real time
driver, err := Driver(ctx, reference)
if err != nil {
return nil, err
}

Check warning on line 420 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L417-L420

Added lines #L417 - L420 were not covered by tests

newUsed, err := driver.CalculateUsage(ctx, referenceID)
if err != nil {
log.G(ctx).Errorf("failed to calculate quota usage for %s %s, error: %v", reference, referenceID, err)
return nil, err
}

Check warning on line 426 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L422-L426

Added lines #L422 - L426 were not covered by tests

q.SetUsed(newUsed)
return q, nil

Check warning on line 429 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L428-L429

Added lines #L428 - L429 were not covered by tests
}

func (c *controller) Update(ctx context.Context, u *quota.Quota) error {
f := func() error {
q, err := c.quotaMgr.GetByRef(ctx, u.Reference, u.ReferenceID)
if err != nil {
return err
}

if q.Hard != u.Hard {
if hard, err := u.GetHard(); err == nil {
q.SetHard(hard)
if oldHard, err := q.GetHard(); err == nil {
if newHard, err := u.GetHard(); err == nil {
if !types.Equals(oldHard, newHard) {
q.SetHard(newHard)
}

Check warning on line 443 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L439-L443

Added lines #L439 - L443 were not covered by tests
}
}

if q.Used != u.Used {
if used, err := u.GetUsed(); err == nil {
q.SetUsed(used)
if oldUsed, err := q.GetUsed(); err == nil {
if newUsed, err := u.GetUsed(); err == nil {
if !types.Equals(oldUsed, newUsed) {
q.SetUsed(newUsed)
}

Check warning on line 451 in src/controller/quota/controller.go

View check run for this annotation

Codecov / codecov/patch

src/controller/quota/controller.go#L447-L451

Added lines #L447 - L451 were not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
golang.org/x/crypto v0.5.0
golang.org/x/net v0.9.0
golang.org/x/oauth2 v0.5.0
golang.org/x/sync v0.3.0
golang.org/x/text v0.9.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
gopkg.in/h2non/gock.v1 v1.0.16
Expand Down Expand Up @@ -162,7 +163,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/sync v0.3.0
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
google.golang.org/api v0.110.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions src/lib/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (suite *CacheTestSuite) TestScan() {
}
}
{
// no match should return all keys
// return all keys with test-scan-*
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand All @@ -143,12 +143,12 @@ func (suite *CacheTestSuite) TestScan() {
}

{
// with match should return matched keys
// return matched keys with test-scan-1*
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "*test-scan-1*")
iter, err := suite.cache.Scan(suite.ctx, "test-scan-1*")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/config/metadata/metadatalist.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,6 @@ var (
{Name: common.ExecutionStatusRefreshIntervalSeconds, Scope: SystemScope, Group: BasicGroup, EnvKey: "EXECUTION_STATUS_REFRESH_INTERVAL_SECONDS", DefaultValue: "30", ItemType: &Int64Type{}, Editable: false, Description: `The interval seconds to refresh the execution status`},

{Name: common.BannerMessage, Scope: UserScope, Group: BasicGroup, EnvKey: "BANNER_MESSAGE", DefaultValue: "", ItemType: &StringType{}, Editable: true, Description: `The customized banner message for the UI`},
{Name: common.QuotaUpdateProvider, Scope: SystemScope, Group: BasicGroup, EnvKey: "QUOTA_UPDATE_PROVIDER", DefaultValue: "db", ItemType: &StringType{}, Editable: false, Description: `The provider for updating quota, 'db' or 'redis' is supported`},
}
)
Loading

0 comments on commit 11e0322

Please sign in to comment.