Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor using packages #1155

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 126 additions & 22 deletions cmd/gor/gor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,116 @@ import (
"expvar"
"flag"
"fmt"
"github.com/buger/goreplay"
"log"
"net/http"
"net/http/httputil"
httppptof "net/http/pprof"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strings"
"syscall"
"time"

"github.com/buger/goreplay/pkg/binary"
"github.com/buger/goreplay/pkg/dummy"
"github.com/buger/goreplay/pkg/emitter"
"github.com/buger/goreplay/pkg/file"
gor_http "github.com/buger/goreplay/pkg/http"
"github.com/buger/goreplay/pkg/kafka"
"github.com/buger/goreplay/pkg/null"
"github.com/buger/goreplay/pkg/plugin"
"github.com/buger/goreplay/pkg/raw"
"github.com/buger/goreplay/pkg/settings"
"github.com/buger/goreplay/pkg/tcp"
"github.com/buger/goreplay/pkg/ws"

"github.com/rs/zerolog/log"
)

// Settings used for quick access to CLI flags
var Settings = settings.Settings

// NewPlugins specify and initialize all available plugins
func NewPlugins() *plugin.InOutPlugins {
plugins := new(plugin.InOutPlugins)

for _, options := range Settings.InputDummy {
plugins.RegisterPlugin(dummy.NewDummyInput, options)
}

for range Settings.OutputDummy {
plugins.RegisterPlugin(dummy.NewDummyOutput)
}

if Settings.OutputStdout {
plugins.RegisterPlugin(dummy.NewDummyOutput)
}

if Settings.OutputNull {
plugins.RegisterPlugin(null.NewNullOutput)
}

for _, options := range Settings.InputRAW {
plugins.RegisterPlugin(raw.NewRAWInput, options, Settings.InputRAWConfig)
}

for _, options := range Settings.InputTCP {
plugins.RegisterPlugin(tcp.NewTCPInput, options, &Settings.InputTCPConfig)
}

for _, options := range Settings.OutputTCP {
plugins.RegisterPlugin(tcp.NewTCPOutput, options, &Settings.OutputTCPConfig)
}

for _, options := range Settings.OutputWebSocket {
plugins.RegisterPlugin(ws.NewWebSocketOutput, options, &Settings.OutputWebSocketConfig)
}

for _, options := range Settings.InputFile {
plugins.RegisterPlugin(file.NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
}

for _, path := range Settings.OutputFile {
if strings.HasPrefix(path, "s3://") {
plugins.RegisterPlugin(file.NewS3Output, path, &Settings.OutputFileConfig)
} else {
plugins.RegisterPlugin(file.NewFileOutput, path, &Settings.OutputFileConfig)
}
}

for _, options := range Settings.InputHTTP {
plugins.RegisterPlugin(gor_http.NewHTTPInput, options)
}

// If we explicitly set Host header http output should not rewrite it
// Fix: https://github.com/buger/gor/issues/174
for _, header := range Settings.ModifierConfig.Headers {
if header.Name == "Host" {
Settings.OutputHTTPConfig.OriginalHost = true
break
}
}

for _, options := range Settings.OutputHTTP {
plugins.RegisterPlugin(gor_http.NewHTTPOutput, options, &Settings.OutputHTTPConfig)
}

for _, options := range Settings.OutputBinary {
plugins.RegisterPlugin(binary.NewBinaryOutput, options, &Settings.OutputBinaryConfig)
}

if Settings.OutputKafkaConfig.Host != "" && Settings.OutputKafkaConfig.Topic != "" {
plugins.RegisterPlugin(kafka.NewKafkaOutput, "", &Settings.OutputKafkaConfig, &Settings.KafkaTLSConfig)
}

if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
plugins.RegisterPlugin(kafka.NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
}

return plugins
}

var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to this file")
Expand Down Expand Up @@ -57,11 +154,14 @@ func loggingMiddleware(addr string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/loop" {
_, err := http.Get("http://" + addr)
log.Println(err)

if err != nil {
log.Error().Err(err).Msg("Error while calling loop endpoint")
}
}

rb, _ := httputil.DumpRequest(r, false)
log.Println(string(rb))
log.Info().Msg(string(rb))
next.ServeHTTP(w, r)
})
}
Expand All @@ -72,26 +172,28 @@ func main() {
}

args := os.Args[1:]
var plugins *goreplay.InOutPlugins
var plugins *plugin.InOutPlugins
if len(args) > 0 && args[0] == "file-server" {
if len(args) != 2 {
log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
log.Fatal().Msg("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
}
dir, _ := os.Getwd()

goreplay.Debug(0, "Started example file server for current directory on address ", args[1])
log.Info().Msgf("Started example file server for current directory on address %s", args[1])

log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
if err := http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))); err != nil {
log.Fatal().Err(err).Msg("Failed to start file server")
}
} else {
flag.Parse()
goreplay.CheckSettings()
plugins = goreplay.NewPlugins()
settings.CheckSettings()
plugins = NewPlugins()
}

log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), goreplay.VERSION)
log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), settings.VERSION)

if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
log.Fatal("Required at least 1 input and 1 output")
log.Fatal().Msg("Required at least 1 input and 1 output")
}

if *memprofile != "" {
Expand All @@ -102,20 +204,22 @@ func main() {
profileCPU(*cpuprofile)
}

if goreplay.Settings.Pprof != "" {
if settings.Settings.Pprof != "" {
go func() {
log.Println(http.ListenAndServe(goreplay.Settings.Pprof, nil))
if err := http.ListenAndServe(settings.Settings.Pprof, nil); err != nil {
log.Fatal().Err(err).Msg("Failed to start pprof server")
}
}()
}

closeCh := make(chan int)
emitter := goreplay.NewEmitter()
go emitter.Start(plugins, goreplay.Settings.Middleware)
if goreplay.Settings.ExitAfter > 0 {
log.Printf("Running gor for a duration of %s\n", goreplay.Settings.ExitAfter)
emitter := emitter.New(&settings.Settings.EmitterConfig)
go emitter.Start(plugins)
if settings.Settings.ExitAfter > 0 {
log.Printf("Running gor for a duration of %s\n", settings.Settings.ExitAfter)

time.AfterFunc(goreplay.Settings.ExitAfter, func() {
log.Printf("gor run timeout %s\n", goreplay.Settings.ExitAfter)
time.AfterFunc(settings.Settings.ExitAfter, func() {
log.Printf("gor run timeout %s\n", settings.Settings.ExitAfter)
close(closeCh)
})
}
Expand All @@ -136,7 +240,7 @@ func profileCPU(cpuprofile string) {
if cpuprofile != "" {
f, err := os.Create(cpuprofile)
if err != nil {
log.Fatal(err)
log.Fatal().Err(err).Msg("Failed to create cpu profile file")
}
pprof.StartCPUProfile(f)

Expand All @@ -151,7 +255,7 @@ func profileMEM(memprofile string) {
if memprofile != "" {
f, err := os.Create(memprofile)
if err != nil {
log.Fatal(err)
log.Fatal().Err(err).Msg("Failed to create memory profile file")
}
time.AfterFunc(30*time.Second, func() {
pprof.WriteHeapProfile(f)
Expand Down
159 changes: 0 additions & 159 deletions emitter.go

This file was deleted.

Loading