Skip to content

Commit

Permalink
Merge branch 'master' into label-selector
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport authored Jun 10, 2017
2 parents 6367c8a + ca60d99 commit 1306532
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 19 deletions.
47 changes: 35 additions & 12 deletions lib/backend/k8s/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
// Other watcher vars.
var nsChan, poChan, npChan, snpChan, gcChan, poolChan, noChan <-chan watch.Event
var event watch.Event
var kvp *model.KVPair
var opts metav1.ListOptions

log.Info("Starting Kubernetes API read loop")
Expand Down Expand Up @@ -489,7 +488,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it.
if kvp = syn.parsePodEvent(event); kvp != nil {
if kvp := syn.parsePodEvent(event); kvp != nil {
// Only send the update if we care about it. We filter
// out a number of events that aren't useful for us.
latestVersions.podVersion = kvp.Revision.(string)
Expand All @@ -505,7 +504,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseNetworkPolicyEvent(event)
kvp := syn.parseNetworkPolicyEvent(event)
latestVersions.networkPolicyVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_NP)
case event = <-snpChan:
Expand All @@ -520,7 +519,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseSystemNetworkPolicyEvent(event)
kvp := syn.parseSystemNetworkPolicyEvent(event)
latestVersions.systemNetworkPolicyVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_SNP)
case event = <-gcChan:
Expand All @@ -535,7 +534,7 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseGlobalConfigEvent(event)
kvp := syn.parseGlobalConfigEvent(event)
latestVersions.globalConfigVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_GC)
case event = <-poolChan:
Expand All @@ -550,23 +549,30 @@ func (syn *kubeSyncer) readFromKubernetesAPI() {
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseIPPoolEvent(event)
kvp := syn.parseIPPoolEvent(event)
latestVersions.poolVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_IP)
case event = <-noChan:
log.Debugf("Incoming Node watch event. Type=%s", event.Type)
if syn.eventNeedsResync(event) {
syn.needsResync[KEY_NO] = true
syn.needsResync[KEY_HC] = true
continue
} else if syn.eventRestartsWatch(event, KEY_NO) {
syn.needsResync[KEY_NO] = true
syn.needsResync[KEY_HC] = true
syn.closeWatcher(KEY_NO)
continue
}
// Event is OK - parse it and send it over the channel.
kvp = syn.parseNodeEvent(event)
latestVersions.nodeVersion = kvp.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvp}, KEY_NO)
kvpHostIP, kvpIPIPAddr := syn.parseNodeEvent(event)
log.WithFields(log.Fields{
"kvpHostIP": kvpHostIP,
"kvpIPIPAddr": kvpIPIPAddr,
}).Debug("Got node KVs.")
latestVersions.nodeVersion = kvpHostIP.Revision.(string)
syn.sendUpdates([]model.KVPair{*kvpHostIP}, KEY_NO)
syn.sendUpdates([]model.KVPair{*kvpIPIPAddr}, KEY_HC)
}
}
}
Expand Down Expand Up @@ -893,7 +899,7 @@ func (syn *kubeSyncer) parseNamespaceEvent(e watch.Event) []model.KVPair {
return []model.KVPair{*rules, *tags, *labels}
}

