Skip to content
This repository has been archived by the owner on Sep 16, 2019. It is now read-only.

Commit

Permalink
Merge pull request #52 from zalando-incubator/feat/mock-aws
Browse files Browse the repository at this point in the history
Feat/mock aws
  • Loading branch information
linki committed Jan 9, 2017
2 parents c250d6f + a3e63a5 commit b562018
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 358 deletions.
151 changes: 124 additions & 27 deletions consumers/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumers

import (
"errors"
"fmt"

"gopkg.in/alecthomas/kingpin.v2"

Expand All @@ -16,43 +17,45 @@ import (
awsclient "github.com/zalando-incubator/mate/pkg/aws"
)

// Implementations provide access to AWS Route53 API's
// required calls.
type AWSClient interface {
ListRecordSets(zoneID string) ([]*route53.ResourceRecordSet, error)
ChangeRecordSets(upsert, del, create []*route53.ResourceRecordSet, zoneID string) error
EndpointsToAlias(endpoints []*pkg.Endpoint) ([]*route53.ResourceRecordSet, error)
RecordInfo(records []*route53.ResourceRecordSet) map[string]*pkg.RecordInfo
GetGroupID() string
GetAssignedTXTRecordObject(record *route53.ResourceRecordSet) *route53.ResourceRecordSet
GetCanonicalZoneIDs(lbDNS []string) (map[string]string, error)
GetHostedZones() (map[string]string, error)
}

type awsClient struct {
client AWSClient
type awsConsumer struct {
groupID string
client AWSClient
}

const (
evaluateTargetHealth = true
defaultTxtTTL = int64(300)
)

func init() {
kingpin.Flag("aws-record-group-id", "Identifier to filter the mate records ").StringVar(&params.awsGroupID)
}

// NewAWSRoute53 reates a Consumer instance to sync and process DNS
// NewAWSConsumer reates a Consumer instance to sync and process DNS
// entries in AWS Route53.
func NewAWSRoute53() (Consumer, error) {
func NewAWSConsumer() (Consumer, error) {
if params.awsGroupID == "" {
return nil, errors.New("please provide --aws-record-group-id")
}
return withClient(awsclient.New(awsclient.Options{
GroupID: params.awsGroupID,
})), nil
return withClient(awsclient.New(awsclient.Options{}), params.awsGroupID), nil
}

func withClient(c AWSClient) *awsClient {
return &awsClient{c}
func withClient(c AWSClient, groupID string) *awsConsumer {
return &awsConsumer{
groupID: groupID,
client: c,
}
}

func (a *awsClient) Sync(endpoints []*pkg.Endpoint) error {
newAliasRecords, err := a.client.EndpointsToAlias(endpoints)
func (a *awsConsumer) Sync(endpoints []*pkg.Endpoint) error {
newAliasRecords, err := a.endpointsToAlias(endpoints)
if err != nil {
return err
}
Expand Down Expand Up @@ -93,13 +96,13 @@ func (a *awsClient) Sync(endpoints []*pkg.Endpoint) error {
return nil
}

func (a *awsClient) syncPerHostedZone(newAliasRecords []*route53.ResourceRecordSet, zoneID string) error {
func (a *awsConsumer) syncPerHostedZone(newAliasRecords []*route53.ResourceRecordSet, zoneID string) error {
existingRecords, err := a.client.ListRecordSets(zoneID)
if err != nil {
return err
}

recordInfoMap := a.client.RecordInfo(existingRecords)
recordInfoMap := a.recordInfo(existingRecords)

var upsert, del []*route53.ResourceRecordSet

Expand All @@ -108,27 +111,27 @@ func (a *awsClient) syncPerHostedZone(newAliasRecords []*route53.ResourceRecordS
existingRecordInfo, exist := recordInfoMap[aws.StringValue(newAliasRecord.Name)]

if !exist { //record does not exist, create it
newTXTRecord := a.client.GetAssignedTXTRecordObject(newAliasRecord)
newTXTRecord := a.getAssignedTXTRecordObject(newAliasRecord)
upsert = append(upsert, newAliasRecord, newTXTRecord)
continue
}

if existingRecordInfo.GroupID != a.client.GetGroupID() { // there exist a record with a different or empty group ID
if existingRecordInfo.GroupID != a.getGroupID() { // there exist a record with a different or empty group ID
log.Warnf("Skipping record %s: with a group ID: %s", aws.StringValue(newAliasRecord.Name), existingRecordInfo.GroupID)
continue
}

// make sure record only updated when target changes, not to spam AWS route53 API with dummy updates
if pkg.SanitizeDNSName(existingRecordInfo.Target) != aws.StringValue(newAliasRecord.AliasTarget.DNSName) {
newTXTRecord := a.client.GetAssignedTXTRecordObject(newAliasRecord)
newTXTRecord := a.getAssignedTXTRecordObject(newAliasRecord)
upsert = append(upsert, newAliasRecord, newTXTRecord)
}
}

//find records to be removed
for _, existingRecord := range existingRecords {
recordInfo := recordInfoMap[aws.StringValue(existingRecord.Name)]
if recordInfo.GroupID == a.client.GetGroupID() {
if recordInfo.GroupID == a.getGroupID() {
remove := true
for _, newAliasRecord := range newAliasRecords {
if aws.StringValue(newAliasRecord.Name) == aws.StringValue(existingRecord.Name) {
Expand All @@ -151,7 +154,7 @@ func (a *awsClient) syncPerHostedZone(newAliasRecords []*route53.ResourceRecordS
return nil
}

func (a *awsClient) Consume(endpoints <-chan *pkg.Endpoint, errors chan<- error, done <-chan struct{}, wg *sync.WaitGroup) {
func (a *awsConsumer) Consume(endpoints <-chan *pkg.Endpoint, errors chan<- error, done <-chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

Expand All @@ -178,18 +181,18 @@ func (a *awsClient) Consume(endpoints <-chan *pkg.Endpoint, errors chan<- error,
}
}

func (a *awsClient) Process(endpoint *pkg.Endpoint) error {
func (a *awsConsumer) Process(endpoint *pkg.Endpoint) error {
hostedZonesMap, err := a.client.GetHostedZones()
if err != nil {
return err
}

aliasRecords, err := a.client.EndpointsToAlias([]*pkg.Endpoint{endpoint})
aliasRecords, err := a.endpointsToAlias([]*pkg.Endpoint{endpoint})
if err != nil {
return err
}

create := []*route53.ResourceRecordSet{aliasRecords[0], a.client.GetAssignedTXTRecordObject(aliasRecords[0])}
create := []*route53.ResourceRecordSet{aliasRecords[0], a.getAssignedTXTRecordObject(aliasRecords[0])}

zoneID := getZoneIDForEndpoint(hostedZonesMap, aliasRecords[0])
if zoneID == "" {
Expand Down Expand Up @@ -218,3 +221,97 @@ func getZoneIDForEndpoint(hostedZonesMap map[string]string, record *route53.Reso
}
return match
}

//getGroupID returns the idenitifier for AWS records as stored in TXT records
func (a *awsConsumer) getGroupID() string {
return fmt.Sprintf("\"mate:%s\"", a.groupID)
}

//getAssignedTXTRecordObject returns the TXT record which accompanies the Alias record
func (a *awsConsumer) getAssignedTXTRecordObject(aliasRecord *route53.ResourceRecordSet) *route53.ResourceRecordSet {
return &route53.ResourceRecordSet{
Type: aws.String("TXT"),
Name: aliasRecord.Name,
TTL: aws.Int64(defaultTxtTTL),
ResourceRecords: []*route53.ResourceRecord{{
Value: aws.String(a.getGroupID()),
}},
}
}

//recordInfo returns the map of record assigned dns to its target and groupID (can be empty)
func (a *awsConsumer) recordInfo(records []*route53.ResourceRecordSet) map[string]*pkg.RecordInfo {
groupIDMap := map[string]string{} //maps dns to group ID

for _, record := range records {
if (aws.StringValue(record.Type)) == "TXT" {
groupIDMap[aws.StringValue(record.Name)] = aws.StringValue(record.ResourceRecords[0].Value)
} else {
if _, exist := groupIDMap[aws.StringValue(record.Name)]; !exist {
groupIDMap[aws.StringValue(record.Name)] = ""
}
}
}

infoMap := map[string]*pkg.RecordInfo{} //maps record DNS to its GroupID (if exists) and Target (LB)
for _, record := range records {
groupID := groupIDMap[aws.StringValue(record.Name)]
if _, exist := infoMap[aws.StringValue(record.Name)]; !exist {
infoMap[aws.StringValue(record.Name)] = &pkg.RecordInfo{
GroupID: groupID,
}
}
if aws.StringValue(record.Type) != "TXT" {
infoMap[aws.StringValue(record.Name)].Target = a.getRecordTarget(record)
}
}

return infoMap
}

//getRecordTarget returns the ELB dns for the given record
func (a *awsConsumer) getRecordTarget(r *route53.ResourceRecordSet) string {
if aws.StringValue(r.Type) == "TXT" {
return ""
}
if r.AliasTarget != nil {
return aws.StringValue(r.AliasTarget.DNSName)
}
return aws.StringValue(r.ResourceRecords[0].Value)
}

//endpointsToAlias converts pkg Endpoint to route53 Alias Records
func (a *awsConsumer) endpointsToAlias(endpoints []*pkg.Endpoint) ([]*route53.ResourceRecordSet, error) {
lbDNS := make([]string, len(endpoints))
for i := range endpoints {
lbDNS[i] = endpoints[i].Hostname
}
zoneIDs, err := a.client.GetCanonicalZoneIDs(lbDNS)
if err != nil {
return nil, err
}
var rset []*route53.ResourceRecordSet

for _, ep := range endpoints {
if loadBalancerZoneID, exist := zoneIDs[ep.Hostname]; exist {
rset = append(rset, a.endpointToAlias(ep, aws.String(loadBalancerZoneID)))
} else {
log.Errorf("Canonical Zone ID for endpoint: %s is not found", ep.Hostname)
}
}
return rset, nil
}

//endpointToAlias convert endpoint to an AWS A Alias record
func (a *awsConsumer) endpointToAlias(ep *pkg.Endpoint, canonicalZoneID *string) *route53.ResourceRecordSet {
rs := &route53.ResourceRecordSet{
Type: aws.String("A"),
Name: aws.String(pkg.SanitizeDNSName(ep.DNSName)),
AliasTarget: &route53.AliasTarget{
DNSName: aws.String(pkg.SanitizeDNSName(ep.Hostname)),
EvaluateTargetHealth: aws.Bool(evaluateTargetHealth),
HostedZoneId: canonicalZoneID,
},
}
return rs
}
Loading

0 comments on commit b562018

Please sign in to comment.