Skip to content

Commit

Permalink
Fix abnormal networking rule sychronization
Browse files Browse the repository at this point in the history
- Synchonize the other hosts' networking rule with distributed lock
- Put host network information by each host
- Allocate or update the network rule for a host with distributed lock
- Add new etcd key for distributed-lock
  • Loading branch information
yunkon-kim committed Mar 14, 2022
1 parent 39934d5 commit de6f085
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 64 deletions.
144 changes: 100 additions & 44 deletions poc-cb-net/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-ping/ping"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

// CBNet represents a network for the multi-cloud.
Expand Down Expand Up @@ -110,13 +111,19 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl

// Start the cb-network
go CBNet.Startup()
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)

// Watch the networking rule to update dynamically
go watchNetworkingRule(etcdClient)
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)

// Watch the other agents' secrets (RSA public keys)
if CBNet.IsEncryptionEnabled() {
go watchSecret(etcdClient)
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)
}

// Sleep until the all routines are ready
Expand All @@ -126,10 +133,9 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl
if CBNet.IsEncryptionEnabled() {
compareAndSwapSecret(etcdClient)
}
//time.Sleep(2 * time.Second)

// Try Compare-And-Swap (CAS) a host-network-information by cladnetID and hostId
compareAndSwapHostNetworkInformation(etcdClient)
initializeAgent(etcdClient)

case "check-connectivity":
checkConnectivity(commandOption, etcdClient)
Expand Down Expand Up @@ -177,66 +183,116 @@ func watchNetworkingRule(etcdClient *clientv3.Client) {
CBLogger.Debug("End.........")
}

func compareAndSwapHostNetworkInformation(etcdClient *clientv3.Client) {
func initializeAgent(etcdClient *clientv3.Client) {
CBLogger.Debug("Start.........")

cladnetID := CBNet.ID
hostID := CBNet.HostID

// Create a sessions to acquire a lock
session, _ := concurrency.NewSession(etcdClient)
defer session.Close()

keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + cladnetID)

lock := concurrency.NewMutex(session, keyPrefix)
ctx := context.TODO()

// Acquire lock (or wait to have it)
CBLogger.Debug("Acquire a lock")
if err := lock.Lock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Acquired lock for '%s'", keyPrefix)

// Get the networking rule
keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)
CBLogger.Debugf("Get - %v", keyNetworkingRule)
resp, etcdErr := etcdClient.Get(context.Background(), keyNetworkingRule, clientv3.WithPrefix())
CBLogger.Tracef("[GetResponse] Networking rule: %v", resp)
if etcdErr != nil {
CBLogger.Error(etcdErr)
}

// Set the other hosts' networking rule
for _, kv := range resp.Kvs {
// Key
key := string(kv.Key)
CBLogger.Tracef("Key: %v", key)

// Value
peerBytes := kv.Value
var peer model.Peer
if err := json.Unmarshal(peerBytes, &peer); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("A host's configuration: %v", peer)

if peer.HostID != hostID {
// Update a host's configuration in the networking rule
CBLogger.Debug("Update a host's configuration")
CBNet.UpdatePeer(peerBytes)
}
}

// Get this host's network information
CBLogger.Debug("Get the host network information")
CBNet.UpdateHostNetworkInformation()
temp := CBNet.GetHostNetworkInformation()
currentHostNetworkInformationBytes, _ := json.Marshal(temp)
currentHostNetworkInformation := string(currentHostNetworkInformationBytes)
CBLogger.Trace(currentHostNetworkInformation)

CBLogger.Debug("Compare-And-Swap (CAS) the host network information")
keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)
keyHostNetworkInformation := fmt.Sprint(etcdkey.HostNetworkInformation + "/" + cladnetID + "/" + hostID)
// NOTICE: "!=" doesn't work..... It might be a temporal issue.
txnResp, err := etcdClient.Txn(context.Background()).
If(clientv3.Compare(clientv3.Value(keyHostNetworkInformation), "=", currentHostNetworkInformation)).
Then(clientv3.OpGet(keyNetworkingRule, clientv3.WithPrefix())).
Else(clientv3.OpPut(keyHostNetworkInformation, currentHostNetworkInformation)).
Commit()