func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair {
func (syn *kubeSyncer) parseNodeEvent(e watch.Event) (*model.KVPair, *model.KVPair) {
node, ok := e.Object.(*k8sapi.Node)
if !ok {
log.Panicf("Invalid node event. Type: %s, Object: %+v", e.Type, e.Object)
Expand All @@ -910,11 +916,28 @@ func (syn *kubeSyncer) parseNodeEvent(e watch.Event) *model.KVPair {
Revision: kvp.Revision,
}

kvpIPIPAddr, err := getTunIp(node)
if err != nil || kvpIPIPAddr == nil {
// If we failed to parse, err will be non-nil. If it's missing, kvpIPIPAddr will be nil.
// Either way, generate a delete.
log.WithError(err).WithField("node", node.Name).Info(
"Node has no (or invalid) pod CIDR. (Normal for a new node.)")
kvpIPIPAddr = &model.KVPair{
Key: model.HostConfigKey{
Hostname: node.Name,
Name: "IpInIpTunnelAddr",
},
Value: nil,
}
}
kvpIPIPAddr.Revision = kvp.Revision

if e.Type == watch.Deleted {
kvp.Value = nil
kvpHostIp.Value = nil
kvpIPIPAddr.Value = nil
}

return kvpHostIp
return kvpHostIp, kvpIPIPAddr
}

// parsePodEvent returns a KVPair for the given event. If the event isn't
Expand Down
14 changes: 7 additions & 7 deletions lib/backend/model/keys.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016 Tigera, Inc. All rights reserved.
// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,24 +167,24 @@ func KeyFromDefaultPath(path string) Key {
log.Debugf("Path is a workload endpoint: %v", path)
return WorkloadEndpointKey{
Hostname: m[1],
OrchestratorID: m[2],
WorkloadID: m[3],
EndpointID: m[4],
OrchestratorID: unescapeName(m[2]),
WorkloadID: unescapeName(m[3]),
EndpointID: unescapeName(m[4]),
}
} else if m := matchHostEndpoint.FindStringSubmatch(path); m != nil {
log.Debugf("Path is a host endpoint: %v", path)
return HostEndpointKey{
Hostname: m[1],
EndpointID: m[2],
EndpointID: unescapeName(m[2]),
}
} else if m := matchPolicy.FindStringSubmatch(path); m != nil {
log.Debugf("Path is a policy: %v", path)
return PolicyKey{
Name: m[2],
Name: unescapeName(m[2]),
}
} else if m := matchProfile.FindStringSubmatch(path); m != nil {
log.Debugf("Path is a profile: %v (%v)", path, m[2])
pk := ProfileKey{m[1]}
pk := ProfileKey{unescapeName(m[1])}
switch m[2] {
case "tags":
log.Debugf("Profile tags")
Expand Down
105 changes: 105 additions & 0 deletions lib/backend/model/keys_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model_test

import (
. "github.com/projectcalico/libcalico-go/lib/backend/model"

. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/projectcalico/libcalico-go/lib/net"
)

var _ = DescribeTable(
"key parsing",
func(strKey string, expected Key) {
key := KeyFromDefaultPath(strKey)
Expect(key).To(Equal(expected))
serialized, err := KeyToDefaultPath(expected)
Expect(err).ToNot(HaveOccurred())
Expect(serialized).To(Equal(strKey))
},
Entry(
"profile rules with a /",
"/calico/v1/policy/profile/foo%2fbar/rules",
ProfileRulesKey{ProfileKey: ProfileKey{Name: "foo/bar"}},
),
Entry(
"profile tags with a /",
"/calico/v1/policy/profile/foo%2fbar/tags",
ProfileTagsKey{ProfileKey: ProfileKey{Name: "foo/bar"}},
),
Entry(
"profile labels with a /",
"/calico/v1/policy/profile/foo%2fbar/labels",
ProfileLabelsKey{ProfileKey: ProfileKey{Name: "foo/bar"}},
),
Entry(
"policy with a /",
"/calico/v1/policy/tier/default/policy/biff%2fbop",
PolicyKey{Name: "biff/bop"},
),
Entry(
"workload with a /",
"/calico/v1/host/foobar/workload/open%2fstack/work%2fload/endpoint/end%2fpoint",
WorkloadEndpointKey{
Hostname: "foobar",
OrchestratorID: "open/stack",
WorkloadID: "work/load",
EndpointID: "end/point",
},
),
Entry(
"host endpoint with a /",
"/calico/v1/host/foobar/endpoint/end%2fpoint",
HostEndpointKey{
Hostname: "foobar",
EndpointID: "end/point",
},
),
Entry(
"host IP",
"/calico/v1/host/foobar/bird_ip",
HostIPKey{Hostname: "foobar"},
),
Entry(
"IP pool",
"/calico/v1/ipam/v4/pool/10.0.0.0-8",
IPPoolKey{CIDR: mustParseCIDR("10.0.0.0/8")},
),
Entry(
"global config",
"/calico/v1/config/foo",
GlobalConfigKey{Name: "foo"},
),
Entry(
"host config",
"/calico/v1/host/hostname/config/foo",
HostConfigKey{Hostname: "hostname", Name: "foo"},
),
Entry(
"ready flag",
"/calico/v1/Ready",
ReadyFlagKey{},
),
)

func mustParseCIDR(s string) net.IPNet {
_, ipNet, err := net.ParseCIDR(s)
if err != nil {
panic(err)
}
return *ipNet
}

0 comments on commit 1306532

Please sign in to comment.