Skip to content

Commit

Permalink
feat(eventsources/bitbucketserver): add OneEventPerChange config opti…
Browse files Browse the repository at this point in the history
…on for webhook event handling

Add OneEventPerChange config option to control whether to process each change in a repo:refs_changed webhook event as a separate event. This allows independent processing of each tag or reference update in a single webhook event useful for triggering distinct workflows in Argo Workflows.

Signed-off-by: Ryan Currah <ryan@currah.ca>
  • Loading branch information
ryancurrah committed May 17, 2024
1 parent 4636435 commit c5db188
Show file tree
Hide file tree
Showing 9 changed files with 773 additions and 557 deletions.
17 changes: 16 additions & 1 deletion api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 20 additions & 2 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@
"type": "object"
},
"io.argoproj.eventsource.v1alpha1.BitbucketBasicAuth": {
"description": "BasicAuth holds the information required to authenticate user via basic auth mechanism",
"description": "BitbucketBasicAuth holds the information required to authenticate user via basic auth mechanism",
"properties": {
"password": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
Expand Down Expand Up @@ -1186,6 +1186,10 @@
"description": "Metadata holds the user defined metadata which will passed along the event payload.",
"type": "object"
},
"oneEventPerChange": {
"description": "OneEventPerChange controls whether to process each change in a repo:refs_changed webhook event as a separate event. This setting is useful when multiple tags are pushed simultaneously for the same commit, and each tag needs to independently trigger an action, such as a distinct workflow in Argo Workflows. When enabled, the BitbucketServerEventSource publishes an individual BitbucketServerEventData for each change, ensuring independent processing of each tag or reference update in a single webhook event.",
"type": "boolean"
},
"projectKey": {
"description": "DeprecatedProjectKey is the key of project for which integration needs to set up. Deprecated: use Repositories instead. Will be unsupported in v1.8.",
"type": "string"
Expand Down
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

254 changes: 188 additions & 66 deletions eventsources/sources/bitbucketserver/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ import (
"strings"
"time"

bitbucketv1 "github.com/gfleury/go-bitbucket-v1"
"github.com/mitchellh/mapstructure"
"golang.org/x/exp/slices"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
eventsourcecommon "github.com/argoproj/argo-events/eventsources/common"
"github.com/argoproj/argo-events/eventsources/common/webhook"
"github.com/argoproj/argo-events/eventsources/sources"
"github.com/argoproj/argo-events/pkg/apis/events"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
bitbucketv1 "github.com/gfleury/go-bitbucket-v1"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -98,56 +96,35 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ
return
}

// When SkipBranchRefsChangedOnOpenPR is enabled and webhook event type is repo:refs_changed,
// check if a Pull Request is opened for the commit, if one is opened the event will be skipped.
if bitbucketserverEventSource.SkipBranchRefsChangedOnOpenPR && slices.Contains(bitbucketserverEventSource.Events, "repo:refs_changed") {
refsChanged := refsChangedWebhookEvent{}
err := json.Unmarshal(body, &refsChanged)
if err != nil {
logger.Errorf("reading webhook body", zap.Error(err))
common.SendErrorResponse(writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return
}

if refsChanged.EventKey == "repo:refs_changed" &&
len(refsChanged.Changes) > 0 && // Note refsChanged.Changes never has more or less than one change, not sure why Atlassian made it a list.
strings.EqualFold(refsChanged.Changes[0].Ref.Type, "BRANCH") &&
!strings.EqualFold(refsChanged.Changes[0].Type, "DELETE") {
// Check if commit is associated to an open PR.
hasOpenPR, err := router.refsChangedHasOpenPullRequest(refsChanged.Repository.Project.Key, refsChanged.Repository.Slug, refsChanged.Changes[0].ToHash)
if err != nil {
logger.Errorf("checking if changed branch ref has an open pull request", zap.Error(err))
common.SendErrorResponse(writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return
}

// Do not publish this Branch repo:refs_changed event if a related Pull Request is already opened for the commit.
if hasOpenPR {
logger.Info("skipping publishing event, commit has an open pull request")
common.SendSuccessResponse(writer, "success")
return
}
}
}

event := &events.BitbucketServerEventData{
Headers: request.Header,
Body: (*json.RawMessage)(&body),
Metadata: router.bitbucketserverEventSource.Metadata,
}

eventBody, err := json.Marshal(event)
// bitbucketEvents is a slice of byte slices, each representing an event from the Bitbucket webhook.
// By invoking webhookHandler, it processes the incoming webhook payload to either batch or separate changes
// based on configuration. Each change can thus be processed and published as an individual event.
bitbucketEvents, err := webhookHandler(request, logger, body, bitbucketserverEventSource, router)
if err != nil {
logger.Errorw("failed to parse event", zap.Error(err))
common.SendErrorResponse(writer, "invalid event")
logger.Errorw("failed to handle event", zap.Error(err))
common.SendErrorResponse(writer, err.Error())
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return
}

logger.Info("dispatching event on route's data channel")
route.DataCh <- eventBody
for _, bitbucketEvent := range bitbucketEvents {
event := &events.BitbucketServerEventData{
Headers: request.Header,
Body: (*json.RawMessage)(&bitbucketEvent),
Metadata: router.bitbucketserverEventSource.Metadata,
}

eventBody, jsonErr := json.Marshal(event)
if jsonErr != nil {
logger.Errorw("failed to parse event", zap.Error(jsonErr))
common.SendErrorResponse(writer, "invalid event")
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
return
}

logger.Info("dispatching event on route's data channel")
route.DataCh <- eventBody
}

logger.Info("request successfully processed")
common.SendSuccessResponse(writer, "success")
Expand Down Expand Up @@ -587,23 +564,6 @@ func newBitbucketServerClient(ctx context.Context, bitbucketConfig *bitbucketv1.
return bitbucketv1.NewAPIClient(ctx, bitbucketConfig)
}

type refsChangedWebhookEvent struct {
EventKey string `json:"eventKey"`
Repository struct {
Slug string `json:"slug"`
Project struct {
Key string `json:"key"`
} `json:"project"`
} `json:"repository"`
Changes []struct {
Ref struct {
Type string `json:"type"`
} `json:"ref"`
ToHash string `json:"toHash"`
Type string `json:"type"`
} `json:"changes"`
}

// customBitbucketClient returns a Bitbucket HTTP client that implements methods that gfleury/go-bitbucket-v1 does not.
// Specifically getting Pull Requests associated to a commit is not supported by gfleury/go-bitbucket-v1.
type customBitbucketClient struct {
Expand Down Expand Up @@ -709,3 +669,165 @@ func (c *customBitbucketClient) GetCommitPullRequests(project, repository, commi

return pullRequests, nil
}

// webhookHandler processes Bitbucket webhook events specifically looking for "repo:refs_changed" events.
// It first checks if the incoming webhook is configured to be handled (i.e., it's a "repo:refs_changed" event).
// If not, it logs the discrepancy and returns the raw event.
//
// For valid "repo:refs_changed" events, it:
// - Unmarshals the JSON body into a refsChangedWebhookEvent struct.
// - Optionally filters out changes associated with open pull requests if SkipBranchRefsChangedOnOpenPR is enabled.
// - Depending on the OneEventPerChange setting, it either batches all changes into one event or creates individual events for each change.
//
// It returns a slice of byte slices containing the processed or original webhook events, and any error encountered during processing.
func webhookHandler(request *http.Request, logger *zap.SugaredLogger, body []byte, bitbucketserverEventSource *v1alpha1.BitbucketServerEventSource, router *Router) ([][]byte, error) {
if request.Header.Get("X-Event-Key") != "repo:refs_changed" {
// Don't continue if this is not a repo:refs_changed webhook event.
logger.Debug("event is not a repo:refs_changed webhook")
return [][]byte{body}, nil
}

refsChanged := refsChangedWebhookEvent{}
err := json.Unmarshal(body, &refsChanged)
if err != nil {
return nil, fmt.Errorf("decoding repo:refs_changed webhook: %w", err)
}

// When SkipBranchRefsChangedOnOpenPR is enabled, skip publishing the repo:refs_changed event if a Pull Request is already open for the commit.
// This prevents duplicate notifications when a branch is updated and a PR is already open for the latest commit.
if bitbucketserverEventSource.SkipBranchRefsChangedOnOpenPR {
err = filterChangesForOpenPRs(router, &refsChanged)
if err != nil {
return nil, fmt.Errorf("filtering changes for open prs: %w", err)
}
}

if len(refsChanged.Changes) == 0 {
// No changes are present in the refsChanged event. Skip processing event
return [][]byte{}, nil
}

var bitbucketEvents [][]byte

// Handle event batching based on OneEventPerChange configuration.
// If enabled, split the refsChanged event into individual events, one per change.
// Otherwise, send the entire refsChanged event as a single event.
if bitbucketserverEventSource.OneEventPerChange {
for _, change := range refsChanged.Changes {
rc := refsChanged

rc.Changes = []refsChangedWebHookEventChange{change}

rcBytes, jsonErr := json.Marshal(&rc)
if jsonErr != nil {
return nil, fmt.Errorf("encoding repo:refs_changed webhook: %w", jsonErr)
}

bitbucketEvents = append(bitbucketEvents, rcBytes)
}
} else {
bitbucketEvents = append(bitbucketEvents, body)
}

return bitbucketEvents, nil
}

// filterChangesForOpenPRs removes changes from the refsChanged webhook event that are linked to open pull requests.
// It examines each change to determine if it pertains to a "BRANCH" that has not been deleted.
// For each relevant change, it checks whether there is an open pull request using the commit's hash.
// Changes with open PRs are excluded from the final list. The modified list of changes is then reassigned
// back to refsChanged.Changes.
func filterChangesForOpenPRs(router *Router, refsChanged *refsChangedWebhookEvent) error {
var keptChanges []refsChangedWebHookEventChange

for _, change := range refsChanged.Changes {
c := change

if c.Ref.Type != "BRANCH" || c.Type == "DELETE" {
keptChanges = append(keptChanges, c)
continue
}

// Check if the current commit is associated with an open PR.
hasOpenPR, hasOpenPrErr := router.refsChangedHasOpenPullRequest(
refsChanged.Repository.Project.Key,
refsChanged.Repository.Slug,
c.ToHash, // Check for the current change's commit hash
)
if hasOpenPrErr != nil {
return fmt.Errorf("checking if changed branch ref has an open pull request: %w", hasOpenPrErr)
}

if !hasOpenPR {
keptChanges = append(keptChanges, c)
}
}

refsChanged.Changes = keptChanges

return nil
}

type refsChangedWebhookEvent struct {
EventKey string `json:"eventKey"`
Date string `json:"date"`
Actor struct {
Name string `json:"name"`
EmailAddress string `json:"emailAddress"`
ID int `json:"id"`
DisplayName string `json:"displayName"`
Active bool `json:"active"`
Slug string `json:"slug"`
Type string `json:"type"`
Links struct {
Self []struct {
Href string `json:"href"`
} `json:"self"`
} `json:"links"`
} `json:"actor"`
Repository struct {
Slug string `json:"slug"`
ID int `json:"id"`
Name string `json:"name"`
HierarchyID string `json:"hierarchyId"`
ScmID string `json:"scmId"`
State string `json:"state"`
StatusMessage string `json:"statusMessage"`
Forkable bool `json:"forkable"`
Project struct {
Key string `json:"key"`
ID int `json:"id"`
Name string `json:"name"`
Public bool `json:"public"`
Type string `json:"type"`
Links struct {
Self []struct {
Href string `json:"href"`
} `json:"self"`
} `json:"links"`
} `json:"project"`
Public bool `json:"public"`
Links struct {
Clone []struct {
Href string `json:"href"`
Name string `json:"name"`
} `json:"clone"`
Self []struct {
Href string `json:"href"`
} `json:"self"`
} `json:"links"`
} `json:"repository"`
Changes []refsChangedWebHookEventChange `json:"changes"`
}

type refsChangedWebHookEventChange struct {
Ref struct {
ID string `json:"id"`
DisplayID string `json:"displayId"`
Type string `json:"type"`
} `json:"ref"`
RefID string `json:"refId"`
FromHash string `json:"fromHash"`
ToHash string `json:"toHash"`
Type string `json:"type"`
}
Loading

0 comments on commit c5db188

Please sign in to comment.