From 6c200760c4bddfb37f0f4f171bf6f13104af9bab Mon Sep 17 00:00:00 2001 From: "Yunkon (Alvin) Kim" Date: Mon, 14 Mar 2022 23:07:43 +0900 Subject: [PATCH] Fix abnormal networking rule sychronization - Synchonize the other hosts' networking rule with distributed lock - Put host network information by each host - Allocate or update the network rule for a host with distributed lock - Add new etcd key for distributed-lock --- poc-cb-net/cmd/agent/agent.go | 106 ++++++++++-------- poc-cb-net/cmd/controller/controller.go | 41 ++++--- poc-cb-net/internal/cb-network/cb-network.go | 4 + poc-cb-net/internal/etcd-key/etcd-key.go | 3 + .../scripts/1.deploy-cb-network-agent.sh | 2 +- .../scripts/2.re-deploy-cb-network-agent.sh | 2 +- 6 files changed, 94 insertions(+), 64 deletions(-) diff --git a/poc-cb-net/cmd/agent/agent.go b/poc-cb-net/cmd/agent/agent.go index ce2028b..010576b 100644 --- a/poc-cb-net/cmd/agent/agent.go +++ b/poc-cb-net/cmd/agent/agent.go @@ -20,6 +20,7 @@ import ( "github.com/go-ping/ping" "github.com/sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" ) // CBNet represents a network for the multi-cloud. @@ -110,13 +111,19 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl // Start the cb-network go CBNet.Startup() + // Wait until the goroutine is started + time.Sleep(200 * time.Millisecond) // Watch the networking rule to update dynamically go watchNetworkingRule(etcdClient) + // Wait until the goroutine is started + time.Sleep(200 * time.Millisecond) // Watch the other agents' secrets (RSA public keys) if CBNet.IsEncryptionEnabled() { go watchSecret(etcdClient) + // Wait until the goroutine is started + time.Sleep(200 * time.Millisecond) } // Sleep until the all routines are ready @@ -126,10 +133,9 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl if CBNet.IsEncryptionEnabled() { compareAndSwapSecret(etcdClient) } - //time.Sleep(2 * time.Second) // Try Compare-And-Swap (CAS) a host-network-information by cladnetID and hostId - compareAndSwapHostNetworkInformation(etcdClient) + initializeAgent(etcdClient) case "check-connectivity": checkConnectivity(commandOption, etcdClient) @@ -177,12 +183,59 @@ func watchNetworkingRule(etcdClient *clientv3.Client) { CBLogger.Debug("End.........") } -func compareAndSwapHostNetworkInformation(etcdClient *clientv3.Client) { +func initializeAgent(etcdClient *clientv3.Client) { CBLogger.Debug("Start.........") cladnetID := CBNet.ID hostID := CBNet.HostID + // Create a sessions to acquire a lock + session, _ := concurrency.NewSession(etcdClient) + defer session.Close() + + keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + cladnetID) + + lock := concurrency.NewMutex(session, keyPrefix) + ctx := context.TODO() + + // Acquire lock (or wait to have it) + CBLogger.Debug("Acquire a lock") + if err := lock.Lock(ctx); err != nil { + CBLogger.Error(err) + } + CBLogger.Tracef("Acquired lock for '%s'", keyPrefix) + + // Get the networking rule + keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID) + CBLogger.Debugf("Get - %v", keyNetworkingRule) + resp, etcdErr := etcdClient.Get(context.Background(), keyNetworkingRule, clientv3.WithPrefix()) + CBLogger.Tracef("[GetResponse] Networking rule: %v", resp) + if etcdErr != nil { + CBLogger.Error(etcdErr) + } + + // Set the other hosts' networking rule + for _, kv := range resp.Kvs { + // Key + key := string(kv.Key) + CBLogger.Tracef("Key: %v", key) + + // Value + peerBytes := kv.Value + var peer model.Peer + if err := json.Unmarshal(peerBytes, &peer); err != nil { + CBLogger.Error(err) + } + CBLogger.Tracef("A host's configuration: %v", peer) + + if peer.HostID != hostID { + // Update a host's configuration in the networking rule + CBLogger.Debug("Update a host's configuration") + CBNet.UpdatePeer(peerBytes) + } + } + + // Get this host's network information CBLogger.Debug("Get the host network information") CBNet.UpdateHostNetworkInformation() temp := CBNet.GetHostNetworkInformation() @@ -190,53 +243,18 @@ func compareAndSwapHostNetworkInformation(etcdClient *clientv3.Client) { currentHostNetworkInformation := string(currentHostNetworkInformationBytes) CBLogger.Trace(currentHostNetworkInformation) - CBLogger.Debug("Compare-And-Swap (CAS) the host network information") - keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID) keyHostNetworkInformation := fmt.Sprint(etcdkey.HostNetworkInformation + "/" + cladnetID + "/" + hostID) - // NOTICE: "!=" doesn't work..... It might be a temporal issue. - txnResp, err := etcdClient.Txn(context.Background()). - If(clientv3.Compare(clientv3.Value(keyHostNetworkInformation), "=", currentHostNetworkInformation)). - Then(clientv3.OpGet(keyNetworkingRule, clientv3.WithPrefix())). - Else(clientv3.OpPut(keyHostNetworkInformation, currentHostNetworkInformation)). - Commit() - if err != nil { + if _, err := etcdClient.Put(context.TODO(), keyHostNetworkInformation, currentHostNetworkInformation); err != nil { CBLogger.Error(err) } - CBLogger.Tracef("Transaction Response: %v", txnResp) - // The CAS would be succeeded if the prev host network information and current host network information are same. - // Then the networking rule will be returned. (The above "watch" will not be performed.) - // If not, the host tries to put the current host network information. - if txnResp.Succeeded { - // Set the networking rule to the host - for _, kv := range txnResp.Responses[0].GetResponseRange().Kvs { - key := string(kv.Key) - CBLogger.Tracef("Key: %v", key) - - peerBytes := kv.Value - CBLogger.Tracef("A host's configuration: %v", peerBytes) - CBLogger.Debug("Update a host's configuration") - - // Update a host's configuration in the networking rule - isThisPeerInitialized := CBNet.UpdatePeer(peerBytes) - - if isThisPeerInitialized { - var peer model.Peer - if err := json.Unmarshal(peerBytes, &peer); err != nil { - CBLogger.Error(err) - } - peer.State = model.Running - - CBLogger.Debugf("Put \"%v\"", key) - doc, _ := json.Marshal(peer) - - if _, err := etcdClient.Put(context.TODO(), key, string(doc)); err != nil { - CBLogger.Error(err) - } - } - } + // Release lock + CBLogger.Debug("Release a lock") + if err := lock.Unlock(ctx); err != nil { + CBLogger.Error(err) } + CBLogger.Tracef("Released lock for '%s'", keyPrefix) CBLogger.Debug("End.........") } diff --git a/poc-cb-net/cmd/controller/controller.go b/poc-cb-net/cmd/controller/controller.go index f85e7d2..e824e07 100644 --- a/poc-cb-net/cmd/controller/controller.go +++ b/poc-cb-net/cmd/controller/controller.go @@ -22,6 +22,7 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" ) // CBLogger represents a logger to show execution processes according to the logging level. @@ -126,9 +127,9 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client // Watch "/registry/cloud-adaptive-network/host-network-information" CBLogger.Debugf("Start to watch \"%v\"", etcdkey.HostNetworkInformation) - // // Create a session to acruie a lock - // session, _ := concurrency.NewSession(etcdClient) - // defer session.Close() + // Create a session to acquire a lock + session, _ := concurrency.NewSession(etcdClient) + defer session.Close() watchChan2 := etcdClient.Watch(context.Background(), etcdkey.HostNetworkInformation, clientv3.WithPrefix()) for watchResponse := range watchChan2 { @@ -165,18 +166,21 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client CBLogger.Error(err) } - // Create a key of host in the specific CLADNet's networking rule - keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID) + // Prepare lock + keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + parsedCLADNetID) - // // Needed?? (not sure yet) - // lock := concurrency.NewMutex(session, keyNetworkingRuleOfHost) - // ctx := context.TODO() + lock := concurrency.NewMutex(session, keyPrefix) + ctx := context.TODO() + + // Acquire a lock to protect a networking rule + CBLogger.Debug("Acquire a lock") + if err := lock.Lock(ctx); err != nil { + CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyPrefix, err) + } + CBLogger.Tracef("Acquired lock for '%s'", keyPrefix) - // // Acquire a lock to protect a networking rule - // if err := lock.Lock(ctx); err != nil { - // CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyNetworkingRuleOfHost, err) - // } - // CBLogger.Debugf("Acquired lock for '%v'", keyNetworkingRuleOfHost) + // Create a key of host in the specific CLADNet's networking rule + keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID) // Get a host's networking rule CBLogger.Tracef("Key: %v", keyNetworkingRuleOfPeer) @@ -225,11 +229,12 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client CBLogger.Error(err) } - // // Release a lock to protect a networking rule - // if err := lock.Unlock(ctx); err != nil { - // CBLogger.Errorf("Cannot release lock for '%v', error: %v", keyNetworkingRuleOfHost, err) - // } - // CBLogger.Debugf("Released lock for '%v'", keyNetworkingRuleOfHost) + // Release a lock to protect a networking rule + CBLogger.Debug("Release a lock") + if err := lock.Unlock(ctx); err != nil { + CBLogger.Error(err) + } + CBLogger.Tracef("Released lock for '%s'", keyPrefix) } case mvccpb.DELETE: // The watched key has been deleted. diff --git a/poc-cb-net/internal/cb-network/cb-network.go b/poc-cb-net/internal/cb-network/cb-network.go index ba03300..8b21554 100644 --- a/poc-cb-net/internal/cb-network/cb-network.go +++ b/poc-cb-net/internal/cb-network/cb-network.go @@ -304,6 +304,10 @@ func (cbnetwork *CBNetwork) UpdatePeer(value []byte) (isThisPeerInitialized bool } cbnetwork.isInterfaceConfigured = true cbnetwork.notificationChannel <- cbnetwork.isInterfaceConfigured + + // Wait until tunneling() is started + time.Sleep(1 * time.Second) + return true } } diff --git a/poc-cb-net/internal/etcd-key/etcd-key.go b/poc-cb-net/internal/etcd-key/etcd-key.go index cd42ebb..f081b5d 100644 --- a/poc-cb-net/internal/etcd-key/etcd-key.go +++ b/poc-cb-net/internal/etcd-key/etcd-key.go @@ -27,4 +27,7 @@ const ( // Secret is a constant variable of "/registry/cloud-adaptive-network/secret" key Secret = CloudAdaptiveNetwork + "/secret" + + // DistributedLock is a constant variable of "/registry/cloud-adaptive-network/distributed-lock" key + DistributedLock = CloudAdaptiveNetwork + "/distributed-lock" ) diff --git a/poc-cb-net/scripts/1.deploy-cb-network-agent.sh b/poc-cb-net/scripts/1.deploy-cb-network-agent.sh index bef280e..0d7c806 100644 --- a/poc-cb-net/scripts/1.deploy-cb-network-agent.sh +++ b/poc-cb-net/scripts/1.deploy-cb-network-agent.sh @@ -85,7 +85,7 @@ cblog: loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop(). ## trace | debug | info | warn | error - loglevel: debug # If loopcheck is true, You can set this online. + loglevel: trace # If loopcheck is true, You can set this online. ## true | false logfile: false diff --git a/poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh b/poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh index 9b89537..a190ab0 100644 --- a/poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh +++ b/poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh @@ -92,7 +92,7 @@ cblog: loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop(). ## trace | debug | info | warn | error - loglevel: debug # If loopcheck is true, You can set this online. + loglevel: trace # If loopcheck is true, You can set this online. ## true | false logfile: false