diff --git a/agent.go b/agent.go index 7ac924f63..00c8d3bc6 100644 --- a/agent.go +++ b/agent.go @@ -5,18 +5,21 @@ import ( "log" "net/url" "os" + "regexp" "sort" "sync" "time" "github.com/influxdb/influxdb/client" "github.com/influxdb/telegraf/plugins" + "github.com/prometheus/client_golang/prometheus" ) type runningPlugin struct { name string plugin plugins.Plugin config *ConfiguredPlugin + mu sync.Mutex } type Agent struct { @@ -53,6 +56,7 @@ func NewAgent(config *Config) (*Agent, error) { } config.Tags["host"] = agent.Hostname + prometheus.MustRegister(agent) return agent, nil } @@ -97,7 +101,7 @@ func (a *Agent) LoadPlugins() ([]string, error) { return nil, err } - a.plugins = append(a.plugins, &runningPlugin{name, plugin, config}) + a.plugins = append(a.plugins, &runningPlugin{name, plugin, config, sync.Mutex{}}) names = append(names, name) } @@ -125,7 +129,9 @@ func (a *Agent) crankParallel() error { acc.Prefix = plugin.name + "_" acc.Config = plugin.config + plugin.mu.Lock() plugin.plugin.Gather(&acc) + plugin.mu.Unlock() points <- &acc }(plugin) @@ -156,7 +162,9 @@ func (a *Agent) crank() error { for _, plugin := range a.plugins { acc.Prefix = plugin.name + "_" acc.Config = plugin.config + plugin.mu.Lock() err := plugin.plugin.Gather(&acc) + plugin.mu.Unlock() if err != nil { return err } @@ -180,7 +188,9 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err acc.Prefix = plugin.name + "_" acc.Config = plugin.config + plugin.mu.Lock() err := plugin.plugin.Gather(&acc) + plugin.mu.Unlock() if err != nil { return err } @@ -200,6 +210,56 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err } } +// Implements prometheus.Collector +func (a *Agent) Describe(ch chan<- *prometheus.Desc) { + prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch) +} + +// Implements prometheus.Collector +func (a *Agent) Collect(ch chan<- prometheus.Metric) { + var wg sync.WaitGroup + + invalidNameCharRE := regexp.MustCompile(`[^a-zA-Z0-9_]`) + + for _, plugin := range a.plugins { + wg.Add(1) + go func(plugin *runningPlugin) { + defer wg.Done() + var acc BatchPoints + acc.Prefix = plugin.name + "_" + + plugin.mu.Lock() + err := plugin.plugin.Gather(&acc) + plugin.mu.Unlock() + if err != nil { + return + } + + for _, point := range acc.Points { + var value float64 + switch point.Fields["value"].(type) { + case float64, int64: + value = point.Fields["value"].(float64) + default: + continue + } + tags := map[string]string{} + for k, v := range point.Tags { + tags[invalidNameCharRE.ReplaceAllString(k, "_")] = v + } + desc := prometheus.NewDesc(invalidNameCharRE.ReplaceAllString(point.Measurement, "_"), point.Measurement, nil, tags) + metric, err := prometheus.NewConstMetric(desc, prometheus.UntypedValue, value) + if err == nil { + ch <- metric + } + } + + }(plugin) + } + + wg.Wait() +} + func (a *Agent) TestAllPlugins() error { var names []string diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index f36693ebc..f4cd6c693 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -4,12 +4,14 @@ import ( "flag" "fmt" "log" + "net/http" "os" "os/signal" "strings" "github.com/influxdb/telegraf" _ "github.com/influxdb/telegraf/plugins/all" + "github.com/prometheus/client_golang/prometheus" ) var fDebug = flag.Bool("debug", false, "show metrics as they're generated to stdout") @@ -117,5 +119,21 @@ func main() { f.Close() } + pc := &telegraf.PrometheusCollector{ListenAddress: ""} + err = config.ApplyPrometheusCollector(pc) + if err != nil { + log.Fatal(err) + } + if pc.ListenAddress != "" { + http.Handle("/metrics", prometheus.Handler()) + log.Printf("Listening on %s for Prometheus", pc.ListenAddress) + go func() { + err := http.ListenAndServe(pc.ListenAddress, nil) + if err != nil { + log.Fatal(err) + } + }() + } + ag.Run(shutdown) } diff --git a/config.go b/config.go index 013b8d281..6953195ba 100644 --- a/config.go +++ b/config.go @@ -36,8 +36,13 @@ type Config struct { UserAgent string Tags map[string]string - agent *ast.Table - plugins map[string]*ast.Table + agent *ast.Table + plugins map[string]*ast.Table + prometheusCollector *ast.Table +} + +type PrometheusCollector struct { + ListenAddress string } func (c *Config) Plugins() map[string]*ast.Table { @@ -85,6 +90,14 @@ func (c *Config) ApplyAgent(v interface{}) error { return nil } +func (c *Config) ApplyPrometheusCollector(v interface{}) error { + if c.prometheusCollector != nil { + return toml.UnmarshalTable(c.prometheusCollector, v) + } + + return nil +} + func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, error) { cp := &ConfiguredPlugin{Name: name} @@ -183,6 +196,8 @@ func LoadConfig(path string) (*Config, error) { } case "agent": c.agent = subtbl + case "prometheus_collector": + c.prometheusCollector = subtbl default: c.plugins[name] = subtbl } @@ -261,6 +276,10 @@ database = "telegraf" # required. # debug = false # hostname = "prod3241" +[prometheus_collector] +# If set, expose all metrics on this address for Prometheus +# listen_address = ":9115" + # PLUGINS `