From 5520f662baeba7389758d8a2087cd5db7e74589d Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 22 Aug 2017 16:52:26 -0700 Subject: [PATCH] Don't retry points beyond retention policy (#3155) --- plugins/outputs/influxdb/influxdb.go | 28 ++++- plugins/outputs/influxdb/influxdb_test.go | 119 ++++++++++++++++++---- 2 files changed, 126 insertions(+), 21 deletions(-) diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 23f567820..b4567af99 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -88,7 +88,7 @@ var sampleConfig = ` ## HTTP Proxy Config # http_proxy = "http://corporate.proxy:3128" - + ## Compress each HTTP request payload using GZIP. # content_encoding = "gzip" ` @@ -199,6 +199,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { i.Database) } } + if strings.Contains(e.Error(), "field type conflict") { log.Printf("E! Field type conflict, dropping conflicted points: %s", e) // setting err to nil, otherwise we will keep retrying and points @@ -206,6 +207,31 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { err = nil break } + + if strings.Contains(e.Error(), "points beyond retention policy") { + log.Printf("W! Points beyond retention policy: %s", e) + // This error is indicates the point is older than the + // retention policy permits, and is probably not a cause for + // concern. Retrying will not help unless the retention + // policy is modified. + err = nil + break + } + + if strings.Contains(e.Error(), "unable to parse") { + log.Printf("E! Parse error; dropping points: %s", e) + // This error indicates a bug in Telegraf or InfluxDB parsing + // of line protocol. Retries will not be successful. + err = nil + break + } + + if strings.Contains(e.Error(), "hinted handoff queue not empty") { + // This is an informational message + err = nil + break + } + // Log write failure log.Printf("E! InfluxDB Output Error: %s", e) } else { diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 5dfa63c66..50c86125d 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -178,28 +178,107 @@ func TestHTTPError_DatabaseNotFound(t *testing.T) { require.NoError(t, i.Close()) } -// field type conflict does not return an error, instead we -func TestHTTPError_FieldTypeConflict(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/write": - w.WriteHeader(http.StatusNotFound) - w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, `{"results":[{}],"error":"field type conflict: input field \"value\" on measurement \"test\" is type integer, already exists as type float dropped=1"}`) - } - })) - defer ts.Close() - - i := InfluxDB{ - URLs: []string{ts.URL}, - Database: "test", +func TestHTTPError_WriteErrors(t *testing.T) { + var testCases = []struct { + name string + status int + contentType string + body string + err error + }{ + { + // HTTP/1.1 400 Bad Request + // Content-Type: application/json + // X-Influxdb-Version: 1.3.3 + // + // { + // "error": "partial write: points beyond retention policy dropped=1" + // } + name: "beyond retention policy is not an error", + status: http.StatusBadRequest, + contentType: "application/json", + body: `{"error":"partial write: points beyond retention policy dropped=1"}`, + err: nil, + }, + { + // HTTP/1.1 400 Bad Request + // Content-Type: application/json + // X-Influxdb-Version: 1.3.3 + // + // { + // "error": "unable to parse 'foo bar=': missing field value" + // } + name: "unable to parse is not an error", + status: http.StatusBadRequest, + contentType: "application/json", + body: `{"error":"unable to parse 'foo bar=': missing field value"}`, + err: nil, + }, + { + // HTTP/1.1 400 Bad Request + // Content-Type: application/json + // X-Influxdb-Version: 1.3.3 + // + // { + // "error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1" + // } + name: "field type conflict is not an error", + status: http.StatusBadRequest, + contentType: "application/json", + body: `{"error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1"}`, + err: nil, + }, + { + // HTTP/1.1 500 Internal Server Error + // Content-Type: application/json + // X-Influxdb-Version: 1.3.3-c1.3.3 + // + // { + // "error": "write failed: hinted handoff queue not empty" + // } + name: "hinted handoff queue not empty is not an error", + status: http.StatusInternalServerError, + contentType: "application/json", + body: `{"error":"write failed: hinted handoff queue not empty"}`, + err: nil, + }, + { + // HTTP/1.1 500 Internal Server Error + // Content-Type: application/json + // X-Influxdb-Version: 1.3.3-c1.3.3 + // + // { + // "error": "partial write" + // } + name: "plain partial write is an error", + status: http.StatusInternalServerError, + contentType: "application/json", + body: `{"error":"partial write"}`, + err: fmt.Errorf("Could not write to any InfluxDB server in cluster"), + }, } - err := i.Connect() - require.NoError(t, err) - err = i.Write(testutil.MockMetrics()) - require.NoError(t, err) - require.NoError(t, i.Close()) + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.WriteHeader(tt.status) + rw.Header().Set("Content-Type", tt.contentType) + fmt.Fprintln(rw, tt.body) + })) + defer ts.Close() + + influx := InfluxDB{ + URLs: []string{ts.URL}, + Database: "test", + } + + err := influx.Connect() + require.NoError(t, err) + err = influx.Write(testutil.MockMetrics()) + require.Equal(t, tt.err, err) + require.NoError(t, influx.Close()) + }) + } } type MockClient struct {