Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PullService lock via pullID #19520

Merged
merged 6 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
asymkey_service "code.gitea.io/gitea/services/asymkey"
)

// prQueue represents a queue to handle update pull request tests
var prQueue queue.UniqueQueue
// prPatchCheckerQueue represents a queue to handle update pull request tests
var prPatchCheckerQueue queue.UniqueQueue

var (
ErrIsClosed = errors.New("pull is cosed")
Expand All @@ -43,7 +43,7 @@ var (

// AddToTaskQueue adds itself to pull request test task queue.
func AddToTaskQueue(pr *models.PullRequest) {
err := prQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
err := prPatchCheckerQueue.PushFunc(strconv.FormatInt(pr.ID, 10), func() error {
pr.Status = models.PullRequestStatusChecking
err := pr.UpdateColsIfNotMerged("status")
if err != nil {
Expand Down Expand Up @@ -144,7 +144,7 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
}

// Make sure there is no waiting test to process before leaving the checking status.
has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
has, err := prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10))
if err != nil {
log.Error("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v", pr.ID, err)
}
Expand Down Expand Up @@ -293,7 +293,7 @@ func InitializePullRequests(ctx context.Context) {
case <-ctx.Done():
return
default:
if err := prQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
if err := prPatchCheckerQueue.PushFunc(strconv.FormatInt(prID, 10), func() error {
log.Trace("Adding PR ID: %d to the pull requests patch checking queue", prID)
return nil
}); err != nil {
Expand All @@ -314,6 +314,8 @@ func handle(data ...queue.Data) []queue.Data {
}

func testPR(id int64) {
pullWorkingPool.CheckIn(fmt.Sprint(id))
6543 marked this conversation as resolved.
Show resolved Hide resolved
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
defer finished()

Expand Down Expand Up @@ -358,13 +360,13 @@ func CheckPrsForBaseBranch(baseRepo *repo_model.Repository, baseBranchName strin

// Init runs the task queue to test all the checking status pull requests
func Init() error {
prQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "")
prPatchCheckerQueue = queue.CreateUniqueQueue("pr_patch_checker", handle, "")

if prQueue == nil {
if prPatchCheckerQueue == nil {
return fmt.Errorf("Unable to create pr_patch_checker Queue")
}

go graceful.GetManager().RunWithShutdownFns(prQueue.Run)
go graceful.GetManager().RunWithShutdownFns(prPatchCheckerQueue.Run)
go graceful.GetManager().RunWithShutdownContext(InitializePullRequests)
return nil
}
10 changes: 5 additions & 5 deletions services/pull/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
queueShutdown := []func(){}
queueTerminate := []func(){}

prQueue = q.(queue.UniqueQueue)
prPatchCheckerQueue = q.(queue.UniqueQueue)

pr := unittest.AssertExistsAndLoadBean(t, &models.PullRequest{ID: 2}).(*models.PullRequest)
AddToTaskQueue(pr)
Expand All @@ -51,11 +51,11 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
return pr.Status == models.PullRequestStatusChecking
}, 1*time.Second, 100*time.Millisecond)

has, err := prQueue.Has(strconv.FormatInt(pr.ID, 10))
has, err := prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10))
assert.True(t, has)
assert.NoError(t, err)

prQueue.Run(func(shutdown func()) {
prPatchCheckerQueue.Run(func(shutdown func()) {
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
queueTerminate = append(queueTerminate, terminate)
Expand All @@ -68,7 +68,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
assert.Fail(t, "Timeout: nothing was added to pullRequestQueue")
}

has, err = prQueue.Has(strconv.FormatInt(pr.ID, 10))
has, err = prPatchCheckerQueue.Has(strconv.FormatInt(pr.ID, 10))
assert.False(t, has)
assert.NoError(t, err)

Expand All @@ -82,5 +82,5 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
callback()
}

prQueue = nil
prPatchCheckerQueue = nil
}
7 changes: 6 additions & 1 deletion services/pull/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

// Merge merges pull request to base repository.
// Caller should check PR is ready to be merged (review and status checks)
// FIXME: add repoWorkingPull make sure two merges does not happen at same time.
func Merge(ctx context.Context, pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, mergeStyle repo_model.MergeStyle, expectedHeadCommitID, message string) (err error) {
if err = pr.LoadHeadRepo(); err != nil {
log.Error("LoadHeadRepo: %v", err)
Expand All @@ -43,6 +42,9 @@ func Merge(ctx context.Context, pr *models.PullRequest, doer *user_model.User, b
return fmt.Errorf("LoadBaseRepo: %v", err)
}

pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))

prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests)
if err != nil {
log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
Expand Down Expand Up @@ -722,6 +724,9 @@ func CheckPRReadyToMerge(ctx context.Context, pr *models.PullRequest, skipProtec

// MergedManually mark pr as merged manually
func MergedManually(pr *models.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))

prUnit, err := pr.BaseRepo.GetUnit(unit.TypePullRequests)
if err != nil {
return
Expand Down
7 changes: 7 additions & 0 deletions services/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ import (
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
issue_service "code.gitea.io/gitea/services/issue"
)

// TODO: use clustered lock (unique queue? or *abuse* cache)
6543 marked this conversation as resolved.
Show resolved Hide resolved
var pullWorkingPool = sync.NewExclusivePool()

// NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *models.Issue, labelIDs []int64, uuids []string, pr *models.PullRequest, assigneeIDs []int64) error {
if err := TestPatch(pr); err != nil {
Expand Down Expand Up @@ -124,6 +128,9 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, pull *mode

// ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *models.PullRequest, doer *user_model.User, targetBranch string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))

// Current target branch is already the same
if pr.BaseBranch == targetBranch {
return nil
Expand Down
3 changes: 3 additions & 0 deletions services/pull/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func Update(ctx context.Context, pull *models.PullRequest, doer *user_model.User
style repo_model.MergeStyle
)

pullWorkingPool.CheckIn(fmt.Sprint(pull.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pull.ID))

if rebase {
pr = pull
style = repo_model.MergeStyleRebaseUpdate
Expand Down
1 change: 1 addition & 0 deletions services/repository/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

// repoWorkingPool represents a working pool to order the parallel changes to the same repository
// TODO: use clustered lock (unique queue? or *abuse* cache)
var repoWorkingPool = sync.NewExclusivePool()

// TransferOwnership transfers all corresponding setting from old user to new one.
Expand Down
3 changes: 2 additions & 1 deletion services/wiki/wiki.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (

var (
reservedWikiNames = []string{"_pages", "_new", "_edit", "raw"}
wikiWorkingPool = sync.NewExclusivePool()
// TODO: use clustered lock (unique queue? or *abuse* cache)
wikiWorkingPool = sync.NewExclusivePool()
)

func nameAllowed(name string) error {
Expand Down