Skip to content

Commit

Permalink
Merge pull request hudl#55 from seh/schedule-updates-by-vip-address
Browse files Browse the repository at this point in the history
Schedule updates to the set of instances registered with a VIP address
  • Loading branch information
damtur committed Jan 16, 2017
2 parents 999c250 + 72404f4 commit a68cdb4
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 39 deletions.
15 changes: 10 additions & 5 deletions example_appupdate_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package fargo
package fargo_test

// MIT Licensed (see README.md) - Copyright (c) 2013 Hudl <@Hudl>

import (
"fmt"
"time"

"github.com/hudl/fargo"
)

func ExampleEurekaConnection_ScheduleAppUpdates(e *EurekaConnection) {
func ExampleEurekaConnection_ScheduleAppUpdates() {
e := makeConnection()
done := make(chan struct{})
time.AfterFunc(2*time.Minute, func() {
close(done)
Expand All @@ -24,7 +27,8 @@ func ExampleEurekaConnection_ScheduleAppUpdates(e *EurekaConnection) {
fmt.Printf("Done monitoring application %q.\n", name)
}

func ExampleAppSource_Latest(e *EurekaConnection) {
func ExampleAppSource_Latest() {
e := makeConnection()
name := "my_app"
source := e.NewAppSource(name, false)
defer source.Stop()
Expand All @@ -38,11 +42,12 @@ func ExampleAppSource_Latest(e *EurekaConnection) {
}
}

func ExampleAppSource_CopyLatestTo(e *EurekaConnection) {
func ExampleAppSource_CopyLatestTo() {
e := makeConnection()
name := "my_app"
source := e.NewAppSource(name, true)
defer source.Stop()
var app Application
var app fargo.Application
if !source.CopyLatestTo(&app) {
fmt.Printf("No application named %q is available.\n", name)
}
Expand Down
14 changes: 14 additions & 0 deletions example_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package fargo_test

// MIT Licensed (see README.md) - Copyright (c) 2013 Hudl <@Hudl>

import "github.com/hudl/fargo"

func makeConnection() fargo.EurekaConnection {
var c fargo.Config
c.Eureka.ServiceUrls = []string{"http://172.17.0.2:8080/eureka/v2"}
c.Eureka.ConnectTimeoutSeconds = 10
c.Eureka.PollIntervalSeconds = 30
c.Eureka.Retries = 3
return fargo.NewConnFromConfig(c)
}
78 changes: 78 additions & 0 deletions example_vipupdate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fargo_test

// MIT Licensed (see README.md) - Copyright (c) 2013 Hudl <@Hudl>

import (
"context"
"fmt"
"time"

"github.com/hudl/fargo"
)

func ExampleEurekaConnection_ScheduleVIPAddressUpdates_manual() {
e := makeConnection()
done := make(chan struct{})
time.AfterFunc(2*time.Minute, func() {
close(done)
})
vipAddress := "my_vip"
// We only care about those instances that are available to receive requests.
updates, err := e.ScheduleVIPAddressUpdates(vipAddress, true, done, fargo.ThatAreUp, fargo.Shuffled)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Monitoring VIP address %q.\n", vipAddress)
for update := range updates {
if update.Err != nil {
fmt.Printf("Most recent request for VIP address %q's instances failed: %v\n", vipAddress, update.Err)
continue
}
fmt.Printf("VIP address %q has %d instances available.\n", vipAddress, len(update.Instances))
}
fmt.Printf("Done monitoring VIP address %q.\n", vipAddress)
}

func ExampleEurekaConnection_ScheduleVIPAddressUpdates_context() {
e := makeConnection()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
vipAddress := "my_vip"
// Look for instances that are in trouble.
updates, err := e.ScheduleVIPAddressUpdates(vipAddress, true, ctx.Done(), fargo.WithStatus(fargo.DOWN), fargo.WithStatus(fargo.OUTOFSERVICE))
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Monitoring VIP address %q.\n", vipAddress)
for update := range updates {
if update.Err != nil {
fmt.Printf("Most recent request for VIP address %q's instances failed: %v\n", vipAddress, update.Err)
continue
}
fmt.Printf("VIP address %q has %d instances in trouble.\n", vipAddress, len(update.Instances))
}
fmt.Printf("Done monitoring VIP address %q.\n", vipAddress)
}

func ExampleEurekaConnection_ScheduleSecureVIPAddressUpdates_context() {
e := makeConnection()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
svipAddress := "my_vip"
updates, err := e.ScheduleSecureVIPAddressUpdates(svipAddress, true, ctx.Done(), fargo.ThatAreUp)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Monitoring secure VIP address %q.\n", svipAddress)
for update := range updates {
if update.Err != nil {
fmt.Printf("Most recent request for secure VIP address %q's instances failed: %v\n", svipAddress, update.Err)
continue
}
fmt.Printf("Secure VIP address %q has %d instances.\n", svipAddress, len(update.Instances))
}
fmt.Printf("Done monitoring secure VIP address %q.\n", svipAddress)
}
138 changes: 108 additions & 30 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"strconv"
"strings"
"time"
)

func (e *EurekaConnection) generateURL(slugs ...string) string {
Expand Down Expand Up @@ -168,28 +169,20 @@ func WithStatus(status StatusType) InstanceQueryOption {
}
}

func makeUncheckedStatusMatchingOption(status StatusType) InstanceQueryOption {
return func(o *instanceQueryOptions) error {
retainIfStatusIs(status, o)
return nil
}
}

// ThatAreUp restricts the set of instances returned to only those with status UP.
//
// Combining the options produced by this function with those produced by WithStatus applies their
// logical disjunction.
func ThatAreUp() InstanceQueryOption {
return makeUncheckedStatusMatchingOption(UP)
// Combining this function with the options produced by WithStatus applies their logical
// disjunction.
func ThatAreUp(o *instanceQueryOptions) error {
retainIfStatusIs(UP, o)
return nil
}

// Shuffled requests randomizing the order of the sequence of instances returned, using the default
// shared rand.Source.
func Shuffled() InstanceQueryOption {
return func(o *instanceQueryOptions) error {
o.intn = rand.Intn
return nil
}
func Shuffled(o *instanceQueryOptions) error {
o.intn = rand.Intn
return nil
}

// ShuffledWith requests randomizing the order of the sequence of instances returned, using the
Expand Down Expand Up @@ -374,36 +367,121 @@ func (e *EurekaConnection) getInstancesByVIPAddress(addr string, opts instanceQu
return instances, nil
}

func mergeInstanceQueryOptions(defaults instanceQueryOptions, opts []InstanceQueryOption) (instanceQueryOptions, error) {
for _, o := range opts {
if o != nil {
if err := o(&defaults); err != nil {
return instanceQueryOptions{}, err
}
}
}
return defaults, nil
}

func collectInstanceQueryOptions(opts []InstanceQueryOption) (instanceQueryOptions, error) {
return mergeInstanceQueryOptions(instanceQueryOptions{}, opts)
}

// GetInstancesByVIPAddress returns the set of instances registered with the given VIP address,
// potentially filtered per the constraints supplied as options.
//
// NB: The VIP address is case-sensitive, and must match the address used at registration time.
func (e *EurekaConnection) GetInstancesByVIPAddress(addr string, opts ...InstanceQueryOption) ([]*Instance, error) {
var mergedOptions instanceQueryOptions
for _, o := range opts {
if o != nil {
if err := o(&mergedOptions); err != nil {
return nil, err
}
}
options, err := collectInstanceQueryOptions(opts)
if err != nil {
return nil, err
}
return e.getInstancesByVIPAddress(addr, mergedOptions)
return e.getInstancesByVIPAddress(addr, options)
}

// GetInstancesBySecureVIPAddress returns the set of instances registered with the given secure
// VIP address, potentially filtered per the constraints supplied as options.
//
// NB: The secure VIP address is case-sensitive, and must match the address used at registration time.
func (e *EurekaConnection) GetInstancesBySecureVIPAddress(addr string, opts ...InstanceQueryOption) ([]*Instance, error) {
mergedOptions := instanceQueryOptions{secure: true}
for _, o := range opts {
if o != nil {
if err := o(&mergedOptions); err != nil {
return nil, err
options, err := mergeInstanceQueryOptions(instanceQueryOptions{secure: true}, opts)
if err != nil {
return nil, err
}
return e.getInstancesByVIPAddress(addr, options)
}

// InstanceSetUpdate is the outcome of an attempt to get a fresh snapshot of a Eureka VIP address's
// set of instances, together with an error that may have occurred in that attempt. If the Err field
// is nil, the Instances field will be populated—though possibly with an empty set.
type InstanceSetUpdate struct {
Instances []*Instance
Err error
}

func sendVIPAddressUpdatesEvery(d time.Duration, produce func() InstanceSetUpdate, c chan<- InstanceSetUpdate, done <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-done:
close(c)
return
case <-t.C:
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case c <- produce():
default:
}
}
}
return e.getInstancesByVIPAddress(addr, mergedOptions)
}

func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate {
produce := func() InstanceSetUpdate {
instances, err := e.getInstancesByVIPAddress(addr, opts)
return InstanceSetUpdate{instances, err}
}
c := make(chan InstanceSetUpdate, 1)
if await {
c <- produce()
}
go sendVIPAddressUpdatesEvery(time.Duration(e.PollInterval)*time.Second, produce, c, done)
return c
}

// ScheduleVIPAddressUpdates starts polling for updates to the set of instances registered with the
// given Eureka VIP address, potentially filtered per the constraints supplied as options, using the
// connection's configured polling interval as its period. It sends the outcome of each update
// attempt to the returned channel, and continues until the supplied done channel is either closed
// or has a value available. Once done sending updates to the returned channel, it closes it.
//
// If await is true, it sends at least one instance set update outcome to the returned channel
// before returning.
//
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) ScheduleVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) {
options, err := collectInstanceQueryOptions(opts)
if err != nil {
return nil, err
}
return e.scheduleVIPAddressUpdates(addr, await, done, options), nil
}

// ScheduleSecureVIPAddressUpdates starts polling for updates to the set of instances registered
// with the given secure Eureka VIP address, potentially filtered per the constraints supplied as
// options, using the connection's configured polling interval as its period. It sends the outcome
// of each update attempt to the returned channel, and continues until the supplied done channel is
// either closed or has a value available. Once done sending updates to the returned channel, it
// closes it.
//
// If await is true, it sends at least one instance set update outcome to the returned channel
// before returning.
//
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) ScheduleSecureVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) {
options, err := mergeInstanceQueryOptions(instanceQueryOptions{secure: true}, opts)
if err != nil {
return nil, err
}
return e.scheduleVIPAddressUpdates(addr, await, done, options), nil
}

// RegisterInstance will register the given Instance with eureka if it is not already registered,
Expand Down
4 changes: 2 additions & 2 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestInstanceQueryOptions(t *testing.T) {
Convey("A shuffling directive", t, func() {
Convey("using the global Rand instance", func() {
var opts instanceQueryOptions
err := Shuffled()(&opts)
err := Shuffled(&opts)
So(err, ShouldBeNil)
So(opts.intn, ShouldNotBeNil)
So(opts.intn(1), ShouldEqual, 0)
Expand All @@ -107,7 +107,7 @@ func TestInstanceQueryOptions(t *testing.T) {
func TestFilterInstancesInApps(t *testing.T) {
Convey("A predicate should preserve only those instances", t, func() {
Convey("with status UP", func() {
areUp := instancePredicateFrom(t, ThatAreUp())
areUp := instancePredicateFrom(t, ThatAreUp)
Convey("from an empty set of applications", func() {
So(filterInstancesInApps(nil, areUp), ShouldBeEmpty)
})
Expand Down
4 changes: 2 additions & 2 deletions tests/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestGetSingleInstanceByVIPAddress(t *testing.T) {
So(instances, ShouldHaveLength, 1)
So(instances[0].VipAddress, ShouldEqual, vipAddress)
Convey("requesting the instances by that VIP address with status UP should provide that one", func() {
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp())
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp)
So(err, ShouldBeNil)
So(instances, ShouldHaveLength, 1)
So(instances[0].VipAddress, ShouldEqual, vipAddress)
Expand All @@ -165,7 +165,7 @@ func TestGetSingleInstanceByVIPAddress(t *testing.T) {
So(instances, ShouldHaveLength, 1)
Convey("And selecting instances with status UP should provide none", func() {
// Ensure that we tolerate a nil option safely.
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp(), nil)
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp, nil)
So(err, ShouldBeNil)
So(instances, ShouldBeEmpty)
})
Expand Down

0 comments on commit a68cdb4

Please sign in to comment.