if err != nil {
if _, err := etcdClient.Put(context.TODO(), keyHostNetworkInformation, currentHostNetworkInformation); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Transaction Response: %v", txnResp)

// The CAS would be succeeded if the prev host network information and current host network information are same.
// Then the networking rule will be returned. (The above "watch" will not be performed.)
// If not, the host tries to put the current host network information.
if txnResp.Succeeded {
// Set the networking rule to the host
for _, kv := range txnResp.Responses[0].GetResponseRange().Kvs {
key := string(kv.Key)
CBLogger.Tracef("Key: %v", key)

peerBytes := kv.Value
CBLogger.Tracef("A host's configuration: %v", peerBytes)
CBLogger.Debug("Update a host's configuration")

// Update a host's configuration in the networking rule
isThisPeerInitialized := CBNet.UpdatePeer(peerBytes)

if isThisPeerInitialized {
var peer model.Peer
if err := json.Unmarshal(peerBytes, &peer); err != nil {
CBLogger.Error(err)
}
peer.State = model.Running

CBLogger.Debugf("Put \"%v\"", key)
doc, _ := json.Marshal(peer)

if _, err := etcdClient.Put(context.TODO(), key, string(doc)); err != nil {
CBLogger.Error(err)
}
}
}
// CBLogger.Debug("Compare-And-Swap (CAS) the host network information")
// // keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)
// keyHostNetworkInformation := fmt.Sprint(etcdkey.HostNetworkInformation + "/" + cladnetID + "/" + hostID)
// CBLogger.Debugf("CAS - %s", keyHostNetworkInformation)
// // NOTICE: "!=" doesn't work..... It might be a temporal issue.
// txnResp, err := etcdClient.Txn(context.Background()).
// If(clientv3.Compare(clientv3.Value(keyHostNetworkInformation), "=", currentHostNetworkInformation)).
// Then().
// Else(clientv3.OpPut(keyHostNetworkInformation, currentHostNetworkInformation)).
// Commit()

// if err != nil {
// CBLogger.Error(err)
// }
// CBLogger.Tracef("Transaction Response: %v", txnResp)

// if !txnResp.Succeeded {
// // Update a host's configuration in the networking rule
// isThisPeerInitialized := CBNet.UpdatePeer(thisPeerBytes)

// if isThisPeerInitialized {
// var peer model.Peer
// if err := json.Unmarshal(thisPeerBytes, &peer); err != nil {
// CBLogger.Error(err)
// }
// peer.State = model.Running

// keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)

// CBLogger.Debugf("Put \"%v\"", thisKey)
// doc, _ := json.Marshal(peer)

// if _, err := etcdClient.Put(context.TODO(), thisKey, string(doc)); err != nil {
// CBLogger.Error(err)
// }
// }
// }

// Release lock
CBLogger.Debug("Release a lock")
if err := lock.Unlock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Released lock for '%s'", keyPrefix)

CBLogger.Debug("End.........")
}
Expand Down
41 changes: 23 additions & 18 deletions poc-cb-net/cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

// CBLogger represents a logger to show execution processes according to the logging level.
Expand Down Expand Up @@ -126,9 +127,9 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
// Watch "/registry/cloud-adaptive-network/host-network-information"
CBLogger.Debugf("Start to watch \"%v\"", etcdkey.HostNetworkInformation)

// // Create a session to acruie a lock
// session, _ := concurrency.NewSession(etcdClient)
// defer session.Close()
// Create a session to acquire a lock
session, _ := concurrency.NewSession(etcdClient)
defer session.Close()

watchChan2 := etcdClient.Watch(context.Background(), etcdkey.HostNetworkInformation, clientv3.WithPrefix())
for watchResponse := range watchChan2 {
Expand Down Expand Up @@ -165,18 +166,21 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
CBLogger.Error(err)
}

// Create a key of host in the specific CLADNet's networking rule
keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID)
// Prepare lock
keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + parsedCLADNetID)

// // Needed?? (not sure yet)
// lock := concurrency.NewMutex(session, keyNetworkingRuleOfHost)
// ctx := context.TODO()
lock := concurrency.NewMutex(session, keyPrefix)
ctx := context.TODO()

// Acquire a lock to protect a networking rule
CBLogger.Debug("Acquire a lock")
if err := lock.Lock(ctx); err != nil {
CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyPrefix, err)
}
CBLogger.Tracef("Acquired lock for '%s'", keyPrefix)

// // Acquire a lock to protect a networking rule
// if err := lock.Lock(ctx); err != nil {
// CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyNetworkingRuleOfHost, err)
// }
// CBLogger.Debugf("Acquired lock for '%v'", keyNetworkingRuleOfHost)
// Create a key of host in the specific CLADNet's networking rule
keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID)

// Get a host's networking rule
CBLogger.Tracef("Key: %v", keyNetworkingRuleOfPeer)
Expand Down Expand Up @@ -225,11 +229,12 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
CBLogger.Error(err)
}

// // Release a lock to protect a networking rule
// if err := lock.Unlock(ctx); err != nil {
// CBLogger.Errorf("Cannot release lock for '%v', error: %v", keyNetworkingRuleOfHost, err)
// }
// CBLogger.Debugf("Released lock for '%v'", keyNetworkingRuleOfHost)
// Release a lock to protect a networking rule
CBLogger.Debug("Release a lock")
if err := lock.Unlock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Released lock for '%s'", keyPrefix)
}

case mvccpb.DELETE: // The watched key has been deleted.
Expand Down
4 changes: 4 additions & 0 deletions poc-cb-net/internal/cb-network/cb-network.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ func (cbnetwork *CBNetwork) UpdatePeer(value []byte) (isThisPeerInitialized bool
}
cbnetwork.isInterfaceConfigured = true
cbnetwork.notificationChannel <- cbnetwork.isInterfaceConfigured

// Wait until tunneling() is started
time.Sleep(1 * time.Second)

return true
}
}
Expand Down
3 changes: 3 additions & 0 deletions poc-cb-net/internal/etcd-key/etcd-key.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ const (

// Secret is a constant variable of "/registry/cloud-adaptive-network/secret" key
Secret = CloudAdaptiveNetwork + "/secret"

// DistributedLock is a constant variable of "/registry/cloud-adaptive-network/distributed-lock" key
DistributedLock = CloudAdaptiveNetwork + "/distributed-lock"
)
2 changes: 1 addition & 1 deletion poc-cb-net/scripts/1.deploy-cb-network-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ cblog:
loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop().
## trace | debug | info | warn | error
loglevel: debug # If loopcheck is true, You can set this online.
loglevel: trace # If loopcheck is true, You can set this online.
## true | false
logfile: false
Expand Down
2 changes: 1 addition & 1 deletion poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ cblog:
loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop().
## trace | debug | info | warn | error
loglevel: debug # If loopcheck is true, You can set this online.
loglevel: trace # If loopcheck is true, You can set this online.
## true | false
logfile: false
Expand Down

0 comments on commit de6f085

Please sign in to comment.