Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix abnormal networking rule sychronization #242

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 62 additions & 44 deletions poc-cb-net/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-ping/ping"
"github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

// CBNet represents a network for the multi-cloud.
Expand Down Expand Up @@ -110,13 +111,19 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl

// Start the cb-network
go CBNet.Startup()
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)

// Watch the networking rule to update dynamically
go watchNetworkingRule(etcdClient)
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)

// Watch the other agents' secrets (RSA public keys)
if CBNet.IsEncryptionEnabled() {
go watchSecret(etcdClient)
// Wait until the goroutine is started
time.Sleep(200 * time.Millisecond)
}

// Sleep until the all routines are ready
Expand All @@ -126,10 +133,9 @@ func handleCommand(command string, commandOption string, etcdClient *clientv3.Cl
if CBNet.IsEncryptionEnabled() {
compareAndSwapSecret(etcdClient)
}
//time.Sleep(2 * time.Second)

// Try Compare-And-Swap (CAS) a host-network-information by cladnetID and hostId
compareAndSwapHostNetworkInformation(etcdClient)
initializeAgent(etcdClient)

case "check-connectivity":
checkConnectivity(commandOption, etcdClient)
Expand Down Expand Up @@ -177,66 +183,78 @@ func watchNetworkingRule(etcdClient *clientv3.Client) {
CBLogger.Debug("End.........")
}

func compareAndSwapHostNetworkInformation(etcdClient *clientv3.Client) {
func initializeAgent(etcdClient *clientv3.Client) {
CBLogger.Debug("Start.........")

cladnetID := CBNet.ID
hostID := CBNet.HostID

// Create a sessions to acquire a lock
session, _ := concurrency.NewSession(etcdClient)
defer session.Close()

keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + cladnetID)

lock := concurrency.NewMutex(session, keyPrefix)
ctx := context.TODO()

// Acquire lock (or wait to have it)
CBLogger.Debug("Acquire a lock")
if err := lock.Lock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Acquired lock for '%s'", keyPrefix)

// Get the networking rule
keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)
CBLogger.Debugf("Get - %v", keyNetworkingRule)
resp, etcdErr := etcdClient.Get(context.Background(), keyNetworkingRule, clientv3.WithPrefix())
CBLogger.Tracef("[GetResponse] Networking rule: %v", resp)
if etcdErr != nil {
CBLogger.Error(etcdErr)
}

// Set the other hosts' networking rule
for _, kv := range resp.Kvs {
// Key
key := string(kv.Key)
CBLogger.Tracef("Key: %v", key)

// Value
peerBytes := kv.Value
var peer model.Peer
if err := json.Unmarshal(peerBytes, &peer); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("A host's configuration: %v", peer)

if peer.HostID != hostID {
// Update a host's configuration in the networking rule
CBLogger.Debug("Update a host's configuration")
CBNet.UpdatePeer(peerBytes)
}
}

// Get this host's network information
CBLogger.Debug("Get the host network information")
CBNet.UpdateHostNetworkInformation()
temp := CBNet.GetHostNetworkInformation()
currentHostNetworkInformationBytes, _ := json.Marshal(temp)
currentHostNetworkInformation := string(currentHostNetworkInformationBytes)
CBLogger.Trace(currentHostNetworkInformation)

CBLogger.Debug("Compare-And-Swap (CAS) the host network information")
keyNetworkingRule := fmt.Sprint(etcdkey.NetworkingRule + "/" + cladnetID)
keyHostNetworkInformation := fmt.Sprint(etcdkey.HostNetworkInformation + "/" + cladnetID + "/" + hostID)
// NOTICE: "!=" doesn't work..... It might be a temporal issue.
txnResp, err := etcdClient.Txn(context.Background()).
If(clientv3.Compare(clientv3.Value(keyHostNetworkInformation), "=", currentHostNetworkInformation)).
Then(clientv3.OpGet(keyNetworkingRule, clientv3.WithPrefix())).
Else(clientv3.OpPut(keyHostNetworkInformation, currentHostNetworkInformation)).
Commit()

if err != nil {
if _, err := etcdClient.Put(context.TODO(), keyHostNetworkInformation, currentHostNetworkInformation); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Transaction Response: %v", txnResp)

// The CAS would be succeeded if the prev host network information and current host network information are same.
// Then the networking rule will be returned. (The above "watch" will not be performed.)
// If not, the host tries to put the current host network information.
if txnResp.Succeeded {
// Set the networking rule to the host
for _, kv := range txnResp.Responses[0].GetResponseRange().Kvs {
key := string(kv.Key)
CBLogger.Tracef("Key: %v", key)

peerBytes := kv.Value
CBLogger.Tracef("A host's configuration: %v", peerBytes)
CBLogger.Debug("Update a host's configuration")

// Update a host's configuration in the networking rule
isThisPeerInitialized := CBNet.UpdatePeer(peerBytes)

if isThisPeerInitialized {
var peer model.Peer
if err := json.Unmarshal(peerBytes, &peer); err != nil {
CBLogger.Error(err)
}
peer.State = model.Running

CBLogger.Debugf("Put \"%v\"", key)
doc, _ := json.Marshal(peer)

if _, err := etcdClient.Put(context.TODO(), key, string(doc)); err != nil {
CBLogger.Error(err)
}
}
}
// Release lock
CBLogger.Debug("Release a lock")
if err := lock.Unlock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Released lock for '%s'", keyPrefix)

CBLogger.Debug("End.........")
}
Expand Down
41 changes: 23 additions & 18 deletions poc-cb-net/cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

// CBLogger represents a logger to show execution processes according to the logging level.
Expand Down Expand Up @@ -126,9 +127,9 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
// Watch "/registry/cloud-adaptive-network/host-network-information"
CBLogger.Debugf("Start to watch \"%v\"", etcdkey.HostNetworkInformation)

// // Create a session to acruie a lock
// session, _ := concurrency.NewSession(etcdClient)
// defer session.Close()
// Create a session to acquire a lock
session, _ := concurrency.NewSession(etcdClient)
defer session.Close()

watchChan2 := etcdClient.Watch(context.Background(), etcdkey.HostNetworkInformation, clientv3.WithPrefix())
for watchResponse := range watchChan2 {
Expand Down Expand Up @@ -165,18 +166,21 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
CBLogger.Error(err)
}

// Create a key of host in the specific CLADNet's networking rule
keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID)
// Prepare lock
keyPrefix := fmt.Sprint(etcdkey.DistributedLock + "/" + parsedCLADNetID)

// // Needed?? (not sure yet)
// lock := concurrency.NewMutex(session, keyNetworkingRuleOfHost)
// ctx := context.TODO()
lock := concurrency.NewMutex(session, keyPrefix)
ctx := context.TODO()

// Acquire a lock to protect a networking rule
CBLogger.Debug("Acquire a lock")
if err := lock.Lock(ctx); err != nil {
CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyPrefix, err)
}
CBLogger.Tracef("Acquired lock for '%s'", keyPrefix)

// // Acquire a lock to protect a networking rule
// if err := lock.Lock(ctx); err != nil {
// CBLogger.Errorf("Could NOT acquire lock for '%v', error: %v", keyNetworkingRuleOfHost, err)
// }
// CBLogger.Debugf("Acquired lock for '%v'", keyNetworkingRuleOfHost)
// Create a key of host in the specific CLADNet's networking rule
keyNetworkingRuleOfPeer := fmt.Sprint(etcdkey.NetworkingRule + "/" + parsedCLADNetID + "/" + parsedHostID)

// Get a host's networking rule
CBLogger.Tracef("Key: %v", keyNetworkingRuleOfPeer)
Expand Down Expand Up @@ -225,11 +229,12 @@ func watchHostNetworkInformation(wg *sync.WaitGroup, etcdClient *clientv3.Client
CBLogger.Error(err)
}

// // Release a lock to protect a networking rule
// if err := lock.Unlock(ctx); err != nil {
// CBLogger.Errorf("Cannot release lock for '%v', error: %v", keyNetworkingRuleOfHost, err)
// }
// CBLogger.Debugf("Released lock for '%v'", keyNetworkingRuleOfHost)
// Release a lock to protect a networking rule
CBLogger.Debug("Release a lock")
if err := lock.Unlock(ctx); err != nil {
CBLogger.Error(err)
}
CBLogger.Tracef("Released lock for '%s'", keyPrefix)
}

case mvccpb.DELETE: // The watched key has been deleted.
Expand Down
4 changes: 4 additions & 0 deletions poc-cb-net/internal/cb-network/cb-network.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ func (cbnetwork *CBNetwork) UpdatePeer(value []byte) (isThisPeerInitialized bool
}
cbnetwork.isInterfaceConfigured = true
cbnetwork.notificationChannel <- cbnetwork.isInterfaceConfigured

// Wait until tunneling() is started
time.Sleep(1 * time.Second)

return true
}
}
Expand Down
3 changes: 3 additions & 0 deletions poc-cb-net/internal/etcd-key/etcd-key.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ const (

// Secret is a constant variable of "/registry/cloud-adaptive-network/secret" key
Secret = CloudAdaptiveNetwork + "/secret"

// DistributedLock is a constant variable of "/registry/cloud-adaptive-network/distributed-lock" key
DistributedLock = CloudAdaptiveNetwork + "/distributed-lock"
)
2 changes: 1 addition & 1 deletion poc-cb-net/scripts/1.deploy-cb-network-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ cblog:
loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop().

## trace | debug | info | warn | error
loglevel: debug # If loopcheck is true, You can set this online.
loglevel: trace # If loopcheck is true, You can set this online.

## true | false
logfile: false
Expand Down
2 changes: 1 addition & 1 deletion poc-cb-net/scripts/2.re-deploy-cb-network-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ cblog:
loopcheck: true # This temp method for development is busy wait. cf) cblogger.go:levelSetupLoop().

## trace | debug | info | warn | error
loglevel: debug # If loopcheck is true, You can set this online.
loglevel: trace # If loopcheck is true, You can set this online.

## true | false
logfile: false
Expand Down