Ensure prometheus metrics have same set of labels (#2857)
This commit is contained in:
parent
e2ab598a4a
commit
6ebeeef452
|
@ -6,6 +6,8 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -17,19 +19,40 @@ import (
|
||||||
|
|
||||||
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||||
|
|
||||||
type MetricWithExpiration struct {
|
// SampleID uniquely identifies a Sample
|
||||||
Metric prometheus.Metric
|
type SampleID string
|
||||||
|
|
||||||
|
// Sample represents the current value of a series.
|
||||||
|
type Sample struct {
|
||||||
|
// Labels are the Prometheus labels.
|
||||||
|
Labels map[string]string
|
||||||
|
// Value is the value in the Prometheus output.
|
||||||
|
Value float64
|
||||||
|
// Expiration is the deadline that this Sample is valid until.
|
||||||
Expiration time.Time
|
Expiration time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MetricFamily contains the data required to build valid prometheus Metrics.
|
||||||
|
type MetricFamily struct {
|
||||||
|
// Samples are the Sample belonging to this MetricFamily.
|
||||||
|
Samples map[SampleID]*Sample
|
||||||
|
// Type of the Value.
|
||||||
|
ValueType prometheus.ValueType
|
||||||
|
// LabelSet is the label counts for all Samples.
|
||||||
|
LabelSet map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
type PrometheusClient struct {
|
type PrometheusClient struct {
|
||||||
Listen string
|
Listen string
|
||||||
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
||||||
server *http.Server
|
|
||||||
|
|
||||||
metrics map[string]*MetricWithExpiration
|
server *http.Server
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
// fam is the non-expired MetricFamily by Prometheus metric name.
|
||||||
|
fam map[string]*MetricFamily
|
||||||
|
// now returns the current time.
|
||||||
|
now func() time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
@ -41,7 +64,6 @@ var sampleConfig = `
|
||||||
`
|
`
|
||||||
|
|
||||||
func (p *PrometheusClient) Start() error {
|
func (p *PrometheusClient) Start() error {
|
||||||
p.metrics = make(map[string]*MetricWithExpiration)
|
|
||||||
prometheus.Register(p)
|
prometheus.Register(p)
|
||||||
|
|
||||||
if p.Listen == "" {
|
if p.Listen == "" {
|
||||||
|
@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
|
||||||
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
|
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements prometheus.Collector
|
// Expire removes Samples that have expired.
|
||||||
|
func (p *PrometheusClient) Expire() {
|
||||||
|
now := p.now()
|
||||||
|
for name, family := range p.fam {
|
||||||
|
for key, sample := range family.Samples {
|
||||||
|
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
|
||||||
|
for k, _ := range sample.Labels {
|
||||||
|
family.LabelSet[k]--
|
||||||
|
}
|
||||||
|
delete(family.Samples, key)
|
||||||
|
|
||||||
|
if len(family.Samples) == 0 {
|
||||||
|
delete(p.fam, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect implements prometheus.Collector
|
||||||
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
|
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
for key, m := range p.metrics {
|
p.Expire()
|
||||||
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
|
|
||||||
delete(p.metrics, key)
|
for name, family := range p.fam {
|
||||||
} else {
|
// Get list of all labels on MetricFamily
|
||||||
ch <- m.Metric
|
var labelNames []string
|
||||||
|
for k, v := range family.LabelSet {
|
||||||
|
if v > 0 {
|
||||||
|
labelNames = append(labelNames, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
|
||||||
|
|
||||||
|
for _, sample := range family.Samples {
|
||||||
|
// Get labels for this sample; unset labels will be set to the
|
||||||
|
// empty string
|
||||||
|
var labels []string
|
||||||
|
for _, label := range labelNames {
|
||||||
|
v := sample.Labels[label]
|
||||||
|
labels = append(labels, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("E! Error creating prometheus metric, "+
|
||||||
|
"key: %s, labels: %v,\nerr: %s\n",
|
||||||
|
name, labels, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
ch <- metric
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sanitize(value string) string {
|
||||||
|
return invalidNameCharRE.ReplaceAllString(value, "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func valueType(tt telegraf.ValueType) prometheus.ValueType {
|
||||||
|
switch tt {
|
||||||
|
case telegraf.Counter:
|
||||||
|
return prometheus.CounterValue
|
||||||
|
case telegraf.Gauge:
|
||||||
|
return prometheus.GaugeValue
|
||||||
|
default:
|
||||||
|
return prometheus.UntypedValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
|
||||||
|
func CreateSampleID(tags map[string]string) SampleID {
|
||||||
|
pairs := make([]string, 0, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
|
||||||
|
}
|
||||||
|
sort.Strings(pairs)
|
||||||
|
return SampleID(strings.Join(pairs, ","))
|
||||||
|
}
|
||||||
|
|
||||||
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
|
|
||||||
if len(metrics) == 0 {
|
now := p.now()
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, point := range metrics {
|
for _, point := range metrics {
|
||||||
key := point.Name()
|
tags := point.Tags()
|
||||||
key = invalidNameCharRE.ReplaceAllString(key, "_")
|
vt := valueType(point.Type())
|
||||||
|
sampleID := CreateSampleID(tags)
|
||||||
|
|
||||||
// convert tags into prometheus labels
|
labels := make(map[string]string)
|
||||||
var labels []string
|
for k, v := range tags {
|
||||||
l := prometheus.Labels{}
|
labels[sanitize(k)] = sanitize(v)
|
||||||
for k, v := range point.Tags() {
|
|
||||||
k = invalidNameCharRE.ReplaceAllString(k, "_")
|
|
||||||
if len(k) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
labels = append(labels, k)
|
|
||||||
l[k] = v
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a type if it's available, defaulting to Untyped
|
for fn, fv := range point.Fields() {
|
||||||
var mType prometheus.ValueType
|
|
||||||
switch point.Type() {
|
|
||||||
case telegraf.Counter:
|
|
||||||
mType = prometheus.CounterValue
|
|
||||||
case telegraf.Gauge:
|
|
||||||
mType = prometheus.GaugeValue
|
|
||||||
default:
|
|
||||||
mType = prometheus.UntypedValue
|
|
||||||
}
|
|
||||||
|
|
||||||
for n, val := range point.Fields() {
|
|
||||||
// Ignore string and bool fields.
|
// Ignore string and bool fields.
|
||||||
switch val.(type) {
|
var value float64
|
||||||
case string:
|
switch fv := fv.(type) {
|
||||||
continue
|
|
||||||
case bool:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// sanitize the measurement name
|
|
||||||
n = invalidNameCharRE.ReplaceAllString(n, "_")
|
|
||||||
var mname string
|
|
||||||
if n == "value" {
|
|
||||||
mname = key
|
|
||||||
} else {
|
|
||||||
mname = fmt.Sprintf("%s_%s", key, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
|
|
||||||
var metric prometheus.Metric
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// switch for field type
|
|
||||||
switch val := val.(type) {
|
|
||||||
case int64:
|
case int64:
|
||||||
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
|
value = float64(fv)
|
||||||
case float64:
|
case float64:
|
||||||
metric, err = prometheus.NewConstMetric(desc, mType, val)
|
value = fv
|
||||||
default:
|
default:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
log.Printf("E! Error creating prometheus metric, "+
|
sample := &Sample{
|
||||||
"key: %s, labels: %v,\nerr: %s\n",
|
Labels: labels,
|
||||||
mname, l, err.Error())
|
Value: value,
|
||||||
|
Expiration: now.Add(p.ExpirationInterval.Duration),
|
||||||
}
|
}
|
||||||
|
|
||||||
p.metrics[desc.String()] = &MetricWithExpiration{
|
// Special handling of value field; supports passthrough from
|
||||||
Metric: metric,
|
// the prometheus input.
|
||||||
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
|
var mname string
|
||||||
|
if fn == "value" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
} else {
|
||||||
|
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var fam *MetricFamily
|
||||||
|
var ok bool
|
||||||
|
if fam, ok = p.fam[mname]; !ok {
|
||||||
|
fam = &MetricFamily{
|
||||||
|
Samples: make(map[SampleID]*Sample),
|
||||||
|
ValueType: vt,
|
||||||
|
LabelSet: make(map[string]int),
|
||||||
|
}
|
||||||
|
p.fam[mname] = fam
|
||||||
|
} else {
|
||||||
|
if fam.ValueType != vt {
|
||||||
|
// Don't return an error since this would be a permanent error
|
||||||
|
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, _ := range sample.Labels {
|
||||||
|
fam.LabelSet[k]++
|
||||||
|
}
|
||||||
|
|
||||||
|
fam.Samples[sampleID] = sample
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -187,6 +266,8 @@ func init() {
|
||||||
outputs.Add("prometheus_client", func() telegraf.Output {
|
outputs.Add("prometheus_client", func() telegraf.Output {
|
||||||
return &PrometheusClient{
|
return &PrometheusClient{
|
||||||
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
||||||
|
fam: make(map[string]*MetricFamily),
|
||||||
|
now: time.Now,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,16 +4,314 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func setUnixTime(client *PrometheusClient, sec int64) {
|
||||||
|
client.now = func() time.Time {
|
||||||
|
return time.Unix(sec, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient initializes a PrometheusClient.
|
||||||
|
func NewClient() *PrometheusClient {
|
||||||
|
return &PrometheusClient{
|
||||||
|
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
||||||
|
fam: make(map[string]*MetricFamily),
|
||||||
|
now: time.Now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Basic(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
pt1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 0.0},
|
||||||
|
now)
|
||||||
|
var metrics = []telegraf.Metric{
|
||||||
|
pt1,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
||||||
|
require.Equal(t, map[string]int{}, fam.LabelSet)
|
||||||
|
|
||||||
|
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, 0.0, sample.Value)
|
||||||
|
require.True(t, now.Before(sample.Expiration))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_IntField(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 42},
|
||||||
|
time.Now())
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
for _, v := range fam.Samples {
|
||||||
|
require.Equal(t, 42.0, v.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_FieldNotValue(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"howdy": 0.0},
|
||||||
|
time.Now())
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo_howdy"]
|
||||||
|
require.True(t, ok)
|
||||||
|
for _, v := range fam.Samples {
|
||||||
|
require.Equal(t, 0.0, v.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_SkipNonNumberField(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": "howdy"},
|
||||||
|
time.Now())
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, ok := client.fam["foo"]
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Counter(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 42},
|
||||||
|
time.Now(),
|
||||||
|
telegraf.Counter)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, prometheus.CounterValue, fam.ValueType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Sanitize(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo.bar",
|
||||||
|
map[string]string{"tag-with-dash": "localhost.local"},
|
||||||
|
map[string]interface{}{"field-with-dash": 42},
|
||||||
|
time.Now(),
|
||||||
|
telegraf.Counter)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo_bar_field_with_dash"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
|
||||||
|
|
||||||
|
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, map[string]string{
|
||||||
|
"tag_with_dash": "localhost_local"}, sample1.Labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Gauge(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 42},
|
||||||
|
time.Now(),
|
||||||
|
telegraf.Gauge)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MixedValueType(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
now,
|
||||||
|
telegraf.Counter)
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
now,
|
||||||
|
telegraf.Gauge)
|
||||||
|
var metrics = []telegraf.Metric{p1, p2}
|
||||||
|
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_Tags(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
now)
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
now)
|
||||||
|
var metrics = []telegraf.Metric{p1, p2}
|
||||||
|
|
||||||
|
client := NewClient()
|
||||||
|
err = client.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
|
||||||
|
|
||||||
|
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
||||||
|
|
||||||
|
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, 1.0, sample1.Value)
|
||||||
|
require.True(t, now.Before(sample1.Expiration))
|
||||||
|
|
||||||
|
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, 2.0, sample2.Value)
|
||||||
|
require.True(t, now.Before(sample2.Expiration))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExpire(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 0)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p2, err := metric.New(
|
||||||
|
"bar",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 1)
|
||||||
|
err = client.Write([]telegraf.Metric{p2})
|
||||||
|
|
||||||
|
setUnixTime(client, 61)
|
||||||
|
require.Equal(t, 2, len(client.fam))
|
||||||
|
client.Expire()
|
||||||
|
require.Equal(t, 1, len(client.fam))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExpire_TagsNoDecrement(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 0)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 1)
|
||||||
|
err = client.Write([]telegraf.Metric{p2})
|
||||||
|
|
||||||
|
setUnixTime(client, 61)
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 2, len(fam.Samples))
|
||||||
|
client.Expire()
|
||||||
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
|
|
||||||
|
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExpire_TagsWithDecrement(t *testing.T) {
|
||||||
|
client := NewClient()
|
||||||
|
|
||||||
|
p1, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
map[string]string{"host": "localhost"},
|
||||||
|
map[string]interface{}{"value": 1.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 0)
|
||||||
|
err = client.Write([]telegraf.Metric{p1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p2, err := metric.New(
|
||||||
|
"foo",
|
||||||
|
make(map[string]string),
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
time.Now())
|
||||||
|
setUnixTime(client, 1)
|
||||||
|
err = client.Write([]telegraf.Metric{p2})
|
||||||
|
|
||||||
|
setUnixTime(client, 61)
|
||||||
|
fam, ok := client.fam["foo"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 2, len(fam.Samples))
|
||||||
|
client.Expire()
|
||||||
|
require.Equal(t, 1, len(fam.Samples))
|
||||||
|
|
||||||
|
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
|
||||||
|
}
|
||||||
|
|
||||||
var pTesting *PrometheusClient
|
var pTesting *PrometheusClient
|
||||||
|
|
||||||
func TestPrometheusWritePointEmptyTag(t *testing.T) {
|
func TestPrometheusWritePointEmptyTag(t *testing.T) {
|
||||||
|
@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPrometheusExpireOldMetrics(t *testing.T) {
|
func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("Skipping integration test in short mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
pClient, p, err := setupPrometheus()
|
|
||||||
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer pClient.Stop()
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
tags := make(map[string]string)
|
|
||||||
pt1, _ := metric.New(
|
|
||||||
"test_point_1",
|
|
||||||
tags,
|
|
||||||
map[string]interface{}{"value": 0.0},
|
|
||||||
now)
|
|
||||||
var metrics = []telegraf.Metric{pt1}
|
|
||||||
require.NoError(t, pClient.Write(metrics))
|
|
||||||
|
|
||||||
for _, m := range pClient.metrics {
|
|
||||||
m.Expiration = now.Add(time.Duration(-15) * time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
pt2, _ := metric.New(
|
|
||||||
"test_point_2",
|
|
||||||
tags,
|
|
||||||
map[string]interface{}{"value": 1.0},
|
|
||||||
now)
|
|
||||||
var metrics2 = []telegraf.Metric{pt2}
|
|
||||||
require.NoError(t, pClient.Write(metrics2))
|
|
||||||
|
|
||||||
expected := []struct {
|
|
||||||
name string
|
|
||||||
value float64
|
|
||||||
tags map[string]string
|
|
||||||
}{
|
|
||||||
{"test_point_2", 1.0, tags},
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
require.NoError(t, p.Gather(&acc))
|
|
||||||
for _, e := range expected {
|
|
||||||
acc.AssertContainsFields(t, e.name,
|
|
||||||
map[string]interface{}{"value": e.value})
|
|
||||||
}
|
|
||||||
|
|
||||||
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
|
|
||||||
|
|
||||||
// Confirm that it's not in the PrometheusClient map anymore
|
|
||||||
assert.Equal(t, 1, len(pClient.metrics))
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
|
|
||||||
if pTesting == nil {
|
if pTesting == nil {
|
||||||
pTesting = &PrometheusClient{Listen: "localhost:9127"}
|
pTesting = NewClient()
|
||||||
|
pTesting.Listen = "localhost:9127"
|
||||||
err := pTesting.Start()
|
err := pTesting.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTesting.metrics = make(map[string]*MetricWithExpiration)
|
pTesting.fam = make(map[string]*MetricFamily)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 200)
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
|
||||||
p := &prometheus.Prometheus{
|
p := &prometheus_input.Prometheus{
|
||||||
Urls: []string{"http://localhost:9127/metrics"},
|
Urls: []string{"http://localhost:9127/metrics"},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue