Skip to content
This repository has been archived by the owner on Sep 16, 2019. It is now read-only.

Commit

Permalink
ref: manage goroutines, add done channel, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
linki committed Dec 21, 2016
1 parent 9dd3793 commit 4942ee3
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 274 deletions.
27 changes: 27 additions & 0 deletions consumers/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,33 @@ func (a *awsClient) syncPerHostedZone(newAliasRecords []*route53.ResourceRecordS
return nil
}

func (a *awsClient) Consume(endpoints chan *pkg.Endpoint, errors chan error, done chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

log.Infoln("[AWS] Listening for events...")

for {
select {
case e, ok := <-endpoints:
if !ok {
log.Info("[AWS] channel closed")
return
}

log.Infof("[AWS] Processing (%s, %s, %s)\n", e.DNSName, e.IP, e.Hostname)

err := a.Process(e)
if err != nil {
errors <- err
}
case <-done:
log.Info("[AWS] Exited consuming loop.")
return
}
}
}

func (a *awsClient) Process(endpoint *pkg.Endpoint) error {
hostedZonesMap, err := a.client.GetHostedZones()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions consumers/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumers

import (
"fmt"
"sync"

"github.com/zalando-incubator/mate/pkg"
)
Expand All @@ -19,6 +20,7 @@ var params struct {

type Consumer interface {
Sync([]*pkg.Endpoint) error
Consume(chan *pkg.Endpoint, chan error, chan struct{}, *sync.WaitGroup)
Process(*pkg.Endpoint) error
}

Expand Down
28 changes: 28 additions & 0 deletions consumers/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strings"
"sync"

"github.com/zalando-incubator/mate/pkg"

Expand Down Expand Up @@ -128,6 +129,33 @@ func (d *googleDNSConsumer) Sync(endpoints []*pkg.Endpoint) error {
return nil
}

func (d *googleDNSConsumer) Consume(endpoints chan *pkg.Endpoint, errors chan error, done chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

log.Infoln("[Google] Listening for events...")

for {
select {
case e, ok := <-endpoints:
if !ok {
log.Info("[Google] channel closed")
return
}

log.Infof("[Google] Processing (%s, %s, %s)\n", e.DNSName, e.IP, e.Hostname)

err := d.Process(e)
if err != nil {
errors <- err
}
case <-done:
log.Info("[Google] Exited consuming loop.")
return
}
}
}

func (d *googleDNSConsumer) Process(endpoint *pkg.Endpoint) error {
change := new(dns.Change)

Expand Down
30 changes: 30 additions & 0 deletions consumers/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package consumers

import (
"fmt"
"sync"

log "github.com/Sirupsen/logrus"

"github.com/zalando-incubator/mate/pkg"
)
Expand All @@ -26,6 +29,33 @@ func (d *stdoutConsumer) Sync(endpoints []*pkg.Endpoint) error {
return nil
}

func (d *stdoutConsumer) Consume(endpoints chan *pkg.Endpoint, errors chan error, done chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

log.Infoln("[Stdout] Listening for events...")

for {
select {
case e, ok := <-endpoints:
if !ok {
log.Info("[Stdout] channel closed")
return
}

log.Infof("[Stdout] Processing (%s, %s, %s)\n", e.DNSName, e.IP, e.Hostname)

err := d.Process(e)
if err != nil {
errors <- err
}
case <-done:
log.Info("[Stdout] Exited consuming loop.")
return
}
}
}

func (d *stdoutConsumer) Process(endpoint *pkg.Endpoint) error {
fmt.Println("process record:", endpoint.DNSName, value(endpoint))
return nil
Expand Down
153 changes: 44 additions & 109 deletions controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -23,11 +22,12 @@ const (
type Controller struct {
producer producers.Producer
consumer consumers.Consumer
options *Options

options *Options

mutex sync.Mutex
wg sync.WaitGroup
results chan *pkg.Endpoint
errors chan error
done chan struct{}
wg sync.WaitGroup
}

type Options struct {
Expand All @@ -47,133 +47,68 @@ func New(producer producers.Producer, consumer consumers.Consumer, options *Opti
producer: producer,
consumer: consumer,
options: options,

results: make(chan *pkg.Endpoint),
errors: make(chan error),
done: make(chan struct{}),
}
}

func (c *Controller) Run() chan error {
errors := make(chan error)

errors1 := c.Synchronize()
errors2 := c.Watch()

c.wg.Add(1)
go c.Synchronize()
go c.Watch()

go func() {
defer c.wg.Done()

for {
select {
case err := <-errors1:
errors <- err
case err := <-errors2:
errors <- err
}
}
}()

return errors
return c.errors
}

func (c *Controller) RunAndWait() {
func (c *Controller) Wait() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

errors := c.Run()
<-signalChan
log.Info("Shutdown signal received, exiting...")
close(c.done)
c.wg.Wait()
}

func (c *Controller) Synchronize() {
c.wg.Add(1)
defer c.wg.Done()

for {
log.Debugf("[Synchronize] Sleeping for %s...", c.options.syncPeriod)
select {
case err := <-errors:
log.Error(err)
case <-signalChan:
log.Info("Shutdown signal received, exiting...")
case <-time.After(c.options.syncPeriod):
case <-c.done:
log.Info("[Synchronize] Exited synchronization loop.")
return
}
}
}

func (c *Controller) Synchronize() chan error {
errors := make(chan error)

c.wg.Add(1)

go func() {
defer c.wg.Done()

for {
log.Infoln("[Synchronize] Synchronizing DNS entries...")

endpoints, err := c.producer.Endpoints()
if err != nil {
errors <- fmt.Errorf("Error getting endpoints from producer: %v", err)
}

err = c.consumer.Sync(endpoints)
if err != nil {
errors <- fmt.Errorf("Error consuming endpoints: %v", err)
}
log.Infoln("[Synchronize] Synchronizing DNS entries...")

log.Infof("[Synchronize] Sleeping for %s...", c.options.syncPeriod)
select {
case <-time.After(c.options.syncPeriod):
}
endpoints, err := c.producer.Endpoints()
if err != nil {
c.errors <- fmt.Errorf("[Synchronize] Error getting endpoints from producer: %v", err)
continue
}
}()

return errors
}

func (c *Controller) Watch() chan error {
errors := make(chan error)

channel, errors1 := c.monitorProducer()
errors2 := c.consumeEndpoints(channel)

c.wg.Add(1)

go func() {
defer c.wg.Done()

for {
select {
case err := <-errors1:
errors <- err
case err := <-errors2:
errors <- err
}
err = c.consumer.Sync(endpoints)
if err != nil {
c.errors <- fmt.Errorf("[Synchronize] Error consuming endpoints: %v", err)
continue
}
}()

return errors
}
}

func (c *Controller) monitorProducer() (chan *pkg.Endpoint, chan error) {
return c.producer.Monitor()
func (c *Controller) Watch() {
go c.monitorProducer()
go c.consumeEndpoints()
}

func (c *Controller) consumeEndpoints(channel chan *pkg.Endpoint) chan error {
errChan := make(chan error)

c.wg.Add(1)

go func() {
defer c.wg.Done()

log.Infoln("[Watch] Listening for events...")

for {
endpoint, ok := <-channel
if !ok {
errChan <- errors.New("[Watch] channel closed")
}

log.Infof("[Watch] Processing (%s, %s, %s)\n", endpoint.DNSName, endpoint.IP, endpoint.Hostname)

err := c.consumer.Process(endpoint)
if err != nil {
errChan <- err
}
}
}()
func (c *Controller) monitorProducer() {
c.producer.Monitor(c.results, c.errors, c.done, &c.wg)
}

return errChan
func (c *Controller) consumeEndpoints() {
c.consumer.Consume(c.results, c.errors, c.done, &c.wg)
}
10 changes: 9 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,13 @@ func main() {
}

ctrl := controller.New(p, c, nil)
ctrl.RunAndWait()
errors := ctrl.Run()

go func() {
for {
log.Error(<-errors)
}
}()

ctrl.Wait()
}
Loading

0 comments on commit 4942ee3

Please sign in to comment.