2015-10-22 16:17:57 +00:00
|
|
|
package prometheus_client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2015-12-01 17:08:38 +00:00
|
|
|
"log"
|
2015-10-28 22:19:13 +00:00
|
|
|
"net/http"
|
2016-03-22 16:34:33 +00:00
|
|
|
"regexp"
|
2016-07-10 13:47:47 +00:00
|
|
|
"sync"
|
2016-11-15 11:33:39 +00:00
|
|
|
"time"
|
2015-10-28 22:19:13 +00:00
|
|
|
|
2016-01-27 21:21:36 +00:00
|
|
|
"github.com/influxdata/telegraf"
|
2016-11-15 11:33:39 +00:00
|
|
|
"github.com/influxdata/telegraf/internal"
|
2017-02-04 15:58:02 +00:00
|
|
|
"github.com/influxdata/telegraf/registry/outputs"
|
2015-10-22 16:17:57 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
)
|
|
|
|
|
2016-07-20 08:24:34 +00:00
|
|
|
var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
|
2016-03-22 16:34:33 +00:00
|
|
|
|
2016-11-15 11:33:39 +00:00
|
|
|
type MetricWithExpiration struct {
|
|
|
|
Metric prometheus.Metric
|
|
|
|
Expiration time.Time
|
|
|
|
}
|
|
|
|
|
2015-10-22 16:17:57 +00:00
|
|
|
type PrometheusClient struct {
|
2016-11-15 11:33:39 +00:00
|
|
|
Listen string
|
|
|
|
ExpirationInterval internal.Duration `toml:"expiration_interval"`
|
2016-07-10 13:47:47 +00:00
|
|
|
|
2016-11-15 11:33:39 +00:00
|
|
|
metrics map[string]*MetricWithExpiration
|
2016-07-10 13:47:47 +00:00
|
|
|
|
|
|
|
sync.Mutex
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var sampleConfig = `
|
2016-02-18 21:26:51 +00:00
|
|
|
## Address to listen on
|
2015-10-22 16:17:57 +00:00
|
|
|
# listen = ":9126"
|
2016-11-15 11:33:39 +00:00
|
|
|
|
|
|
|
## Interval to expire metrics and not deliver to prometheus, 0 == no expiration
|
|
|
|
# expiration_interval = "60s"
|
2015-10-22 16:17:57 +00:00
|
|
|
`
|
|
|
|
|
|
|
|
func (p *PrometheusClient) Start() error {
|
2016-11-15 11:33:39 +00:00
|
|
|
p.metrics = make(map[string]*MetricWithExpiration)
|
2016-09-12 09:09:13 +00:00
|
|
|
prometheus.Register(p)
|
2016-06-10 16:18:38 +00:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
// recovering from panic here because there is no way to stop a
|
|
|
|
// running http go server except by a kill signal. Since the server
|
|
|
|
// does not stop on SIGHUP, Start() will panic when the process
|
|
|
|
// is reloaded.
|
|
|
|
}
|
|
|
|
}()
|
2015-10-22 16:17:57 +00:00
|
|
|
if p.Listen == "" {
|
2015-10-28 22:19:13 +00:00
|
|
|
p.Listen = "localhost:9126"
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
2015-10-28 22:19:13 +00:00
|
|
|
|
2015-10-22 16:17:57 +00:00
|
|
|
http.Handle("/metrics", prometheus.Handler())
|
|
|
|
server := &http.Server{
|
2015-10-28 22:19:13 +00:00
|
|
|
Addr: p.Listen,
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
2015-10-28 22:19:13 +00:00
|
|
|
|
|
|
|
go server.ListenAndServe()
|
2015-10-22 16:17:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PrometheusClient) Stop() {
|
|
|
|
// TODO: Use a listener for http.Server that counts active connections
|
|
|
|
// that can be stopped and closed gracefully
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PrometheusClient) Connect() error {
|
|
|
|
// This service output does not need to make any further connections
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PrometheusClient) Close() error {
|
|
|
|
// This service output does not need to close any of its connections
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PrometheusClient) SampleConfig() string {
|
|
|
|
return sampleConfig
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PrometheusClient) Description() string {
|
|
|
|
return "Configuration for the Prometheus client to spawn"
|
|
|
|
}
|
|
|
|
|
2016-07-10 13:47:47 +00:00
|
|
|
// Implements prometheus.Collector
|
|
|
|
func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
|
|
|
|
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Implements prometheus.Collector
|
|
|
|
func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
|
|
|
|
p.Lock()
|
|
|
|
defer p.Unlock()
|
|
|
|
|
2016-11-15 11:33:39 +00:00
|
|
|
for key, m := range p.metrics {
|
|
|
|
if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
|
|
|
|
delete(p.metrics, key)
|
|
|
|
} else {
|
|
|
|
ch <- m.Metric
|
2016-09-16 14:43:53 +00:00
|
|
|
}
|
2016-07-10 13:47:47 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
|
2016-07-10 13:47:47 +00:00
|
|
|
p.Lock()
|
|
|
|
defer p.Unlock()
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
if len(metrics) == 0 {
|
2015-10-22 16:17:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-01-27 23:15:14 +00:00
|
|
|
for _, point := range metrics {
|
2015-12-01 17:08:38 +00:00
|
|
|
key := point.Name()
|
2016-07-18 10:45:25 +00:00
|
|
|
key = invalidNameCharRE.ReplaceAllString(key, "_")
|
2015-10-22 16:17:57 +00:00
|
|
|
|
2016-08-30 17:09:48 +00:00
|
|
|
// convert tags into prometheus labels
|
2016-03-22 16:34:33 +00:00
|
|
|
var labels []string
|
2015-10-22 16:17:57 +00:00
|
|
|
l := prometheus.Labels{}
|
2016-03-22 16:34:33 +00:00
|
|
|
for k, v := range point.Tags() {
|
2016-07-18 10:45:25 +00:00
|
|
|
k = invalidNameCharRE.ReplaceAllString(k, "_")
|
2016-03-22 16:34:33 +00:00
|
|
|
if len(k) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
labels = append(labels, k)
|
|
|
|
l[k] = v
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
|
|
|
|
2016-08-30 17:09:48 +00:00
|
|
|
// Get a type if it's available, defaulting to Untyped
|
|
|
|
var mType prometheus.ValueType
|
|
|
|
switch point.Type() {
|
|
|
|
case telegraf.Counter:
|
|
|
|
mType = prometheus.CounterValue
|
|
|
|
case telegraf.Gauge:
|
|
|
|
mType = prometheus.GaugeValue
|
|
|
|
default:
|
|
|
|
mType = prometheus.UntypedValue
|
|
|
|
}
|
|
|
|
|
fix prometheus output
if i understand the prometheus data model correctly, the current output
for this plugin is unusable
prometheus only accepts a single value per measurement. prior to this change, the range loop
causes a measurement to end up w/ a random value
for instance:
net,dc=sjc1,grp_dashboard=1,grp_home=1,grp_hwy_fetcher=1,grp_web_admin=1,host=sjc1-b4-8,hw=app,interface=docker0,state=live
bytes_recv=477596i,bytes_sent=152963303i,drop_in=0i,drop_out=0i,err_in=0i,err_out=0i,packets_recv=7231i,packets_sent=11460i
1457121990003778992
this 'net' measurent would have all it's tags copied to prometheus
labels, but any of 152963303, or 0, or 7231 as a value for
'net' depending on which field is last in the map iteration
this change expands the fields into new measurements by appending
the field name to the influxdb measurement name.
ie, the above example results with 'net' dropped and new measurements
to take it's place:
net_bytes_recv
net_bytes_sent
net_drop_in
net_err_in
net_packets_recv
net_packets_sent
i hope this can be merged, i love telegraf's composability of tags and
filtering
2016-03-04 20:05:10 +00:00
|
|
|
for n, val := range point.Fields() {
|
2016-03-23 14:57:05 +00:00
|
|
|
// Ignore string and bool fields.
|
|
|
|
switch val.(type) {
|
|
|
|
case string:
|
|
|
|
continue
|
|
|
|
case bool:
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// sanitize the measurement name
|
2016-07-18 10:45:25 +00:00
|
|
|
n = invalidNameCharRE.ReplaceAllString(n, "_")
|
2016-03-08 18:33:57 +00:00
|
|
|
var mname string
|
|
|
|
if n == "value" {
|
|
|
|
mname = key
|
|
|
|
} else {
|
|
|
|
mname = fmt.Sprintf("%s_%s", key, n)
|
|
|
|
}
|
2016-03-22 16:34:33 +00:00
|
|
|
|
2016-07-10 13:47:47 +00:00
|
|
|
desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
|
|
|
|
var metric prometheus.Metric
|
|
|
|
var err error
|
2016-08-30 17:09:48 +00:00
|
|
|
|
|
|
|
// switch for field type
|
2015-12-01 17:08:38 +00:00
|
|
|
switch val := val.(type) {
|
2015-10-22 16:17:57 +00:00
|
|
|
case int64:
|
2016-08-30 17:09:48 +00:00
|
|
|
metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
|
2015-10-22 16:17:57 +00:00
|
|
|
case float64:
|
2016-08-30 17:09:48 +00:00
|
|
|
metric, err = prometheus.NewConstMetric(desc, mType, val)
|
2016-03-23 14:57:05 +00:00
|
|
|
default:
|
|
|
|
continue
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
2016-07-10 13:47:47 +00:00
|
|
|
if err != nil {
|
2016-09-30 21:37:56 +00:00
|
|
|
log.Printf("E! Error creating prometheus metric, "+
|
2016-07-10 13:47:47 +00:00
|
|
|
"key: %s, labels: %v,\nerr: %s\n",
|
|
|
|
mname, l, err.Error())
|
|
|
|
}
|
2016-11-15 11:33:39 +00:00
|
|
|
|
|
|
|
p.metrics[desc.String()] = &MetricWithExpiration{
|
|
|
|
Metric: metric,
|
|
|
|
Expiration: time.Now().Add(p.ExpirationInterval.Duration),
|
|
|
|
}
|
2015-10-22 16:17:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
2016-01-27 21:21:36 +00:00
|
|
|
outputs.Add("prometheus_client", func() telegraf.Output {
|
2016-11-15 11:33:39 +00:00
|
|
|
return &PrometheusClient{
|
|
|
|
ExpirationInterval: internal.Duration{Duration: time.Second * 60},
|
|
|
|
}
|
2015-10-22 16:17:57 +00:00
|
|
|
})
|
|
|
|
}
|