Support a telegraf.Metric.Split function
This commit is contained in:
parent
11bc82379c
commit
6b0e863556
|
@ -12,6 +12,7 @@
|
||||||
- [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats.
|
- [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats.
|
||||||
- [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself.
|
- [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself.
|
||||||
- [#2127](https://github.com/influxdata/telegraf/pull/2127): Update Go version to 1.7.4.
|
- [#2127](https://github.com/influxdata/telegraf/pull/2127): Update Go version to 1.7.4.
|
||||||
|
- [#2126](https://github.com/influxdata/telegraf/pull/2126): Support a metric.Split function.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,10 @@ type Metric interface {
|
||||||
Serialize() []byte
|
Serialize() []byte
|
||||||
String() string // convenience function for string(Serialize())
|
String() string // convenience function for string(Serialize())
|
||||||
Copy() Metric
|
Copy() Metric
|
||||||
|
// Split will attempt to return multiple metrics with the same timestamp
|
||||||
|
// whose string representations are no longer than maxSize.
|
||||||
|
// Metrics with a single field may exceed the requested size.
|
||||||
|
Split(maxSize int) []Metric
|
||||||
|
|
||||||
// Tag functions
|
// Tag functions
|
||||||
HasTag(key string) bool
|
HasTag(key string) bool
|
||||||
|
|
|
@ -178,6 +178,57 @@ func (m *metric) Serialize() []byte {
|
||||||
return tmp
|
return tmp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||||
|
if m.Len() < maxSize {
|
||||||
|
return []telegraf.Metric{m}
|
||||||
|
}
|
||||||
|
var out []telegraf.Metric
|
||||||
|
|
||||||
|
// constant number of bytes for each metric (in addition to field bytes)
|
||||||
|
constant := len(m.name) + len(m.tags) + len(m.t) + 3
|
||||||
|
// currently selected fields
|
||||||
|
fields := make([]byte, 0, maxSize)
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
if i >= len(m.fields) {
|
||||||
|
// hit the end of the field byte slice
|
||||||
|
if len(fields) > 0 {
|
||||||
|
out = append(out, copyWith(m.name, m.tags, fields, m.t))
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the end of the next field
|
||||||
|
j := indexUnescapedByte(m.fields[i:], ',')
|
||||||
|
if j == -1 {
|
||||||
|
j = len(m.fields)
|
||||||
|
} else {
|
||||||
|
j += i
|
||||||
|
}
|
||||||
|
|
||||||
|
// if true, then we need to create a metric _not_ including the currently
|
||||||
|
// selected field
|
||||||
|
if len(m.fields[i:j])+len(fields)+constant > maxSize {
|
||||||
|
// if false, then we'll create a metric including the currently
|
||||||
|
// selected field anyways. This means that the given maxSize is too
|
||||||
|
// small for a single field to fit.
|
||||||
|
if len(fields) > 0 {
|
||||||
|
out = append(out, copyWith(m.name, m.tags, fields, m.t))
|
||||||
|
}
|
||||||
|
|
||||||
|
fields = make([]byte, 0, maxSize)
|
||||||
|
}
|
||||||
|
if len(fields) > 0 {
|
||||||
|
fields = append(fields, ',')
|
||||||
|
}
|
||||||
|
fields = append(fields, m.fields[i:j]...)
|
||||||
|
|
||||||
|
i = j + 1
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func (m *metric) Fields() map[string]interface{} {
|
func (m *metric) Fields() map[string]interface{} {
|
||||||
fieldMap := map[string]interface{}{}
|
fieldMap := map[string]interface{}{}
|
||||||
i := 0
|
i := 0
|
||||||
|
@ -380,17 +431,21 @@ func (m *metric) RemoveField(key string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metric) Copy() telegraf.Metric {
|
func (m *metric) Copy() telegraf.Metric {
|
||||||
mOut := metric{
|
return copyWith(m.name, m.tags, m.fields, m.t)
|
||||||
name: make([]byte, len(m.name)),
|
|
||||||
tags: make([]byte, len(m.tags)),
|
|
||||||
fields: make([]byte, len(m.fields)),
|
|
||||||
t: make([]byte, len(m.t)),
|
|
||||||
}
|
}
|
||||||
copy(mOut.name, m.name)
|
|
||||||
copy(mOut.tags, m.tags)
|
func copyWith(name, tags, fields, t []byte) telegraf.Metric {
|
||||||
copy(mOut.fields, m.fields)
|
out := metric{
|
||||||
copy(mOut.t, m.t)
|
name: make([]byte, len(name)),
|
||||||
return &mOut
|
tags: make([]byte, len(tags)),
|
||||||
|
fields: make([]byte, len(fields)),
|
||||||
|
t: make([]byte, len(t)),
|
||||||
|
}
|
||||||
|
copy(out.name, name)
|
||||||
|
copy(out.tags, tags)
|
||||||
|
copy(out.fields, fields)
|
||||||
|
copy(out.t, t)
|
||||||
|
return &out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metric) HashID() uint64 {
|
func (m *metric) HashID() uint64 {
|
||||||
|
|
|
@ -50,6 +50,21 @@ func BenchmarkAddTag(b *testing.B) {
|
||||||
s = string(mt.String())
|
s = string(mt.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkSplit(b *testing.B) {
|
||||||
|
var mt telegraf.Metric
|
||||||
|
mt = &metric{
|
||||||
|
name: []byte("cpu"),
|
||||||
|
tags: []byte(",host=localhost"),
|
||||||
|
fields: []byte("a=101,b=10i,c=10101,d=101010,e=42"),
|
||||||
|
t: []byte("1480614053000000000"),
|
||||||
|
}
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
metrics = mt.Split(60)
|
||||||
|
}
|
||||||
|
s = string(metrics[0].String())
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkTags(b *testing.B) {
|
func BenchmarkTags(b *testing.B) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
var mt, _ = New("test_metric",
|
var mt, _ = New("test_metric",
|
||||||
|
|
|
@ -3,6 +3,7 @@ package metric
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"regexp"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -434,6 +435,149 @@ func TestNewCounterMetric(t *testing.T) {
|
||||||
assert.Equal(t, now.UnixNano(), m.UnixNano())
|
assert.Equal(t, now.UnixNano(), m.UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test splitting metric into various max lengths
|
||||||
|
func TestSplitMetric(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
"int": int64(100001),
|
||||||
|
"bool": true,
|
||||||
|
"false": false,
|
||||||
|
"string": "test",
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
split80 := m.Split(80)
|
||||||
|
assert.Len(t, split80, 2)
|
||||||
|
|
||||||
|
split70 := m.Split(70)
|
||||||
|
assert.Len(t, split70, 3)
|
||||||
|
|
||||||
|
split60 := m.Split(60)
|
||||||
|
assert.Len(t, split60, 4)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test splitting metric into various max lengths
|
||||||
|
// use a simple regex check to verify that the split metrics are valid
|
||||||
|
func TestSplitMetric_RegexVerify(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"foo": float64(98934259085),
|
||||||
|
"bar": float64(19385292),
|
||||||
|
"number": float64(19385292),
|
||||||
|
"another": float64(19385292),
|
||||||
|
"n": float64(19385292),
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// verification regex
|
||||||
|
re := regexp.MustCompile(`cpu,host=localhost \w+=\d+(,\w+=\d+)* 1480940990034083306`)
|
||||||
|
|
||||||
|
split90 := m.Split(90)
|
||||||
|
assert.Len(t, split90, 2)
|
||||||
|
for _, splitM := range split90 {
|
||||||
|
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
split70 := m.Split(70)
|
||||||
|
assert.Len(t, split70, 3)
|
||||||
|
for _, splitM := range split70 {
|
||||||
|
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
split20 := m.Split(20)
|
||||||
|
assert.Len(t, split20, 5)
|
||||||
|
for _, splitM := range split20 {
|
||||||
|
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// test splitting metric even when given length is shorter than
|
||||||
|
// shortest possible length
|
||||||
|
// Split should split metric as short as possible, ie, 1 field per metric
|
||||||
|
func TestSplitMetric_TooShort(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
"int": int64(100001),
|
||||||
|
"bool": true,
|
||||||
|
"false": false,
|
||||||
|
"string": "test",
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
split := m.Split(10)
|
||||||
|
assert.Len(t, split, 5)
|
||||||
|
strings := make([]string, 5)
|
||||||
|
for i, splitM := range split {
|
||||||
|
strings[i] = splitM.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Contains(t, strings, "cpu,host=localhost float=100001 1480940990034083306\n")
|
||||||
|
assert.Contains(t, strings, "cpu,host=localhost int=100001i 1480940990034083306\n")
|
||||||
|
assert.Contains(t, strings, "cpu,host=localhost bool=true 1480940990034083306\n")
|
||||||
|
assert.Contains(t, strings, "cpu,host=localhost false=false 1480940990034083306\n")
|
||||||
|
assert.Contains(t, strings, "cpu,host=localhost string=\"test\" 1480940990034083306\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitMetric_NoOp(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
"int": int64(100001),
|
||||||
|
"bool": true,
|
||||||
|
"false": false,
|
||||||
|
"string": "test",
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
split := m.Split(1000)
|
||||||
|
assert.Len(t, split, 1)
|
||||||
|
assert.Equal(t, m, split[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitMetric_OneField(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", m.String())
|
||||||
|
|
||||||
|
split := m.Split(1000)
|
||||||
|
assert.Len(t, split, 1)
|
||||||
|
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
||||||
|
|
||||||
|
split = m.Split(1)
|
||||||
|
assert.Len(t, split, 1)
|
||||||
|
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
||||||
|
|
||||||
|
split = m.Split(40)
|
||||||
|
assert.Len(t, split, 1)
|
||||||
|
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewMetricAggregate(t *testing.T) {
|
func TestNewMetricAggregate(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue