Skip to content

Commit

Permalink
NGINX Plus: dynamic upstream reloads support (#1469)
Browse files Browse the repository at this point in the history
NGINX Plus: dynamic upstream reloads support

Problem: One of the benefits of using NGINX Plus is the ability to dynamically update upstream servers using the API. We currently only perform nginx reloads to update upstream servers, which can be a disruptive process.

Solution: If using NGINX Plus, we'll now use the N+ API to update upstream servers. This reduces the amount of reloads that we have to perform, specifically when endpoints change (scaled, for example) with no other changes.
  • Loading branch information
sjberman authored Jan 24, 2024
1 parent 73f7918 commit d65385d
Show file tree
Hide file tree
Showing 28 changed files with 1,076 additions and 262 deletions.
118 changes: 109 additions & 9 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
ngxclient "github.com/nginxinc/nginx-plus-go-client/client"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -98,21 +99,33 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
h.handleEvent(ctx, logger, event)
}

changed, graph := h.cfg.processor.Process()
if !changed {
changeType, graph := h.cfg.processor.Process()

var err error
switch changeType {
case state.NoChange:
logger.Info("Handling events didn't result into NGINX configuration changes")
if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil {
h.cfg.healthChecker.setAsReady()
}
return
case state.EndpointsOnlyChange:
h.cfg.version++
err = h.updateUpstreamServers(
ctx,
logger,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
)
case state.ClusterStateChange:
h.cfg.version++
err = h.updateNginxConf(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
)
}

var nginxReloadRes nginxReloadResult
h.cfg.version++
if err := h.updateNginx(
ctx,
dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version),
); err != nil {
if err != nil {
logger.Error(err, "Failed to update NGINX configuration")
nginxReloadRes.error = err
if !h.cfg.healthChecker.ready {
Expand Down Expand Up @@ -174,9 +187,9 @@ func (h *eventHandlerImpl) handleEvent(ctx context.Context, logger logr.Logger,
}
}

func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Configuration) error {
// updateNginxConf updates nginx conf files and reloads nginx
func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error {
files := h.cfg.generator.Generate(conf)

if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
}
Expand All @@ -188,6 +201,93 @@ func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Confi
return nil
}

// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then:
// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them;
// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx
func (h *eventHandlerImpl) updateUpstreamServers(
ctx context.Context,
logger logr.Logger,
conf dataplane.Configuration,
) error {
isPlus := h.cfg.nginxRuntimeMgr.IsPlus()

files := h.cfg.generator.Generate(conf)
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
}

reload := func() error {
if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil {
return fmt.Errorf("failed to reload NGINX: %w", err)
}

return nil
}

if isPlus {
type upstream struct {
name string
servers []ngxclient.UpstreamServer
}
var upstreams []upstream

prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
if err != nil {
logger.Error(err, "failed to get upstreams from API, reloading configuration instead")
return reload()
}

for _, u := range conf.Upstreams {
upstream := upstream{
name: u.Name,
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
}

if u, ok := prevUpstreams[upstream.name]; ok {
if !serversEqual(upstream.servers, u.Peers) {
upstreams = append(upstreams, upstream)
}
}
}

var reloadPlus bool
for _, upstream := range upstreams {
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
logger.Error(
err, "couldn't update upstream via the API, reloading configuration instead",
"upstreamName", upstream.name,
)
reloadPlus = true
}
}

if !reloadPlus {
return nil
}
}

return reload()
}

func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool {
if len(newServers) != len(oldServers) {
return false
}

diff := make(map[string]struct{}, len(newServers))
for _, s := range newServers {
diff[s.Server] = struct{}{}
}

for _, s := range oldServers {
if _, ok := diff[s.Server]; !ok {
return false
}
}

return true
}

// updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status
// based on the outcome
func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(
Expand Down
172 changes: 167 additions & 5 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"

ngxclient "github.com/nginxinc/nginx-plus-go-client/client"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file/filefakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/runtime/runtimefakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state"
staticConds "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/conditions"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane"
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/graph"
Expand Down Expand Up @@ -64,6 +67,7 @@ var _ = Describe("eventHandler", func() {

BeforeEach(func() {
fakeProcessor = &statefakes.FakeChangeProcessor{}
fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{})
fakeGenerator = &configfakes.FakeGenerator{}
fakeNginxFileMgr = &filefakes.FakeManager{}
fakeNginxRuntimeMgr = &runtimefakes.FakeManager{}
Expand Down Expand Up @@ -112,7 +116,7 @@ var _ = Describe("eventHandler", func() {
}

BeforeEach(func() {
fakeProcessor.ProcessReturns(true /* changed */, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange /* changed */, &graph.Graph{})

fakeGenerator.GenerateReturns(fakeCfgFiles)
})
Expand Down Expand Up @@ -280,11 +284,129 @@ var _ = Describe("eventHandler", func() {
})
})

When("receiving an EndpointsOnlyChange update", func() {
e := &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-gateway",
Namespace: "nginx-gateway",
},
}}
batch := []interface{}{e}

BeforeEach(func() {
fakeProcessor.ProcessReturns(state.EndpointsOnlyChange, &graph.Graph{})
upstreams := ngxclient.Upstreams{
"one": ngxclient.Upstream{
Peers: []ngxclient.Peer{
{Server: "server1"},
},
},
}
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
})

When("running NGINX Plus", func() {
It("should call the NGINX Plus API", func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1))
})
})

When("not running NGINX Plus", func() {
It("should not call the NGINX Plus API", func() {
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(0))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(1))
})
})
})

When("updating upstream servers", func() {
conf := dataplane.Configuration{
Upstreams: []dataplane.Upstream{
{
Name: "one",
},
},
}

type callCounts struct {
generate int
update int
reload int
}

assertCallCounts := func(cc callCounts) {
Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate))
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload))
}

BeforeEach(func() {
upstreams := ngxclient.Upstreams{
"one": ngxclient.Upstream{
Peers: []ngxclient.Peer{
{Server: "server1"},
},
},
}
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
})

When("running NGINX Plus", func() {
BeforeEach(func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)
})

It("should update servers using the NGINX Plus API", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 0})
})

It("should reload when GET API returns an error", func() {
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should reload when POST API returns an error", func() {
fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 1})
})
})

When("not running NGINX Plus", func() {
It("should update servers by reloading", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should return an error when reloading fails", func() {
fakeNginxRuntimeMgr.ReloadReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})
})
})

It("should set the health checker status properly when there are changes", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}

fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expand All @@ -304,22 +426,22 @@ var _ = Describe("eventHandler", func() {
e := &events.UpsertEvent{Resource: &gatewayv1.HTTPRoute{}}
batch := []interface{}{e}

fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error"))

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// now send an update with no changes; should still return an error
fakeProcessor.ProcessReturns(false, &graph.Graph{})
fakeProcessor.ProcessReturns(state.NoChange, &graph.Graph{})

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed())

// error goes away
fakeProcessor.ProcessReturns(true, &graph.Graph{})
fakeProcessor.ProcessReturns(state.ClusterStateChange, &graph.Graph{})
fakeNginxRuntimeMgr.ReloadReturns(nil)

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
Expand All @@ -339,6 +461,46 @@ var _ = Describe("eventHandler", func() {
})
})

var _ = Describe("serversEqual", func() {
DescribeTable("determines if server lists are equal",
func(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer, equal bool) {
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
},
Entry("different length",
[]ngxclient.UpstreamServer{
{Server: "server1"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server2"},
},
false,
),
Entry("differing elements",
[]ngxclient.UpstreamServer{
{Server: "server1"},
{Server: "server2"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server3"},
},
false,
),
Entry("same elements",
[]ngxclient.UpstreamServer{
{Server: "server1"},
{Server: "server2"},
},
[]ngxclient.Peer{
{Server: "server1"},
{Server: "server2"},
},
true,
),
)
})

var _ = Describe("getGatewayAddresses", func() {
It("gets gateway addresses from a Service", func() {
fakeClient := fake.NewFakeClient()
Expand Down
Loading

0 comments on commit d65385d

Please sign in to comment.