Converge to typed value in prometheus output (#3104)
This commit is contained in:
parent
b1eb240b18
commit
294b7322b4
|
@ -254,7 +254,17 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
p.fam[mname] = fam
|
p.fam[mname] = fam
|
||||||
} else {
|
} else {
|
||||||
if fam.ValueType != vt {
|
// Metrics can be untyped even though the corresponding plugin
|
||||||
|
// creates them with a type. This happens when the metric was
|
||||||
|
// transferred over the network in a format that does not
|
||||||
|
// preserve value type and received using an input such as a
|
||||||
|
// queue consumer. To avoid issues we automatically upgrade
|
||||||
|
// value type from untyped to a typed metric.
|
||||||
|
if fam.ValueType == prometheus.UntypedValue {
|
||||||
|
fam.ValueType = vt
|
||||||
|
}
|
||||||
|
|
||||||
|
if vt != prometheus.UntypedValue && fam.ValueType != vt {
|
||||||
// Don't return an error since this would be a permanent error
|
// Don't return an error since this would be a permanent error
|
||||||
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
|
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
|
||||||
break
|
break
|
||||||
|
|
|
@ -189,6 +189,56 @@ func TestWrite_MixedValueType(t *testing.T) {
|
||||||
require.Equal(t, 1, len(fam.Samples))
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWrite_MixedValueTypeUpgrade(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"a": "x"},
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
now,
|
||||||
|
telegraf.Untyped)
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"a": "y"},
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
now,
|
||||||
|
telegraf.Gauge)
|
||||||
|
var metrics = []telegraf.Metric{p1, p2}
|
||||||
|
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 2, len(fam.Samples))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MixedValueTypeDowngrade(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"a": "x"},
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
now,
|
||||||
|
telegraf.Gauge)
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"a": "y"},
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
now,
|
||||||
|
telegraf.Untyped)
|
||||||
|
var metrics = []telegraf.Metric{p1, p2}
|
||||||
|
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 2, len(fam.Samples))
|
||||||
|
}
|
||||||
|
|
||||||
func TestWrite_Tags(t *testing.T) {
|
func TestWrite_Tags(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
p1, err := metric.New(
|
p1, err := metric.New(
|
||||||
|
|
Loading…
Reference in New Issue