Fix metric splitting edge cases (#2896)
Metrics needing one extra byte to fit the output buffer would not be split, so we would emit lines without a line ending. Metrics which overflowed by exactly one field length would be split one field too late, causing truncated fields.
This commit is contained in:
parent
de7fb2acfe
commit
a275e6792a
|
@ -218,7 +218,7 @@ func (m *metric) SerializeTo(dst []byte) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||||
if m.Len() < maxSize {
|
if m.Len() <= maxSize {
|
||||||
return []telegraf.Metric{m}
|
return []telegraf.Metric{m}
|
||||||
}
|
}
|
||||||
var out []telegraf.Metric
|
var out []telegraf.Metric
|
||||||
|
@ -248,7 +248,7 @@ func (m *metric) Split(maxSize int) []telegraf.Metric {
|
||||||
|
|
||||||
// if true, then we need to create a metric _not_ including the currently
|
// if true, then we need to create a metric _not_ including the currently
|
||||||
// selected field
|
// selected field
|
||||||
if len(m.fields[i:j])+len(fields)+constant > maxSize {
|
if len(m.fields[i:j])+len(fields)+constant >= maxSize {
|
||||||
// if false, then we'll create a metric including the currently
|
// if false, then we'll create a metric including the currently
|
||||||
// selected field anyways. This means that the given maxSize is too
|
// selected field anyways. This means that the given maxSize is too
|
||||||
// small for a single field to fit.
|
// small for a single field to fit.
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewMetric(t *testing.T) {
|
func TestNewMetric(t *testing.T) {
|
||||||
|
@ -458,7 +459,7 @@ func TestSplitMetric(t *testing.T) {
|
||||||
assert.Len(t, split70, 3)
|
assert.Len(t, split70, 3)
|
||||||
|
|
||||||
split60 := m.Split(60)
|
split60 := m.Split(60)
|
||||||
assert.Len(t, split60, 4)
|
assert.Len(t, split60, 5)
|
||||||
}
|
}
|
||||||
|
|
||||||
// test splitting metric into various max lengths
|
// test splitting metric into various max lengths
|
||||||
|
@ -578,6 +579,42 @@ func TestSplitMetric_OneField(t *testing.T) {
|
||||||
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSplitMetric_ExactSize(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
"int": int64(100001),
|
||||||
|
"bool": true,
|
||||||
|
"false": false,
|
||||||
|
"string": "test",
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
actual := m.Split(m.Len())
|
||||||
|
// check that no copy was made
|
||||||
|
require.Equal(t, &m, &actual[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitMetric_NoRoomForNewline(t *testing.T) {
|
||||||
|
now := time.Unix(0, 1480940990034083306)
|
||||||
|
tags := map[string]string{
|
||||||
|
"host": "localhost",
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"float": float64(100001),
|
||||||
|
"int": int64(100001),
|
||||||
|
"bool": true,
|
||||||
|
"false": false,
|
||||||
|
}
|
||||||
|
m, err := New("cpu", tags, fields, now)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
actual := m.Split(m.Len() - 1)
|
||||||
|
require.Equal(t, 2, len(actual))
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewMetricAggregate(t *testing.T) {
|
func TestNewMetricAggregate(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue