Don't retry points beyond retention policy (#3155)
This commit is contained in:
		
							parent
							
								
									5224b526f4
								
							
						
					
					
						commit
						1f4a997164
					
				|  | @ -199,6 +199,7 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { | ||||||
| 						i.Database) | 						i.Database) | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  | 
 | ||||||
| 			if strings.Contains(e.Error(), "field type conflict") { | 			if strings.Contains(e.Error(), "field type conflict") { | ||||||
| 				log.Printf("E! Field type conflict, dropping conflicted points: %s", e) | 				log.Printf("E! Field type conflict, dropping conflicted points: %s", e) | ||||||
| 				// setting err to nil, otherwise we will keep retrying and points
 | 				// setting err to nil, otherwise we will keep retrying and points
 | ||||||
|  | @ -206,6 +207,31 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { | ||||||
| 				err = nil | 				err = nil | ||||||
| 				break | 				break | ||||||
| 			} | 			} | ||||||
|  | 
 | ||||||
|  | 			if strings.Contains(e.Error(), "points beyond retention policy") { | ||||||
|  | 				log.Printf("W! Points beyond retention policy: %s", e) | ||||||
|  | 				// This error is indicates the point is older than the
 | ||||||
|  | 				// retention policy permits, and is probably not a cause for
 | ||||||
|  | 				// concern.  Retrying will not help unless the retention
 | ||||||
|  | 				// policy is modified.
 | ||||||
|  | 				err = nil | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if strings.Contains(e.Error(), "unable to parse") { | ||||||
|  | 				log.Printf("E! Parse error; dropping points: %s", e) | ||||||
|  | 				// This error indicates a bug in Telegraf or InfluxDB parsing
 | ||||||
|  | 				// of line protocol.  Retries will not be successful.
 | ||||||
|  | 				err = nil | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if strings.Contains(e.Error(), "hinted handoff queue not empty") { | ||||||
|  | 				// This is an informational message
 | ||||||
|  | 				err = nil | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
| 			// Log write failure
 | 			// Log write failure
 | ||||||
| 			log.Printf("E! InfluxDB Output Error: %s", e) | 			log.Printf("E! InfluxDB Output Error: %s", e) | ||||||
| 		} else { | 		} else { | ||||||
|  |  | ||||||
|  | @ -178,28 +178,107 @@ func TestHTTPError_DatabaseNotFound(t *testing.T) { | ||||||
| 	require.NoError(t, i.Close()) | 	require.NoError(t, i.Close()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // field type conflict does not return an error, instead we
 | func TestHTTPError_WriteErrors(t *testing.T) { | ||||||
| func TestHTTPError_FieldTypeConflict(t *testing.T) { | 	var testCases = []struct { | ||||||
| 	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 		name        string | ||||||
| 		switch r.URL.Path { | 		status      int | ||||||
| 		case "/write": | 		contentType string | ||||||
| 			w.WriteHeader(http.StatusNotFound) | 		body        string | ||||||
| 			w.Header().Set("Content-Type", "application/json") | 		err         error | ||||||
| 			fmt.Fprintln(w, `{"results":[{}],"error":"field type conflict: input field \"value\" on measurement \"test\" is type integer, already exists as type float dropped=1"}`) | 	}{ | ||||||
| 		} | 		{ | ||||||
| 	})) | 			// HTTP/1.1 400 Bad Request
 | ||||||
| 	defer ts.Close() | 			// Content-Type: application/json
 | ||||||
| 
 | 			// X-Influxdb-Version: 1.3.3
 | ||||||
| 	i := InfluxDB{ | 			//
 | ||||||
| 		URLs:     []string{ts.URL}, | 			// {
 | ||||||
| 		Database: "test", | 			//     "error": "partial write: points beyond retention policy dropped=1"
 | ||||||
|  | 			// }
 | ||||||
|  | 			name:        "beyond retention policy is not an error", | ||||||
|  | 			status:      http.StatusBadRequest, | ||||||
|  | 			contentType: "application/json", | ||||||
|  | 			body:        `{"error":"partial write: points beyond retention policy dropped=1"}`, | ||||||
|  | 			err:         nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			// HTTP/1.1 400 Bad Request
 | ||||||
|  | 			// Content-Type: application/json
 | ||||||
|  | 			// X-Influxdb-Version: 1.3.3
 | ||||||
|  | 			//
 | ||||||
|  | 			// {
 | ||||||
|  | 			//     "error": "unable to parse 'foo bar=': missing field value"
 | ||||||
|  | 			// }
 | ||||||
|  | 			name:        "unable to parse is not an error", | ||||||
|  | 			status:      http.StatusBadRequest, | ||||||
|  | 			contentType: "application/json", | ||||||
|  | 			body:        `{"error":"unable to parse 'foo bar=': missing field value"}`, | ||||||
|  | 			err:         nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			// HTTP/1.1 400 Bad Request
 | ||||||
|  | 			// Content-Type: application/json
 | ||||||
|  | 			// X-Influxdb-Version: 1.3.3
 | ||||||
|  | 			//
 | ||||||
|  | 			// {
 | ||||||
|  | 			//     "error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1"
 | ||||||
|  | 			// }
 | ||||||
|  | 			name:        "field type conflict is not an error", | ||||||
|  | 			status:      http.StatusBadRequest, | ||||||
|  | 			contentType: "application/json", | ||||||
|  | 			body:        `{"error": "partial write: field type conflict: input field \"bar\" on measurement \"foo\" is type float, already exists as type integer dropped=1"}`, | ||||||
|  | 			err:         nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			// HTTP/1.1 500 Internal Server Error
 | ||||||
|  | 			// Content-Type: application/json
 | ||||||
|  | 			// X-Influxdb-Version: 1.3.3-c1.3.3
 | ||||||
|  | 			//
 | ||||||
|  | 			// {
 | ||||||
|  | 			//     "error": "write failed: hinted handoff queue not empty"
 | ||||||
|  | 			// }
 | ||||||
|  | 			name:        "hinted handoff queue not empty is not an error", | ||||||
|  | 			status:      http.StatusInternalServerError, | ||||||
|  | 			contentType: "application/json", | ||||||
|  | 			body:        `{"error":"write failed: hinted handoff queue not empty"}`, | ||||||
|  | 			err:         nil, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			// HTTP/1.1 500 Internal Server Error
 | ||||||
|  | 			// Content-Type: application/json
 | ||||||
|  | 			// X-Influxdb-Version: 1.3.3-c1.3.3
 | ||||||
|  | 			//
 | ||||||
|  | 			// {
 | ||||||
|  | 			//     "error": "partial write"
 | ||||||
|  | 			// }
 | ||||||
|  | 			name:        "plain partial write is an error", | ||||||
|  | 			status:      http.StatusInternalServerError, | ||||||
|  | 			contentType: "application/json", | ||||||
|  | 			body:        `{"error":"partial write"}`, | ||||||
|  | 			err:         fmt.Errorf("Could not write to any InfluxDB server in cluster"), | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	err := i.Connect() | 	for _, tt := range testCases { | ||||||
| 	require.NoError(t, err) | 		t.Run(tt.name, func(t *testing.T) { | ||||||
| 	err = i.Write(testutil.MockMetrics()) | 			ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { | ||||||
| 	require.NoError(t, err) | 				rw.WriteHeader(tt.status) | ||||||
| 	require.NoError(t, i.Close()) | 				rw.Header().Set("Content-Type", tt.contentType) | ||||||
|  | 				fmt.Fprintln(rw, tt.body) | ||||||
|  | 			})) | ||||||
|  | 			defer ts.Close() | ||||||
|  | 
 | ||||||
|  | 			influx := InfluxDB{ | ||||||
|  | 				URLs:     []string{ts.URL}, | ||||||
|  | 				Database: "test", | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			err := influx.Connect() | ||||||
|  | 			require.NoError(t, err) | ||||||
|  | 			err = influx.Write(testutil.MockMetrics()) | ||||||
|  | 			require.Equal(t, tt.err, err) | ||||||
|  | 			require.NoError(t, influx.Close()) | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type MockClient struct { | type MockClient struct { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue