diff --git a/lib/backend/k8s/syncer.go b/lib/backend/k8s/syncer.go index 68dfe5e00..729bfa348 100644 --- a/lib/backend/k8s/syncer.go +++ b/lib/backend/k8s/syncer.go @@ -320,7 +320,6 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { // Other watcher vars. var nsChan, poChan, npChan, snpChan, gcChan, poolChan, noChan <-chan watch.Event var event watch.Event - var kvp *model.KVPair var opts metav1.ListOptions log.Info("Starting Kubernetes API read loop") @@ -489,7 +488,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { continue } // Event is OK - parse it. - if kvp = syn.parsePodEvent(event); kvp != nil { + if kvp := syn.parsePodEvent(event); kvp != nil { // Only send the update if we care about it. We filter // out a number of events that aren't useful for us. latestVersions.podVersion = kvp.Revision.(string) @@ -505,7 +504,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { continue } // Event is OK - parse it and send it over the channel. - kvp = syn.parseNetworkPolicyEvent(event) + kvp := syn.parseNetworkPolicyEvent(event) latestVersions.networkPolicyVersion = kvp.Revision.(string) syn.sendUpdates([]model.KVPair{*kvp}, KEY_NP) case event = <-snpChan: @@ -520,7 +519,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { continue } // Event is OK - parse it and send it over the channel. - kvp = syn.parseSystemNetworkPolicyEvent(event) + kvp := syn.parseSystemNetworkPolicyEvent(event) latestVersions.systemNetworkPolicyVersion = kvp.Revision.(string) syn.sendUpdates([]model.KVPair{*kvp}, KEY_SNP) case event = <-gcChan: @@ -535,7 +534,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { continue } // Event is OK - parse it and send it over the channel. - kvp = syn.parseGlobalConfigEvent(event) + kvp := syn.parseGlobalConfigEvent(event) latestVersions.globalConfigVersion = kvp.Revision.(string) syn.sendUpdates([]model.KVPair{*kvp}, KEY_GC) case event = <-poolChan: @@ -550,23 +549,30 @@ func (syn *kubeSyncer) readFromKubernetesAPI() { continue } // Event is OK - parse it and send it over the channel. - kvp = syn.parseIPPoolEvent(event) + kvp := syn.parseIPPoolEvent(event) latestVersions.poolVersion = kvp.Revision.(string) syn.sendUpdates([]model.KVPair{*kvp}, KEY_IP) case event = <-noChan: log.Debugf("Incoming Node watch event. Type=%s", event.Type) if syn.eventNeedsResync(event) { syn.needsResync[KEY_NO] = true + syn.needsResync[KEY_HC] = true continue } else if syn.eventRestartsWatch(event, KEY_NO) { syn.needsResync[KEY_NO] = true + syn.needsResync[KEY_HC] = true syn.closeWatcher(KEY_NO) continue } // Event is OK - parse it and send it over the channel. - kvp = syn.parseNodeEvent(event) - latestVersions.nodeVersion = kvp.Revision.(string) - syn.sendUpdates([]model.KVPair{*kvp}, KEY_NO) + kvpHostIP, kvpIPIPAddr := syn.parseNodeEvent(event) + log.WithFields(log.Fields{ + "kvpHostIP": kvpHostIP, + "kvpIPIPAddr": kvpIPIPAddr, + }).Debug("Got node KVs.") + latestVersions.nodeVersion = kvpHostIP.Revision.(string) + syn.sendUpdates([]model.KVPair{*kvpHostIP}, KEY_NO) + syn.sendUpdates([]model.KVPair{*kvpIPIPAddr}, KEY_HC) } } } @@ -893,7 +899,7 @@ func (syn *kubeSyncer) parseNamespaceEvent(e watch.Event) []model.KVPair { return []model.KVPair{*rules, *tags, *labels} } -func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair { +func (syn *kubeSyncer) parseNodeEvent(e watch.Event) (*model.KVPair, *model.KVPair) { node, ok := e.Object.(*k8sapi.Node) if !ok { log.Panicf("Invalid node event. Type: %s, Object: %+v", e.Type, e.Object) @@ -910,11 +916,28 @@ func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair { Revision: kvp.Revision, } + kvpIPIPAddr, err := getTunIp(node) + if err != nil || kvpIPIPAddr == nil { + // If we failed to parse, err will be non-nil. If it's missing, kvpIPIPAddr will be nil. + // Either way, generate a delete. + log.WithError(err).WithField("node", node.Name).Info( + "Node has no (or invalid) pod CIDR. (Normal for a new node.)") + kvpIPIPAddr = &model.KVPair{ + Key: model.HostConfigKey{ + Hostname: node.Name, + Name: "IpInIpTunnelAddr", + }, + Value: nil, + } + } + kvpIPIPAddr.Revision = kvp.Revision + if e.Type == watch.Deleted { - kvp.Value = nil + kvpHostIp.Value = nil + kvpIPIPAddr.Value = nil } - return kvpHostIp + return kvpHostIp, kvpIPIPAddr } // parsePodEvent returns a KVPair for the given event. If the event isn't