diff --git a/README.md b/README.md index 52435e8a7..6de2d94f8 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ Telegraf currently has support for collecting metrics from: * Prometheus (client libraries and exporters) * PostgreSQL * Redis +* Elasticsearch (indices stats) We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API. diff --git a/plugins/all/all.go b/plugins/all/all.go index 1a46d5054..4840ead3a 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/elasticsearch" _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go new file mode 100644 index 000000000..79b2435c2 --- /dev/null +++ b/plugins/elasticsearch/elasticsearch.go @@ -0,0 +1,108 @@ +package elasticsearch + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/influxdb/telegraf/plugins" +) + +const indicesStatsPath = "/_nodes/stats/indices" +const indicesStatsPathLocal = "/_nodes/_local/stats/indices" + +type node struct { + Host string `json:"host"` + Indices map[string]map[string]interface{} `json:"indices"` +} + +const sampleConfig = ` +# specify a list of one or more Elasticsearch servers +servers = ["http://localhost:9200"] +# +# set local to false when you want to read the indices stats from all nodes +# within the cluster +local = true +` + +// Elasticsearch is a plugin to read stats from one or many Elasticsearch +// servers. +type Elasticsearch struct { + Local bool + Servers []string + client *http.Client +} + +// NewElasticsearch return a new instance of Elasticsearch +func NewElasticsearch() *Elasticsearch { + return &Elasticsearch{client: http.DefaultClient} +} + +// SampleConfig returns sample configuration for this plugin. +func (e *Elasticsearch) SampleConfig() string { + return sampleConfig +} + +// Description returns the plugin description. +func (e *Elasticsearch) Description() string { + return "Read indices stats from one or more Elasticsearch servers or clusters" +} + +// Gather reads the stats from Elasticsearch and writes it to the +// Accumulator. +func (e *Elasticsearch) Gather(acc plugins.Accumulator) error { + for _, serv := range e.Servers { + var url string + if e.Local { + url = serv + indicesStatsPathLocal + } else { + url = serv + indicesStatsPath + } + if err := e.gatherUrl(url, acc); err != nil { + return err + } + } + return nil +} + +func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error { + r, err := e.client.Get(url) + if err != nil { + return err + } + d := json.NewDecoder(r.Body) + esRes := &struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]*node `json:"nodes"` + }{} + if err = d.Decode(esRes); err != nil { + return err + } + + for _, n := range esRes.Nodes { + tags := map[string]string{ + "node_host": n.Host, + "cluster_name": esRes.ClusterName, + } + + for group, stats := range n.Indices { + for statName, value := range stats { + floatVal, ok := value.(float64) + if !ok { + // there are a couple of values that we can't cast to float, + // this is fine :-) + continue + } + acc.Add(fmt.Sprintf("indices_%s_%s", group, statName), int(floatVal), tags) + } + } + } + + return nil +} + +func init() { + plugins.Add("elasticsearch", func() plugins.Plugin { + return NewElasticsearch() + }) +} diff --git a/plugins/elasticsearch/elasticsearch_test.go b/plugins/elasticsearch/elasticsearch_test.go new file mode 100644 index 000000000..6f7944043 --- /dev/null +++ b/plugins/elasticsearch/elasticsearch_test.go @@ -0,0 +1,258 @@ +package elasticsearch + +import ( + "io/ioutil" + "net/http" + "strings" + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +const indicesResponse = ` +{ + "cluster_name": "es-testcluster", + "nodes": { + "SDFsfSDFsdfFSDSDfSFDSDF": { + "timestamp": 1436365550135, + "name": "test.host.com", + "transport_address": "inet[/127.0.0.1:9300]", + "host": "test", + "ip": [ + "inet[/127.0.0.1:9300]", + "NONE" + ], + "attributes": { + "master": "true" + }, + "indices": { + "docs": { + "count": 29652, + "deleted": 5229 + }, + "store": { + "size_in_bytes": 37715234, + "throttle_time_in_millis": 215 + }, + "indexing": { + "index_total": 84790, + "index_time_in_millis": 29680, + "index_current": 0, + "delete_total": 13879, + "delete_time_in_millis": 1139, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0 + }, + "get": { + "total": 1, + "time_in_millis": 2, + "exists_total": 0, + "exists_time_in_millis": 0, + "missing_total": 1, + "missing_time_in_millis": 2, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 1452, + "query_time_in_millis": 5695, + "query_current": 0, + "fetch_total": 414, + "fetch_time_in_millis": 146, + "fetch_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 133, + "total_time_in_millis": 21060, + "total_docs": 203672, + "total_size_in_bytes": 142900226 + }, + "refresh": { + "total": 1076, + "total_time_in_millis": 20078 + }, + "flush": { + "total": 115, + "total_time_in_millis": 2401 + }, + "warmer": { + "current": 0, + "total": 2319, + "total_time_in_millis": 448 + }, + "filter_cache": { + "memory_size_in_bytes": 7384, + "evictions": 0 + }, + "id_cache": { + "memory_size_in_bytes": 0 + }, + "fielddata": { + "memory_size_in_bytes": 12996, + "evictions": 0 + }, + "percolate": { + "total": 0, + "time_in_millis": 0, + "current": 0, + "memory_size_in_bytes": -1, + "memory_size": "-1b", + "queries": 0 + }, + "completion": { + "size_in_bytes": 0 + }, + "segments": { + "count": 134, + "memory_in_bytes": 1285212, + "index_writer_memory_in_bytes": 0, + "index_writer_max_memory_in_bytes": 172368955, + "version_map_memory_in_bytes": 611844, + "fixed_bit_set_memory_in_bytes": 0 + }, + "translog": { + "operations": 17702, + "size_in_bytes": 17 + }, + "suggest": { + "total": 0, + "time_in_millis": 0, + "current": 0 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "evictions": 0, + "hit_count": 0, + "miss_count": 0 + }, + "recovery": { + "current_as_source": 0, + "current_as_target": 0, + "throttle_time_in_millis": 0 + } + } + } + } +} +` + +var indicesExpected = map[string]int{ + "indices_id_cache_memory_size_in_bytes": 0, + "indices_completion_size_in_bytes": 0, + "indices_suggest_total": 0, + "indices_suggest_time_in_millis": 0, + "indices_suggest_current": 0, + "indices_query_cache_memory_size_in_bytes": 0, + "indices_query_cache_evictions": 0, + "indices_query_cache_hit_count": 0, + "indices_query_cache_miss_count": 0, + "indices_store_size_in_bytes": 37715234, + "indices_store_throttle_time_in_millis": 215, + "indices_merges_current_docs": 0, + "indices_merges_current_size_in_bytes": 0, + "indices_merges_total": 133, + "indices_merges_total_time_in_millis": 21060, + "indices_merges_total_docs": 203672, + "indices_merges_total_size_in_bytes": 142900226, + "indices_merges_current": 0, + "indices_filter_cache_memory_size_in_bytes": 7384, + "indices_filter_cache_evictions": 0, + "indices_indexing_index_total": 84790, + "indices_indexing_index_time_in_millis": 29680, + "indices_indexing_index_current": 0, + "indices_indexing_noop_update_total": 0, + "indices_indexing_throttle_time_in_millis": 0, + "indices_indexing_delete_total": 13879, + "indices_indexing_delete_time_in_millis": 1139, + "indices_indexing_delete_current": 0, + "indices_get_exists_time_in_millis": 0, + "indices_get_missing_total": 1, + "indices_get_missing_time_in_millis": 2, + "indices_get_current": 0, + "indices_get_total": 1, + "indices_get_time_in_millis": 2, + "indices_get_exists_total": 0, + "indices_refresh_total": 1076, + "indices_refresh_total_time_in_millis": 20078, + "indices_percolate_current": 0, + "indices_percolate_memory_size_in_bytes": -1, + "indices_percolate_queries": 0, + "indices_percolate_total": 0, + "indices_percolate_time_in_millis": 0, + "indices_translog_operations": 17702, + "indices_translog_size_in_bytes": 17, + "indices_recovery_current_as_source": 0, + "indices_recovery_current_as_target": 0, + "indices_recovery_throttle_time_in_millis": 0, + "indices_docs_count": 29652, + "indices_docs_deleted": 5229, + "indices_flush_total_time_in_millis": 2401, + "indices_flush_total": 115, + "indices_fielddata_memory_size_in_bytes": 12996, + "indices_fielddata_evictions": 0, + "indices_search_fetch_current": 0, + "indices_search_open_contexts": 0, + "indices_search_query_total": 1452, + "indices_search_query_time_in_millis": 5695, + "indices_search_query_current": 0, + "indices_search_fetch_total": 414, + "indices_search_fetch_time_in_millis": 146, + "indices_warmer_current": 0, + "indices_warmer_total": 2319, + "indices_warmer_total_time_in_millis": 448, + "indices_segments_count": 134, + "indices_segments_memory_in_bytes": 1285212, + "indices_segments_index_writer_memory_in_bytes": 0, + "indices_segments_index_writer_max_memory_in_bytes": 172368955, + "indices_segments_version_map_memory_in_bytes": 611844, + "indices_segments_fixed_bit_set_memory_in_bytes": 0, +} + +type tranportMock struct { + statusCode int + body string +} + +func newTransportMock(statusCode int, body string) http.RoundTripper { + return &tranportMock{ + statusCode: statusCode, + body: body, + } +} + +func (t *tranportMock) RoundTrip(r *http.Request) (*http.Response, error) { + res := &http.Response{ + Header: make(http.Header), + Request: r, + StatusCode: t.statusCode, + } + res.Header.Set("Content-Type", "application/json") + res.Body = ioutil.NopCloser(strings.NewReader(t.body)) + return res, nil +} + +func TestElasticsearch(t *testing.T) { + es := NewElasticsearch() + es.Servers = []string{"http://example.com:9200"} + es.client.Transport = newTransportMock(http.StatusOK, indicesResponse) + + var acc testutil.Accumulator + if err := es.Gather(&acc); err != nil { + t.Fatal(err) + } + + tags := map[string]string{ + "node_host": "test", + "cluster_name": "es-testcluster", + } + + for key, val := range indicesExpected { + assert.NoError(t, acc.ValidateTaggedValue(key, val, tags)) + } +}