Fix metric splitting edge cases (#2896)

Metrics needing one extra byte to fit the output buffer would not be split, so we would emit lines without a line ending. Metrics which overflowed by exactly one field length would be split one field too late, causing truncated fields.
This commit is contained in:
Daniel Nelson 2017-06-07 13:37:54 -07:00 committed by GitHub
parent d3562b7730
commit f0c10b4012
2 changed files with 40 additions and 3 deletions

View File

@ -218,7 +218,7 @@ func (m *metric) SerializeTo(dst []byte) int {
} }
func (m *metric) Split(maxSize int) []telegraf.Metric { func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize { if m.Len() <= maxSize {
return []telegraf.Metric{m} return []telegraf.Metric{m}
} }
var out []telegraf.Metric var out []telegraf.Metric
@ -248,7 +248,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
// if true, then we need to create a metric _not_ including the currently // if true, then we need to create a metric _not_ including the currently
// selected field // selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize { if len(m.fields[i:j])+len(fields)+constant >= maxSize {
// if false, then we'll create a metric including the currently // if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too // selected field anyways. This means that the given maxSize is too
// small for a single field to fit. // small for a single field to fit.

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestNewMetric(t *testing.T) { func TestNewMetric(t *testing.T) {
@ -458,7 +459,7 @@ func TestSplitMetric(t *testing.T) {
assert.Len(t, split70, 3) assert.Len(t, split70, 3)
split60 := m.Split(60) split60 := m.Split(60)
assert.Len(t, split60, 4) assert.Len(t, split60, 5)
} }
// test splitting metric into various max lengths // test splitting metric into various max lengths
@ -578,6 +579,42 @@ func TestSplitMetric_OneField(t *testing.T) {
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
} }
func TestSplitMetric_ExactSize(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len())
// check that no copy was made
require.Equal(t, &m, &actual[0])
}
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)
actual := m.Split(m.Len() - 1)
require.Equal(t, 2, len(actual))
}
func TestNewMetricAggregate(t *testing.T) { func TestNewMetricAggregate(t *testing.T) {
now := time.Now() now := time.Now()