Compare commits
8 Commits
release-1.
...
1.2.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b6ffb344e | ||
|
|
c9d359f21b | ||
|
|
45ea321e4b | ||
|
|
853178c7dc | ||
|
|
b2c1d98cff | ||
|
|
10d0a78b96 | ||
|
|
828231193f | ||
|
|
c3ae75730b |
@@ -67,7 +67,10 @@ plugins, not just statsd.
|
|||||||
- [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses.
|
- [#1973](https://github.com/influxdata/telegraf/issues/1973): Partial fix: logparser CLF pattern with IPv6 addresses.
|
||||||
- [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin.
|
- [#1975](https://github.com/influxdata/telegraf/issues/1975) & [#2102](https://github.com/influxdata/telegraf/issues/2102): Fix thread-safety when using multiple instances of the statsd input plugin.
|
||||||
- [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix.
|
- [#2027](https://github.com/influxdata/telegraf/issues/2027): docker input: interface conversion panic fix.
|
||||||
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages
|
- [#1814](https://github.com/influxdata/telegraf/issues/1814): snmp: ensure proper context is present on error messages.
|
||||||
|
- [#2299](https://github.com/influxdata/telegraf/issues/2299): opentsdb: add tcp:// prefix if no scheme provided.
|
||||||
|
- [#2297](https://github.com/influxdata/telegraf/issues/2297): influx parser: parse line-protocol without newlines.
|
||||||
|
- [#2245](https://github.com/influxdata/telegraf/issues/2245): influxdb output: fix field type conflict blocking output buffer.
|
||||||
|
|
||||||
## v1.1.2 [2016-12-12]
|
## v1.1.2 [2016-12-12]
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,9 @@ machine:
|
|||||||
post:
|
post:
|
||||||
- sudo service zookeeper stop
|
- sudo service zookeeper stop
|
||||||
- go version
|
- go version
|
||||||
- go version | grep 1.7.4 || sudo rm -rf /usr/local/go
|
- go version | grep 1.7.5 || sudo rm -rf /usr/local/go
|
||||||
- wget https://storage.googleapis.com/golang/go1.7.4.linux-amd64.tar.gz
|
- wget https://storage.googleapis.com/golang/go1.7.5.linux-amd64.tar.gz
|
||||||
- sudo tar -C /usr/local -xzf go1.7.4.linux-amd64.tar.gz
|
- sudo tar -C /usr/local -xzf go1.7.5.linux-amd64.tar.gz
|
||||||
- go version
|
- go version
|
||||||
|
|
||||||
dependencies:
|
dependencies:
|
||||||
|
|||||||
@@ -90,6 +90,9 @@ func NewRunningOutput(
|
|||||||
// AddMetric adds a metric to the output. This function can also write cached
|
// AddMetric adds a metric to the output. This function can also write cached
|
||||||
// points if FlushBufferWhenFull is true.
|
// points if FlushBufferWhenFull is true.
|
||||||
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
func (ro *RunningOutput) AddMetric(m telegraf.Metric) {
|
||||||
|
if m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Filter any tagexclude/taginclude parameters before adding metric
|
// Filter any tagexclude/taginclude parameters before adding metric
|
||||||
if ro.Config.Filter.IsActive() {
|
if ro.Config.Filter.IsActive() {
|
||||||
// In order to filter out tags, we need to create a new metric, since
|
// In order to filter out tags, we need to create a new metric, since
|
||||||
|
|||||||
@@ -75,6 +75,23 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddingNilMetric(t *testing.T) {
|
||||||
|
conf := &OutputConfig{
|
||||||
|
Filter: Filter{},
|
||||||
|
}
|
||||||
|
|
||||||
|
m := &mockOutput{}
|
||||||
|
ro := NewRunningOutput("test", m, conf, 1000, 10000)
|
||||||
|
|
||||||
|
ro.AddMetric(nil)
|
||||||
|
ro.AddMetric(nil)
|
||||||
|
ro.AddMetric(nil)
|
||||||
|
|
||||||
|
err := ro.Write()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, m.Metrics(), 0)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that NameDrop filters ger properly applied.
|
// Test that NameDrop filters ger properly applied.
|
||||||
func TestRunningOutput_DropFilter(t *testing.T) {
|
func TestRunningOutput_DropFilter(t *testing.T) {
|
||||||
conf := &OutputConfig{
|
conf := &OutputConfig{
|
||||||
|
|||||||
@@ -263,7 +263,7 @@ func (m *metric) Fields() map[string]interface{} {
|
|||||||
case '"':
|
case '"':
|
||||||
// string field
|
// string field
|
||||||
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
fieldMap[unescape(string(m.fields[i:][0:i1]), "fieldkey")] = unescape(string(m.fields[i:][i2+1:i3-1]), "fieldval")
|
||||||
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
case '-', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
||||||
// number field
|
// number field
|
||||||
switch m.fields[i:][i3-1] {
|
switch m.fields[i:][i3-1] {
|
||||||
case 'i':
|
case 'i':
|
||||||
|
|||||||
@@ -44,6 +44,9 @@ cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
|||||||
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
cpu,host=foo,datacenter=us-east idle=99,busy=1i,b=true,s="string"
|
||||||
`
|
`
|
||||||
|
|
||||||
|
const negMetrics = `weather,host=local temp=-99i,temp_float=-99.4 1465839830100400200
|
||||||
|
`
|
||||||
|
|
||||||
// some metrics are invalid
|
// some metrics are invalid
|
||||||
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
const someInvalid = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
|
||||||
@@ -85,6 +88,26 @@ func TestParse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseNegNumbers(t *testing.T) {
|
||||||
|
metrics, err := Parse([]byte(negMetrics))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
|
||||||
|
assert.Equal(t,
|
||||||
|
map[string]interface{}{
|
||||||
|
"temp": int64(-99),
|
||||||
|
"temp_float": float64(-99.4),
|
||||||
|
},
|
||||||
|
metrics[0].Fields(),
|
||||||
|
)
|
||||||
|
assert.Equal(t,
|
||||||
|
map[string]string{
|
||||||
|
"host": "local",
|
||||||
|
},
|
||||||
|
metrics[0].Tags(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseErrors(t *testing.T) {
|
func TestParseErrors(t *testing.T) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
metrics, err := Parse([]byte(someInvalid))
|
metrics, err := Parse([]byte(someInvalid))
|
||||||
|
|||||||
@@ -300,9 +300,6 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
func (h *HTTPListener) parse(b []byte, t time.Time) error {
|
||||||
if !bytes.HasSuffix(b, []byte("\n")) {
|
|
||||||
b = append(b, '\n')
|
|
||||||
}
|
|
||||||
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
metrics, err := h.parser.ParseWithDefaultTime(b, t)
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257\n"
|
||||||
|
testMsgNeg = "cpu_load_short,host=server01 value=-23422.0 1422568543702900257\n"
|
||||||
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
|
||||||
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||||
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
invalidMsg = "cpu_load_short,host=server01 1422568543702900257\n"
|
||||||
@@ -76,13 +77,28 @@ func TestPersistentClientIDFail(t *testing.T) {
|
|||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that the parser parses NATS messages into metrics
|
|
||||||
func TestRunParser(t *testing.T) {
|
func TestRunParser(t *testing.T) {
|
||||||
n, in := newTestMQTTConsumer()
|
n, in := newTestMQTTConsumer()
|
||||||
acc := testutil.Accumulator{}
|
acc := testutil.Accumulator{}
|
||||||
n.acc = &acc
|
n.acc = &acc
|
||||||
defer close(n.done)
|
defer close(n.done)
|
||||||
|
|
||||||
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
|
go n.receiver()
|
||||||
|
in <- mqttMsg(testMsgNeg)
|
||||||
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
|
if a := acc.NFields(); a != 1 {
|
||||||
|
t.Errorf("got %v, expected %v", a, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunParserNegativeNumber(t *testing.T) {
|
||||||
|
n, in := newTestMQTTConsumer()
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
n.acc = &acc
|
||||||
|
defer close(n.done)
|
||||||
|
|
||||||
n.parser, _ = parsers.NewInfluxParser()
|
n.parser, _ = parsers.NewInfluxParser()
|
||||||
go n.receiver()
|
go n.receiver()
|
||||||
in <- mqttMsg(testMsg)
|
in <- mqttMsg(testMsg)
|
||||||
|
|||||||
@@ -200,8 +200,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
|||||||
p := rand.Perm(len(i.conns))
|
p := rand.Perm(len(i.conns))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if e := i.conns[n].Write(bp); e != nil {
|
if e := i.conns[n].Write(bp); e != nil {
|
||||||
// Log write failure
|
|
||||||
log.Printf("E! InfluxDB Output Error: %s", e)
|
|
||||||
// If the database was not found, try to recreate it
|
// If the database was not found, try to recreate it
|
||||||
if strings.Contains(e.Error(), "database not found") {
|
if strings.Contains(e.Error(), "database not found") {
|
||||||
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
|
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
|
||||||
@@ -209,6 +207,15 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
|||||||
i.Database)
|
i.Database)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if strings.Contains(e.Error(), "field type conflict") {
|
||||||
|
log.Printf("E! Field type conflict, dropping conflicted points: %s", e)
|
||||||
|
// setting err to nil, otherwise we will keep retrying and points
|
||||||
|
// w/ conflicting types will get stuck in the buffer forever.
|
||||||
|
err = nil
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Log write failure
|
||||||
|
log.Printf("E! InfluxDB Output Error: %s", e)
|
||||||
} else {
|
} else {
|
||||||
err = nil
|
err = nil
|
||||||
break
|
break
|
||||||
|
|||||||
@@ -59,6 +59,9 @@ func ToLineFormat(tags map[string]string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (o *OpenTSDB) Connect() error {
|
func (o *OpenTSDB) Connect() error {
|
||||||
|
if !strings.HasPrefix(o.Host, "http") && !strings.HasPrefix(o.Host, "tcp") {
|
||||||
|
o.Host = "tcp://" + o.Host
|
||||||
|
}
|
||||||
// Test Connection to OpenTSDB Server
|
// Test Connection to OpenTSDB Server
|
||||||
u, err := url.Parse(o.Host)
|
u, err := url.Parse(o.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -68,11 +71,11 @@ func (o *OpenTSDB) Connect() error {
|
|||||||
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
uri := fmt.Sprintf("%s:%d", u.Host, o.Port)
|
||||||
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB: TCP address cannot be resolved")
|
return fmt.Errorf("OpenTSDB TCP address cannot be resolved: %s", err)
|
||||||
}
|
}
|
||||||
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
connection, err := net.DialTCP("tcp", nil, tcpAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("OpenTSDB: Telnet connect fail")
|
return fmt.Errorf("OpenTSDB Telnet connect fail: %s", err)
|
||||||
}
|
}
|
||||||
defer connection.Close()
|
defer connection.Close()
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ type InfluxParser struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
func (p *InfluxParser) ParseWithDefaultTime(buf []byte, t time.Time) ([]telegraf.Metric, error) {
|
||||||
|
if !bytes.HasSuffix(buf, []byte("\n")) {
|
||||||
|
buf = append(buf, '\n')
|
||||||
|
}
|
||||||
// parse even if the buffer begins with a newline
|
// parse even if the buffer begins with a newline
|
||||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
||||||
metrics, err := metric.ParseWithDefaultTime(buf, t)
|
metrics, err := metric.ParseWithDefaultTime(buf, t)
|
||||||
|
|||||||
@@ -18,10 +18,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
validInflux = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||||
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
negativeFloat = "cpu_load_short,cpu=cpu0 value=-13.4 1257894000000000000\n"
|
||||||
invalidInflux = "I don't think this is line protocol\n"
|
validInfluxNewline = "\ncpu_load_short,cpu=cpu0 value=10 1257894000000000000\n"
|
||||||
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
validInfluxNoNewline = "cpu_load_short,cpu=cpu0 value=10 1257894000000000000"
|
||||||
|
invalidInflux = "I don't think this is line protocol\n"
|
||||||
|
invalidInflux2 = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
const influxMulti = `
|
const influxMulti = `
|
||||||
@@ -69,6 +71,30 @@ func TestParseValidInflux(t *testing.T) {
|
|||||||
"cpu": "cpu0",
|
"cpu": "cpu0",
|
||||||
}, metrics[0].Tags())
|
}, metrics[0].Tags())
|
||||||
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||||
|
|
||||||
|
metrics, err = parser.Parse([]byte(validInfluxNoNewline))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "cpu_load_short", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"value": float64(10),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}, metrics[0].Tags())
|
||||||
|
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||||
|
|
||||||
|
metrics, err = parser.Parse([]byte(negativeFloat))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, metrics, 1)
|
||||||
|
assert.Equal(t, "cpu_load_short", metrics[0].Name())
|
||||||
|
assert.Equal(t, map[string]interface{}{
|
||||||
|
"value": float64(-13.4),
|
||||||
|
}, metrics[0].Fields())
|
||||||
|
assert.Equal(t, map[string]string{
|
||||||
|
"cpu": "cpu0",
|
||||||
|
}, metrics[0].Tags())
|
||||||
|
assert.Equal(t, exptime, metrics[0].Time().UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseLineValidInflux(t *testing.T) {
|
func TestParseLineValidInflux(t *testing.T) {
|
||||||
|
|||||||
@@ -83,9 +83,9 @@ targets = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
supported_builds = {
|
supported_builds = {
|
||||||
"windows": [ "amd64" ],
|
"windows": [ "amd64", "i386" ],
|
||||||
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
|
"linux": [ "amd64", "i386", "armhf", "armel", "arm64", "static_amd64" ],
|
||||||
"freebsd": [ "amd64" ]
|
"freebsd": [ "amd64", "i386" ]
|
||||||
}
|
}
|
||||||
|
|
||||||
supported_packages = {
|
supported_packages = {
|
||||||
|
|||||||
Reference in New Issue
Block a user