Fix prometheus passthrough for existing value types (#3351)
This commit is contained in:
parent
66376d54a2
commit
01cfe1d505
|
@ -86,7 +86,7 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
|||
} else {
|
||||
t = time.Now()
|
||||
}
|
||||
metric, err := metric.New(metricName, tags, fields, t)
|
||||
metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType()))
|
||||
if err == nil {
|
||||
metrics = append(metrics, metric)
|
||||
}
|
||||
|
@ -97,6 +97,17 @@ func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) {
|
|||
return metrics, err
|
||||
}
|
||||
|
||||
func valueType(mt dto.MetricType) telegraf.ValueType {
|
||||
switch mt {
|
||||
case dto.MetricType_COUNTER:
|
||||
return telegraf.Counter
|
||||
case dto.MetricType_GAUGE:
|
||||
return telegraf.Gauge
|
||||
default:
|
||||
return telegraf.Untyped
|
||||
}
|
||||
}
|
||||
|
||||
// Get Quantiles from summary metric
|
||||
func makeQuantiles(m *dto.Metric) map[string]interface{} {
|
||||
fields := make(map[string]interface{})
|
||||
|
|
|
@ -218,7 +218,15 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
|
|||
if url.Address != "" {
|
||||
tags["address"] = url.Address
|
||||
}
|
||||
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||
|
||||
switch metric.Type() {
|
||||
case telegraf.Counter:
|
||||
acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||
case telegraf.Gauge:
|
||||
acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||
default:
|
||||
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
||||
|
@ -46,6 +48,7 @@ type PrometheusClient struct {
|
|||
Listen string
|
||||
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
||||
Path string `toml:"path"`
|
||||
CollectorsExclude []string `toml:"collectors_exclude"`
|
||||
|
||||
server *http.Server
|
||||
|
||||
|
@ -62,11 +65,26 @@ var sampleConfig = `
|
|||
|
||||
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
|
||||
# expiration_interval = "60s"
|
||||
|
||||
## Collectors to enable, valid entries are "gocollector" and "process".
|
||||
## If unset, both are enabled.
|
||||
collectors_exclude = ["gocollector", "process"]
|
||||
`
|
||||
|
||||
func (p *PrometheusClient) Start() error {
|
||||
prometheus.Register(p)
|
||||
|
||||
for _, collector := range p.CollectorsExclude {
|
||||
switch collector {
|
||||
case "gocollector":
|
||||
prometheus.Unregister(prometheus.NewGoCollector())
|
||||
case "process":
|
||||
prometheus.Unregister(prometheus.NewProcessCollector(os.Getpid(), ""))
|
||||
default:
|
||||
return fmt.Errorf("unrecognized collector %s", collector)
|
||||
}
|
||||
}
|
||||
|
||||
if p.Listen == "" {
|
||||
p.Listen = "localhost:9273"
|
||||
}
|
||||
|
@ -76,7 +94,9 @@ func (p *PrometheusClient) Start() error {
|
|||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(p.Path, prometheus.Handler())
|
||||
mux.Handle(p.Path, promhttp.HandlerFor(
|
||||
prometheus.DefaultGatherer,
|
||||
promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
|
||||
|
||||
p.server = &http.Server{
|
||||
Addr: p.Listen,
|
||||
|
@ -243,10 +263,22 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
|||
// Special handling of value field; supports passthrough from
|
||||
// the prometheus input.
|
||||
var mname string
|
||||
if fn == "value" {
|
||||
mname = sanitize(point.Name())
|
||||
} else {
|
||||
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||
switch point.Type() {
|
||||
case telegraf.Counter:
|
||||
if fn == "counter" {
|
||||
mname = sanitize(point.Name())
|
||||
}
|
||||
case telegraf.Gauge:
|
||||
if fn == "gauge" {
|
||||
mname = sanitize(point.Name())
|
||||
}
|
||||
}
|
||||
if mname == "" {
|
||||
if fn == "value" {
|
||||
mname = sanitize(point.Name())
|
||||
} else {
|
||||
mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
|
||||
}
|
||||
}
|
||||
|
||||
var fam *MetricFamily
|
||||
|
|
|
@ -107,21 +107,69 @@ func TestWrite_SkipNonNumberField(t *testing.T) {
|
|||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestWrite_Counter(t *testing.T) {
|
||||
client := NewClient()
|
||||
func TestWrite_Counters(t *testing.T) {
|
||||
type args struct {
|
||||
measurement string
|
||||
tags map[string]string
|
||||
fields map[string]interface{}
|
||||
valueType telegraf.ValueType
|
||||
}
|
||||
var tests = []struct {
|
||||
name string
|
||||
args args
|
||||
err error
|
||||
metricName string
|
||||
promType prometheus.ValueType
|
||||
}{
|
||||
{
|
||||
name: "field named value is not added to metric name",
|
||||
args: args{
|
||||
measurement: "foo",
|
||||
fields: map[string]interface{}{"value": 42},
|
||||
valueType: telegraf.Counter,
|
||||
},
|
||||
metricName: "foo",
|
||||
promType: prometheus.CounterValue,
|
||||
},
|
||||
{
|
||||
name: "field named counter is not added to metric name",
|
||||
args: args{
|
||||
measurement: "foo",
|
||||
fields: map[string]interface{}{"counter": 42},
|
||||
valueType: telegraf.Counter,
|
||||
},
|
||||
metricName: "foo",
|
||||
promType: prometheus.CounterValue,
|
||||
},
|
||||
{
|
||||
name: "field with any other name is added to metric name",
|
||||
args: args{
|
||||
measurement: "foo",
|
||||
fields: map[string]interface{}{"other": 42},
|
||||
valueType: telegraf.Counter,
|
||||
},
|
||||
metricName: "foo_other",
|
||||
promType: prometheus.CounterValue,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m, err := metric.New(
|
||||
tt.args.measurement,
|
||||
tt.args.tags,
|
||||
tt.args.fields,
|
||||
time.Now(),
|
||||
tt.args.valueType,
|
||||
)
|
||||
client := NewClient()
|
||||
err = client.Write([]telegraf.Metric{m})
|
||||
require.Equal(t, tt.err, err)
|
||||
|
||||
p1, err := metric.New(
|
||||
"foo",
|
||||
make(map[string]string),
|
||||
map[string]interface{}{"value": 42},
|
||||
time.Now(),
|
||||
telegraf.Counter)
|
||||
err = client.Write([]telegraf.Metric{p1})
|
||||
require.NoError(t, err)
|
||||
|
||||
fam, ok := client.fam["foo"]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, prometheus.CounterValue, fam.ValueType)
|
||||
fam, ok := client.fam[tt.metricName]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tt.promType, fam.ValueType)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrite_Sanitize(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue