From afb82609350433eda9415e8203a29b140f9d4294 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 11 Dec 2015 17:47:36 +0100 Subject: [PATCH] Better connection handling for InfluxDB: --- sinks/external.go | 11 +++++++- sinks/influxdb/driver.go | 57 ++++++++++++++++++++++++++++++++-------- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/sinks/external.go b/sinks/external.go index 9d0205021a..131c0c442a 100644 --- a/sinks/external.go +++ b/sinks/external.go @@ -146,7 +146,16 @@ func (esm *externalSinkManager) store() error { var errors []string for i := 0; i < errorsLen; i++ { if err := <-errorsChan; err != nil { - errors = append(errors, fmt.Sprintf("%v ", err)) + strError := fmt.Sprintf("%v ", err) + found := false + for _, otherError := range errors { + if otherError == strError { + found = true + } + } + if !found { + errors = append(errors, strError) + } } } if len(errors) > 0 { diff --git a/sinks/influxdb/driver.go b/sinks/influxdb/driver.go index dbf9834461..8fcbb1a3e6 100644 --- a/sinks/influxdb/driver.go +++ b/sinks/influxdb/driver.go @@ -70,6 +70,9 @@ var ( ) func (sink *influxdbSink) Register(metrics []sink_api.MetricDescriptor) error { + sink.Lock() + defer sink.Unlock() + if err := sink.createDatabase(); err != nil { glog.Error(err) } @@ -109,6 +112,9 @@ func (sink *influxdbSink) metricToPoint(timeseries *sink_api.Timeseries) influxd // Stores events into the backend. func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error { + sink.Lock() + defer sink.Unlock() + if err := sink.createDatabase(); err != nil { return err } @@ -126,7 +132,10 @@ func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error { } if _, err = sink.client.Write(bp); err != nil { if strings.Contains(err.Error(), dbNotFoundError) { - sink.dbExists = false + sink.resetConnection() + } else if _, _, err := sink.client.Ping(); err != nil { + glog.Errorf("InfluxDB ping failed: %v", err) + sink.resetConnection() } glog.Errorf("failed to write events to influxDB - %s", err) sink.recordWriteFailure() @@ -136,6 +145,12 @@ func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error { return nil } +func (sink *influxdbSink) resetConnection() { + glog.Infof("Influxdb connection reset") + sink.dbExists = false + sink.client = nil +} + func (sink *influxdbSink) eventsToPoints(events []kube_api.Event) ([]influxdb.Point, error) { if events == nil || len(events) <= 0 { return nil, nil @@ -168,6 +183,9 @@ func (sink *influxdbSink) eventsToPoints(events []kube_api.Event) ([]influxdb.Po } func (sink *influxdbSink) StoreTimeseries(timeseries []sink_api.Timeseries) error { + sink.Lock() + defer sink.Unlock() + var err error if err = sink.createDatabase(); err != nil { return err @@ -184,10 +202,14 @@ func (sink *influxdbSink) StoreTimeseries(timeseries []sink_api.Timeseries) erro // TODO: Record the average time taken to flush data. if _, err = sink.client.Write(bp); err != nil { if strings.Contains(err.Error(), dbNotFoundError) { - sink.dbExists = false + sink.resetConnection() } glog.Errorf("failed to write stats to influxDB - %v", err) sink.recordWriteFailure() + if _, _, err := sink.client.Ping(); err != nil { + glog.Errorf("InfluxDB ping failed: %v", err) + sink.resetConnection() + } return err } glog.V(4).Info("flushed stats to influxDB") @@ -204,18 +226,17 @@ func getEventValue(event *kube_api.Event) (string, error) { } func (sink *influxdbSink) recordWriteFailure() { - sink.Lock() - defer sink.Unlock() sink.writeFailures++ } func (sink *influxdbSink) getState() string { - sink.RLock() - defer sink.RUnlock() return fmt.Sprintf("\tNumber of write failures: %d\n", sink.writeFailures) } func (sink *influxdbSink) DebugInfo() string { + sink.RLock() + defer sink.RUnlock() + desc := "Sink Type: InfluxDB\n" desc += fmt.Sprintf("\tclient: Host %q, Database %q\n", sink.c.host, sink.c.dbName) desc += sink.getState() @@ -228,8 +249,14 @@ func (sink *influxdbSink) Name() string { } func (sink *influxdbSink) createDatabase() error { - sink.Lock() - defer sink.Unlock() + if sink.client == nil { + client, err := newClient(sink.c) + if err != nil { + return err + } + sink.client = client + } + if sink.dbExists { return nil } @@ -246,8 +273,7 @@ func (sink *influxdbSink) createDatabase() error { return nil } -// Returns a thread-compatible implementation of influxdb interactions. -func new(c config) (sink_api.ExternalSink, error) { +func newClient(c config) (influxdbClient, error) { url := &url.URL{ Scheme: "http", Host: c.host, @@ -270,8 +296,17 @@ func new(c config) (sink_api.ExternalSink, error) { if _, _, err := client.Ping(); err != nil { return nil, fmt.Errorf("failed to ping InfluxDB server at %q - %v", c.host, err) } + return client, nil +} + +// Returns a thread-compatible implementation of influxdb interactions. +func new(c config) (sink_api.ExternalSink, error) { + client, err := newClient(c) + if err != nil { + fmt.Errorf("issues while creating an InfluxDB sink: %v, will retry on use", err) + } return &influxdbSink{ - client: client, + client: client, // can be nil c: c, }, nil }