Compare commits

..

22 Commits

Author SHA1 Message Date
Daniel Nelson
2de7aa23d7 Set 1.4.1 release date in changelog
(cherry picked from commit fd702e6bb8)
2017-09-26 14:19:51 -07:00
Daniel Nelson
52cd38150c Update changelog
(cherry picked from commit 0048bf2120)
2017-09-18 14:25:57 -07:00
Daniel Nelson
c08f492f78 Fix arm64 packages contain 32-bit executable (#3246)
(cherry picked from commit b8e134cd37)
2017-09-18 14:25:57 -07:00
Daniel Nelson
66cfe80e37 Update changelog
(cherry picked from commit b94cda6b46)
2017-09-14 15:30:51 -07:00
Trevor Pounds
ba5e5ec283 Fix panic in statsd p100 calculation (#3230)
(cherry picked from commit 73372872c2)
2017-09-14 15:30:51 -07:00
Daniel Nelson
259f8e4002 Update changelog
(cherry picked from commit 875ab3c4b7)
2017-09-14 15:05:38 -07:00
Mark Wilkinson - m82labs
558ab0c730 Fix duplicate keys in perf counters sqlserver query (#3175)
(cherry picked from commit 1c5ebd4be3)
2017-09-14 15:05:38 -07:00
Daniel Nelson
8d4fbe29e7 Update changelog
(cherry picked from commit 103d24bfba)
2017-09-14 15:01:28 -07:00
Daniel Nelson
72337a1c97 Fix skipped line with empty target in iptables (#3235)
(cherry picked from commit d5f48e3e96)
2017-09-14 15:01:21 -07:00
Daniel Nelson
86537899b2 Update changelog
(cherry picked from commit 7a41d2c586)
2017-09-14 13:07:30 -07:00
Trevor Pounds
a727d5d1f0 Fix counter and gauge metric types. (#3232)
(cherry picked from commit fa1982323a)
2017-09-14 13:07:30 -07:00
Daniel Nelson
7ec194a482 Update changelog
(cherry picked from commit cdf63c5776)
2017-09-13 17:32:03 -07:00
Daniel Nelson
5a77d28837 Whitelist allowed char classes for opentsdb output. (#3227)
(cherry picked from commit 0a8c2e0b3b)
2017-09-13 17:32:03 -07:00
Daniel Nelson
47927c353d Fix fluentd test
(cherry picked from commit eebee9759f)
2017-09-12 17:58:29 -07:00
Daniel Nelson
b9e7fa27aa Update changelog
(cherry picked from commit c5cfde667a)
2017-09-12 17:18:29 -07:00
Daniel Nelson
0d437140bd Fix optional field types in fluentd input
(cherry picked from commit 8a68e7424c)
2017-09-12 17:18:29 -07:00
Daniel Nelson
36969a63c2 Update changelog
(cherry picked from commit cc63b3b667)
2017-09-11 12:28:37 -07:00
DanKans
e9a12bb694 Fix MQTT input exits if Broker is not available on startup (#3202)
(cherry picked from commit 5488f4b3ac)
2017-09-11 12:28:12 -07:00
Daniel Nelson
34b7a4c361 Add 1.4.0 release date
(cherry picked from commit ab1c11b06d)
2017-09-05 17:15:06 -07:00
Daniel Nelson
f46370d982 Sort metrics before comparing in graphite test
(cherry picked from commit 98e784faf3)
2017-09-05 12:50:55 -07:00
Daniel Nelson
07b7e09749 Update changelog
(cherry picked from commit f43af72785)
2017-08-31 13:44:05 -07:00
Daniel Nelson
e54795795d Fix panic when handling string fields with escapes (#3188)
(cherry picked from commit 28d16188b3)
2017-08-30 21:17:10 -07:00
20 changed files with 320 additions and 138 deletions

View File

@@ -1,4 +1,17 @@
## v1.4 [unreleased]
## v1.4.1 [2017-09-26]
### Bugfixes
- [#3167](https://github.com/influxdata/telegraf/issues/3167): Fix MQTT input exits if Broker is not available on startup.
- [#3217](https://github.com/influxdata/telegraf/issues/3217): Fix optional field value conversions in fluentd input.
- [#3227](https://github.com/influxdata/telegraf/issues/3227): Whitelist allowed char classes for opentsdb output.
- [#3232](https://github.com/influxdata/telegraf/issues/3232): Fix counter and gauge metric types.
- [#3235](https://github.com/influxdata/telegraf/issues/3235): Fix skipped line with empty target in iptables.
- [#3175](https://github.com/influxdata/telegraf/issues/3175): Fix duplicate keys in perf counters sqlserver query.
- [#3230](https://github.com/influxdata/telegraf/issues/3230): Fix panic in statsd p100 calculation.
- [#3242](https://github.com/influxdata/telegraf/issues/3242): Fix arm64 packages contain 32-bit executable.
## v1.4 [2017-09-05]
### Release Notes
@@ -103,6 +116,7 @@
- [#2672](https://github.com/influxdata/telegraf/issues/2672): Fix NSQ input plugin when used with version 1.0.0-compat.
- [#2523](https://github.com/influxdata/telegraf/issues/2523): Added CloudWatch metric constraint validation.
- [#3179](https://github.com/influxdata/telegraf/issues/3179): Skip non-numerical values in graphite format.
- [#3187](https://github.com/influxdata/telegraf/issues/3187): Fix panic when handling string fields with escapes.
## v1.3.5 [2017-07-26]

View File

@@ -150,12 +150,6 @@ func makemetric(
continue
}
case string:
if strings.HasSuffix(val, `\`) {
log.Printf("D! Measurement [%s] field [%s] has a value "+
"ending with a backslash, skipping", measurement, k)
delete(fields, k)
continue
}
fields[k] = v
default:
fields[k] = v

View File

@@ -370,16 +370,17 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
expectedTags: map[string]string{},
},
{
name: "Field value with trailing slash dropped",
name: "Field value with trailing slash okay",
measurement: `cpu`,
fields: map[string]interface{}{
"value": int64(42),
"bad": `xyzzy\`,
"ok": `xyzzy\`,
},
tags: map[string]string{},
expectedMeasurement: `cpu`,
expectedFields: map[string]interface{}{
"value": int64(42),
"ok": `xyzzy\`,
},
expectedTags: map[string]string{},
},
@@ -387,7 +388,7 @@ func TestMakeMetric_TrailingSlash(t *testing.T) {
name: "Must have one field after dropped",
measurement: `cpu`,
fields: map[string]interface{}{
"bad": `xyzzy\`,
"bad": math.NaN(),
},
tags: map[string]string{},
expectedNil: true,

View File

@@ -21,14 +21,14 @@ func New(
t time.Time,
mType ...telegraf.ValueType,
) (telegraf.Metric, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("Metric cannot be made without any fields")
}
if len(name) == 0 {
return nil, fmt.Errorf("Metric cannot be made with an empty name")
return nil, fmt.Errorf("missing measurement name")
}
if len(fields) == 0 {
return nil, fmt.Errorf("%s: must have one or more fields", name)
}
if strings.HasSuffix(name, `\`) {
return nil, fmt.Errorf("Metric cannot have measurement name ending with a backslash")
return nil, fmt.Errorf("%s: measurement name cannot end with a backslash", name)
}
var thisType telegraf.ValueType
@@ -49,10 +49,10 @@ func New(
taglen := 0
for k, v := range tags {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have tag key ending with a backslash")
return nil, fmt.Errorf("%s: tag key cannot end with a backslash: %s", name, k)
}
if strings.HasSuffix(v, `\`) {
return nil, fmt.Errorf("Metric cannot have tag value ending with a backslash")
return nil, fmt.Errorf("%s: tag value cannot end with a backslash: %s", name, v)
}
if len(k) == 0 || len(v) == 0 {
@@ -79,7 +79,7 @@ func New(
fieldlen := 0
for k, _ := range fields {
if strings.HasSuffix(k, `\`) {
return nil, fmt.Errorf("Metric cannot have field key ending with a backslash")
return nil, fmt.Errorf("%s: field key cannot end with a backslash: %s", name, k)
}
// 10 bytes is completely arbitrary, but will at least prevent some
@@ -102,7 +102,8 @@ func New(
}
// indexUnescapedByte finds the index of the first byte equal to b in buf that
// is not escaped. Returns -1 if not found.
// is not escaped. Does not allow the escape char to be escaped. Returns -1 if
// not found.
func indexUnescapedByte(buf []byte, b byte) int {
var keyi int
for {
@@ -122,6 +123,46 @@ func indexUnescapedByte(buf []byte, b byte) int {
return keyi
}
// indexUnescapedByteBackslashEscaping finds the index of the first byte equal
// to b in buf that is not escaped. Allows for the escape char `\` to be
// escaped. Returns -1 if not found.
func indexUnescapedByteBackslashEscaping(buf []byte, b byte) int {
var keyi int
for {
i := bytes.IndexByte(buf[keyi:], b)
if i == -1 {
return -1
} else if i == 0 {
break
}
keyi += i
if countBackslashes(buf, keyi-1)%2 == 0 {
break
} else {
keyi++
}
}
return keyi
}
// countBackslashes counts the number of preceding backslashes starting at
// the 'start' index.
func countBackslashes(buf []byte, index int) int {
var count int
for {
if index < 0 {
return count
}
if buf[index] == '\\' {
count++
index--
} else {
break
}
}
return count
}
type metric struct {
name []byte
tags []byte
@@ -283,7 +324,7 @@ func (m *metric) Fields() map[string]interface{} {
// end index of field value
var i3 int
if m.fields[i:][i2] == '"' {
i3 = indexUnescapedByte(m.fields[i:][i2+1:], '"')
i3 = indexUnescapedByteBackslashEscaping(m.fields[i:][i2+1:], '"')
if i3 == -1 {
i3 = len(m.fields[i:])
}

View File

@@ -258,6 +258,7 @@ func TestNewMetric_Fields(t *testing.T) {
"quote_string": `x"y`,
"backslash_quote_string": `x\"y`,
"backslash": `x\y`,
"ends_with_backslash": `x\`,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

View File

@@ -148,15 +148,15 @@ func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
}
if p.BufferQueueLength != nil {
tmpFields["buffer_queue_length"] = p.BufferQueueLength
tmpFields["buffer_queue_length"] = *p.BufferQueueLength
}
if p.RetryCount != nil {
tmpFields["retry_count"] = p.RetryCount
tmpFields["retry_count"] = *p.RetryCount
}
if p.BufferTotalQueuedSize != nil {
tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize
tmpFields["buffer_total_queued_size"] = *p.BufferTotalQueuedSize
}
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {

View File

@@ -122,12 +122,6 @@ func Test_parse(t *testing.T) {
}
func Test_Gather(t *testing.T) {
if testing.Short() {
t.Skip("Skipping Gather function test")
}
t.Log("Testing Gather function")
t.Logf("Start HTTP mock (%s) with sampleJSON", fluentdTest.Endpoint)
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -157,13 +151,13 @@ func Test_Gather(t *testing.T) {
assert.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"])
assert.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"])
assert.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"])
assert.Equal(t, expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
assert.Equal(t, *expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
assert.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"])
assert.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"])
assert.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"])
assert.Equal(t, expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
assert.Equal(t, expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
assert.Equal(t, expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
assert.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
assert.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
assert.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
}

View File

@@ -95,7 +95,7 @@ const measurement = "iptables"
var errParse = errors.New("Cannot parse iptables list information")
var chainNameRe = regexp.MustCompile(`^Chain\s+(\S+)`)
var fieldsHeaderRe = regexp.MustCompile(`^\s*pkts\s+bytes\s+`)
var commentRe = regexp.MustCompile(`\s*/\*\s*(.+?)\s*\*/\s*`)
var valuesRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s+.*?/\*\s*(.+?)\s*\*/\s*`)
func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error {
lines := strings.Split(data, "\n")
@@ -110,21 +110,14 @@ func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error
return errParse
}
for _, line := range lines[2:] {
tokens := strings.Fields(line)
if len(tokens) < 10 {
matches := valuesRe.FindStringSubmatch(line)
if len(matches) != 4 {
continue
}
pkts := tokens[0]
bytes := tokens[1]
end := strings.Join(tokens[9:], " ")
matches := commentRe.FindStringSubmatch(end)
if matches == nil {
continue
}
comment := matches[1]
pkts := matches[1]
bytes := matches[2]
comment := matches[3]
tags := map[string]string{"table": ipt.Table, "chain": mchain[1], "ruleid": comment}
fields := make(map[string]interface{})

View File

@@ -154,68 +154,85 @@ func TestIptables_Gather(t *testing.T) {
tags: []map[string]string{},
fields: [][]map[string]interface{}{},
},
{ // 11 - all target and ports
table: "all_recv",
chains: []string{"accountfwd"},
values: []string{
`Chain accountfwd (1 references)
pkts bytes target prot opt in out source destination
123 456 all -- eth0 * 0.0.0.0/0 0.0.0.0/0 /* all_recv */
`},
tags: []map[string]string{
map[string]string{"table": "all_recv", "chain": "accountfwd", "ruleid": "all_recv"},
},
fields: [][]map[string]interface{}{
{map[string]interface{}{"pkts": uint64(123), "bytes": uint64(456)}},
},
},
}
for i, tt := range tests {
i++
ipt := &Iptables{
Table: tt.table,
Chains: tt.chains,
lister: func(table, chain string) (string, error) {
if len(tt.values) > 0 {
v := tt.values[0]
tt.values = tt.values[1:]
return v, nil
}
return "", nil
},
}
acc := new(testutil.Accumulator)
err := acc.GatherError(ipt.Gather)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err)
}
if tt.table == "" {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty table got %d", i, n)
t.Run(tt.table, func(t *testing.T) {
i++
ipt := &Iptables{
Table: tt.table,
Chains: tt.chains,
lister: func(table, chain string) (string, error) {
if len(tt.values) > 0 {
v := tt.values[0]
tt.values = tt.values[1:]
return v, nil
}
return "", nil
},
}
continue
}
if len(tt.chains) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty chains got %d", i, n)
acc := new(testutil.Accumulator)
err := acc.GatherError(ipt.Gather)
if !reflect.DeepEqual(tt.err, err) {
t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err)
}
continue
}
if len(tt.tags) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 values got %d", i, n)
if tt.table == "" {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty table got %d", i, n)
}
return
}
continue
}
n := 0
for j, tags := range tt.tags {
for k, fields := range tt.fields[j] {
if len(acc.Metrics) < n+1 {
t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics))
break
if len(tt.chains) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 fields if empty chains got %d", i, n)
}
m := acc.Metrics[n]
if !reflect.DeepEqual(m.Measurement, measurement) {
t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement)
}
if !reflect.DeepEqual(m.Tags, tags) {
t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags)
}
if !reflect.DeepEqual(m.Fields, fields) {
t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields)
}
n++
return
}
}
if len(tt.tags) == 0 {
n := acc.NFields()
if n != 0 {
t.Errorf("%d: expected 0 values got %d", i, n)
}
return
}
n := 0
for j, tags := range tt.tags {
for k, fields := range tt.fields[j] {
if len(acc.Metrics) < n+1 {
t.Errorf("%d: expected at least %d values got %d", i, n+1, len(acc.Metrics))
break
}
m := acc.Metrics[n]
if !reflect.DeepEqual(m.Measurement, measurement) {
t.Errorf("%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement)
}
if !reflect.DeepEqual(m.Tags, tags) {
t.Errorf("%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags)
}
if !reflect.DeepEqual(m.Fields, fields) {
t.Errorf("%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields)
}
n++
}
}
})
}
}

View File

@@ -13,6 +13,8 @@ The plugin expects messages in the
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = 30
## Topics to subscribe to
topics = [

View File

@@ -16,11 +16,12 @@ import (
)
type MQTTConsumer struct {
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
Servers []string
Topics []string
Username string
Password string
QoS int `toml:"qos"`
ConnectionTimeout internal.Duration `toml:"connection_timeout"`
parser parsers.Parser
@@ -48,13 +49,15 @@ type MQTTConsumer struct {
// keep the accumulator internally:
acc telegraf.Accumulator
started bool
connected bool
}
var sampleConfig = `
servers = ["localhost:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = 30
## Topics to subscribe to
topics = [
@@ -103,7 +106,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
m.started = false
m.connected = false
if m.PersistentSession && m.ClientID == "" {
return fmt.Errorf("ERROR MQTT Consumer: When using persistent_session" +
@@ -115,26 +118,40 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS)
}
if int(m.ConnectionTimeout.Duration) <= 0 {
return fmt.Errorf("MQTT Consumer, invalid connection_timeout value: %d", m.ConnectionTimeout)
}
opts, err := m.createOpts()
if err != nil {
return err
}
m.client = mqtt.NewClient(opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
m.in = make(chan mqtt.Message, 1000)
m.done = make(chan struct{})
m.connect()
return nil
}
func (m *MQTTConsumer) connect() error {
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
err := token.Error()
log.Printf("D! MQTT Consumer, connection error - %v", err)
return err
}
go m.receiver()
return nil
}
func (m *MQTTConsumer) onConnect(c mqtt.Client) {
log.Printf("I! MQTT Client Connected")
if !m.PersistentSession || !m.started {
if !m.PersistentSession || !m.connected {
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
@@ -145,7 +162,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}
m.started = true
m.connected = true
}
return
}
@@ -186,18 +203,27 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
func (m *MQTTConsumer) Stop() {
m.Lock()
defer m.Unlock()
close(m.done)
m.client.Disconnect(200)
m.started = false
if m.connected {
close(m.done)
m.client.Disconnect(200)
m.connected = false
}
}
func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {
if !m.connected {
m.connect()
}
return nil
}
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()
opts.ConnectTimeout = m.ConnectionTimeout.Duration
if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
@@ -238,6 +264,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.SetCleanSession(!m.PersistentSession)
opts.SetOnConnectHandler(m.onConnect)
opts.SetConnectionLostHandler(m.onConnectionLost)
return opts, nil
}

View File

@@ -22,11 +22,13 @@ const (
func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) {
in := make(chan mqtt.Message, 100)
n := &MQTTConsumer{
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
in: in,
done: make(chan struct{}),
Topics: []string{"telegraf"},
Servers: []string{"localhost:1883"},
in: in,
done: make(chan struct{}),
connected: true,
}
return n, in
}
@@ -131,6 +133,7 @@ func TestRunParserAndGather(t *testing.T) {
n, in := newTestMQTTConsumer()
acc := testutil.Accumulator{}
n.acc = &acc
defer close(n.done)
n.parser, _ = parsers.NewInfluxParser()

View File

@@ -1022,7 +1022,7 @@ CREATE TABLE #PCounters
Primary Key(object_name, counter_name, instance_name)
);
INSERT #PCounters
SELECT RTrim(spi.object_name) object_name
SELECT DISTINCT RTrim(spi.object_name) object_name
, RTrim(spi.counter_name) counter_name
, RTrim(spi.instance_name) instance_name
, spi.cntr_value
@@ -1044,7 +1044,7 @@ CREATE TABLE #CCounters
Primary Key(object_name, counter_name, instance_name)
);
INSERT #CCounters
SELECT RTrim(spi.object_name) object_name
SELECT DISTINCT RTrim(spi.object_name) object_name
, RTrim(spi.counter_name) counter_name
, RTrim(spi.instance_name) instance_name
, spi.cntr_value

View File

@@ -101,8 +101,15 @@ func (rs *RunningStats) Percentile(n int) float64 {
}
i := int(float64(len(rs.perc)) * float64(n) / float64(100))
if i < 0 {
i = 0
}
return rs.perc[i]
return rs.perc[clamp(i, 0, len(rs.perc)-1)]
}
func clamp(i int, min int, max int) int {
if i < min {
return min
}
if i > max {
return max
}
return i
}

View File

@@ -23,12 +23,18 @@ func TestRunningStats_Single(t *testing.T) {
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(100) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(100))
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Percentile(0) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(0))
}
if rs.Count() != 1 {
t.Errorf("Expected %v, got %v", 1, rs.Count())
}
@@ -58,12 +64,18 @@ func TestRunningStats_Duplicate(t *testing.T) {
if rs.Lower() != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Lower())
}
if rs.Percentile(100) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(100))
}
if rs.Percentile(90) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(90))
}
if rs.Percentile(50) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(50))
}
if rs.Percentile(0) != 10.1 {
t.Errorf("Expected %v, got %v", 10.1, rs.Percentile(0))
}
if rs.Count() != 4 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}
@@ -93,12 +105,18 @@ func TestRunningStats(t *testing.T) {
if rs.Lower() != 5 {
t.Errorf("Expected %v, got %v", 5, rs.Lower())
}
if rs.Percentile(100) != 45 {
t.Errorf("Expected %v, got %v", 45, rs.Percentile(100))
}
if rs.Percentile(90) != 32 {
t.Errorf("Expected %v, got %v", 32, rs.Percentile(90))
}
if rs.Percentile(50) != 11 {
t.Errorf("Expected %v, got %v", 11, rs.Percentile(50))
}
if rs.Percentile(0) != 5 {
t.Errorf("Expected %v, got %v", 5, rs.Percentile(0))
}
if rs.Count() != 16 {
t.Errorf("Expected %v, got %v", 4, rs.Count())
}

View File

@@ -251,14 +251,14 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, metric := range s.gauges {
acc.AddFields(metric.name, metric.fields, metric.tags, now)
acc.AddGauge(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}
for _, metric := range s.counters {
acc.AddFields(metric.name, metric.fields, metric.tags, now)
acc.AddCounter(metric.name, metric.fields, metric.tags, now)
}
if s.DeleteCounters {
s.counters = make(map[string]cachedcounter)

View File

@@ -5,6 +5,7 @@ import (
"log"
"net"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
@@ -13,6 +14,16 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
)
var (
allowedChars = regexp.MustCompile(`[^a-zA-Z0-9-_./\p{L}]`)
hypenChars = strings.NewReplacer(
"@", "-",
"*", "-",
`%`, "-",
"#", "-",
"$", "-")
)
type OpenTSDB struct {
Prefix string
@@ -24,9 +35,6 @@ type OpenTSDB struct {
Debug bool
}
var sanitizedChars = strings.NewReplacer("@", "-", "*", "-", " ", "_",
`%`, "-", "#", "-", "$", "-", ":", "_")
var sampleConfig = `
## prefix for metrics keys
prefix = "my.specific.prefix."
@@ -125,8 +133,7 @@ func (o *OpenTSDB) WriteHttp(metrics []telegraf.Metric, u *url.URL) error {
}
metric := &HttpMetric{
Metric: sanitizedChars.Replace(fmt.Sprintf("%s%s_%s",
o.Prefix, m.Name(), fieldName)),
Metric: sanitize(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
Tags: tags,
Timestamp: now,
Value: value,
@@ -176,7 +183,7 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
}
messageLine := fmt.Sprintf("put %s %v %s %s\n",
sanitizedChars.Replace(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
sanitize(fmt.Sprintf("%s%s_%s", o.Prefix, m.Name(), fieldName)),
now, metricValue, tags)
_, err := connection.Write([]byte(messageLine))
@@ -192,7 +199,7 @@ func (o *OpenTSDB) WriteTelnet(metrics []telegraf.Metric, u *url.URL) error {
func cleanTags(tags map[string]string) map[string]string {
tagSet := make(map[string]string, len(tags))
for k, v := range tags {
tagSet[sanitizedChars.Replace(k)] = sanitizedChars.Replace(v)
tagSet[sanitize(k)] = sanitize(v)
}
return tagSet
}
@@ -236,6 +243,13 @@ func (o *OpenTSDB) Close() error {
return nil
}
func sanitize(value string) string {
// Apply special hypenation rules to preserve backwards compatibility
value = hypenChars.Replace(value)
// Replace any remaining illegal chars
return allowedChars.ReplaceAllLiteralString(value, "_")
}
func init() {
outputs.Add("opentsdb", func() telegraf.Output {
return &OpenTSDB{}

View File

@@ -10,9 +10,10 @@ import (
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
//"github.com/stretchr/testify/require"
)
func TestCleanTags(t *testing.T) {
@@ -29,8 +30,16 @@ func TestCleanTags(t *testing.T) {
map[string]string{"aaa": "bbb"},
},
{
map[string]string{"Sp%ci@l Chars": "g$t repl#ced"},
map[string]string{"Sp-ci-l_Chars": "g-t_repl-ced"},
map[string]string{"Sp%ci@l Chars[": "g$t repl#ce)d"},
map[string]string{"Sp-ci-l_Chars_": "g-t_repl-ce_d"},
},
{
map[string]string{"μnicodε_letters": "okαy"},
map[string]string{"μnicodε_letters": "okαy"},
},
{
map[string]string{"n☺": "emojies☠"},
map[string]string{"n_": "emojies_"},
},
{
map[string]string{},
@@ -75,6 +84,47 @@ func TestBuildTagsTelnet(t *testing.T) {
}
}
func TestSanitize(t *testing.T) {
tests := []struct {
name string
value string
expected string
}{
{
name: "Ascii letters and numbers allowed",
value: "ascii 123",
expected: "ascii_123",
},
{
name: "Allowed punct",
value: "-_./",
expected: "-_./",
},
{
name: "Special conversions to hyphen",
value: "@*%#$!",
expected: "-----_",
},
{
name: "Unicode Letters allowed",
value: "μnicodε_letters",
expected: "μnicodε_letters",
},
{
name: "Other Unicode not allowed",
value: "“☢”",
expected: "___",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := sanitize(tt.value)
require.Equal(t, tt.expected, actual)
})
}
}
func BenchmarkHttpSend(b *testing.B) {
const BatchSize = 50
const MetricsCount = 4 * BatchSize

View File

@@ -212,6 +212,8 @@ func TestSerializeValueBoolean(t *testing.T) {
fmt.Sprintf("localhost.enabled.cpu0.us-west-2.cpu 1 %d", now.Unix()),
fmt.Sprintf("localhost.disabled.cpu0.us-west-2.cpu 0 %d", now.Unix()),
}
sort.Strings(mS)
sort.Strings(expS)
assert.Equal(t, expS, mS)
}

View File

@@ -274,6 +274,8 @@ def get_system_arch():
arch = "amd64"
elif arch == "386":
arch = "i386"
elif "arm64" in arch:
arch = "arm64"
elif 'arm' in arch:
# Prevent uname from reporting full ARM arch (eg 'armv7l')
arch = "arm"
@@ -446,6 +448,8 @@ def build(version=None,
# Handle variations in architecture output
if arch == "i386" or arch == "i686":
arch = "386"
elif "arm64" in arch:
arch = "arm64"
elif "arm" in arch:
arch = "arm"
build_command += "GOOS={} GOARCH={} ".format(platform, arch)