Ensure prometheus metrics have same set of labels (#2857)

This commit is contained in:
Daniel Nelson 2017-06-13 18:04:26 -07:00 committed by GitHub
parent 246f342e6a
commit 949072e8dc
2 changed files with 457 additions and 131 deletions

View File

@ -6,6 +6,8 @@ import (
"log" "log"
"net/http" "net/http"
"regexp" "regexp"
"sort"
"strings"
"sync" "sync"
"time" "time"
@ -17,19 +19,40 @@ import (
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
type MetricWithExpiration struct { // SampleID uniquely identifies a Sample
Metric prometheus.Metric type SampleID string
// Sample represents the current value of a series.
type Sample struct {
// Labels are the Prometheus labels.
Labels map[string]string
// Value is the value in the Prometheus output.
Value float64
// Expiration is the deadline that this Sample is valid until.
Expiration time.Time Expiration time.Time
} }
// MetricFamily contains the data required to build valid prometheus Metrics.
type MetricFamily struct {
// Samples are the Sample belonging to this MetricFamily.
Samples map[SampleID]*Sample
// Type of the Value.
ValueType prometheus.ValueType
// LabelSet is the label counts for all Samples.
LabelSet map[string]int
}
type PrometheusClient struct { type PrometheusClient struct {
Listen string Listen string
ExpirationInterval internal.Duration `toml:"expiration_interval"` ExpirationInterval internal.Duration `toml:"expiration_interval"`
server *http.Server server *http.Server
metrics map[string]*MetricWithExpiration
sync.Mutex sync.Mutex
// fam is the non-expired MetricFamily by Prometheus metric name.
fam map[string]*MetricFamily
// now returns the current time.
now func() time.Time
} }
var sampleConfig = ` var sampleConfig = `
@ -41,7 +64,6 @@ var sampleConfig = `
` `
func (p *PrometheusClient) Start() error { func (p *PrometheusClient) Start() error {
p.metrics = make(map[string]*MetricWithExpiration)
prometheus.Register(p) prometheus.Register(p)
if p.Listen == "" { if p.Listen == "" {
@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch) prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
} }
// Implements prometheus.Collector // Expire removes Samples that have expired.
func (p *PrometheusClient) Expire() {
now := p.now()
for name, family := range p.fam {
for key, sample := range family.Samples {
if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
for k, _ := range sample.Labels {
family.LabelSet[k]--
}
delete(family.Samples, key)
if len(family.Samples) == 0 {
delete(p.fam, name)
}
}
}
}
}
// Collect implements prometheus.Collector
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) { func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
for key, m := range p.metrics { p.Expire()
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
delete(p.metrics, key) for name, family := range p.fam {
} else { // Get list of all labels on MetricFamily
ch <- m.Metric var labelNames []string
for k, v := range family.LabelSet {
if v > 0 {
labelNames = append(labelNames, k)
} }
} }
desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
for _, sample := range family.Samples {
// Get labels for this sample; unset labels will be set to the
// empty string
var labels []string
for _, label := range labelNames {
v := sample.Labels[label]
labels = append(labels, v)
}
metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
if err != nil {
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
name, labels, err.Error())
}
ch <- metric
}
}
}
func sanitize(value string) string {
return invalidNameCharRE.ReplaceAllString(value, "_")
}
func valueType(tt telegraf.ValueType) prometheus.ValueType {
switch tt {
case telegraf.Counter:
return prometheus.CounterValue
case telegraf.Gauge:
return prometheus.GaugeValue
default:
return prometheus.UntypedValue
}
}
// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
func CreateSampleID(tags map[string]string) SampleID {
pairs := make([]string, 0, len(tags))
for k, v := range tags {
pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(pairs)
return SampleID(strings.Join(pairs, ","))
} }
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error { func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
if len(metrics) == 0 { now := p.now()
return nil
}
for _, point := range metrics { for _, point := range metrics {
key := point.Name() tags := point.Tags()
key = invalidNameCharRE.ReplaceAllString(key, "_") vt := valueType(point.Type())
sampleID := CreateSampleID(tags)
// convert tags into prometheus labels labels := make(map[string]string)
var labels []string for k, v := range tags {
l := prometheus.Labels{} labels[sanitize(k)] = sanitize(v)
for k, v := range point.Tags() {
k = invalidNameCharRE.ReplaceAllString(k, "_")
if len(k) == 0 {
continue
}
labels = append(labels, k)
l[k] = v
} }
// Get a type if it's available, defaulting to Untyped for fn, fv := range point.Fields() {
var mType prometheus.ValueType
switch point.Type() {
case telegraf.Counter:
mType = prometheus.CounterValue
case telegraf.Gauge:
mType = prometheus.GaugeValue
default:
mType = prometheus.UntypedValue
}
for n, val := range point.Fields() {
// Ignore string and bool fields. // Ignore string and bool fields.
switch val.(type) { var value float64
case string: switch fv := fv.(type) {
continue
case bool:
continue
}
// sanitize the measurement name
n = invalidNameCharRE.ReplaceAllString(n, "_")
var mname string
if n == "value" {
mname = key
} else {
mname = fmt.Sprintf("%s_%s", key, n)
}
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
var metric prometheus.Metric
var err error
// switch for field type
switch val := val.(type) {
case int64: case int64:
metric, err = prometheus.NewConstMetric(desc, mType, float64(val)) value = float64(fv)
case float64: case float64:
metric, err = prometheus.NewConstMetric(desc, mType, val) value = fv
default: default:
continue continue
} }
if err != nil {
log.Printf("E! Error creating prometheus metric, "+ sample := &Sample{
"key: %s, labels: %v,\nerr: %s\n", Labels: labels,
mname, l, err.Error()) Value: value,
Expiration: now.Add(p.ExpirationInterval.Duration),
} }
p.metrics[desc.String()] = &MetricWithExpiration{ // Special handling of value field; supports passthrough from
Metric: metric, // the prometheus input.
Expiration: time.Now().Add(p.ExpirationInterval.Duration), var mname string
if fn == "value" {
mname = sanitize(point.Name())
} else {
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
} }
var fam *MetricFamily
var ok bool
if fam, ok = p.fam[mname]; !ok {
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
ValueType: vt,
LabelSet: make(map[string]int),
}
p.fam[mname] = fam
} else {
if fam.ValueType != vt {
// Don't return an error since this would be a permanent error
log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
break
}
}
for k, _ := range sample.Labels {
fam.LabelSet[k]++
}
fam.Samples[sampleID] = sample
} }
} }
return nil return nil
@ -187,6 +266,8 @@ func init() {
outputs.Add("prometheus_client", func() telegraf.Output { outputs.Add("prometheus_client", func() telegraf.Output {
return &PrometheusClient{ return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60}, ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
} }
}) })
} }

View File

@ -4,16 +4,314 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs/prometheus" prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
) )
func setUnixTime(client *PrometheusClient, sec int64) {
client.now = func() time.Time {
return time.Unix(sec, 0)
}
}
// NewClient initializes a PrometheusClient.
func NewClient() *PrometheusClient {
return &PrometheusClient{
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
fam: make(map[string]*MetricFamily),
now: time.Now,
}
}
func TestWrite_Basic(t *testing.T) {
now := time.Now()
pt1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{
pt1,
}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{}, fam.LabelSet)
sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
require.True(t, ok)
require.Equal(t, 0.0, sample.Value)
require.True(t, now.Before(sample.Expiration))
}
func TestWrite_IntField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 42.0, v.Value)
}
}
func TestWrite_FieldNotValue(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"howdy": 0.0},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_howdy"]
require.True(t, ok)
for _, v := range fam.Samples {
require.Equal(t, 0.0, v.Value)
}
}
func TestWrite_SkipNonNumberField(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": "howdy"},
time.Now())
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
_, ok := client.fam["foo"]
require.False(t, ok)
}
func TestWrite_Counter(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.CounterValue, fam.ValueType)
}
func TestWrite_Sanitize(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo.bar",
map[string]string{"tag-with-dash": "localhost.local"},
map[string]interface{}{"field-with-dash": 42},
time.Now(),
telegraf.Counter)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo_bar_field_with_dash"]
require.True(t, ok)
require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, map[string]string{
"tag_with_dash": "localhost_local"}, sample1.Labels)
}
func TestWrite_Gauge(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Gauge)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.GaugeValue, fam.ValueType)
}
func TestWrite_MixedValueType(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now,
telegraf.Counter)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
now,
telegraf.Gauge)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 1, len(fam.Samples))
}
func TestWrite_Tags(t *testing.T) {
now := time.Now()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
now)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
now)
var metrics = []telegraf.Metric{p1, p2}
client := NewClient()
err = client.Write(metrics)
require.NoError(t, err)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, prometheus.UntypedValue, fam.ValueType)
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
require.True(t, ok)
require.Equal(t, 1.0, sample1.Value)
require.True(t, now.Before(sample1.Expiration))
sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
require.True(t, ok)
require.Equal(t, 2.0, sample2.Value)
require.True(t, now.Before(sample2.Expiration))
}
func TestExpire(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"bar",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
require.Equal(t, 2, len(client.fam))
client.Expire()
require.Equal(t, 1, len(client.fam))
}
func TestExpire_TagsNoDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
}
func TestExpire_TagsWithDecrement(t *testing.T) {
client := NewClient()
p1, err := metric.New(
"foo",
map[string]string{"host": "localhost"},
map[string]interface{}{"value": 1.0},
time.Now())
setUnixTime(client, 0)
err = client.Write([]telegraf.Metric{p1})
require.NoError(t, err)
p2, err := metric.New(
"foo",
make(map[string]string),
map[string]interface{}{"value": 2.0},
time.Now())
setUnixTime(client, 1)
err = client.Write([]telegraf.Metric{p2})
setUnixTime(client, 61)
fam, ok := client.fam["foo"]
require.True(t, ok)
require.Equal(t, 2, len(fam.Samples))
client.Expire()
require.Equal(t, 1, len(fam.Samples))
require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
}
var pTesting *PrometheusClient var pTesting *PrometheusClient
func TestPrometheusWritePointEmptyTag(t *testing.T) { func TestPrometheusWritePointEmptyTag(t *testing.T) {
@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
} }
} }
func TestPrometheusExpireOldMetrics(t *testing.T) { func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
pClient, p, err := setupPrometheus()
pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
require.NoError(t, err)
defer pClient.Stop()
now := time.Now()
tags := make(map[string]string)
pt1, _ := metric.New(
"test_point_1",
tags,
map[string]interface{}{"value": 0.0},
now)
var metrics = []telegraf.Metric{pt1}
require.NoError(t, pClient.Write(metrics))
for _, m := range pClient.metrics {
m.Expiration = now.Add(time.Duration(-15) * time.Second)
}
pt2, _ := metric.New(
"test_point_2",
tags,
map[string]interface{}{"value": 1.0},
now)
var metrics2 = []telegraf.Metric{pt2}
require.NoError(t, pClient.Write(metrics2))
expected := []struct {
name string
value float64
tags map[string]string
}{
{"test_point_2", 1.0, tags},
}
var acc testutil.Accumulator
require.NoError(t, p.Gather(&acc))
for _, e := range expected {
acc.AssertContainsFields(t, e.name,
map[string]interface{}{"value": e.value})
}
acc.AssertDoesNotContainMeasurement(t, "test_point_1")
// Confirm that it's not in the PrometheusClient map anymore
assert.Equal(t, 1, len(pClient.metrics))
}
func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
if pTesting == nil { if pTesting == nil {
pTesting = &PrometheusClient{Listen: "localhost:9127"} pTesting = NewClient()
pTesting.Listen = "localhost:9127"
err := pTesting.Start() err := pTesting.Start()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
} else { } else {
pTesting.metrics = make(map[string]*MetricWithExpiration) pTesting.fam = make(map[string]*MetricFamily)
} }
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
p := &prometheus.Prometheus{ p := &prometheus_input.Prometheus{
Urls: []string{"http://localhost:9127/metrics"}, Urls: []string{"http://localhost:9127/metrics"},
} }