From d4bc23b6b862e00458f567602eb7231882cc7fbd Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Sun, 17 Sep 2017 00:29:53 +0000 Subject: [PATCH] fix golint warnings --- LICENSE | 2 +- monstache.go | 499 +++++++++++++++++++++-------------------- monstache_test.go | 34 +-- monstachemap/plugin.go | 56 ++--- 4 files changed, 297 insertions(+), 294 deletions(-) diff --git a/LICENSE b/LICENSE index 671462a..f8c7f1f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2016 Ryan Wynn +Copyright (c) 2016-2017 Ryan Wynn Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/monstache.go b/monstache.go index aa58bc6..b158ca0 100644 --- a/monstache.go +++ b/monstache.go @@ -1,3 +1,4 @@ +// package main provides the monstache binary package main import ( @@ -37,10 +38,10 @@ import ( ) var gridByteBuffer bytes.Buffer -var infoLog *log.Logger = log.New(os.Stdout, "INFO ", log.Flags()) -var statsLog *log.Logger = log.New(os.Stdout, "STATS ", log.Flags()) -var traceLog *log.Logger = log.New(os.Stdout, "TRACE ", log.Flags()) -var errorLog *log.Logger = log.New(os.Stderr, "ERROR ", log.Flags()) +var infoLog = log.New(os.Stdout, "INFO ", log.Flags()) +var statsLog = log.New(os.Stdout, "STATS ", log.Flags()) +var traceLog = log.New(os.Stdout, "TRACE ", log.Flags()) +var errorLog = log.New(os.Stderr, "ERROR ", log.Flags()) var mapperPlugin func(*monstachemap.MapperPluginInput) (*monstachemap.MapperPluginOutput, error) var mapEnvs map[string]*executionEnv @@ -52,8 +53,8 @@ var chunksRegex = regexp.MustCompile("\\.chunks$") var systemsRegex = regexp.MustCompile("system\\..+$") var lastTimestamp bson.MongoTimestamp -const Version = "3.1.1" -const mongoUrlDefault string = "localhost" +const version = "3.1.1" +const mongoURLDefault string = "localhost" const resumeNameDefault string = "default" const elasticMaxConnsDefault int = 10 const elasticClientTimeoutDefault int = 60 @@ -63,7 +64,7 @@ const gtmChannelSizeDefault int = 512 type stringargs []string type executionEnv struct { - Vm *otto.Otto + VM *otto.Otto Script string Routing bool } @@ -110,7 +111,7 @@ type gtmSettings struct { } type configOptions struct { - MongoUrl string `toml:"mongo-url"` + MongoURL string `toml:"mongo-url"` MongoPemFile string `toml:"mongo-pem-file"` MongoValidatePemFile bool `toml:"mongo-validate-pem-file"` MongoOpLogDatabaseName string `toml:"mongo-oplog-database-name"` @@ -172,48 +173,48 @@ type configOptions struct { MapperPluginPath string `toml:"mapper-plugin-path"` } -func (this *stringargs) String() string { - return fmt.Sprintf("%s", *this) +func (args *stringargs) String() string { + return fmt.Sprintf("%s", *args) } -func (this *stringargs) Set(value string) error { - *this = append(*this, value) +func (args *stringargs) Set(value string) error { + *args = append(*args, value) return nil } -func (this *configOptions) ParseElasticsearchVersion(number string) (err error) { +func (config *configOptions) parseElasticsearchVersion(number string) (err error) { if number == "" { - err = errors.New("Elasticsearch version cannot be blank") + err = errors.New("elasticsearch version cannot be blank") } else { versionParts := strings.Split(number, ".") var majorVersion int majorVersion, err = strconv.Atoi(versionParts[0]) if err == nil { - this.ElasticMajorVersion = majorVersion + config.ElasticMajorVersion = majorVersion if majorVersion == 0 { - err = errors.New("Invalid elasticsearch major version 0") + err = errors.New("invalid elasticsearch major version 0") } } } return } -func (this *configOptions) NewBulkProcessor(client *elastic.Client, mongo *mgo.Session) (bulk *elastic.BulkProcessor, err error) { +func (config *configOptions) newBulkProcessor(client *elastic.Client, mongo *mgo.Session) (bulk *elastic.BulkProcessor, err error) { bulkService := client.BulkProcessor().Name("monstache") - bulkService.Workers(this.ElasticMaxConns) + bulkService.Workers(config.ElasticMaxConns) bulkService.Stats(true) - if this.ElasticMaxDocs != 0 { - bulkService.BulkActions(this.ElasticMaxDocs) + if config.ElasticMaxDocs != 0 { + bulkService.BulkActions(config.ElasticMaxDocs) } - if this.ElasticMaxBytes != 0 { - bulkService.BulkSize(this.ElasticMaxBytes) + if config.ElasticMaxBytes != 0 { + bulkService.BulkSize(config.ElasticMaxBytes) } - bulkService.FlushInterval(time.Duration(this.ElasticMaxSeconds) * time.Second) - bulkService.After(CreateAfterBulk(mongo, this)) + bulkService.FlushInterval(time.Duration(config.ElasticMaxSeconds) * time.Second) + bulkService.After(createAfterBulk(mongo, config)) return bulkService.Do(context.Background()) } -func (this *configOptions) NewStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { +func (config *configOptions) newStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) { bulkService := client.BulkProcessor().Name("monstache-stats") bulkService.Workers(1) bulkService.Stats(false) @@ -221,9 +222,9 @@ func (this *configOptions) NewStatsBulkProcessor(client *elastic.Client) (bulk * return bulkService.Do(context.Background()) } -func (this *configOptions) NeedsSecureScheme() bool { - if len(this.ElasticUrls) > 0 { - for _, url := range this.ElasticUrls { +func (config *configOptions) needsSecureScheme() bool { + if len(config.ElasticUrls) > 0 { + for _, url := range config.ElasticUrls { if strings.HasPrefix(url, "https") { return true } @@ -233,34 +234,34 @@ func (this *configOptions) NeedsSecureScheme() bool { } -func (this *configOptions) NewElasticClient() (client *elastic.Client, err error) { +func (config *configOptions) newElasticClient() (client *elastic.Client, err error) { var clientOptions []elastic.ClientOptionFunc var httpClient *http.Client clientOptions = append(clientOptions, elastic.SetErrorLog(errorLog)) clientOptions = append(clientOptions, elastic.SetSniff(false)) - if this.NeedsSecureScheme() { + if config.needsSecureScheme() { clientOptions = append(clientOptions, elastic.SetScheme("https")) } - if len(this.ElasticUrls) > 0 { - clientOptions = append(clientOptions, elastic.SetURL(this.ElasticUrls...)) + if len(config.ElasticUrls) > 0 { + clientOptions = append(clientOptions, elastic.SetURL(config.ElasticUrls...)) } else { - this.ElasticUrls = append(this.ElasticUrls, elastic.DefaultURL) + config.ElasticUrls = append(config.ElasticUrls, elastic.DefaultURL) } - if this.Verbose { + if config.Verbose { clientOptions = append(clientOptions, elastic.SetTraceLog(traceLog)) } - if this.Gzip { + if config.Gzip { clientOptions = append(clientOptions, elastic.SetGzip(true)) } - if this.ElasticUser != "" { - clientOptions = append(clientOptions, elastic.SetBasicAuth(this.ElasticUser, this.ElasticPassword)) + if config.ElasticUser != "" { + clientOptions = append(clientOptions, elastic.SetBasicAuth(config.ElasticUser, config.ElasticPassword)) } - if this.ElasticRetry { + if config.ElasticRetry { d1, d2 := time.Duration(50)*time.Millisecond, time.Duration(20)*time.Second retrier := elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(d1, d2)) clientOptions = append(clientOptions, elastic.SetRetrier(retrier)) } - httpClient, err = this.NewHttpClient() + httpClient, err = config.NewHTTPClient() if err != nil { return client, err } @@ -268,35 +269,35 @@ func (this *configOptions) NewElasticClient() (client *elastic.Client, err error return elastic.NewClient(clientOptions...) } -func (this *configOptions) TestElasticsearchConn(client *elastic.Client) (err error) { +func (config *configOptions) testElasticsearchConn(client *elastic.Client) (err error) { var number string - url := this.ElasticUrls[0] + url := config.ElasticUrls[0] number, err = client.ElasticsearchVersion(url) if err == nil { - if this.Verbose { + if config.Verbose { infoLog.Printf("Successfully connected to elasticsearch version %s", number) } - err = this.ParseElasticsearchVersion(number) + err = config.parseElasticsearchVersion(number) } return } -func NormalizeIndexName(name string) (normal string) { +func normalizeIndexName(name string) (normal string) { normal = strings.ToLower(strings.TrimPrefix(name, "_")) return } -func NormalizeTypeName(name string) (normal string) { +func normalizeTypeName(name string) (normal string) { normal = strings.TrimPrefix(name, "_") return } -func NormalizeEsId(id string) (normal string) { +func normalizeEsID(id string) (normal string) { normal = strings.TrimPrefix(id, "_") return } -func DeleteIndexes(client *elastic.Client, db string, config *configOptions) (err error) { +func deleteIndexes(client *elastic.Client, db string, config *configOptions) (err error) { ctx := context.Background() for ns, m := range mapIndexTypes { parts := strings.SplitN(ns, ".", 2) @@ -306,13 +307,13 @@ func DeleteIndexes(client *elastic.Client, db string, config *configOptions) (er } } } - _, err = client.DeleteIndex(NormalizeIndexName(db) + "*").Do(ctx) + _, err = client.DeleteIndex(normalizeIndexName(db) + "*").Do(ctx) return } -func DeleteIndex(client *elastic.Client, namespace string, config *configOptions) (err error) { +func deleteIndex(client *elastic.Client, namespace string, config *configOptions) (err error) { ctx := context.Background() - esIndex := NormalizeIndexName(namespace) + esIndex := normalizeIndexName(namespace) if m := mapIndexTypes[namespace]; m != nil { esIndex = m.Index } @@ -320,15 +321,14 @@ func DeleteIndex(client *elastic.Client, namespace string, config *configOptions return err } -func EnsureFileMapping(client *elastic.Client, namespace string, config *configOptions) (err error) { +func ensureFileMapping(client *elastic.Client, namespace string, config *configOptions) (err error) { if config.ElasticMajorVersion < 5 { - return EnsureFileMappingMapperAttachment(client, namespace, config) - } else { - return EnsureFileMappingIngestAttachment(client, namespace, config) + return ensureFileMappingMapperAttachment(client, namespace, config) } + return ensureFileMappingIngestAttachment(client, namespace, config) } -func EnsureFileMappingIngestAttachment(client *elastic.Client, namespace string, config *configOptions) (err error) { +func ensureFileMappingIngestAttachment(client *elastic.Client, namespace string, config *configOptions) (err error) { ctx := context.Background() pipeline := map[string]interface{}{ "description": "Extract file information", @@ -344,10 +344,10 @@ func EnsureFileMappingIngestAttachment(client *elastic.Client, namespace string, return err } -func EnsureFileMappingMapperAttachment(conn *elastic.Client, namespace string, config *configOptions) (err error) { +func ensureFileMappingMapperAttachment(conn *elastic.Client, namespace string, config *configOptions) (err error) { ctx := context.Background() parts := strings.SplitN(namespace, ".", 2) - esIndex, esType := NormalizeIndexName(namespace), NormalizeTypeName(parts[1]) + esIndex, esType := normalizeIndexName(namespace), normalizeTypeName(parts[1]) if m := mapIndexTypes[namespace]; m != nil { esIndex, esType = m.Index, m.Type } @@ -383,16 +383,16 @@ func EnsureFileMappingMapperAttachment(conn *elastic.Client, namespace string, c return err } -func DefaultIndexTypeMapping(op *gtm.Op) *indexTypeMapping { +func defaultIndexTypeMapping(op *gtm.Op) *indexTypeMapping { return &indexTypeMapping{ Namespace: op.Namespace, - Index: NormalizeIndexName(op.Namespace), - Type: NormalizeTypeName(op.GetCollection()), + Index: normalizeIndexName(op.Namespace), + Type: normalizeTypeName(op.GetCollection()), } } -func IndexTypeMapping(op *gtm.Op) *indexTypeMapping { - mapping := DefaultIndexTypeMapping(op) +func mapIndexType(op *gtm.Op) *indexTypeMapping { + mapping := defaultIndexTypeMapping(op) if mapIndexTypes != nil { if m := mapIndexTypes[op.Namespace]; m != nil { mapping = m @@ -401,37 +401,37 @@ func IndexTypeMapping(op *gtm.Op) *indexTypeMapping { return mapping } -func OpIdToString(op *gtm.Op) string { - var opIdStr string +func opIDToString(op *gtm.Op) string { + var opIDStr string switch op.Id.(type) { case bson.ObjectId: - opIdStr = op.Id.(bson.ObjectId).Hex() + opIDStr = op.Id.(bson.ObjectId).Hex() case float64: - intId := int(op.Id.(float64)) - if op.Id.(float64) == float64(intId) { - opIdStr = fmt.Sprintf("%v", intId) + intID := int(op.Id.(float64)) + if op.Id.(float64) == float64(intID) { + opIDStr = fmt.Sprintf("%v", intID) } else { - opIdStr = fmt.Sprintf("%v", op.Id) + opIDStr = fmt.Sprintf("%v", op.Id) } case float32: - intId := int(op.Id.(float32)) - if op.Id.(float32) == float32(intId) { - opIdStr = fmt.Sprintf("%v", intId) + intID := int(op.Id.(float32)) + if op.Id.(float32) == float32(intID) { + opIDStr = fmt.Sprintf("%v", intID) } else { - opIdStr = fmt.Sprintf("%v", op.Id) + opIDStr = fmt.Sprintf("%v", op.Id) } default: - opIdStr = NormalizeEsId(fmt.Sprintf("%v", op.Id)) + opIDStr = normalizeEsID(fmt.Sprintf("%v", op.Id)) } - return opIdStr + return opIDStr } -func MapDataJavascript(op *gtm.Op) error { +func mapDataJavascript(op *gtm.Op) error { if mapEnvs == nil { return nil } if env := mapEnvs[op.Namespace]; env != nil { - val, err := env.Vm.Call("module.exports", op.Data, op.Data) + val, err := env.VM.Call("module.exports", op.Data, op.Data) if err != nil { return err } @@ -456,7 +456,7 @@ func MapDataJavascript(op *gtm.Op) error { return nil } -func MapDataGolang(op *gtm.Op) error { +func mapDataGolang(op *gtm.Op) error { input := &monstachemap.MapperPluginInput{ Document: op.Data, Namespace: op.Namespace, @@ -493,15 +493,14 @@ func MapDataGolang(op *gtm.Op) error { return nil } -func MapData(config *configOptions, op *gtm.Op) error { +func mapData(config *configOptions, op *gtm.Op) error { if config.MapperPluginPath != "" { - return MapDataGolang(op) - } else { - return MapDataJavascript(op) + return mapDataGolang(op) } + return mapDataJavascript(op) } -func PrepareDataForIndexing(config *configOptions, op *gtm.Op) { +func prepareDataForIndexing(config *configOptions, op *gtm.Op) { data := op.Data if config.IndexOplogTime { secs := int64(op.Timestamp >> 32) @@ -517,7 +516,7 @@ func PrepareDataForIndexing(config *configOptions, op *gtm.Op) { delete(data, "_meta_monstache") } -func ParseIndexMeta(data map[string]interface{}) (meta *indexingMeta) { +func parseIndexMeta(data map[string]interface{}) (meta *indexingMeta) { meta = &indexingMeta{} if m, ok := data["_meta_monstache"]; ok { switch m.(type) { @@ -558,7 +557,7 @@ func ParseIndexMeta(data map[string]interface{}) (meta *indexingMeta) { return meta } -func AddFileContent(session *mgo.Session, op *gtm.Op, config *configOptions) (err error) { +func addFileContent(session *mgo.Session, op *gtm.Op, config *configOptions) (err error) { op.Data["file"] = "" gridByteBuffer.Reset() db, bucket := @@ -587,33 +586,33 @@ func AddFileContent(session *mgo.Session, op *gtm.Op, config *configOptions) (er return } -func NotMonstache(op *gtm.Op) bool { +func notMonstache(op *gtm.Op) bool { return op.GetDatabase() != "monstache" } -func NotChunks(op *gtm.Op) bool { +func notChunks(op *gtm.Op) bool { return !chunksRegex.MatchString(op.GetCollection()) } -func NotSystem(op *gtm.Op) bool { +func notSystem(op *gtm.Op) bool { return !systemsRegex.MatchString(op.GetCollection()) } -func FilterWithRegex(regex string) gtm.OpFilter { +func filterWithRegex(regex string) gtm.OpFilter { var validNameSpace = regexp.MustCompile(regex) return func(op *gtm.Op) bool { return validNameSpace.MatchString(op.Namespace) } } -func FilterInverseWithRegex(regex string) gtm.OpFilter { +func filterInverseWithRegex(regex string) gtm.OpFilter { var invalidNameSpace = regexp.MustCompile(regex) return func(op *gtm.Op) bool { return !invalidNameSpace.MatchString(op.Namespace) } } -func EnsureClusterTTL(session *mgo.Session) error { +func ensureClusterTTL(session *mgo.Session) error { col := session.DB("monstache").C("cluster") return col.EnsureIndex(mgo.Index{ Key: []string{"expireAt"}, @@ -622,7 +621,7 @@ func EnsureClusterTTL(session *mgo.Session) error { }) } -func IsEnabledProcess(session *mgo.Session, config *configOptions) (bool, error) { +func isEnabledProcess(session *mgo.Session, config *configOptions) (bool, error) { col := session.DB("monstache").C("cluster") doc := make(map[string]interface{}) doc["_id"] = config.ResumeName @@ -640,17 +639,16 @@ func IsEnabledProcess(session *mgo.Session, config *configOptions) (bool, error) lastError := err.(*mgo.LastError) if lastError.Code == 11000 { return false, nil - } else { - return false, err } + return false, err } -func ResetClusterState(session *mgo.Session, config *configOptions) error { +func resetClusterState(session *mgo.Session, config *configOptions) error { col := session.DB("monstache").C("cluster") return col.RemoveId(config.ResumeName) } -func IsEnabledProcessId(session *mgo.Session, config *configOptions) bool { +func isEnabledProcessID(session *mgo.Session, config *configOptions) bool { col := session.DB("monstache").C("cluster") doc := make(map[string]interface{}) col.FindId(config.ResumeName).One(doc) @@ -672,7 +670,7 @@ func IsEnabledProcessId(session *mgo.Session, config *configOptions) bool { return false } -func ResumeWork(ctx *gtm.OpCtx, session *mgo.Session, config *configOptions) { +func resumeWork(ctx *gtm.OpCtx, session *mgo.Session, config *configOptions) { col := session.DB("monstache").C("monstache") doc := make(map[string]interface{}) col.FindId(config.ResumeName).One(doc) @@ -683,7 +681,7 @@ func ResumeWork(ctx *gtm.OpCtx, session *mgo.Session, config *configOptions) { ctx.Resume() } -func SaveTimestamp(session *mgo.Session, ts bson.MongoTimestamp, resumeName string) error { +func saveTimestamp(session *mgo.Session, ts bson.MongoTimestamp, resumeName string) error { col := session.DB("monstache").C("monstache") doc := make(map[string]interface{}) doc["ts"] = ts @@ -691,9 +689,9 @@ func SaveTimestamp(session *mgo.Session, ts bson.MongoTimestamp, resumeName stri return err } -func (config *configOptions) ParseCommandLineFlags() *configOptions { +func (config *configOptions) parseCommandLineFlags() *configOptions { flag.BoolVar(&config.Print, "print-config", false, "Print the configuration and then exit") - flag.StringVar(&config.MongoUrl, "mongo-url", "", "MongoDB connection URL") + flag.StringVar(&config.MongoURL, "mongo-url", "", "MongoDB connection URL") flag.StringVar(&config.MongoPemFile, "mongo-pem-file", "", "Path to a PEM file for secure connections to MongoDB") flag.BoolVar(&config.MongoValidatePemFile, "mongo-validate-pem-file", true, "Set to boolean false to not validate the MongoDB PEM file") flag.StringVar(&config.MongoOpLogDatabaseName, "mongo-oplog-database-name", "", "Override the database name which contains the mongodb oplog") @@ -749,15 +747,15 @@ func (config *configOptions) ParseCommandLineFlags() *configOptions { return config } -func (config *configOptions) LoadIndexTypes() { +func (config *configOptions) loadIndexTypes() { if config.Mapping != nil { mapIndexTypes = make(map[string]*indexTypeMapping) for _, m := range config.Mapping { if m.Namespace != "" && m.Index != "" && m.Type != "" { mapIndexTypes[m.Namespace] = &indexTypeMapping{ Namespace: m.Namespace, - Index: NormalizeIndexName(m.Index), - Type: NormalizeTypeName(m.Type), + Index: normalizeIndexName(m.Index), + Type: normalizeTypeName(m.Type), } } else { panic("mappings must specify namespace, index, and type attributes") @@ -766,23 +764,23 @@ func (config *configOptions) LoadIndexTypes() { } } -func (config *configOptions) LoadScripts() { +func (config *configOptions) loadScripts() { if config.Script != nil { mapEnvs = make(map[string]*executionEnv) for _, s := range config.Script { if s.Namespace != "" && s.Script != "" { env := &executionEnv{ - Vm: otto.New(), + VM: otto.New(), Script: s.Script, Routing: s.Routing, } - if err := env.Vm.Set("module", make(map[string]interface{})); err != nil { + if err := env.VM.Set("module", make(map[string]interface{})); err != nil { panic(err) } - if _, err := env.Vm.Run(env.Script); err != nil { + if _, err := env.VM.Run(env.Script); err != nil { panic(err) } - val, err := env.Vm.Run("module.exports") + val, err := env.VM.Run("module.exports") if err != nil { panic(err) } else if !val.IsFunction() { @@ -797,7 +795,7 @@ func (config *configOptions) LoadScripts() { } } -func (config *configOptions) LoadPlugins() *configOptions { +func (config *configOptions) loadPlugins() *configOptions { if config.MapperPluginPath != "" { p, err := plugin.Open(config.MapperPluginPath) if err != nil { @@ -817,20 +815,20 @@ func (config *configOptions) LoadPlugins() *configOptions { return config } -func (config *configOptions) LoadConfigFile() *configOptions { +func (config *configOptions) loadConfigFile() *configOptions { if config.ConfigFile != "" { - var tomlConfig configOptions = configOptions{ + var tomlConfig = configOptions{ DroppedDatabases: true, DroppedCollections: true, MongoDialSettings: mongoDialSettings{Timeout: -1}, MongoSessionSettings: mongoSessionSettings{SocketTimeout: -1, SyncTimeout: -1}, - GtmSettings: GtmDefaultSettings(), + GtmSettings: gtmDefaultSettings(), } if _, err := toml.DecodeFile(config.ConfigFile, &tomlConfig); err != nil { panic(err) } - if config.MongoUrl == "" { - config.MongoUrl = tomlConfig.MongoUrl + if config.MongoURL == "" { + config.MongoURL = tomlConfig.MongoURL } if config.MongoPemFile == "" { config.MongoPemFile = tomlConfig.MongoPemFile @@ -989,14 +987,14 @@ func (config *configOptions) LoadConfigFile() *configOptions { config.MongoSessionSettings = tomlConfig.MongoSessionSettings config.GtmSettings = tomlConfig.GtmSettings config.Logs = tomlConfig.Logs - tomlConfig.SetupLogging() - tomlConfig.LoadScripts() - tomlConfig.LoadIndexTypes() + tomlConfig.setupLogging() + tomlConfig.loadScripts() + tomlConfig.loadIndexTypes() } return config } -func (config *configOptions) NewLogger(path string) *lumberjack.Logger { +func (config *configOptions) newLogger(path string) *lumberjack.Logger { return &lumberjack.Logger{ Filename: path, MaxSize: 500, // megabytes @@ -1005,19 +1003,19 @@ func (config *configOptions) NewLogger(path string) *lumberjack.Logger { } } -func (config *configOptions) SetupLogging() { +func (config *configOptions) setupLogging() { logs := config.Logs if logs.Info != "" { - infoLog.SetOutput(config.NewLogger(logs.Info)) + infoLog.SetOutput(config.newLogger(logs.Info)) } if logs.Error != "" { - errorLog.SetOutput(config.NewLogger(logs.Error)) + errorLog.SetOutput(config.newLogger(logs.Error)) } if logs.Trace != "" { - traceLog.SetOutput(config.NewLogger(logs.Trace)) + traceLog.SetOutput(config.newLogger(logs.Trace)) } if logs.Stats != "" { - statsLog.SetOutput(config.NewLogger(logs.Stats)) + statsLog.SetOutput(config.newLogger(logs.Stats)) } } @@ -1037,7 +1035,7 @@ func (config *configOptions) LoadGridFsConfig() *configOptions { return config } -func (config *configOptions) Dump() { +func (config *configOptions) dump() { json, err := json.MarshalIndent(config, "", " ") if err != nil { errorLog.Printf("Unable to print configuration: %s", err) @@ -1046,9 +1044,33 @@ func (config *configOptions) Dump() { } } -func (config *configOptions) SetDefaults() *configOptions { - if config.MongoUrl == "" { - config.MongoUrl = mongoUrlDefault +/* +if ssl=true is set on the connection string, remove the option +from the connection string and enable TLS because the mgo +driver does not support the option in the connection string +*/ +func (config *configOptions) parseMongoURL() *configOptions { + const queryDelim string = "?" + hostQuery := strings.SplitN(config.MongoURL, queryDelim, 2) + if len(hostQuery) == 2 { + host, query := hostQuery[0], hostQuery[1] + r := regexp.MustCompile(`ssl=true&?|&ssl=true$`) + qstr := r.ReplaceAllString(query, "") + if qstr != query { + config.MongoDialSettings.Ssl = true + if qstr == "" { + config.MongoURL = host + } else { + config.MongoURL = strings.Join([]string{host, qstr}, queryDelim) + } + } + } + return config +} + +func (config *configOptions) setDefaults() *configOptions { + if config.MongoURL == "" { + config.MongoURL = mongoURLDefault } if config.ClusterName != "" { if config.ClusterName != "" && config.Worker != "" { @@ -1079,26 +1101,8 @@ func (config *configOptions) SetDefaults() *configOptions { if config.ElasticMaxDocs == 0 { config.ElasticMaxDocs = elasticMaxDocsDefault } - if config.MongoUrl != "" { - // if ssl=true is set on the connection string, remove the option - // from the connection string and enable TLS because the mgo - // driver does not support the option in the connection string - const queryDelim string = "?" - host_query := strings.SplitN(config.MongoUrl, queryDelim, 2) - if len(host_query) == 2 { - host, query := host_query[0], host_query[1] - r := regexp.MustCompile(`ssl=true&?|&ssl=true$`) - qstr := r.ReplaceAllString(query, "") - if qstr != query { - // ssl detected - config.MongoDialSettings.Ssl = true - if qstr == "" { - config.MongoUrl = host - } else { - config.MongoUrl = strings.Join([]string{host, qstr}, queryDelim) - } - } - } + if config.MongoURL != "" { + config.parseMongoURL() } return config } @@ -1121,39 +1125,36 @@ func (config *configOptions) DialMongo() (*mgo.Session, error) { // Turn off validation tlsConfig.InsecureSkipVerify = true } - dialInfo, err := mgo.ParseURL(config.MongoUrl) + dialInfo, err := mgo.ParseURL(config.MongoURL) if err != nil { return nil, err - } else { - dialInfo.Timeout = time.Duration(10) * time.Second - if config.MongoDialSettings.Timeout != -1 { - dialInfo.Timeout = time.Duration(config.MongoDialSettings.Timeout) * time.Second - } - dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) { - conn, err := tls.Dial("tcp", addr.String(), tlsConfig) - if err != nil { - errorLog.Printf("Unable to dial mongodb: %s", err) - } - return conn, err - } - session, err := mgo.DialWithInfo(dialInfo) - if err == nil { - session.SetSyncTimeout(1 * time.Minute) - session.SetSocketTimeout(1 * time.Minute) - } - return session, err } - } else { + dialInfo.Timeout = time.Duration(10) * time.Second if config.MongoDialSettings.Timeout != -1 { - return mgo.DialWithTimeout(config.MongoUrl, - time.Duration(config.MongoDialSettings.Timeout)*time.Second) - } else { - return mgo.Dial(config.MongoUrl) + dialInfo.Timeout = time.Duration(config.MongoDialSettings.Timeout) * time.Second + } + dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) { + conn, err := tls.Dial("tcp", addr.String(), tlsConfig) + if err != nil { + errorLog.Printf("Unable to dial mongodb: %s", err) + } + return conn, err } + session, err := mgo.DialWithInfo(dialInfo) + if err == nil { + session.SetSyncTimeout(1 * time.Minute) + session.SetSocketTimeout(1 * time.Minute) + } + return session, err + } + if config.MongoDialSettings.Timeout != -1 { + return mgo.DialWithTimeout(config.MongoURL, + time.Duration(config.MongoDialSettings.Timeout)*time.Second) } + return mgo.Dial(config.MongoURL) } -func (config *configOptions) NewHttpClient() (client *http.Client, err error) { +func (config *configOptions) NewHTTPClient() (client *http.Client, err error) { tlsConfig := &tls.Config{} if config.ElasticPemFile != "" { var ca []byte @@ -1180,19 +1181,19 @@ func (config *configOptions) NewHttpClient() (client *http.Client, err error) { return client, err } -func DoDrop(mongo *mgo.Session, elastic *elastic.Client, op *gtm.Op, config *configOptions) (err error) { +func doDrop(mongo *mgo.Session, elastic *elastic.Client, op *gtm.Op, config *configOptions) (err error) { if db, drop := op.IsDropDatabase(); drop { if config.DroppedDatabases { - if err = DeleteIndexes(elastic, db, config); err == nil { - if e := DropDBMeta(mongo, db); e != nil { + if err = deleteIndexes(elastic, db, config); err == nil { + if e := dropDBMeta(mongo, db); e != nil { errorLog.Printf("unable to delete meta for db: %s", e) } } } } else if col, drop := op.IsDropCollection(); drop { if config.DroppedCollections { - if err = DeleteIndex(elastic, op.GetDatabase()+"."+col, config); err == nil { - if e := DropCollectionMeta(mongo, op.GetDatabase()+"."+col); e != nil { + if err = deleteIndex(elastic, op.GetDatabase()+"."+col, config); err == nil { + if e := dropCollectionMeta(mongo, op.GetDatabase()+"."+col); e != nil { errorLog.Printf("unable to delete meta for collection: %s", e) } } @@ -1201,12 +1202,12 @@ func DoDrop(mongo *mgo.Session, elastic *elastic.Client, op *gtm.Op, config *con return } -func DoFileContent(mongo *mgo.Session, op *gtm.Op, config *configOptions) (ingestAttachment bool, err error) { +func doFileContent(mongo *mgo.Session, op *gtm.Op, config *configOptions) (ingestAttachment bool, err error) { if !config.IndexFiles { return } if fileNamespaces[op.Namespace] { - err = AddFileContent(mongo, op, config) + err = addFileContent(mongo, op, config) if config.ElasticMajorVersion >= 5 { if op.Data["file"] != "" { ingestAttachment = true @@ -1216,19 +1217,19 @@ func DoFileContent(mongo *mgo.Session, op *gtm.Op, config *configOptions) (inges return } -func DoResume(mongo *mgo.Session, ts bson.MongoTimestamp, config *configOptions) (err error) { +func doResume(mongo *mgo.Session, ts bson.MongoTimestamp, config *configOptions) (err error) { if config.Resume { if ts > 0 { - err = SaveTimestamp(mongo, ts, config.ResumeName) + err = saveTimestamp(mongo, ts, config.ResumeName) } } return } -func AddPatch(config *configOptions, client *elastic.Client, op *gtm.Op, - objectId string, indexType *indexTypeMapping, meta *indexingMeta) (err error) { +func addPatch(config *configOptions, client *elastic.Client, op *gtm.Op, + objectID string, indexType *indexTypeMapping, meta *indexingMeta) (err error) { var merges []interface{} - var toJson []byte + var toJSON []byte if op.IsSourceOplog() == false { return nil } @@ -1237,7 +1238,7 @@ func AddPatch(config *configOptions, client *elastic.Client, op *gtm.Op, } if op.IsUpdate() { ctx := context.Background() - service := client.Get().Index(indexType.Index).Type(indexType.Type).Id(objectId) + service := client.Get().Index(indexType.Index).Type(indexType.Type).Id(objectID) if meta.Routing != "" { service.Routing(meta.Routing) } @@ -1255,10 +1256,10 @@ func AddPatch(config *configOptions, client *elastic.Client, op *gtm.Op, } } delete(src, config.MergePatchAttr) - var fromJson, mergeDoc []byte - if fromJson, err = json.Marshal(src); err == nil { - if toJson, err = json.Marshal(op.Data); err == nil { - if mergeDoc, err = jsonpatch.CreateMergePatch(fromJson, toJson); err == nil { + var fromJSON, mergeDoc []byte + if fromJSON, err = json.Marshal(src); err == nil { + if toJSON, err = json.Marshal(op.Data); err == nil { + if mergeDoc, err = jsonpatch.CreateMergePatch(fromJSON, toJSON); err == nil { merge := make(map[string]interface{}) merge["ts"] = op.Timestamp >> 32 merge["p"] = string(mergeDoc) @@ -1276,11 +1277,11 @@ func AddPatch(config *configOptions, client *elastic.Client, op *gtm.Op, } } else { if _, found := op.Data[config.MergePatchAttr]; !found { - if toJson, err = json.Marshal(op.Data); err == nil { + if toJSON, err = json.Marshal(op.Data); err == nil { merge := make(map[string]interface{}) merge["v"] = 1 merge["ts"] = op.Timestamp >> 32 - merge["p"] = string(toJson) + merge["p"] = string(toJSON) merges = append(merges, merge) op.Data[config.MergePatchAttr] = merges } @@ -1289,19 +1290,19 @@ func AddPatch(config *configOptions, client *elastic.Client, op *gtm.Op, return } -func DoIndexing(config *configOptions, mongo *mgo.Session, bulk *elastic.BulkProcessor, client *elastic.Client, op *gtm.Op, ingestAttachment bool) (err error) { - meta := ParseIndexMeta(op.Data) - PrepareDataForIndexing(config, op) - objectId, indexType := OpIdToString(op), IndexTypeMapping(op) +func doIndexing(config *configOptions, mongo *mgo.Session, bulk *elastic.BulkProcessor, client *elastic.Client, op *gtm.Op, ingestAttachment bool) (err error) { + meta := parseIndexMeta(op.Data) + prepareDataForIndexing(config, op) + objectID, indexType := opIDToString(op), mapIndexType(op) if config.EnablePatches { if patchNamespaces[op.Namespace] { - if e := AddPatch(config, client, op, objectId, indexType, meta); e != nil { + if e := addPatch(config, client, op, objectID, indexType, meta); e != nil { errorLog.Printf("unable to save json-patch info: %s", e) } } } req := elastic.NewBulkIndexRequest().Index(indexType.Index).Type(indexType.Type) - req.Id(objectId) + req.Id(objectID) req.Doc(op.Data) if meta.Routing != "" { req.Routing(meta.Routing) @@ -1311,25 +1312,25 @@ func DoIndexing(config *configOptions, mongo *mgo.Session, bulk *elastic.BulkPro } bulk.Add(req) if meta.Empty() == false { - if e := SetIndexMeta(mongo, op.Namespace, objectId, meta); e != nil { + if e := setIndexMeta(mongo, op.Namespace, objectID, meta); e != nil { errorLog.Printf("unable to save routing info: %s", e) } } return } -func DoIndex(config *configOptions, mongo *mgo.Session, bulk *elastic.BulkProcessor, client *elastic.Client, op *gtm.Op, ingestAttachment bool) (err error) { - if err = MapData(config, op); err == nil { +func doIndex(config *configOptions, mongo *mgo.Session, bulk *elastic.BulkProcessor, client *elastic.Client, op *gtm.Op, ingestAttachment bool) (err error) { + if err = mapData(config, op); err == nil { if op.Data != nil { - err = DoIndexing(config, mongo, bulk, client, op, ingestAttachment) + err = doIndexing(config, mongo, bulk, client, op, ingestAttachment) } else if op.IsUpdate() { - DoDelete(mongo, bulk, op) + doDelete(mongo, bulk, op) } } return } -func DoIndexStats(bulkStats *elastic.BulkProcessor, stats elastic.BulkProcessorStats) (err error) { +func doIndexStats(bulkStats *elastic.BulkProcessor, stats elastic.BulkProcessorStats) (err error) { var hostname string doc := make(map[string]interface{}) t := time.Now().UTC() @@ -1347,14 +1348,14 @@ func DoIndexStats(bulkStats *elastic.BulkProcessor, stats elastic.BulkProcessorS return } -func DropDBMeta(session *mgo.Session, db string) (err error) { +func dropDBMeta(session *mgo.Session, db string) (err error) { col := session.DB("monstache").C("meta") q := bson.M{"db": db} _, err = col.RemoveAll(q) return } -func DropCollectionMeta(session *mgo.Session, namespace string) (err error) { +func dropCollectionMeta(session *mgo.Session, namespace string) (err error) { col := session.DB("monstache").C("meta") q := bson.M{"namespace": namespace} _, err = col.RemoveAll(q) @@ -1365,25 +1366,25 @@ func (meta *indexingMeta) Empty() bool { return meta.Routing == "" && meta.Index == "" && meta.Type == "" } -func SetIndexMeta(session *mgo.Session, namespace, id string, meta *indexingMeta) error { +func setIndexMeta(session *mgo.Session, namespace, id string, meta *indexingMeta) error { col := session.DB("monstache").C("meta") - metaId := fmt.Sprintf("%s.%s", namespace, id) + metaID := fmt.Sprintf("%s.%s", namespace, id) doc := make(map[string]interface{}) doc["routing"] = meta.Routing doc["index"] = meta.Index doc["type"] = meta.Type doc["db"] = strings.SplitN(namespace, ".", 2)[0] doc["namespace"] = namespace - _, err := col.UpsertId(metaId, bson.M{"$set": doc}) + _, err := col.UpsertId(metaID, bson.M{"$set": doc}) return err } -func GetIndexMeta(session *mgo.Session, namespace, id string) (meta *indexingMeta) { +func getIndexMeta(session *mgo.Session, namespace, id string) (meta *indexingMeta) { meta = &indexingMeta{} col := session.DB("monstache").C("meta") doc := make(map[string]interface{}) - metaId := fmt.Sprintf("%s.%s", namespace, id) - col.FindId(metaId).One(doc) + metaID := fmt.Sprintf("%s.%s", namespace, id) + col.FindId(metaID).One(doc) if doc["routing"] != nil { meta.Routing = doc["routing"].(string) } @@ -1393,15 +1394,15 @@ func GetIndexMeta(session *mgo.Session, namespace, id string) (meta *indexingMet if doc["type"] != nil { meta.Type = doc["type"].(string) } - col.RemoveId(metaId) + col.RemoveId(metaID) return } -func DoDelete(mongo *mgo.Session, bulk *elastic.BulkProcessor, op *gtm.Op) { - objectId, indexType, meta := OpIdToString(op), IndexTypeMapping(op), &indexingMeta{} +func doDelete(mongo *mgo.Session, bulk *elastic.BulkProcessor, op *gtm.Op) { + objectID, indexType, meta := opIDToString(op), mapIndexType(op), &indexingMeta{} if mapEnvs != nil { if env := mapEnvs[op.Namespace]; env != nil && env.Routing { - meta = GetIndexMeta(mongo, op.Namespace, objectId) + meta = getIndexMeta(mongo, op.Namespace, objectID) } } if meta.Index != "" { @@ -1414,12 +1415,12 @@ func DoDelete(mongo *mgo.Session, bulk *elastic.BulkProcessor, op *gtm.Op) { if meta.Routing != "" { req.Routing(meta.Routing) } - req.Id(objectId) + req.Id(objectID) bulk.Add(req) return } -func GtmDefaultSettings() gtmSettings { +func gtmDefaultSettings() gtmSettings { return gtmSettings{ ChannelSize: gtmChannelSizeDefault, BufferSize: 32, @@ -1427,12 +1428,12 @@ func GtmDefaultSettings() gtmSettings { } } -func CreateAfterBulk(mongo *mgo.Session, config *configOptions) elastic.BulkAfterFunc { +func createAfterBulk(mongo *mgo.Session, config *configOptions) elastic.BulkAfterFunc { return func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if err != nil || lastTimestamp == 0 { return } - if err = DoResume(mongo, lastTimestamp, config); err != nil { + if err = doResume(mongo, lastTimestamp, config); err != nil { errorLog.Printf("Unable to save timestamp: %s", err) } } @@ -1443,19 +1444,19 @@ func main() { config := &configOptions{ MongoDialSettings: mongoDialSettings{Timeout: -1}, MongoSessionSettings: mongoSessionSettings{SocketTimeout: -1, SyncTimeout: -1}, - GtmSettings: GtmDefaultSettings(), + GtmSettings: gtmDefaultSettings(), } - config.ParseCommandLineFlags() + config.parseCommandLineFlags() if config.Version { - fmt.Println(Version) + fmt.Println(version) os.Exit(0) } - config.LoadConfigFile().SetDefaults() + config.loadConfigFile().setDefaults() if config.Print { - config.Dump() + config.dump() os.Exit(0) } - config.LoadPlugins() + config.loadPlugins() sigs := make(chan os.Signal, 1) done := make(chan bool, 1) @@ -1463,7 +1464,7 @@ func main() { mongo, err := config.DialMongo() if err != nil { - errorLog.Panicf("Unable to connect to mongodb using URL %s: %s", config.MongoUrl, err) + errorLog.Panicf("Unable to connect to mongodb using URL %s: %s", config.MongoURL, err) } mongo.SetMode(mgo.Primary, true) defer mongo.Close() @@ -1481,28 +1482,28 @@ func main() { mongo.SetSyncTimeout(timeOut) } - elasticClient, err := config.NewElasticClient() + elasticClient, err := config.newElasticClient() if err != nil { errorLog.Panicf("Unable to create elasticsearch client: %s", err) } if config.ElasticVersion == "" { - if err := config.TestElasticsearchConn(elasticClient); err != nil { + if err := config.testElasticsearchConn(elasticClient); err != nil { errorLog.Panicf("Unable to validate connection to elasticsearch using client %s: %s", elasticClient, err) } } else { - if err := config.ParseElasticsearchVersion(config.ElasticVersion); err != nil { + if err := config.parseElasticsearchVersion(config.ElasticVersion); err != nil { errorLog.Panicf("Elasticsearch version must conform to major.minor.fix: %s", err) } } - bulk, err := config.NewBulkProcessor(elasticClient, mongo) + bulk, err := config.newBulkProcessor(elasticClient, mongo) if err != nil { errorLog.Panicf("Unable to start bulk processor: %s", err) } defer bulk.Stop() var bulkStats *elastic.BulkProcessor if config.IndexStats { - bulkStats, err = config.NewStatsBulkProcessor(elasticClient) + bulkStats, err = config.newStatsBulkProcessor(elasticClient) if err != nil { errorLog.Panicf("Unable to start stats bulk processor: %s", err) } @@ -1514,7 +1515,7 @@ func main() { done <- true }() - var after gtm.TimestampGenerator = nil + var after gtm.TimestampGenerator if config.Resume { after = func(session *mgo.Session, options *gtm.Options) bson.MongoTimestamp { ts := gtm.LastOpTimestamp(session, options) @@ -1543,7 +1544,7 @@ func main() { errorLog.Fatalln("File indexing is ON but no file namespaces are configured") } for _, namespace := range config.FileNamespaces { - if err := EnsureFileMapping(elasticClient, namespace, config); err != nil { + if err := ensureFileMapping(elasticClient, namespace, config); err != nil { panic(err) } if config.ElasticMajorVersion >= 5 { @@ -1552,14 +1553,14 @@ func main() { } } - var filter gtm.OpFilter = nil - var directReadFilter gtm.OpFilter = nil - filterChain := []gtm.OpFilter{NotMonstache, NotSystem, NotChunks} + var filter gtm.OpFilter + var directReadFilter gtm.OpFilter + filterChain := []gtm.OpFilter{notMonstache, notSystem, notChunks} if config.NsRegex != "" { - filterChain = append(filterChain, FilterWithRegex(config.NsRegex)) + filterChain = append(filterChain, filterWithRegex(config.NsRegex)) } if config.NsExcludeRegex != "" { - filterChain = append(filterChain, FilterInverseWithRegex(config.NsExcludeRegex)) + filterChain = append(filterChain, filterInverseWithRegex(config.NsExcludeRegex)) } if config.Worker != "" { workerFilter, err := consistent.ConsistentHashFilter(config.Worker, config.Workers) @@ -1583,12 +1584,12 @@ func main() { cursorTimeout = &config.MongoCursorTimeout } if config.ClusterName != "" { - if err = EnsureClusterTTL(mongo); err == nil { + if err = ensureClusterTTL(mongo); err == nil { infoLog.Printf("Joined cluster %s", config.ClusterName) } else { errorLog.Panicf("Unable to enable cluster mode: %s", err) } - enabled, err = IsEnabledProcess(mongo, config) + enabled, err = isEnabledProcess(mongo, config) if err != nil { errorLog.Panicf("Unable to determine enabled cluster process: %s", err) } @@ -1659,7 +1660,7 @@ func main() { bulkStats.Stop() } if config.ClusterName != "" { - ResetClusterState(mongo, config) + resetClusterState(mongo, config) } mongo.Close() os.Exit(exitStatus) @@ -1668,18 +1669,18 @@ func main() { break } if enabled { - enabled = IsEnabledProcessId(mongo, config) + enabled = isEnabledProcessID(mongo, config) if !enabled { infoLog.Printf("Pausing work for cluster %s", config.ClusterName) gtmCtx.Pause() bulk.Stop() } } else { - if enabled, err = IsEnabledProcess(mongo, config); err == nil { + if enabled, err = isEnabledProcess(mongo, config); err == nil { if enabled { infoLog.Printf("Resuming work for cluster %s", config.ClusterName) bulk.Start(context.Background()) - ResumeWork(gtmCtx, mongo, config) + resumeWork(gtmCtx, mongo, config) } } else { gtmCtx.ErrC <- err @@ -1690,7 +1691,7 @@ func main() { break } if config.IndexStats { - if err := DoIndexStats(bulkStats, bulk.Stats()); err != nil { + if err := doIndexStats(bulkStats, bulk.Stats()); err != nil { errorLog.Printf("Error indexing statistics: %s", err) } } else { @@ -1717,17 +1718,17 @@ func main() { } if op.IsDrop() { bulk.Flush() - if err = DoDrop(mongo, elasticClient, op, config); err != nil { + if err = doDrop(mongo, elasticClient, op, config); err != nil { gtmCtx.ErrC <- err } } else if op.IsDelete() { - DoDelete(mongo, bulk, op) + doDelete(mongo, bulk, op) } else if op.Data != nil { ingestAttachment := false - if ingestAttachment, err = DoFileContent(mongo, op, config); err != nil { + if ingestAttachment, err = doFileContent(mongo, op, config); err != nil { gtmCtx.ErrC <- err } - if err = DoIndex(config, mongo, bulk, elasticClient, op, ingestAttachment); err != nil { + if err = doIndex(config, mongo, bulk, elasticClient, op, ingestAttachment); err != nil { gtmCtx.ErrC <- err } } diff --git a/monstache_test.go b/monstache_test.go index 352e924..ef9e7ac 100644 --- a/monstache_test.go +++ b/monstache_test.go @@ -56,51 +56,51 @@ func ValidateDocResponse(t *testing.T, doc map[string]string, resp *elastic.GetR func TestSetElasticClientScheme(t *testing.T) { c := &configOptions{ - ElasticUrls: []string { "https://example.com:9200" }, + ElasticUrls: []string{"https://example.com:9200"}, } - if c.NeedsSecureScheme() == false { + if c.needsSecureScheme() == false { t.Fatalf("secure scheme should be required") } c = &configOptions{ - ElasticUrls: []string { "http://example.com:9200" }, + ElasticUrls: []string{"http://example.com:9200"}, } - if c.NeedsSecureScheme() { + if c.needsSecureScheme() { t.Fatalf("secure scheme should not be required") } c = &configOptions{} - if c.NeedsSecureScheme() { + if c.needsSecureScheme() { t.Fatalf("secure scheme should not be required") } } func TestParseSecureMongoUrl(t *testing.T) { - c := &configOptions{MongoUrl: "mongo://host:47/db?a=b&ssl=true&c=d"} - c.SetDefaults() - if c.MongoUrl != "mongo://host:47/db?a=b&c=d" { + c := &configOptions{MongoURL: "mongo://host:47/db?a=b&ssl=true&c=d"} + c.setDefaults() + if c.MongoURL != "mongo://host:47/db?a=b&c=d" { t.Fatalf("ssl param not removed from url") } if c.MongoDialSettings.Ssl == false { t.Fatalf("ssl not enabled") } - c = &configOptions{MongoUrl: "mongo://host:47/db?a=b&c=d&ssl=true"} - c.SetDefaults() - if c.MongoUrl != "mongo://host:47/db?a=b&c=d" { + c = &configOptions{MongoURL: "mongo://host:47/db?a=b&c=d&ssl=true"} + c.setDefaults() + if c.MongoURL != "mongo://host:47/db?a=b&c=d" { t.Fatalf("ssl param not removed from url") } if c.MongoDialSettings.Ssl == false { t.Fatalf("ssl not enabled") } - c = &configOptions{MongoUrl: "mongo://host:47/db?ssl=true"} - c.SetDefaults() - if c.MongoUrl != "mongo://host:47/db" { + c = &configOptions{MongoURL: "mongo://host:47/db?ssl=true"} + c.setDefaults() + if c.MongoURL != "mongo://host:47/db" { t.Fatalf("ssl param not removed from url") } if c.MongoDialSettings.Ssl == false { t.Fatalf("ssl not enabled") } - c = &configOptions{MongoUrl: "mongo://host:47/db?ssl=true&a=b"} - c.SetDefaults() - if c.MongoUrl != "mongo://host:47/db?a=b" { + c = &configOptions{MongoURL: "mongo://host:47/db?ssl=true&a=b"} + c.setDefaults() + if c.MongoURL != "mongo://host:47/db?a=b" { t.Fatalf("ssl param not removed from url") } if c.MongoDialSettings.Ssl == false { diff --git a/monstachemap/plugin.go b/monstachemap/plugin.go index 3a33fcb..ad40b6b 100644 --- a/monstachemap/plugin.go +++ b/monstachemap/plugin.go @@ -1,27 +1,29 @@ -package monstachemap - -// plugins must import this package -// import "github.com/rwynn/monstache/monstachemap" - -// plugins must implement a function named "Map" with the following signature -// func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) - -// plugins can be compiled using go build -buildmode=plugin -o myplugin.so myplugin.go -// to enable the plugin start with monstache -mapper-plugin-path /path/to/myplugin.so - -type MapperPluginInput struct { - Document map[string]interface{} // the original document from MongoDB - Database string // the origin database in MongoDB - Collection string // the origin collection in MongoDB - Namespace string // the entire namespace for the original document - Operation string // "i" for a insert or "u" for update -} - -type MapperPluginOutput struct { - Document map[string]interface{} // an updated document to index into Elasticsearch - Index string // the name of the index to use - Type string // the document type - Routing string // the routing value to use - Drop bool // set to true to indicate that the document should not be indexed - Passthrough bool // set to true to indicate the original document should be indexed unchanged -} +package monstachemap + +// plugins must import this package +// import "github.com/rwynn/monstache/monstachemap" + +// plugins must implement a function named "Map" with the following signature +// func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) + +// plugins can be compiled using go build -buildmode=plugin -o myplugin.so myplugin.go +// to enable the plugin start with monstache -mapper-plugin-path /path/to/myplugin.so + +// MapperPluginInput is the input to the Map function +type MapperPluginInput struct { + Document map[string]interface{} // the original document from MongoDB + Database string // the origin database in MongoDB + Collection string // the origin collection in MongoDB + Namespace string // the entire namespace for the original document + Operation string // "i" for a insert or "u" for update +} + +// MapperPluginOutput is the output of the Map function +type MapperPluginOutput struct { + Document map[string]interface{} // an updated document to index into Elasticsearch + Index string // the name of the index to use + Type string // the document type + Routing string // the routing value to use + Drop bool // set to true to indicate that the document should not be indexed + Passthrough bool // set to true to indicate the original document should be indexed unchanged +}