Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Add http-client/server functionality #3

Merged
merged 19 commits into from
Dec 6, 2020
Merged
Prev Previous commit
Next Next commit
Setup http-server-producer
  • Loading branch information
alitari committed Dec 2, 2020
commit 8e21de44dafada1da5df1e1d937c4d157f423613
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Go-Template transforms an input data structure to a cloudEvent and sends them to
| producer name | Input | Description |
| ------------- | ------| ------------|
| periodic-producer | void | Sends events frequently based on a configurable time period. See [details](docs/periodic-producer.md)
| http-server-producer | HTTP-Request | Sends events based on an incoming http request |
| http-server-producer | HTTP-Request | Sends events based on an incoming http request. See [details](docs/http-server-producer.md) |


## mappers
Expand Down
73 changes: 35 additions & 38 deletions cmd/http-server-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,73 @@ package main
import (
"fmt"
"log"
"strings"
"time"

"net/http"

"github.com/alitari/ce-go-template/pkg/cehandler"
"github.com/alitari/ce-go-template/pkg/cehttpserver"
"github.com/alitari/ce-go-template/pkg/cerequesttransformer"
cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
)

// var ceProducer *cehttpclienttransformer.cehttpclienttransformer
var ceClient cloudevents.Client = nil
var ceProducerHandler *cehandler.CeProducerHandler
var config Configuration
// var ceClient cloudevents.Client = nil
// var ceProducerHandler *cehandler.CeProducerHandler
// var config Configuration

// Configuration bla
type Configuration struct {
Verbose bool `default:"true"`
CeTemplate string `split_words:"true" default:"{ \"data\":{\"name\":\"Alex\"}, \"datacontenttype\":\"application/json\",\"id\":\" {{ uuidv4 }}\",\"source\":\"ce-gotemplate\",\"specversion\":\"1.0\",\"type\":\"test\" }"`
Sink string `envconfig:"K_SINK"`
Timeout string `default:"1000ms"`
HTTPPort int `split_words:"true" default:"8080"`
HTTPPath string `split_words:"true" default:"/"`
HTTPMethod string `split_words:"true" default:"GET"`
HTTPAccept string `split_words:"true" default:"application/json"`
Verbose bool `default:"true"`
CeTemplate string `split_words:"true" default:"{\"name\":\"Alex\"}"`
CeSource string `split_words:"true" default:"https://github.com/alitari/ce-go-template"`
CeType string `split_words:"true" default:"com.github.alitari.ce-go-template.periodic-producer"`
Sink string `envconfig:"K_SINK"`
Timeout time.Duration `default:"1000ms"`
HTTPPort int `split_words:"true" default:"8080"`
HTTPPath string `split_words:"true" default:"/"`
HTTPMethod string `split_words:"true" default:"GET"`
HTTPAccept string `split_words:"true" default:"application/json"`
}

func (c Configuration) info() string {
return fmt.Sprintf("Configuration:\n====================================\nVerbose: %v\nTimeout: %v\nSink: '%v'\nCeTemplate: '%v'\nServing HTTP %s %s on port %v \n", c.Verbose, c.Timeout, c.Sink, c.CeTemplate, c.HTTPMethod, c.HTTPPath, c.HTTPPort)
return fmt.Sprintf(`
Configuration:
====================================
Verbose: %v
Timeout: %v
Sink: '%v
CeTemplate: '%v'
CloudEvent source: %s
CloudEvent type: %s
Serving HTTP %s on path '%s' listening on port %v accepting '%s'`, c.Verbose, c.Timeout, c.Sink, c.CeTemplate, c.CeSource, c.CeType, c.HTTPMethod, c.HTTPPath, c.HTTPPort, c.HTTPAccept)
}

func main() {
config = Configuration{}
config := Configuration{}
if err := envconfig.Process("", &config); err != nil {
log.Fatal(err)
}
log.Print(config.info())

var err error

timeout, err := time.ParseDuration(config.Timeout)
httpProtocol, err := cloudevents.NewHTTP(cehttp.WithShutdownTimeout(timeout))
httpProtocol, err := cloudevents.NewHTTP(cehttp.WithShutdownTimeout(config.Timeout))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
ceClient, err = cloudevents.NewClient(httpProtocol)
ceClient, err := cloudevents.NewClient(httpProtocol)
if err != nil {
log.Fatal(err.Error())
}

// ceProducer = cehttpclienttransformer.Newcehttpclienttransformer("", config.CeTemplate, 0, true, false, config.Verbose)

// ceProducerHandler = cetransformer.NewProducerHandler(ceProducer, config.Sink, timeout, config.Verbose)

}

