Add history and summary types to telegraf and prometheus plugins (#3337)
This commit is contained in:
parent
76fbe598a7
commit
43d69d805d
|
@ -28,6 +28,18 @@ type Accumulator interface {
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
t ...time.Time)
|
t ...time.Time)
|
||||||
|
|
||||||
|
// AddSummary is the same as AddFields, but will add the metric as a "Summary" type
|
||||||
|
AddSummary(measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
t ...time.Time)
|
||||||
|
|
||||||
|
// AddHistogram is the same as AddFields, but will add the metric as a "Histogram" type
|
||||||
|
AddHistogram(measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
t ...time.Time)
|
||||||
|
|
||||||
SetPrecision(precision, interval time.Duration)
|
SetPrecision(precision, interval time.Duration)
|
||||||
|
|
||||||
AddError(err error)
|
AddError(err error)
|
||||||
|
|
|
@ -76,6 +76,28 @@ func (ac *accumulator) AddCounter(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ac *accumulator) AddSummary(
|
||||||
|
measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
t ...time.Time,
|
||||||
|
) {
|
||||||
|
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil {
|
||||||
|
ac.metrics <- m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ac *accumulator) AddHistogram(
|
||||||
|
measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
t ...time.Time,
|
||||||
|
) {
|
||||||
|
if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil {
|
||||||
|
ac.metrics <- m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AddError passes a runtime error to the accumulator.
|
// AddError passes a runtime error to the accumulator.
|
||||||
// The error will be tagged with the plugin name and written to the log.
|
// The error will be tagged with the plugin name and written to the log.
|
||||||
func (ac *accumulator) AddError(err error) {
|
func (ac *accumulator) AddError(err error) {
|
||||||
|
|
|
@ -13,6 +13,8 @@ const (
|
||||||
Counter
|
Counter
|
||||||
Gauge
|
Gauge
|
||||||
Untyped
|
Untyped
|
||||||
|
Summary
|
||||||
|
Histogram
|
||||||
)
|
)
|
||||||
|
|
||||||
type Metric interface {
|
type Metric interface {
|
||||||
|
|
|
@ -103,6 +103,10 @@ func valueType(mt dto.MetricType) telegraf.ValueType {
|
||||||
return telegraf.Counter
|
return telegraf.Counter
|
||||||
case dto.MetricType_GAUGE:
|
case dto.MetricType_GAUGE:
|
||||||
return telegraf.Gauge
|
return telegraf.Gauge
|
||||||
|
case dto.MetricType_SUMMARY:
|
||||||
|
return telegraf.Summary
|
||||||
|
case dto.MetricType_HISTOGRAM:
|
||||||
|
return telegraf.Histogram
|
||||||
default:
|
default:
|
||||||
return telegraf.Untyped
|
return telegraf.Untyped
|
||||||
}
|
}
|
||||||
|
@ -145,11 +149,11 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
|
||||||
fields["gauge"] = float64(m.GetGauge().GetValue())
|
fields["gauge"] = float64(m.GetGauge().GetValue())
|
||||||
}
|
}
|
||||||
} else if m.Counter != nil {
|
} else if m.Counter != nil {
|
||||||
if !math.IsNaN(m.GetGauge().GetValue()) {
|
if !math.IsNaN(m.GetCounter().GetValue()) {
|
||||||
fields["counter"] = float64(m.GetCounter().GetValue())
|
fields["counter"] = float64(m.GetCounter().GetValue())
|
||||||
}
|
}
|
||||||
} else if m.Untyped != nil {
|
} else if m.Untyped != nil {
|
||||||
if !math.IsNaN(m.GetGauge().GetValue()) {
|
if !math.IsNaN(m.GetUntyped().GetValue()) {
|
||||||
fields["value"] = float64(m.GetUntyped().GetValue())
|
fields["value"] = float64(m.GetUntyped().GetValue())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,6 +224,10 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
|
||||||
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
|
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
case telegraf.Gauge:
|
case telegraf.Gauge:
|
||||||
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
|
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
|
case telegraf.Summary:
|
||||||
|
acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
|
case telegraf.Histogram:
|
||||||
|
acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
default:
|
default:
|
||||||
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -28,8 +29,13 @@ type SampleID string
|
||||||
type Sample struct {
|
type Sample struct {
|
||||||
// Labels are the Prometheus labels.
|
// Labels are the Prometheus labels.
|
||||||
Labels map[string]string
|
Labels map[string]string
|
||||||
// Value is the value in the Prometheus output.
|
// Value is the value in the Prometheus output. Only one of these will populated.
|
||||||
Value float64
|
Value float64
|
||||||
|
HistogramValue map[float64]uint64
|
||||||
|
SummaryValue map[float64]float64
|
||||||
|
// Histograms and Summaries need a count and a sum
|
||||||
|
Count uint64
|
||||||
|
Sum float64
|
||||||
// Expiration is the deadline that this Sample is valid until.
|
// Expiration is the deadline that this Sample is valid until.
|
||||||
Expiration time.Time
|
Expiration time.Time
|
||||||
}
|
}
|
||||||
|
@ -38,8 +44,9 @@ type Sample struct {
|
||||||
type MetricFamily struct {
|
type MetricFamily struct {
|
||||||
// Samples are the Sample belonging to this MetricFamily.
|
// Samples are the Sample belonging to this MetricFamily.
|
||||||
Samples map[SampleID]*Sample
|
Samples map[SampleID]*Sample
|
||||||
// Type of the Value.
|
// Need the telegraf ValueType because there isn't a Prometheus ValueType
|
||||||
ValueType prometheus.ValueType
|
// representing Histogram or Summary
|
||||||
|
TelegrafValueType telegraf.ValueType
|
||||||
// LabelSet is the label counts for all Samples.
|
// LabelSet is the label counts for all Samples.
|
||||||
LabelSet map[string]int
|
LabelSet map[string]int
|
||||||
}
|
}
|
||||||
|
@ -189,7 +196,16 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
|
||||||
labels = append(labels, v)
|
labels = append(labels, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
|
var metric prometheus.Metric
|
||||||
|
var err error
|
||||||
|
switch family.TelegrafValueType {
|
||||||
|
case telegraf.Summary:
|
||||||
|
metric, err = prometheus.NewConstSummary(desc, sample.Count, sample.Sum, sample.SummaryValue, labels...)
|
||||||
|
case telegraf.Histogram:
|
||||||
|
metric, err = prometheus.NewConstHistogram(desc, sample.Count, sample.Sum, sample.HistogramValue, labels...)
|
||||||
|
default:
|
||||||
|
metric, err = prometheus.NewConstMetric(desc, getPromValueType(family.TelegrafValueType), sample.Value, labels...)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("E! Error creating prometheus metric, "+
|
log.Printf("E! Error creating prometheus metric, "+
|
||||||
"key: %s, labels: %v,\nerr: %s\n",
|
"key: %s, labels: %v,\nerr: %s\n",
|
||||||
|
@ -205,7 +221,7 @@ func sanitize(value string) string {
|
||||||
return invalidNameCharRE.ReplaceAllString(value, "_")
|
return invalidNameCharRE.ReplaceAllString(value, "_")
|
||||||
}
|
}
|
||||||
|
|
||||||
func valueType(tt telegraf.ValueType) prometheus.ValueType {
|
func getPromValueType(tt telegraf.ValueType) prometheus.ValueType {
|
||||||
switch tt {
|
switch tt {
|
||||||
case telegraf.Counter:
|
case telegraf.Counter:
|
||||||
return prometheus.CounterValue
|
return prometheus.CounterValue
|
||||||
|
@ -226,6 +242,30 @@ func CreateSampleID(tags map[string]string) SampleID {
|
||||||
return SampleID(strings.Join(pairs, ","))
|
return SampleID(strings.Join(pairs, ","))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addSample(fam *MetricFamily, sample *Sample, sampleID SampleID) {
|
||||||
|
|
||||||
|
for k, _ := range sample.Labels {
|
||||||
|
fam.LabelSet[k]++
|
||||||
|
}
|
||||||
|
|
||||||
|
fam.Samples[sampleID] = sample
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PrometheusClient) addMetricFamily(point telegraf.Metric, sample *Sample, mname string, sampleID SampleID) {
|
||||||
|
var fam *MetricFamily
|
||||||
|
var ok bool
|
||||||
|
if fam, ok = p.fam[mname]; !ok {
|
||||||
|
fam = &MetricFamily{
|
||||||
|
Samples: make(map[SampleID]*Sample),
|
||||||
|
TelegrafValueType: point.Type(),
|
||||||
|
LabelSet: make(map[string]int),
|
||||||
|
}
|
||||||
|
p.fam[mname] = fam
|
||||||
|
}
|
||||||
|
|
||||||
|
addSample(fam, sample, sampleID)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
@ -234,7 +274,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
for _, point := range metrics {
|
for _, point := range metrics {
|
||||||
tags := point.Tags()
|
tags := point.Tags()
|
||||||
vt := valueType(point.Type())
|
|
||||||
sampleID := CreateSampleID(tags)
|
sampleID := CreateSampleID(tags)
|
||||||
|
|
||||||
labels := make(map[string]string)
|
labels := make(map[string]string)
|
||||||
|
@ -251,77 +290,128 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for fn, fv := range point.Fields() {
|
switch point.Type() {
|
||||||
// Ignore string and bool fields.
|
case telegraf.Summary:
|
||||||
var value float64
|
|
||||||
switch fv := fv.(type) {
|
|
||||||
case int64:
|
|
||||||
value = float64(fv)
|
|
||||||
case float64:
|
|
||||||
value = fv
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
sample := &Sample{
|
|
||||||
Labels: labels,
|
|
||||||
Value: value,
|
|
||||||
Expiration: now.Add(p.ExpirationInterval.Duration),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Special handling of value field; supports passthrough from
|
|
||||||
// the prometheus input.
|
|
||||||
var mname string
|
var mname string
|
||||||
switch point.Type() {
|
var sum float64
|
||||||
case telegraf.Counter:
|
var count uint64
|
||||||
if fn == "counter" {
|
summaryvalue := make(map[float64]float64)
|
||||||
mname = sanitize(point.Name())
|
for fn, fv := range point.Fields() {
|
||||||
}
|
var value float64
|
||||||
case telegraf.Gauge:
|
switch fv := fv.(type) {
|
||||||
if fn == "gauge" {
|
case int64:
|
||||||
mname = sanitize(point.Name())
|
value = float64(fv)
|
||||||
}
|
case float64:
|
||||||
}
|
value = fv
|
||||||
if mname == "" {
|
default:
|
||||||
if fn == "value" {
|
continue
|
||||||
mname = sanitize(point.Name())
|
|
||||||
} else {
|
|
||||||
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var fam *MetricFamily
|
|
||||||
var ok bool
|
|
||||||
if fam, ok = p.fam[mname]; !ok {
|
|
||||||
fam = &MetricFamily{
|
|
||||||
Samples: make(map[SampleID]*Sample),
|
|
||||||
ValueType: vt,
|
|
||||||
LabelSet: make(map[string]int),
|
|
||||||
}
|
|
||||||
p.fam[mname] = fam
|
|
||||||
} else {
|
|
||||||
// Metrics can be untyped even though the corresponding plugin
|
|
||||||
// creates them with a type. This happens when the metric was
|
|
||||||
// transferred over the network in a format that does not
|
|
||||||
// preserve value type and received using an input such as a
|
|
||||||
// queue consumer. To avoid issues we automatically upgrade
|
|
||||||
// value type from untyped to a typed metric.
|
|
||||||
if fam.ValueType == prometheus.UntypedValue {
|
|
||||||
fam.ValueType = vt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if vt != prometheus.UntypedValue && fam.ValueType != vt {
|
switch fn {
|
||||||
// Don't return an error since this would be a permanent error
|
case "sum":
|
||||||
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
|
sum = value
|
||||||
break
|
case "count":
|
||||||
|
count = uint64(value)
|
||||||
|
default:
|
||||||
|
limit, err := strconv.ParseFloat(fn, 64)
|
||||||
|
if err == nil {
|
||||||
|
summaryvalue[limit] = value
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sample := &Sample{
|
||||||
for k, _ := range sample.Labels {
|
Labels: labels,
|
||||||
fam.LabelSet[k]++
|
SummaryValue: summaryvalue,
|
||||||
|
Count: count,
|
||||||
|
Sum: sum,
|
||||||
|
Expiration: now.Add(p.ExpirationInterval.Duration),
|
||||||
}
|
}
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
|
||||||
fam.Samples[sampleID] = sample
|
p.addMetricFamily(point, sample, mname, sampleID)
|
||||||
|
|
||||||
|
case telegraf.Histogram:
|
||||||
|
var mname string
|
||||||
|
var sum float64
|
||||||
|
var count uint64
|
||||||
|
histogramvalue := make(map[float64]uint64)
|
||||||
|
for fn, fv := range point.Fields() {
|
||||||
|
var value float64
|
||||||
|
switch fv := fv.(type) {
|
||||||
|
case int64:
|
||||||
|
value = float64(fv)
|
||||||
|
case float64:
|
||||||
|
value = fv
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch fn {
|
||||||
|
case "sum":
|
||||||
|
sum = value
|
||||||
|
case "count":
|
||||||
|
count = uint64(value)
|
||||||
|
default:
|
||||||
|
limit, err := strconv.ParseFloat(fn, 64)
|
||||||
|
if err == nil {
|
||||||
|
histogramvalue[limit] = uint64(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sample := &Sample{
|
||||||
|
Labels: labels,
|
||||||
|
HistogramValue: histogramvalue,
|
||||||
|
Count: count,
|
||||||
|
Sum: sum,
|
||||||
|
Expiration: now.Add(p.ExpirationInterval.Duration),
|
||||||
|
}
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
|
||||||
|
p.addMetricFamily(point, sample, mname, sampleID)
|
||||||
|
|
||||||
|
default:
|
||||||
|
for fn, fv := range point.Fields() {
|
||||||
|
// Ignore string and bool fields.
|
||||||
|
var value float64
|
||||||
|
switch fv := fv.(type) {
|
||||||
|
case int64:
|
||||||
|
value = float64(fv)
|
||||||
|
case float64:
|
||||||
|
value = fv
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sample := &Sample{
|
||||||
|
Labels: labels,
|
||||||
|
Value: value,
|
||||||
|
Expiration: now.Add(p.ExpirationInterval.Duration),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special handling of value field; supports passthrough from
|
||||||
|
// the prometheus input.
|
||||||
|
var mname string
|
||||||
|
switch point.Type() {
|
||||||
|
case telegraf.Counter:
|
||||||
|
if fn == "counter" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
}
|
||||||
|
case telegraf.Gauge:
|
||||||
|
if fn == "gauge" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if mname == "" {
|
||||||
|
if fn == "value" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
} else {
|
||||||
|
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.addMetricFamily(point, sample, mname, sampleID)
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -45,7 +44,7 @@ func TestWrite_Basic(t *testing.T) {
|
||||||
|
|
||||||
fam, ok := client.fam["foo"]
|
fam, ok := client.fam["foo"]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
|
||||||
require.Equal(t, map[string]int{}, fam.LabelSet)
|
require.Equal(t, map[string]int{}, fam.LabelSet)
|
||||||
|
|
||||||
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
|
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
|
||||||
|
@ -119,7 +118,7 @@ func TestWrite_Counters(t *testing.T) {
|
||||||
args args
|
args args
|
||||||
err error
|
err error
|
||||||
metricName string
|
metricName string
|
||||||
promType prometheus.ValueType
|
valueType telegraf.ValueType
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "field named value is not added to metric name",
|
name: "field named value is not added to metric name",
|
||||||
|
@ -129,7 +128,7 @@ func TestWrite_Counters(t *testing.T) {
|
||||||
valueType: telegraf.Counter,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
promType: prometheus.CounterValue,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "field named counter is not added to metric name",
|
name: "field named counter is not added to metric name",
|
||||||
|
@ -139,7 +138,7 @@ func TestWrite_Counters(t *testing.T) {
|
||||||
valueType: telegraf.Counter,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
metricName: "foo",
|
metricName: "foo",
|
||||||
promType: prometheus.CounterValue,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "field with any other name is added to metric name",
|
name: "field with any other name is added to metric name",
|
||||||
|
@ -149,7 +148,7 @@ func TestWrite_Counters(t *testing.T) {
|
||||||
valueType: telegraf.Counter,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
metricName: "foo_other",
|
metricName: "foo_other",
|
||||||
promType: prometheus.CounterValue,
|
valueType: telegraf.Counter,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
@ -167,7 +166,7 @@ func TestWrite_Counters(t *testing.T) {
|
||||||
|
|
||||||
fam, ok := client.fam[tt.metricName]
|
fam, ok := client.fam[tt.metricName]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, tt.promType, fam.ValueType)
|
require.Equal(t, tt.valueType, fam.TelegrafValueType)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,20 +195,119 @@ func TestWrite_Sanitize(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_Gauge(t *testing.T) {
|
func TestWrite_Gauge(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
measurement string
|
||||||
|
tags map[string]string
|
||||||
|
fields map[string]interface{}
|
||||||
|
valueType telegraf.ValueType
|
||||||
|
}
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
err error
|
||||||
|
metricName string
|
||||||
|
valueType telegraf.ValueType
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "field named value is not added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"value": 42},
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
metricName: "foo",
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "field named gauge is not added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"gauge": 42},
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
metricName: "foo",
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "field with any other name is added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"other": 42},
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
metricName: "foo_other",
|
||||||
|
valueType: telegraf.Gauge,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
m, err := metric.New(
|
||||||
|
tt.args.measurement,
|
||||||
|
tt.args.tags,
|
||||||
|
tt.args.fields,
|
||||||
|
time.Now(),
|
||||||
|
tt.args.valueType,
|
||||||
|
)
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write([]telegraf.Metric{m})
|
||||||
|
require.Equal(t, tt.err, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam[tt.metricName]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, tt.valueType, fam.TelegrafValueType)
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Summary(t *testing.T) {
|
||||||
client := NewClient()
|
client := NewClient()
|
||||||
|
|
||||||
p1, err := metric.New(
|
p1, err := metric.New(
|
||||||
"foo",
|
"foo",
|
||||||
make(map[string]string),
|
make(map[string]string),
|
||||||
map[string]interface{}{"value": 42},
|
map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
|
||||||
time.Now(),
|
time.Now(),
|
||||||
telegraf.Gauge)
|
telegraf.Summary)
|
||||||
|
|
||||||
err = client.Write([]telegraf.Metric{p1})
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fam, ok := client.fam["foo"]
|
fam, ok := client.fam["foo"]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
|
|
||||||
|
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, 84.0, sample1.Sum)
|
||||||
|
require.Equal(t, uint64(42), sample1.Count)
|
||||||
|
require.Equal(t, 3, len(sample1.SummaryValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Histogram(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
|
||||||
|
time.Now(),
|
||||||
|
telegraf.Histogram)
|
||||||
|
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
|
|
||||||
|
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, 84.0, sample1.Sum)
|
||||||
|
require.Equal(t, uint64(42), sample1.Count)
|
||||||
|
require.Equal(t, 3, len(sample1.HistogramValue))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_MixedValueType(t *testing.T) {
|
func TestWrite_MixedValueType(t *testing.T) {
|
||||||
|
@ -307,7 +405,7 @@ func TestWrite_Tags(t *testing.T) {
|
||||||
|
|
||||||
fam, ok := client.fam["foo"]
|
fam, ok := client.fam["foo"]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
|
||||||
|
|
||||||
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
||||||
|
|
||||||
|
|
|
@ -122,6 +122,24 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) AddSummary(
|
||||||
|
measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
timestamp ...time.Time,
|
||||||
|
) {
|
||||||
|
a.AddFields(measurement, fields, tags, timestamp...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Accumulator) AddHistogram(
|
||||||
|
measurement string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
tags map[string]string,
|
||||||
|
timestamp ...time.Time,
|
||||||
|
) {
|
||||||
|
a.AddFields(measurement, fields, tags, timestamp...)
|
||||||
|
}
|
||||||
|
|
||||||
// AddError appends the given error to Accumulator.Errors.
|
// AddError appends the given error to Accumulator.Errors.
|
||||||
func (a *Accumulator) AddError(err error) {
|
func (a *Accumulator) AddError(err error) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
Loading…
Reference in New Issue