fix prometheus output

if i understand the prometheus data model correctly, the current output
for this plugin is unusable

prometheus only accepts a single value per measurement. prior to this change, the range loop
causes a measurement to end up w/ a random value

for instance:

net,dc=sjc1,grp_dashboard=1,grp_home=1,grp_hwy_fetcher=1,grp_web_admin=1,host=sjc1-b4-8,hw=app,interface=docker0,state=live
bytes_recv=477596i,bytes_sent=152963303i,drop_in=0i,drop_out=0i,err_in=0i,err_out=0i,packets_recv=7231i,packets_sent=11460i
1457121990003778992

this 'net' measurent  would have all it's tags copied to prometheus
labels, but any of 152963303, or 0, or 7231 as a value for
'net' depending on which field is last in the map iteration

this change expands the fields into new measurements by appending
the field name to the influxdb measurement name.

ie, the above example results with 'net' dropped and new measurements
to take it's place:
	net_bytes_recv
	net_bytes_sent
	net_drop_in
	net_err_in
	net_packets_recv
	net_packets_sent

i hope this can be merged, i love telegraf's composability of tags and
filtering
This commit is contained in:
david birdsong 2016-03-04 20:05:10 +00:00 committed by david birdsong
parent 35f1e28809
commit 13600366cf
2 changed files with 22 additions and 21 deletions

View File

@ -73,42 +73,43 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
} }
} }
if _, ok := p.metrics[key]; !ok {
p.metrics[key] = prometheus.NewUntypedVec(
prometheus.UntypedOpts{
Name: key,
Help: fmt.Sprintf("Telegraf collected point '%s'", key),
},
labels,
)
prometheus.MustRegister(p.metrics[key])
}
l := prometheus.Labels{} l := prometheus.Labels{}
for tk, tv := range point.Tags() { for tk, tv := range point.Tags() {
l[tk] = tv l[tk] = tv
} }
for _, val := range point.Fields() { for n, val := range point.Fields() {
mname := fmt.Sprintf("%s_%s", key, n)
if _, ok := p.metrics[mname]; !ok {
p.metrics[mname] = prometheus.NewUntypedVec(
prometheus.UntypedOpts{
Name: mname,
Help: fmt.Sprintf("Telegraf collected point '%s'", mname),
},
labels,
)
prometheus.MustRegister(p.metrics[mname])
}
switch val := val.(type) { switch val := val.(type) {
default: default:
log.Printf("Prometheus output, unsupported type. key: %s, type: %T\n", log.Printf("Prometheus output, unsupported type. key: %s, type: %T\n",
key, val) mname, val)
case int64: case int64:
m, err := p.metrics[key].GetMetricWith(l) m, err := p.metrics[mname].GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
key, l, err.Error()) mname, l, err.Error())
continue continue
} }
m.Set(float64(val)) m.Set(float64(val))
case float64: case float64:
m, err := p.metrics[key].GetMetricWith(l) m, err := p.metrics[mname].GetMetricWith(l)
if err != nil { if err != nil {
log.Printf("ERROR Getting metric in Prometheus output, "+ log.Printf("ERROR Getting metric in Prometheus output, "+
"key: %s, labels: %v,\nerr: %s\n", "key: %s, labels: %v,\nerr: %s\n",
key, l, err.Error()) mname, l, err.Error())
continue continue
} }
m.Set(val) m.Set(val)

View File

@ -46,8 +46,8 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
value float64 value float64
tags map[string]string tags map[string]string
}{ }{
{"test_point_1", 0.0, tags}, {"test_point_1_value", 0.0, tags},
{"test_point_2", 1.0, tags}, {"test_point_2_value", 1.0, tags},
} }
var acc testutil.Accumulator var acc testutil.Accumulator
@ -78,8 +78,8 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
name string name string
value float64 value float64
}{ }{
{"test_point_3", 0.0}, {"test_point_3_value", 0.0},
{"test_point_4", 1.0}, {"test_point_4_value", 1.0},
} }
require.NoError(t, p.Gather(&acc)) require.NoError(t, p.Gather(&acc))