func handleRequest(w http.ResponseWriter, r *http.Request) {
result := ceProducerHandler.SendCe(nil)
re := result.Error()
if !strings.HasPrefix(re, "20") {
log.Printf("Failed to send event! error: %v", result.Error())
} else {
if cloudevents.IsUndelivered(result) {
log.Printf("Event was not delivered: %v", result)
} else {
if config.Verbose {
log.Printf("Event successfully delivered: %v", result)
}
}
ceProducer, err := cerequesttransformer.NewRequestTransformer(config.CeTemplate, config.CeType, config.CeSource, config.Verbose)
if err != nil {
log.Fatalf("failed to create request transformer: %s", err.Error())
}
ceProducerHandler := cehandler.NewProducerHandler(ceProducer, ceClient, config.Sink, config.Timeout, true)
cehttpserver.NewCeHTTPServer(config.HTTPPort, config.HTTPPath, config.HTTPMethod, config.Verbose, ceProducerHandler)

select {}

}
6 changes: 3 additions & 3 deletions cmd/periodic-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Configuration struct {
CeSource string `split_words:"true" default:"https://github.com/alitari/ce-go-template"`
CeType string `split_words:"true" default:"com.github.alitari.ce-go-template.periodic-producer"`
Sink string `envconfig:"K_SINK"`
Period time.Duration `default:"1000ms"`
Timeout time.Duration `default:"1000ms"`
Period time.Duration `default:"1000ms"`
}

func (c Configuration) info() string {
Expand All @@ -33,8 +33,8 @@ Period: %v
Timeout: %v
Sink: '%v'
CeTemplate: '%v'
cloudEvent source: %s
cloudEvent type: %s`, c.Verbose, c.Period, c.Timeout, c.Sink, c.CeTemplate, c.CeSource, c.CeType)
CloudEvent source: %s
CloudEvent type: %s`, c.Verbose, c.Period, c.Timeout, c.Sink, c.CeTemplate, c.CeSource, c.CeType)
}

func main() {
Expand Down
66 changes: 66 additions & 0 deletions docs/http-server-producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# http-server-producer

## configuration

| Name | Default | Description |
| ---- | ------- | ----------- |
| `VERBOSE` | `true` | if `true` you get an extensive log output |
| `CE_TEMPLATE` | `{"name": "Alex"}` | example valid json |
| `CE_SOURCE` | `https://github.com/alitari/ce-go-template` | [Cloudevent Source](https://github.com/cloudevents/spec/blob/v1.0/spec.md#source-1) |
| `CE_TYPE` | `com.github.alitari.ce-go-template.periodic-producer` | [Cloudevent Type](https://github.com/cloudevents/spec/blob/v1.0/spec.md#type) |
| `K_SINK` | | An adressable K8s resource. see [Sinkbinding](https://knative.dev/docs/eventing/samples/sinkbinding/) |
| `TIMEOUT` | `1000ms` | send timeout |
| `HTTP_PORT` | `8080` | server port |
| `HTTP_PATH` | `/` | server path |
| `HTTP_METHOD` | `GET` | server method |
| `HTTP_ACCEPT` | `application/json` | Http Accept header |

## examples

### default

```bash
K_SINK=https://httpbin.org/post go run cmd/http-server-producer/main.go
# in a new shell
curl -v localhost:8080/
```

### path param to cloudevent payload

```bash
CE_TEMPLATE='{{ $name := trimPrefix "/person/" .url.path }}'\
'{'\
'"person": {{ $name | quote }}'\
'}' \
HTTP_PATH="/person/" K_SINK=https://httpbin.org/post go run cmd/http-server-producer/main.go
# in a new shell
curl -v localhost:8080/person/alex
```

### query param to cloudevent payload

```bash
CE_TEMPLATE='{{ $query := .url.query }}'\
'{'\
'"person": {{ index (index $query "person") 0 | quote }}'\
'}' \
K_SINK=https://httpbin.org/post go run cmd/http-server-producer/main.go
# in a new shell
curl -v localhost:8080/?person=alex
```

### request body to cloudevent payload

```bash
CE_TEMPLATE='{{ $name := .body.name }}'\
'{'\
'"person": {{ $name | quote }}'\
'}' \
HTTP_METHOD=POST \
K_SINK=https://httpbin.org/post go run cmd/http-server-producer/main.go
# in a new shell
curl -v -X POST localhost:8080/ -d '{ "name" : "Alex" }'
```