Kapacitor input plugin (#2031)

This commit is contained in:
Ross McDonald 2017-04-27 13:47:22 -05:00 committed by Daniel Nelson
parent e1a734c525
commit a3feacbd2f
7 changed files with 537 additions and 1 deletions

View File

@ -77,6 +77,7 @@ be deprecated eventually.
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
- [#2705](https://github.com/influxdata/telegraf/pull/2705): Kinesis output: add use_random_partitionkey option
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer
- [#2031](https://github.com/influxdata/telegraf/pull/2031): Add Kapacitor input plugin
### Bugfixes

View File

@ -35,6 +35,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/iptables"
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
_ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/kapacitor"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/leofs"
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"

View File

@ -80,4 +80,4 @@ internal_write,output=file,host=tyrion buffer_limit=10000i,write_time_ns=636609i
internal_gather,input=internal,host=tyrion metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000
internal_gather,input=http_listener,host=tyrion metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000
internal_http_listener,address=:8186,host=tyrion queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000
```
```

View File

@ -0,0 +1,149 @@
# Kapacitor Plugin
The Kapacitor plugin will collect metrics from the given Kapacitor instances.
### Configuration:
```toml
[[inputs.kapacitor]]
## Multiple URLs from which to read Kapacitor-formatted JSON
## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
urls = [
"http://localhost:9092/kapacitor/v1/debug/vars"
]
## Time limit for http requests
timeout = "5s"
```
### Measurements & Fields
- kapacitor
- num_enabled_tasks, integer
- num_subscriptions, integer
- num_tasks, integer
- kapacitor_edges
- collected, integer
- emitted, integer
- kapacitor_ingress
- points_received, integer
- kapacitor_memstats
- alloc_bytes, integer
- buck_hash_sys_bytes, integer
- frees, integer
- gcc_pu_fraction, float
- gc_sys_bytes, integer
- heap_alloc_bytes, integer
- heap_idle_bytes, integer
- heap_inuse_bytes, integer
- heap_objects, integer
- heap_released_bytes, integer
- heap_sys_bytes, integer
- last_gc_ns, integer
- lookups, integer
- mallocs, integer
- mcache_in_use_bytes, integer
- mcache_sys_bytes, integer
- mspan_in_use_bytes, integer
- mspan_sys_bytes, integer
- next_gc_ns, integer
- num_gc, integer
- other_sys_bytes, integer
- pause_total_ns, integer
- stack_in_use_bytes, integer
- stack_sys_bytes, integer
- sys_bytes, integer
- total_alloc_bytes, integer
- kapacitor_nodes
- alerts_triggered, integer
- avg_exec_time_ns, integer
- batches_queried, integer
- crits_triggered, integer
- eval_errors, integer
- fields_defaulted, integer
- infos_triggered, integer
- oks_triggered, integer
- points_queried, integer
- points_written, integer
- query_errors, integer
- tags_defaulted, integer
- warns_triggered, integer
- write_errors, integer
*Note:* The Kapacitor variables `host`, `cluster_id`, and `server_id`
are currently not recorded due to the potential high cardinality of
these values.
### Example Output:
```
$ telegraf -config /etc/telegraf.conf -input-filter kapacitor -test
* Plugin: inputs.kapacitor, Collection 1
> kapacitor_memstats,host=hostname.local,kap_version=1.1.0~rc2,url=http://localhost:9092/kapacitor/v1/debug/vars alloc_bytes=6974808i,buck_hash_sys_bytes=1452609i,frees=207281i,gc_sys_bytes=802816i,gcc_pu_fraction=0.00004693548939673313,heap_alloc_bytes=6974808i,heap_idle_bytes=6742016i,heap_in_use_bytes=9183232i,heap_objects=23216i,heap_released_bytes=0i,heap_sys_bytes=15925248i,last_gc_ns=1478791460012676997i,lookups=88i,mallocs=230497i,mcache_in_use_bytes=9600i,mcache_sys_bytes=16384i,mspan_in_use_bytes=98560i,mspan_sys_bytes=131072i,next_gc_ns=11467528i,num_gc=8i,other_sys_bytes=2236087i,pause_total_ns=2994110i,stack_in_use_bytes=1900544i,stack_sys_bytes=1900544i,sys_bytes=22464760i,total_alloc_bytes=35023600i 1478791462000000000
> kapacitor,host=hostname.local,kap_version=1.1.0~rc2,url=http://localhost:9092/kapacitor/v1/debug/vars num_enabled_tasks=5i,num_subscriptions=5i,num_tasks=5i 1478791462000000000
> kapacitor_edges,child=stream0,host=hostname.local,parent=stream,task=deadman-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=shard,retention_policy=monitor,task_master=main points_received=120 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=subscriber,retention_policy=monitor,task_master=main points_received=60 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=http_out,node=http_out3,task=sys-stats,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=window6,host=hostname.local,parent=derivative5,task=deadman-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=from,node=from1,task=sys-stats,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=stream,node=stream0,task=test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=window,node=window6,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=cq,retention_policy=monitor,task_master=main points_received=10 1478791462000000000
> kapacitor_edges,child=http_out3,host=hostname.local,parent=window2,task=sys-stats,type=batch collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=mean4,host=hostname.local,parent=log3,task=deadman-test,type=batch collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_kapacitor,host=hostname.local,measurement=nodes,retention_policy=autogen,task_master=main points_received=207 1478791462000000000
> kapacitor_edges,child=stream0,host=hostname.local,parent=stream,task=sys-stats,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=log6,host=hostname.local,parent=sum5,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=from1,host=hostname.local,parent=stream0,task=sys-stats,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=alert,node=alert2,task=test,type=stream alerts_triggered=0,avg_exec_time_ns=0i,crits_triggered=0,infos_triggered=0,oks_triggered=0,warns_triggered=0 1478791462000000000
> kapacitor_edges,child=log3,host=hostname.local,parent=derivative2,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_kapacitor,host=hostname.local,measurement=runtime,retention_policy=autogen,task_master=main points_received=9 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=tsm1_filestore,retention_policy=monitor,task_master=main points_received=120 1478791462000000000
> kapacitor_edges,child=derivative2,host=hostname.local,parent=from1,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=stream,node=stream0,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=queryExecutor,retention_policy=monitor,task_master=main points_received=10 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=tsm1_wal,retention_policy=monitor,task_master=main points_received=120 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=log,node=log6,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=stream,host=hostname.local,parent=stats,task=task_master:main,type=stream collected=598,emitted=598 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=write,retention_policy=monitor,task_master=main points_received=10 1478791462000000000
> kapacitor_edges,child=stream0,host=hostname.local,parent=stream,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=log,node=log3,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=from,node=from1,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_kapacitor,host=hostname.local,measurement=ingress,retention_policy=autogen,task_master=main points_received=148 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=eval,node=eval4,task=derivative-test,type=stream avg_exec_time_ns=0i,eval_errors=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=derivative,node=derivative2,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=runtime,retention_policy=monitor,task_master=main points_received=10 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=httpd,retention_policy=monitor,task_master=main points_received=10 1478791462000000000
> kapacitor_edges,child=sum5,host=hostname.local,parent=eval4,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_kapacitor,host=hostname.local,measurement=kapacitor,retention_policy=autogen,task_master=main points_received=9 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=from,node=from1,task=test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=tsm1_engine,retention_policy=monitor,task_master=main points_received=120 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=window,node=window2,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=stream,node=stream0,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=influxdb_out4,host=hostname.local,parent=http_out3,task=sys-stats,type=batch collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=window2,host=hostname.local,parent=from1,task=deadman-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=from,node=from1,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=from1,host=hostname.local,parent=stream0,task=deadman-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=database,retention_policy=monitor,task_master=main points_received=40 1478791462000000000
> kapacitor_edges,child=stream,host=hostname.local,parent=write_points,task=task_master:main,type=stream collected=750,emitted=750 1478791462000000000
> kapacitor_edges,child=log7,host=hostname.local,parent=window6,task=deadman-test,type=batch collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=window2,host=hostname.local,parent=from1,task=sys-stats,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=log,node=log7,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_ingress,database=_kapacitor,host=hostname.local,measurement=edges,retention_policy=autogen,task_master=main points_received=225 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=derivative,node=derivative5,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=from1,host=hostname.local,parent=stream0,task=test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=alert2,host=hostname.local,parent=from1,task=test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=log,node=log3,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=influxdb_out,node=influxdb_out4,task=sys-stats,type=stream avg_exec_time_ns=0i,points_written=0,write_errors=0 1478791462000000000
> kapacitor_edges,child=stream0,host=hostname.local,parent=stream,task=test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=log3,host=hostname.local,parent=window2,task=deadman-test,type=batch collected=0,emitted=0 1478791462000000000
> kapacitor_edges,child=derivative5,host=hostname.local,parent=mean4,task=deadman-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=stream,node=stream0,task=sys-stats,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=window,node=window2,task=sys-stats,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=mean,node=mean4,task=deadman-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=from1,host=hostname.local,parent=stream0,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
> kapacitor_ingress,database=_internal,host=hostname.local,measurement=tsm1_cache,retention_policy=monitor,task_master=main points_received=120 1478791462000000000
> kapacitor_nodes,host=hostname.local,kind=sum,node=sum5,task=derivative-test,type=stream avg_exec_time_ns=0i 1478791462000000000
> kapacitor_edges,child=eval4,host=hostname.local,parent=log3,task=derivative-test,type=stream collected=0,emitted=0 1478791462000000000
```

View File

@ -0,0 +1,228 @@
package kapacitor
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
defaultURL = "http://localhost:9092/kapacitor/v1/debug/vars"
)
type Kapacitor struct {
URLs []string `toml:"urls"`
Timeout internal.Duration
client *http.Client
}
func (*Kapacitor) Description() string {
return "Read Kapacitor-formatted JSON metrics from one or more HTTP endpoints"
}
func (*Kapacitor) SampleConfig() string {
return `
## Multiple URLs from which to read Kapacitor-formatted JSON
## Default is "http://localhost:9092/kapacitor/v1/debug/vars".
urls = [
"http://localhost:9092/kapacitor/v1/debug/vars"
]
## Time limit for http requests
timeout = "5s"
`
}
func (k *Kapacitor) Gather(acc telegraf.Accumulator) error {
if k.client == nil {
k.client = &http.Client{Timeout: k.Timeout.Duration}
}
var wg sync.WaitGroup
for _, u := range k.URLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := k.gatherURL(acc, url); err != nil {
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err))
}
}(u)
}
wg.Wait()
return nil
}
type object struct {
Name string `json:"name"`
Values map[string]interface{} `json:"values"`
Tags map[string]string `json:"tags"`
}
type memstats struct {
Alloc int64 `json:"Alloc"`
TotalAlloc int64 `json:"TotalAlloc"`
Sys int64 `json:"Sys"`
Lookups int64 `json:"Lookups"`
Mallocs int64 `json:"Mallocs"`
Frees int64 `json:"Frees"`
HeapAlloc int64 `json:"HeapAlloc"`
HeapSys int64 `json:"HeapSys"`
HeapIdle int64 `json:"HeapIdle"`
HeapInuse int64 `json:"HeapInuse"`
HeapReleased int64 `json:"HeapReleased"`
HeapObjects int64 `json:"HeapObjects"`
StackInuse int64 `json:"StackInuse"`
StackSys int64 `json:"StackSys"`
MSpanInuse int64 `json:"MSpanInuse"`
MSpanSys int64 `json:"MSpanSys"`
MCacheInuse int64 `json:"MCacheInuse"`
MCacheSys int64 `json:"MCacheSys"`
BuckHashSys int64 `json:"BuckHashSys"`
GCSys int64 `json:"GCSys"`
OtherSys int64 `json:"OtherSys"`
NextGC int64 `json:"NextGC"`
LastGC int64 `json:"LastGC"`
PauseTotalNs int64 `json:"PauseTotalNs"`
NumGC int64 `json:"NumGC"`
GCCPUFraction float64 `json:"GCCPUFraction"`
}
type stats struct {
CmdLine []string `json:"cmdline"`
ClusterID string `json:"cluster_id"`
Host string `json:"host"`
Kapacitor *map[string]object `json:"kapacitor"`
MemStats *memstats `json:"memstats"`
NumEnabledTasks int `json:"num_enabled_tasks"`
NumSubscriptions int `json:"num_subscriptions"`
NumTasks int `json:"num_tasks"`
Product string `json:"product"`
ServerID string `json:"server_id"`
Version string `json:"version"`
}
// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
// url : endpoint to send request to
//
// Returns:
// error: Any error that may have occurred
func (k *Kapacitor) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
now := time.Now()
resp, err := k.client.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
var s stats
err = dec.Decode(&s)
if err != nil {
return err
}
if s.MemStats != nil {
acc.AddFields("kapacitor_memstats",
map[string]interface{}{
"alloc_bytes": s.MemStats.Alloc,
"buck_hash_sys_bytes": s.MemStats.BuckHashSys,
"frees": s.MemStats.Frees,
"gcc_pu_fraction": s.MemStats.GCCPUFraction,
"gc_sys_bytes": s.MemStats.GCSys,
"heap_alloc_bytes": s.MemStats.HeapAlloc,
"heap_idle_bytes": s.MemStats.HeapIdle,
"heap_in_use_bytes": s.MemStats.HeapInuse,
"heap_objects": s.MemStats.HeapObjects,
"heap_released_bytes": s.MemStats.HeapReleased,
"heap_sys_bytes": s.MemStats.HeapSys,
"last_gc_ns": s.MemStats.LastGC,
"lookups": s.MemStats.Lookups,
"mallocs": s.MemStats.Mallocs,
"mcache_in_use_bytes": s.MemStats.MCacheInuse,
"mcache_sys_bytes": s.MemStats.MCacheSys,
"mspan_in_use_bytes": s.MemStats.MSpanInuse,
"mspan_sys_bytes": s.MemStats.MSpanSys,
"next_gc_ns": s.MemStats.NextGC,
"num_gc": s.MemStats.NumGC,
"other_sys_bytes": s.MemStats.OtherSys,
"pause_total_ns": s.MemStats.PauseTotalNs,
"stack_in_use_bytes": s.MemStats.StackInuse,
"stack_sys_bytes": s.MemStats.StackSys,
"sys_bytes": s.MemStats.Sys,
"total_alloc_bytes": s.MemStats.TotalAlloc,
},
map[string]string{
"kap_version": s.Version,
"url": url,
},
now)
}
acc.AddFields("kapacitor",
map[string]interface{}{
"num_enabled_tasks": s.NumEnabledTasks,
"num_subscriptions": s.NumSubscriptions,
"num_tasks": s.NumTasks,
},
map[string]string{
"kap_version": s.Version,
"url": url,
},
now)
if s.Kapacitor != nil {
for _, obj := range *s.Kapacitor {
// Strip out high-cardinality or duplicative tags
excludeTags := []string{"host", "cluster_id", "server_id"}
for _, key := range excludeTags {
if _, ok := obj.Tags[key]; ok {
delete(obj.Tags, key)
}
}
// Convert time-related string field to int
if _, ok := obj.Values["avg_exec_time_ns"]; ok {
d, err := time.ParseDuration(obj.Values["avg_exec_time_ns"].(string))
if err != nil {
continue
}
obj.Values["avg_exec_time_ns"] = d.Nanoseconds()
}
acc.AddFields(
"kapacitor_"+obj.Name,
obj.Values,
obj.Tags,
now,
)
}
}
return nil
}
func init() {
inputs.Add("kapacitor", func() telegraf.Input {
return &Kapacitor{
URLs: []string{defaultURL},
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}

File diff suppressed because one or more lines are too long

View File

@ -299,6 +299,22 @@ func (a *Accumulator) HasTimestamp(measurement string, timestamp time.Time) bool
return false
}
// HasField returns true if the given measurement has a field with the given
// name
func (a *Accumulator) HasField(measurement string, field string) bool {
a.Lock()
defer a.Unlock()
for _, p := range a.Metrics {
if p.Measurement == measurement {
if _, ok := p.Fields[field]; ok {
return true
}
}
}
return false
}
// HasIntField returns true if the measurement has an Int value
func (a *Accumulator) HasIntField(measurement string, field string) bool {
a.Lock()