Skip to content

Commit

Permalink
Merge pull request #192 from keel-hq/feature/stateful_set
Browse files Browse the repository at this point in the history
Feature/stateful set
  • Loading branch information
rusenask committed Apr 20, 2018
2 parents f7f4ed1 + cf8f332 commit 689c921
Show file tree
Hide file tree
Showing 123 changed files with 11,347 additions and 3,139 deletions.
25 changes: 23 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions bot/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/keel-hq/keel/bot/formatter"
"github.com/keel-hq/keel/provider/kubernetes"

"k8s.io/api/extensions/v1beta1"
apps_v1 "k8s.io/api/apps/v1"

log "github.com/sirupsen/logrus"
)
Expand All @@ -19,8 +19,8 @@ type Filter struct {
}

// deployments - gets all deployments
func deployments(k8sImplementer kubernetes.Implementer) ([]v1beta1.Deployment, error) {
deploymentLists := []*v1beta1.DeploymentList{}
func deployments(k8sImplementer kubernetes.Implementer) ([]apps_v1.Deployment, error) {
deploymentLists := []*apps_v1.DeploymentList{}

n, err := k8sImplementer.Namespaces()
if err != nil {
Expand All @@ -39,7 +39,7 @@ func deployments(k8sImplementer kubernetes.Implementer) ([]v1beta1.Deployment, e
deploymentLists = append(deploymentLists, l)
}

impacted := []v1beta1.Deployment{}
impacted := []apps_v1.Deployment{}

for _, deploymentList := range deploymentLists {
for _, deployment := range deploymentList.Items {
Expand Down Expand Up @@ -71,7 +71,7 @@ func DeploymentsResponse(filter Filter, k8sImplementer kubernetes.Implementer) s
return buf.String()
}

func convertToInternal(deployments []v1beta1.Deployment) []formatter.Deployment {
func convertToInternal(deployments []apps_v1.Deployment) []formatter.Deployment {
formatted := []formatter.Deployment{}
for _, d := range deployments {

Expand All @@ -86,7 +86,7 @@ func convertToInternal(deployments []v1beta1.Deployment) []formatter.Deployment
return formatted
}

func getImages(deployment *v1beta1.Deployment) []string {
func getImages(deployment *apps_v1.Deployment) []string {
var images []string
for _, c := range deployment.Spec.Template.Spec.Containers {
images = append(images, c.Image)
Expand Down
73 changes: 41 additions & 32 deletions cmd/keel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

"github.com/keel-hq/keel/constants"
"github.com/keel-hq/keel/extension/notification"
"github.com/keel-hq/keel/internal/k8s"
"github.com/keel-hq/keel/internal/workgroup"
"github.com/keel-hq/keel/provider"
"github.com/keel-hq/keel/provider/helm"
"github.com/keel-hq/keel/provider/kubernetes"
Expand Down Expand Up @@ -133,6 +135,18 @@ func main() {
}).Fatal("main: failed to create kubernetes implementer")
}

var g workgroup.Group

t := &k8s.Translator{
FieldLogger: log.WithField("context", "translator"),
}

buf := k8s.NewBuffer(&g, t, log.StandardLogger(), 128)
wl := log.WithField("context", "watch")
k8s.WatchDeployments(&g, implementer.Client(), wl, buf)
k8s.WatchStatefulSets(&g, implementer.Client(), wl, buf)
k8s.WatchDaemonSets(&g, implementer.Client(), wl, buf)

keelsNamespace := constants.DefaultNamespace
if os.Getenv(EnvNamespace) != "" {
keelsNamespace = os.Getenv(EnvNamespace)
Expand All @@ -147,57 +161,52 @@ func main() {
}

serializer := codecs.DefaultSerializer()
// mem := memory.NewMemoryCache(24*time.Hour, 24*time.Hour, 1*time.Minute)
approvalsManager := approvals.New(kkv, serializer)

go approvalsManager.StartExpiryService(ctx)

// setting up providers
providers := setupProviders(implementer, sender, approvalsManager)

providers := setupProviders(implementer, sender, approvalsManager, &t.GenericResourceCache)
secretsGetter := secrets.NewGetter(implementer)

teardownTriggers := setupTriggers(ctx, providers, secretsGetter, approvalsManager)

bot.Run(implementer, approvalsManager)

signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for _ = range signalChan {
log.Info("received an interrupt, closing connection...")

go func() {
select {
case <-time.After(10 * time.Second):
log.Info("connection shutdown took too long, exiting... ")
close(cleanupDone)
return
case <-cleanupDone:
return
}
}()

// teardownProviders()
providers.Stop()
teardownTriggers()
bot.Stop()

cleanupDone <- true
}
}()

<-cleanupDone

g.Add(func(stop <-chan struct{}) {
go func() {
for range signalChan {
log.Info("received an interrupt, shutting down...")
go func() {
select {
case <-time.After(10 * time.Second):
log.Info("connection shutdown took too long, exiting... ")
close(cleanupDone)
return
case <-cleanupDone:
return
}
}()
providers.Stop()
teardownTriggers()
bot.Stop()

cleanupDone <- true
}
}()
<-cleanupDone
})
g.Run()
}

// setupProviders - setting up available providers. New providers should be initialised here and added to
// provider map
func setupProviders(k8sImplementer kubernetes.Implementer, sender notification.Sender, approvalsManager approvals.Manager) (providers provider.Providers) {
func setupProviders(k8sImplementer kubernetes.Implementer, sender notification.Sender, approvalsManager approvals.Manager, grc *k8s.GenericResourceCache) (providers provider.Providers) {
var enabledProviders []provider.Provider

k8sProvider, err := kubernetes.NewProvider(k8sImplementer, sender, approvalsManager)
k8sProvider, err := kubernetes.NewProvider(k8sImplementer, sender, approvalsManager, grc)
if err != nil {
log.WithFields(log.Fields{
"error": err,
Expand Down
112 changes: 112 additions & 0 deletions internal/k8s/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package k8s

import (
"sort"
"sync"
)

type genericResourceCache struct {
sync.Mutex
values []*GenericResource
}

// GenericResourceCache - storage for generic resources with a rendezvous point for goroutines
// waiting for or announcing the occurence of a cache events.
type GenericResourceCache struct {
genericResourceCache
Cond
}

// Values returns a copy of the contents of the cache.
func (cc *genericResourceCache) Values() []*GenericResource {
cc.Lock()
r := append([]*GenericResource{}, cc.values...)
cc.Unlock()
return r
}

// Add adds an entry to the cache. If a GenericResource with the same
// name exists, it is replaced.
func (cc *genericResourceCache) Add(grs ...*GenericResource) {
if len(grs) == 0 {
return
}
cc.Lock()
sort.Sort(genericResource(cc.values))
for _, gr := range grs {
cc.add(gr)
}
cc.Unlock()
}

// add adds c to the cache. If c is already present, the cached value of c is overwritten.
// invariant: cc.values should be sorted on entry.
func (cc *genericResourceCache) add(c *GenericResource) {
i := sort.Search(len(cc.values), func(i int) bool { return cc.values[i].Identifier >= c.Identifier })
if i < len(cc.values) && cc.values[i].Identifier == c.Identifier {
// c is already present, replace
cc.values[i] = c
} else {
// c is not present, append
cc.values = append(cc.values, c)
// restort to convert append into insert
sort.Sort(genericResource(cc.values))
}
}

// Remove removes the named entry from the cache. If the entry
// is not present in the cache, the operation is a no-op.
func (cc *genericResourceCache) Remove(identifiers ...string) {
if len(identifiers) == 0 {
return
}
cc.Lock()
sort.Sort(genericResource(cc.values))
for _, n := range identifiers {
cc.remove(n)
}
cc.Unlock()
}

// remove removes the named entry from the cache.
// invariant: cc.values should be sorted on entry.
func (cc *genericResourceCache) remove(identifier string) {
i := sort.Search(len(cc.values), func(i int) bool { return cc.values[i].Identifier >= identifier })
if i < len(cc.values) && cc.values[i].Identifier == identifier {
// c is present, remove
cc.values = append(cc.values[:i], cc.values[i+1:]...)
}
}

// Cond implements a condition variable, a rendezvous point for goroutines
// waiting for or announcing the occurence of an event.
type Cond struct {
mu sync.Mutex
waiters []chan int
last int
}

// Register registers ch to receive a value when Notify is called.
func (c *Cond) Register(ch chan int, last int) {
c.mu.Lock()
defer c.mu.Unlock()

if last < c.last {
// notify this channel immediately
ch <- c.last
return
}
c.waiters = append(c.waiters, ch)
}

// Notify notifies all registered waiters that an event has occured.
func (c *Cond) Notify() {
c.mu.Lock()
defer c.mu.Unlock()
c.last++

for _, ch := range c.waiters {
ch <- c.last
}
c.waiters = c.waiters[:0]
}
Loading

0 comments on commit 689c921

Please sign in to comment.