Compare commits

...

3 Commits

Author SHA1 Message Date
Cameron Sparr
10d0a78b96 influxdb output: treat field type conflicts as a successful write
If we write a batch of points and get a "field type conflict" error
message in return, we should drop the entire batch of points because
this indicates that one or more points have a type that doesnt match the
database.

These errors will never go away on their own, and InfluxDB will
successfully write the points that dont have a conflict.

closes #2245
2017-01-23 18:26:37 -08:00
Cameron Sparr
828231193f Add newline to influx line-protocol if not present
closes #2297
2017-01-23 14:12:28 -08:00
Cameron Sparr
c3ae75730b opentsdb: add tcp:// prefix if not present
closes #2299
2017-01-23 13:45:56 -08:00
6 changed files with 38 additions and 12 deletions

View File

@@ -67,7 +67,10 @@ plugins, not just statsd.
- [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses.
- [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin.
- [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix.
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages.
- [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided.
- [#2297](https://github.com/influxdata/telegraf/issues/2297): influx parser: parse line-protocol without newlines.
- [#2245](https://github.com/influxdata/telegraf/issues/2245): influxdb output: fix field type conflict blocking output buffer.
## v1.1.2 [2016-12-12]

View File

@@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
func (h *HTTPListener) parse(b []byte, t time.Time) error {
if !bytes.HasSuffix(b, []byte("\n")) {
b = append(b, '\n')
}
metrics, err := h.parser.ParseWithDefaultTime(b, t)
for _, m := range metrics {

View File

@@ -200,8 +200,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
p := rand.Perm(len(i.conns))
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
// Log write failure
log.Printf("E! InfluxDB Output Error: %s", e)
// If the database was not found, try to recreate it
if strings.Contains(e.Error(), "database not found") {
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
@@ -209,6 +207,15 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
i.Database)
}
}
if strings.Contains(e.Error(), "field type conflict") {
log.Printf("E! Field type conflict, dropping conflicted points: %s", e)
// setting err to nil, otherwise we will keep retrying and points
// w/ conflicting types will get stuck in the buffer forever.
err = nil
break
}
// Log write failure
log.Printf("E! InfluxDB Output Error: %s", e)
} else {
err = nil
break

View File

@@ -59,6 +59,9 @@ func ToLineFormat(tags map[string]string) string {
}
func (o *OpenTSDB) Connect() error {
if !strings.HasPrefix(o.Host, "http") && !strings.HasPrefix(o.Host, "tcp") {
o.Host = "tcp://" + o.Host
}
// Test Connection to OpenTSDB Server
u, err := url.Parse(o.Host)
if err != nil {
@@ -68,11 +71,11 @@ func (o *OpenTSDB) Connect() error {
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("OpenTSDB: TCP address cannot be resolved")
return fmt.Errorf("OpenTSDB TCP address cannot be resolved: %s", err)
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("OpenTSDB: Telnet connect fail")
return fmt.Errorf("OpenTSDB Telnet connect fail: %s", err)
}
defer connection.Close()
return nil

View File

@@ -16,6 +16,9 @@ type InfluxParser struct {
}
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
if !bytes.HasSuffix(buf, []byte("\n")) {
buf = append(buf, '\n')
}
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
metrics, err := metric.ParseWithDefaultTime(buf, t)

View File

@@ -18,10 +18,11 @@ var (
)
const (
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
invalidInflux = "I don't think this is line protocol\n"
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
invalidInflux = "I don't think this is line protocol\n"
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
)
const influxMulti = `
@@ -69,6 +70,18 @@ func TestParseValidInflux(t *testing.T) {
"cpu": "cpu0",
}, metrics[0].Tags())
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
metrics, err = parser.Parse([]byte(validInfluxNoNewline))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cpu_load_short", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"value": float64(10),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"cpu": "cpu0",
}, metrics[0].Tags())
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
}
func TestParseLineValidInflux(t *testing.T) {