Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#780 from mwielgus/influxdb-conn…
Browse files Browse the repository at this point in the history
…ection

Better connection handling for InfluxDB:
  • Loading branch information
vishh committed Dec 11, 2015
2 parents b9ab6fe + afb8260 commit b19c032
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
11 changes: 10 additions & 1 deletion sinks/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,16 @@ func (esm *externalSinkManager) store() error {
var errors []string
for i := 0; i < errorsLen; i++ {
if err := <-errorsChan; err != nil {
errors = append(errors, fmt.Sprintf("%v ", err))
strError := fmt.Sprintf("%v ", err)
found := false
for _, otherError := range errors {
if otherError == strError {
found = true
}
}
if !found {
errors = append(errors, strError)
}
}
}
if len(errors) > 0 {
Expand Down
57 changes: 46 additions & 11 deletions sinks/influxdb/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ var (
)

func (sink *influxdbSink) Register(metrics []sink_api.MetricDescriptor) error {
sink.Lock()
defer sink.Unlock()

if err := sink.createDatabase(); err != nil {
glog.Error(err)
}
Expand Down Expand Up @@ -109,6 +112,9 @@ func (sink *influxdbSink) metricToPoint(timeseries *sink_api.Timeseries) influxd

// Stores events into the backend.
func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error {
sink.Lock()
defer sink.Unlock()

if err := sink.createDatabase(); err != nil {
return err
}
Expand All @@ -126,7 +132,10 @@ func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error {
}
if _, err = sink.client.Write(bp); err != nil {
if strings.Contains(err.Error(), dbNotFoundError) {
sink.dbExists = false
sink.resetConnection()
} else if _, _, err := sink.client.Ping(); err != nil {
glog.Errorf("InfluxDB ping failed: %v", err)
sink.resetConnection()
}
glog.Errorf("failed to write events to influxDB - %s", err)
sink.recordWriteFailure()
Expand All @@ -136,6 +145,12 @@ func (sink *influxdbSink) StoreEvents(events []kube_api.Event) error {
return nil
}

func (sink *influxdbSink) resetConnection() {
glog.Infof("Influxdb connection reset")
sink.dbExists = false
sink.client = nil
}

func (sink *influxdbSink) eventsToPoints(events []kube_api.Event) ([]influxdb.Point, error) {
if events == nil || len(events) <= 0 {
return nil, nil
Expand Down Expand Up @@ -168,6 +183,9 @@ func (sink *influxdbSink) eventsToPoints(events []kube_api.Event) ([]influxdb.Po
}

func (sink *influxdbSink) StoreTimeseries(timeseries []sink_api.Timeseries) error {
sink.Lock()
defer sink.Unlock()

var err error
if err = sink.createDatabase(); err != nil {
return err
Expand All @@ -184,10 +202,14 @@ func (sink *influxdbSink) StoreTimeseries(timeseries []sink_api.Timeseries) erro
// TODO: Record the average time taken to flush data.
if _, err = sink.client.Write(bp); err != nil {
if strings.Contains(err.Error(), dbNotFoundError) {
sink.dbExists = false
sink.resetConnection()
}
glog.Errorf("failed to write stats to influxDB - %v", err)
sink.recordWriteFailure()
if _, _, err := sink.client.Ping(); err != nil {
glog.Errorf("InfluxDB ping failed: %v", err)
sink.resetConnection()
}
return err
}
glog.V(4).Info("flushed stats to influxDB")
Expand All @@ -204,18 +226,17 @@ func getEventValue(event *kube_api.Event) (string, error) {
}

func (sink *influxdbSink) recordWriteFailure() {
sink.Lock()
defer sink.Unlock()
sink.writeFailures++
}

func (sink *influxdbSink) getState() string {
sink.RLock()
defer sink.RUnlock()
return fmt.Sprintf("\tNumber of write failures: %d\n", sink.writeFailures)
}

func (sink *influxdbSink) DebugInfo() string {
sink.RLock()
defer sink.RUnlock()

desc := "Sink Type: InfluxDB\n"
desc += fmt.Sprintf("\tclient: Host %q, Database %q\n", sink.c.host, sink.c.dbName)
desc += sink.getState()
Expand All @@ -228,8 +249,14 @@ func (sink *influxdbSink) Name() string {
}

func (sink *influxdbSink) createDatabase() error {
sink.Lock()
defer sink.Unlock()
if sink.client == nil {
client, err := newClient(sink.c)
if err != nil {
return err
}
sink.client = client
}

if sink.dbExists {
return nil
}
Expand All @@ -246,8 +273,7 @@ func (sink *influxdbSink) createDatabase() error {
return nil
}

// Returns a thread-compatible implementation of influxdb interactions.
func new(c config) (sink_api.ExternalSink, error) {
func newClient(c config) (influxdbClient, error) {
url := &url.URL{
Scheme: "http",
Host: c.host,
Expand All @@ -270,8 +296,17 @@ func new(c config) (sink_api.ExternalSink, error) {
if _, _, err := client.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping InfluxDB server at %q - %v", c.host, err)
}
return client, nil
}

// Returns a thread-compatible implementation of influxdb interactions.
func new(c config) (sink_api.ExternalSink, error) {
client, err := newClient(c)
if err != nil {
fmt.Errorf("issues while creating an InfluxDB sink: %v, will retry on use", err)
}
return &influxdbSink{
client: client,
client: client, // can be nil
c: c,
}, nil
}
Expand Down

0 comments on commit b19c032

Please sign in to comment.