Fix prometheus passthrough for existing value types (#3351)
This commit is contained in:
parent
9b59cdd10e
commit
6e5915c59f
|
@ -86,7 +86,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
||||||
} else {
|
} else {
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
}
|
}
|
||||||
metric, err := metric.New(metricName, tags, fields, t)
|
metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType()))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
metrics = append(metrics, metric)
|
metrics = append(metrics, metric)
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,17 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
||||||
return metrics, err
|
return metrics, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func valueType(mt dto.MetricType) telegraf.ValueType {
|
||||||
|
switch mt {
|
||||||
|
case dto.MetricType_COUNTER:
|
||||||
|
return telegraf.Counter
|
||||||
|
case dto.MetricType_GAUGE:
|
||||||
|
return telegraf.Gauge
|
||||||
|
default:
|
||||||
|
return telegraf.Untyped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get Quantiles from summary metric
|
// Get Quantiles from summary metric
|
||||||
func makeQuantiles(m *dto.Metric) map[string]interface{} {
|
func makeQuantiles(m *dto.Metric) map[string]interface{} {
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
|
|
|
@ -218,8 +218,16 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
|
||||||
if url.Address != "" {
|
if url.Address != "" {
|
||||||
tags["address"] = url.Address
|
tags["address"] = url.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch metric.Type() {
|
||||||
|
case telegraf.Counter:
|
||||||
|
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
|
case telegraf.Gauge:
|
||||||
|
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
|
default:
|
||||||
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -15,6 +16,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||||
|
@ -46,6 +48,7 @@ type PrometheusClient struct {
|
||||||
Listen string
|
Listen string
|
||||||
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
||||||
Path string `toml:"path"`
|
Path string `toml:"path"`
|
||||||
|
CollectorsExclude []string `toml:"collectors_exclude"`
|
||||||
|
|
||||||
server *http.Server
|
server *http.Server
|
||||||
|
|
||||||
|
@ -62,11 +65,26 @@ var sampleConfig = `
|
||||||
|
|
||||||
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
|
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
|
||||||
# expiration_interval = "60s"
|
# expiration_interval = "60s"
|
||||||
|
|
||||||
|
## Collectors to enable, valid entries are "gocollector" and "process".
|
||||||
|
## If unset, both are enabled.
|
||||||
|
collectors_exclude = ["gocollector", "process"]
|
||||||
`
|
`
|
||||||
|
|
||||||
func (p *PrometheusClient) Start() error {
|
func (p *PrometheusClient) Start() error {
|
||||||
prometheus.Register(p)
|
prometheus.Register(p)
|
||||||
|
|
||||||
|
for _, collector := range p.CollectorsExclude {
|
||||||
|
switch collector {
|
||||||
|
case "gocollector":
|
||||||
|
prometheus.Unregister(prometheus.NewGoCollector())
|
||||||
|
case "process":
|
||||||
|
prometheus.Unregister(prometheus.NewProcessCollector(os.Getpid(), ""))
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unrecognized collector %s", collector)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if p.Listen == "" {
|
if p.Listen == "" {
|
||||||
p.Listen = "localhost:9273"
|
p.Listen = "localhost:9273"
|
||||||
}
|
}
|
||||||
|
@ -76,7 +94,9 @@ func (p *PrometheusClient) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle(p.Path, prometheus.Handler())
|
mux.Handle(p.Path, promhttp.HandlerFor(
|
||||||
|
prometheus.DefaultGatherer,
|
||||||
|
promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
|
||||||
|
|
||||||
p.server = &http.Server{
|
p.server = &http.Server{
|
||||||
Addr: p.Listen,
|
Addr: p.Listen,
|
||||||
|
@ -243,11 +263,23 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
||||||
// Special handling of value field; supports passthrough from
|
// Special handling of value field; supports passthrough from
|
||||||
// the prometheus input.
|
// the prometheus input.
|
||||||
var mname string
|
var mname string
|
||||||
|
switch point.Type() {
|
||||||
|
case telegraf.Counter:
|
||||||
|
if fn == "counter" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
}
|
||||||
|
case telegraf.Gauge:
|
||||||
|
if fn == "gauge" {
|
||||||
|
mname = sanitize(point.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if mname == "" {
|
||||||
if fn == "value" {
|
if fn == "value" {
|
||||||
mname = sanitize(point.Name())
|
mname = sanitize(point.Name())
|
||||||
} else {
|
} else {
|
||||||
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var fam *MetricFamily
|
var fam *MetricFamily
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
|
@ -107,21 +107,69 @@ func TestWrite_SkipNonNumberField(t *testing.T) {
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_Counter(t *testing.T) {
|
func TestWrite_Counters(t *testing.T) {
|
||||||
client := NewClient()
|
type args struct {
|
||||||
|
measurement string
|
||||||
p1, err := metric.New(
|
tags map[string]string
|
||||||
"foo",
|
fields map[string]interface{}
|
||||||
make(map[string]string),
|
valueType telegraf.ValueType
|
||||||
map[string]interface{}{"value": 42},
|
}
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
err error
|
||||||
|
metricName string
|
||||||
|
promType prometheus.ValueType
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "field named value is not added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"value": 42},
|
||||||
|
valueType: telegraf.Counter,
|
||||||
|
},
|
||||||
|
metricName: "foo",
|
||||||
|
promType: prometheus.CounterValue,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "field named counter is not added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"counter": 42},
|
||||||
|
valueType: telegraf.Counter,
|
||||||
|
},
|
||||||
|
metricName: "foo",
|
||||||
|
promType: prometheus.CounterValue,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "field with any other name is added to metric name",
|
||||||
|
args: args{
|
||||||
|
measurement: "foo",
|
||||||
|
fields: map[string]interface{}{"other": 42},
|
||||||
|
valueType: telegraf.Counter,
|
||||||
|
},
|
||||||
|
metricName: "foo_other",
|
||||||
|
promType: prometheus.CounterValue,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
m, err := metric.New(
|
||||||
|
tt.args.measurement,
|
||||||
|
tt.args.tags,
|
||||||
|
tt.args.fields,
|
||||||
time.Now(),
|
time.Now(),
|
||||||
telegraf.Counter)
|
tt.args.valueType,
|
||||||
err = client.Write([]telegraf.Metric{p1})
|
)
|
||||||
require.NoError(t, err)
|
client := NewClient()
|
||||||
|
err = client.Write([]telegraf.Metric{m})
|
||||||
|
require.Equal(t, tt.err, err)
|
||||||
|
|
||||||
fam, ok := client.fam["foo"]
|
fam, ok := client.fam[tt.metricName]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, prometheus.CounterValue, fam.ValueType)
|
require.Equal(t, tt.promType, fam.ValueType)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_Sanitize(t *testing.T) {
|
func TestWrite_Sanitize(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue