Skip to content

Commit

Permalink
Merge pull request #937 from ywc689/fix-hc-agent-racing
Browse files Browse the repository at this point in the history
Fix backend update racing problem with dpvs-agent and healthcheck.
  • Loading branch information
ywc689 authored Apr 10, 2024
2 parents ef96bbf + fad525d commit 3ec1d59
Show file tree
Hide file tree
Showing 34 changed files with 933 additions and 376 deletions.
40 changes: 5 additions & 35 deletions src/VERSION
Original file line number Diff line number Diff line change
@@ -1,44 +1,14 @@
#!/bin/sh
# program: dpvs
# Dec 19, 2023 #
# Mar 12, 2024 #
##
# Features
# - New tool: **dpvs-agent**, a management daemon tool for dpvs based on OpenAPI.
# - New tool: **healthcheck**, a service health check daemon tool cooperating with dpvs-agent.
# - Dpvs: Develop **passive health check** methods for tcp and bidirectional udp backends.
# - Dpvs: Add supports for **Proxy Protocol** with both v1 and v2 versions.
# - Dpvs: Add supports for extended statistics of ethernet devices.
# - Dpvs: Add configuration file and dpip supports for allmulticast setting switch.
# - Build: Transfer all build configurations to a top-level file `config.mk`.
# - Containerization: Draft a Dockerfile and a tutorial document to build and run dpvs in container.
#
# Bugfixes
# - Dpvs: Protect toa from source address spoofing attack and increase success ratio for source address delievery via toa.
# - Dpvs: Adjust tcp window scale in outbound direction for synproxy to improve throughput in bulk upload cases.
# - Dpvs: Fix timer inaccuracy problem when timing over 524s.
# - Dpvs: Fix the crash problem caused by ether address list buffer overflow.
# - Dpvs: Fix the crash problem caused by dividing by zero when bonding slaves attempt to send packets out.
# - Dpvs: Fix the crash problem caused by inconsistent data structures of `dp_vs_dest_compat` between dpvs and keepalived.
# - Dpvs: Correct ipo option length for judgement of branching to standalone uoa.
# - Dpvs: Inhibit setting multicast ether address from slave lcores.
# - Dpvs: Fix service flag conflicts of synproxy and expire-quiescent.
# - Dpvs: Fix the chaos use of flag, flags and fwdmode in dest and service structures.
# - Dpvs: Fix service flush function not usable problem.
# - Dpvs: Fix invalid port problem when getting verbose information of netif devices.
# - Dpvs: Use atomic operation to generate packet id for ipv4 header.
# - Dpvs: Remove fragile implementations of strategy routing for snat.
# - Dpvs: Remove the stale config item "ipc_msg/unix_domain".
# - Keepalived: Do not delete and re-add vs/rs to eliminate service disturbances at reload.
# - Keepalived: Fix a carsh problem caused by missing definition of allowlist/denylist config items.
# - Ipvsadm: Add `conn-timeout` configuration option for service.
# - Ipvsadm: Fix the ambiguous use of '-Y' configuration option.
# - Ipvsadm: Fix icmpv6 configuration option `-1` lost problem..
# - Ipvsadm: Update help text, including supported schedulers, laddr and allow/deny ip list.
# - Dpip: Fix line break problem in help message.
# - Uoa: Enable ipv6 with a macro for uoa example server.
# - tools: Fix concurrency problem between dpvs-agent and healthcheck in editing realserver .
# - tools/dpvs-agent: Add the snapshot cache.
# - tools/healthchech: Fix occasionally arising bad icmp checksum problem for udp and udpping checkers.
#

export VERSION=1.9
export RELEASE=6
export RELEASE=7

echo $VERSION-$RELEASE
4 changes: 4 additions & 0 deletions src/ipvs/ip_vs_dest.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ static void __dp_vs_dest_update(struct dp_vs_service *svc,
int conn_flags;

rte_atomic16_set(&dest->weight, udest->weight);
if (udest->flags & DPVS_DEST_F_INHIBITED)
dp_vs_dest_set_inhibited(dest);
else
dp_vs_dest_clear_inhibited(dest);
conn_flags = udest->conn_flags | DPVS_CONN_F_INACTIVE;
dest->fwdmode = udest->fwdmode;
rte_atomic16_set(&dest->conn_flags, conn_flags);
Expand Down
11 changes: 6 additions & 5 deletions tools/dpvs-agent/cmd/dpvs-agent-server/local_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ func (agent *DpvsAgentServer) LocalLoad(cp *pool.ConnPool, parentLogger hclog.Lo
logger = parentLogger.Named("LoadConfigFile")
}

snapshot := settings.ShareSnapshot()
if err := snapshot.LoadFrom(settings.LocalConfigFile(), logger); err != nil {
nodeSnap := settings.ShareSnapshot()
if err := nodeSnap.LoadFrom(settings.LocalConfigFile(), logger); err != nil {
return err
}

announcePort := snapshot.NodeSpec.AnnouncePort
laddrs := snapshot.NodeSpec.Laddrs
announcePort := nodeSnap.NodeSpec.AnnouncePort
laddrs := nodeSnap.NodeSpec.Laddrs

for _, service := range snapshot.Services {
for _, snap := range nodeSnap.Snapshot {
service := snap.Service
// 1> ipvsadm -A vip:port -s wrr
vs := types.NewVirtualServerSpec()
vs.SetAddr(service.Addr)
Expand Down
11 changes: 9 additions & 2 deletions tools/dpvs-agent/cmd/ipvs/delete_vs_vip_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,21 @@ func (h *delVsItem) Handle(params apiVs.DeleteVsVipPortParams) middleware.Respon
return apiVs.NewDeleteVsVipPortFailure()
}

shareSnapshot := settings.ShareSnapshot()
snapshot := shareSnapshot.SnapshotGet(params.VipPort)
if snapshot != nil {
snapshot.Lock()
defer snapshot.Unlock()
}

result := vs.Del(h.connPool, h.logger)
switch result {
case types.EDPVS_OK:
settings.ShareSnapshot().ServiceDel(params.VipPort)
shareSnapshot.ServiceDel(params.VipPort)
h.logger.Info("Del virtual server success.", "VipPort", params.VipPort)
return apiVs.NewDeleteVsVipPortOK()
case types.EDPVS_NOTEXIST:
settings.ShareSnapshot().ServiceDel(params.VipPort)
shareSnapshot.ServiceDel(params.VipPort)
h.logger.Warn("Del a not exist virtual server done.", "VipPort", params.VipPort, "result", result.String())
return apiVs.NewDeleteVsVipPortNotFound()
default:
Expand Down
57 changes: 20 additions & 37 deletions tools/dpvs-agent/cmd/ipvs/get_vs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,28 @@ func NewGetVs(cp *pool.ConnPool, parentLogger hclog.Logger) *getVs {
}

func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder {
shareSnapshot := settings.ShareSnapshot()
if params.Healthcheck != nil && !*params.Healthcheck {
return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger))
}

// if params.Snapshot != nil && *params.Snapshot {
// shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger)
// }

front := types.NewVirtualServerFront()
vss, err := front.Get(h.connPool, h.logger)
if err != nil {
h.logger.Error("Get virtual server list failed.", "Error", err.Error())
// FIXME: Invalid
return apiVs.NewGetVsOK()
return apiVs.NewGetVsNoContent()
}

shareSnapshot := settings.ShareSnapshot()
vsModels := models.VirtualServerList{
Items: make([]*models.VirtualServerSpecExpand, len(vss)),
}

h.logger.Info("Get all virtual server done.", "vss", vss)
for _, vs := range vss {
for i, vs := range vss {
front := types.NewRealServerFront()

err := front.ParseVipPortProto(vs.ID())
Expand All @@ -69,45 +79,18 @@ func (h *getVs) Handle(params apiVs.GetVsParams) middleware.Responder {

h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss)

vsModel := vs.GetModel()
vsStats := (*types.ServerStats)(vsModel.Stats)
vsModel.RSs = new(models.RealServerExpandList)
vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
vsModels.Items[i] = vs.GetModel()
vsStats := (*types.ServerStats)(vsModels.Items[i].Stats)
vsModels.Items[i].RSs = new(models.RealServerExpandList)
vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss))

for j, rs := range rss {
rsModel := rs.GetModel()
rsStats := (*types.ServerStats)(rsModel.Stats)
vsModel.RSs.Items[j] = rsModel
vsModels.Items[i].RSs.Items[j] = rsModel
vsStats.Increase(rsStats)
}

if shareSnapshot.NodeSpec.Laddrs == nil {
laddr := types.NewLocalAddrFront()
if err := laddr.ParseVipPortProto(vs.ID()); err != nil {
// FIXME: Invalid
return apiVs.NewGetVsOK()
}

laddrs, err := laddr.Get(h.connPool, h.logger)
if err != nil {
// FIXME: Invalid
return apiVs.NewGetVsOK()
}

shareSnapshot.NodeSpec.Laddrs = new(models.LocalAddressExpandList)
laddrModels := shareSnapshot.NodeSpec.Laddrs
laddrModels.Items = make([]*models.LocalAddressSpecExpand, len(laddrs))
for k, lip := range laddrs {
laddrModels.Items[k] = lip.GetModel()
}
}

shareSnapshot.ServiceUpsert(vsModel)
}

if params.Snapshot != nil && *params.Snapshot {
shareSnapshot.DumpTo(settings.LocalConfigFile(), h.logger)
}

return apiVs.NewGetVsOK().WithPayload(shareSnapshot.GetModels(h.logger))
return apiVs.NewGetVsOK().WithPayload(&vsModels)
}
56 changes: 46 additions & 10 deletions tools/dpvs-agent/cmd/ipvs/get_vs_vip_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ipvs

import (
"strings"

"github.com/dpvs-agent/models"
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/ipc/types"
Expand All @@ -40,10 +42,33 @@ func NewGetVsVipPort(cp *pool.ConnPool, parentLogger hclog.Logger) *getVsVipPort
}

func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Responder {
shareSnapshot := settings.ShareSnapshot()
if params.Healthcheck != nil && !*params.Healthcheck {
vsModel := shareSnapshot.ServiceGet(params.VipPort)
if vsModel != nil {
vsModels := new(models.VirtualServerList)
vsModels.Items = make([]*models.VirtualServerSpecExpand, 1)
vsModels.Items[0] = vsModel
return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
}
}

vaild := true
var vss []*types.VirtualServerSpec
spec := types.NewVirtualServerSpec()
err := spec.ParseVipPortProto(params.VipPort)
if err != nil {
vaild = false
if params.Healthcheck != nil && !*params.Healthcheck {
// invalid VipPort string
// respond full cache info
vsModels := shareSnapshot.GetModels(h.logger)
if len(vsModels.Items) != 0 {
return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
}
// read from dpvs memory
}

h.logger.Warn("Convert to virtual server failed. Get All virtual server.", "VipPort", params.VipPort, "Error", err.Error())
front := types.NewVirtualServerFront()
vss, err = front.Get(h.connPool, h.logger)
Expand All @@ -56,10 +81,9 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon
return apiVs.NewGetVsVipPortNotFound()
}

shareSnapshot := settings.ShareSnapshot()

vsModels := new(models.VirtualServerList)
vsModels.Items = make([]*models.VirtualServerSpecExpand, len(vss))
vsModels := &models.VirtualServerList{
Items: make([]*models.VirtualServerSpecExpand, len(vss)),
}

for i, vs := range vss {
front := types.NewRealServerFront()
Expand All @@ -80,20 +104,32 @@ func (h *getVsVipPort) Handle(params apiVs.GetVsVipPortParams) middleware.Respon
h.logger.Info("Get real server list of virtual server success.", "ID", vs.ID(), "rss", rss)

vsModel := vs.GetModel()
shareSnapshot.ServiceUpsert(vsModel)
// vsModel.Version = shareSnapshot.ServiceVersion(vs.ID())
vsModels.Items[i] = vsModel
vsStats := (*types.ServerStats)(vsModels.Items[i].Stats)
vsModels.Items[i].RSs = new(models.RealServerExpandList)
vsModels.Items[i].RSs.Items = make([]*models.RealServerSpecExpand, len(rss))
vsStats := (*types.ServerStats)(vsModel.Stats)
vsModel.RSs = new(models.RealServerExpandList)
vsModel.RSs.Items = make([]*models.RealServerSpecExpand, len(rss))

for j, rs := range rss {
rsModel := rs.GetModel()
rsStats := (*types.ServerStats)(rsModel.Stats)
vsModels.Items[i].RSs.Items[j] = rsModel
vsModel.RSs.Items[j] = rsModel
vsStats.Increase(rsStats)
}
}

if vaild {
targetModels := &models.VirtualServerList{
Items: make([]*models.VirtualServerSpecExpand, 1),
}

for _, vsModel := range vsModels.Items {
typesVsModel := (*types.VirtualServerSpecExpandModel)(vsModel)
if strings.EqualFold(spec.ID(), typesVsModel.ID()) {
targetModels.Items[0] = vsModel
return apiVs.NewGetVsVipPortOK().WithPayload(targetModels)
}
}
}

return apiVs.NewGetVsVipPortOK().WithPayload(vsModels)
}
55 changes: 52 additions & 3 deletions tools/dpvs-agent/cmd/ipvs/post_vs_vip_port_rs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package ipvs

import (
// "github.com/dpvs-agent/models"
"strings"

"github.com/dpvs-agent/models"
"github.com/dpvs-agent/pkg/ipc/pool"
"github.com/dpvs-agent/pkg/ipc/types"
"github.com/dpvs-agent/pkg/settings"
Expand Down Expand Up @@ -46,6 +48,10 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond
return apiVs.NewPostVsVipPortRsInvalidFrontend()
}

if params.Rss == nil || params.Rss.Items == nil {
return apiVs.NewPostVsVipPortRsInvalidFrontend()
}

rss := make([]*types.RealServerSpec, len(params.Rss.Items))
for i, rs := range params.Rss.Items {
var fwdmode types.DpvsFwdMode
Expand All @@ -56,15 +62,58 @@ func (h *postVsRs) Handle(params apiVs.PostVsVipPortRsParams) middleware.Respond
rss[i].SetWeight(uint32(rs.Weight))
rss[i].SetProto(front.GetProto())
rss[i].SetAddr(rs.IP)
rss[i].SetInhibited(rs.Inhibited)
rss[i].SetOverloaded(rs.Overloaded)
rss[i].SetFwdMode(fwdmode)
// NOTE: inhibited set by healthcheck module with API /vs/${ID}/rs/health only
// we clear it default
inhibited := false
if rs.Inhibited != nil {
inhibited = *rs.Inhibited
}
rss[i].SetInhibited(&inhibited)
}

shareSnapshot := settings.ShareSnapshot()
if shareSnapshot.ServiceLock(params.VipPort) {
defer shareSnapshot.ServiceUnlock(params.VipPort)
}

result := front.Update(rss, h.connPool, h.logger)
switch result {
case types.EDPVS_EXIST, types.EDPVS_OK:
settings.ShareSnapshot().ServiceVersionUpdate(params.VipPort, h.logger)
// Update Snapshot
vsModel := shareSnapshot.ServiceGet(params.VipPort)
if vsModel == nil {
spec := types.NewVirtualServerSpec()
err := spec.ParseVipPortProto(params.VipPort)
if err != nil {
h.logger.Warn("Convert to virtual server failed.", "VipPort", params.VipPort, "Error", err.Error())
// FIXME return
}
vss, err := spec.Get(h.connPool, h.logger)
if err != nil {
h.logger.Error("Get virtual server failed.", "svc VipPort", params.VipPort, "Error", err.Error())
// FIXME return
}

for _, vs := range vss {
if strings.EqualFold(vs.ID(), spec.ID()) {
shareSnapshot.ServiceAdd(vs)
break
}
}
} else {
vsModel.RSs = &models.RealServerExpandList{
Items: make([]*models.RealServerSpecExpand, len(rss)),
}

for i, rs := range rss {
vsModel.RSs.Items[i] = rs.GetModel()
}
}

shareSnapshot.ServiceVersionUpdate(params.VipPort, h.logger)

h.logger.Info("Set real server to virtual server success.", "VipPort", params.VipPort, "rss", rss, "result", result.String())
return apiVs.NewPostVsVipPortRsOK()
default:
Expand Down
Loading

0 comments on commit 3ec1d59

Please sign in to comment.