Skip to content

Commit

Permalink
Fix AWS signing
Browse files Browse the repository at this point in the history
This commit should fix AWS signing issues and makes elastic ready to use
with AWS Elasticsearch service.

Close olivere#962
  • Loading branch information
olivere committed Nov 29, 2018
1 parent 0f13c62 commit 6159f8d
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ addons:
services:
- docker
before_install:
- if [[ "$TRAVIS_OS_NAME" == "linux" && ! $(which nc) ]] ; then sudo apt-get install -y netcat ; fi
- if [[ "$TRAVIS_OS_NAME" == "linux" && ! $(which nc) ]] ; then sudo apt-get install -y netcat ; fi
- sudo sysctl -w vm.max_map_count=262144
# - docker run -d --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch:6.5.0 elasticsearch -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_
- docker-compose pull
Expand Down
4 changes: 4 additions & 0 deletions aws/v4/CREDITS
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
This package contains code that is Copyright (c) 2016 Anthony Atkinson
and licensed under the MIT license.

See https://github.com/sha1sum/aws_signing_client.
45 changes: 36 additions & 9 deletions aws/v4/aws_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package v4

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/credentials"
Expand Down Expand Up @@ -42,19 +43,45 @@ type Transport struct {

// RoundTrip uses the underlying RoundTripper transport, but signs request first with AWS V4 Signing
func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) {
// AWS signer needs an io.ReadSeeker; however, req.Body is an io.ReadCloser.
// TODO Maybe there's a more efficient way to get an io.ReadSeeker than to read the whole thing.
var body io.ReadSeeker
if req.Body != nil {
d, err := ioutil.ReadAll(req.Body)
if h, ok := req.Header["Authorization"]; ok && len(h) > 0 && strings.HasPrefix(h[0], "AWS4") {
// Received a signed request, just pass it on.
return st.client.Do(req)
}

req.URL.Scheme = "https"
if strings.Contains(req.URL.RawPath, "%2C") {
// Escaping path
req.URL.RawPath = url.PathEscape(req.URL.RawPath)
}
now := time.Now().UTC()
req.Header.Set("Date", now.Format(time.RFC3339))
var err error
switch req.Body {
case nil:
_, err = st.signer.Sign(req, nil, "es", st.region, now)
default:
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
body = bytes.NewReader(d)
req.Body = ioutil.NopCloser(bytes.NewReader(buf))
_, err = st.signer.Sign(req, bytes.NewReader(buf), "es", st.region, time.Now().UTC())
}
if err != nil {
return nil, err
}
_, err := st.signer.Sign(req, body, "es", st.region, time.Now())
resp, err := st.client.Do(req)
if err != nil {
return nil, err
}
return st.client.Do(req)
if resp.Body != nil {
defer resp.Body.Close()
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return nil, err
}
resp.Body = ioutil.NopCloser(bytes.NewReader(buf.Bytes()))
}
return resp, nil
}
13 changes: 11 additions & 2 deletions aws/v4/aws_v4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package v4

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -18,7 +19,7 @@ import (

func TestSigningClient(t *testing.T) {
var req *http.Request
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
req = r // capture the HTTP request
Expand Down Expand Up @@ -49,7 +50,15 @@ func TestSigningClient(t *testing.T) {
defer ts.Close()

cred := credentials.NewStaticCredentials("dev", "secret", "")
signingClient := NewV4SigningClient(cred, "us-east-1")
// Don't do this in production!
insecureHttpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
signingClient := NewV4SigningClientWithHTTPClient(cred, "us-east-1", insecureHttpClient)

// Create a simple Ping request via Elastic
client, err := elastic.NewClient(
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.0
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.1
hostname: elasticsearch
environment:
- cluster.name=elasticsearch
Expand All @@ -25,7 +25,7 @@ services:
ports:
- 9200:9200
platinum:
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.0
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.1
hostname: elasticsearch-platinum
environment:
- cluster.name=platinum
Expand Down
1 change: 1 addition & 0 deletions recipes/aws-connect-v4/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/aws-connect-v4
14 changes: 9 additions & 5 deletions recipes/aws-connect-v4/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,33 @@ import (

func main() {
var (
accessKey = flag.String("access-key", env.String("", "AWS_ACCESS_KEY"), "Access Key ID")
secretKey = flag.String("secret-key", env.String("", "AWS_SECRET_KEY"), "Secret access key")
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
accessKey = flag.String("access-key", env.String("", "AWS_ACCESS_KEY", "AWS_ACCESS_KEY_ID"), "Access Key ID")
secretKey = flag.String("secret-key", env.String("", "AWS_SECRET_KEY", "AWS_SECRET_ACCESS_KEY"), "Secret access key")
url = flag.String("url", "", "Elasticsearch URL")
sniff = flag.Bool("sniff", false, "Enable or disable sniffing")
region = flag.String("region", "eu-west-1", "AWS Region name")
)
flag.Parse()
log.SetFlags(0)

if *url == "" {
*url = "http://127.0.0.1:9200"
log.Fatal("please specify a URL with -url")
}
if *accessKey == "" {
log.Fatal("missing -access-key or AWS_ACCESS_KEY environment variable")
}
if *secretKey == "" {
log.Fatal("missing -secret-key or AWS_SECRET_KEY environment variable")
}
if *region == "" {
log.Fatal("please specify an AWS region with -regiom")
}

signingClient := aws.NewV4SigningClient(credentials.NewStaticCredentials(
*accessKey,
*secretKey,
"",
), "eu-central-1")
), *region)

// Create an Elasticsearch client
client, err := elastic.NewClient(
Expand Down
1 change: 1 addition & 0 deletions recipes/aws-mapping-v4/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/aws-mapping-v4
182 changes: 182 additions & 0 deletions recipes/aws-mapping-v4/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

// Connect creates an index with a mapping with different data types.
//
// Example
//
//
// aws-mapping-v4 -url=https://search-xxxxx-yyyyy.eu-central-1.es.amazonaws.com -index=twitter -type=tweet -sniff=false
//
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"time"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/olivere/env"

"github.com/olivere/elastic"
aws "github.com/olivere/elastic/aws/v4"
)

const (
mapping = `
{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"_doc":{
"properties":{
"user":{
"type":"keyword"
},
"message":{
"type":"text"
},
"retweets":{
"type":"integer"
},
"created":{
"type":"date"
},
"attributes":{
"type":"object"
}
}
}
}
}
`
)

// Tweet is just an example document.
type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
Retweets int `json:"retweets"`
Created time.Time `json:"created"`
Attrs map[string]interface{} `json:"attributes,omitempty"`
}

func main() {
var (
accessKey = flag.String("access-key", env.String("", "AWS_ACCESS_KEY", "AWS_ACCESS_KEY_ID"), "Access Key ID")
secretKey = flag.String("secret-key", env.String("", "AWS_SECRET_KEY", "AWS_SECRET_ACCESS_KEY"), "Secret access key")
url = flag.String("url", "", "Elasticsearch URL")
sniff = flag.Bool("sniff", false, "Enable or disable sniffing")
trace = flag.Bool("trace", false, "Enable or disable tracing")
index = flag.String("index", "", "Index name")
region = flag.String("region", "eu-west-1", "AWS Region name")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lshortfile)

if *url == "" {
log.Fatal("please specify a URL with -url")
}
if *index == "" {
log.Fatal("please specify an index name with -index")
}
if *region == "" {
log.Fatal("please specify an AWS region with -regiom")
}

// Create an Elasticsearch client
signingClient := aws.NewV4SigningClient(credentials.NewStaticCredentials(
*accessKey,
*secretKey,
"",
), *region)

// Create an Elasticsearch client
opts := []elastic.ClientOptionFunc{
elastic.SetURL(*url),
elastic.SetSniff(*sniff),
elastic.SetHealthcheck(*sniff),
elastic.SetHttpClient(signingClient),
}
if *trace {
opts = append(opts, elastic.SetTraceLog(log.New(os.Stdout, "", 0)))
}
client, err := elastic.NewClient(opts...)
if err != nil {
log.Fatal(err)
}

// Check if index already exists. We'll drop it then.
// Next, we create a fresh index/mapping.
ctx := context.Background()
exists, err := client.IndexExists(*index).Pretty(true).Do(ctx)
if err != nil {
log.Fatal(err)
}
if exists {
_, err := client.DeleteIndex(*index).Pretty(true).Do(ctx)
if err != nil {
log.Fatal(err)
}
}
_, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx)
if err != nil {
log.Fatal(err)
}

// Add a tweet
{
tweet := Tweet{
User: "olivere",
Message: "Welcome to Go and Elasticsearch.",
Retweets: 0,
Created: time.Now(),
Attrs: map[string]interface{}{
"views": 17,
"vip": true,
},
}
_, err := client.Index().
Index(*index).
Type("_doc").
Id("1").
BodyJson(&tweet).
Refresh("true").
Pretty(true).
Do(context.TODO())
if err != nil {
log.Fatal(err)
}
}

// Read the tweet
{
doc, err := client.Get().
Index(*index).
Type("_doc").
Id("1").
Pretty(true).
Do(context.TODO())
if err != nil {
log.Fatal(err)
}
var tweet Tweet
if err = json.Unmarshal(*doc.Source, &tweet); err != nil {
log.Fatal(err)
}
fmt.Printf("%s at %s: %s (%d retweets)\n",
tweet.User,
tweet.Created,
tweet.Message,
tweet.Retweets,
)
fmt.Printf(" %v\n", tweet.Attrs)
}
}

0 comments on commit 6159f8d

Please sign in to comment.