From cb8499c26446a213b9f066588c019ff490d147d4 Mon Sep 17 00:00:00 2001 From: JP Date: Tue, 27 Oct 2015 22:31:25 -0700 Subject: [PATCH] optinally gather cluster and index health stats --- plugins/elasticsearch/elasticsearch.go | 119 +++++++++++++++++--- plugins/elasticsearch/elasticsearch_test.go | 37 ++++++ plugins/elasticsearch/testdata_test.go | 71 ++++++++++++ 3 files changed, 209 insertions(+), 18 deletions(-) diff --git a/plugins/elasticsearch/elasticsearch.go b/plugins/elasticsearch/elasticsearch.go index e5236f42b..3150e6f5c 100644 --- a/plugins/elasticsearch/elasticsearch.go +++ b/plugins/elasticsearch/elasticsearch.go @@ -4,12 +4,14 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/influxdb/telegraf/plugins" ) const statsPath = "/_nodes/stats" const statsPathLocal = "/_nodes/_local/stats" +const healthPath = "/_cluster/health" type node struct { Host string `json:"host"` @@ -27,6 +29,31 @@ type node struct { Breakers interface{} `json:"breakers"` } +type clusterHealth struct { + ClusterName string `json:"cluster_name"` + Status string `json:"status"` + TimedOut bool `json:"timed_out"` + NumberOfNodes int `json:"number_of_nodes"` + NumberOfDataNodes int `json:"number_of_data_nodes"` + ActivePrimaryShards int `json:"active_primary_shards"` + ActiveShards int `json:"active_shards"` + RelocatingShards int `json:"relocating_shards"` + InitializingShards int `json:"initializing_shards"` + UnassignedShards int `json:"unassigned_shards"` + Indices map[string]indexHealth `json:"indices"` +} + +type indexHealth struct { + Status string `json:"status"` + NumberOfShards int `json:"number_of_shards"` + NumberOfReplicas int `json:"number_of_replicas"` + ActivePrimaryShards int `json:"active_primary_shards"` + ActiveShards int `json:"active_shards"` + RelocatingShards int `json:"relocating_shards"` + InitializingShards int `json:"initializing_shards"` + UnassignedShards int `json:"unassigned_shards"` +} + const sampleConfig = ` # specify a list of one or more Elasticsearch servers servers = ["http://localhost:9200"] @@ -34,14 +61,18 @@ const sampleConfig = ` # set local to false when you want to read the indices stats from all nodes # within the cluster local = true + + # set cluster_health to true when you want to also obtain cluster level stats + cluster_health = false ` // Elasticsearch is a plugin to read stats from one or many Elasticsearch // servers. type Elasticsearch struct { - Local bool - Servers []string - client *http.Client + Local bool + Servers []string + ClusterHealth bool + client *http.Client } // NewElasticsearch return a new instance of Elasticsearch @@ -69,36 +100,30 @@ func (e *Elasticsearch) Gather(acc plugins.Accumulator) error { } else { url = serv + statsPath } - if err := e.gatherUrl(url, acc); err != nil { + if err := e.gatherNodeStats(url, acc); err != nil { return err } + if e.ClusterHealth { + e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc) + } } return nil } -func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error { - r, err := e.client.Get(url) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) - } - d := json.NewDecoder(r.Body) - esRes := &struct { +func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) error { + nodeStats := &struct { ClusterName string `json:"cluster_name"` Nodes map[string]*node `json:"nodes"` }{} - if err = d.Decode(esRes); err != nil { + if err := e.gatherData(url, nodeStats); err != nil { return err } - - for id, n := range esRes.Nodes { + for id, n := range nodeStats.Nodes { tags := map[string]string{ "node_id": id, "node_host": n.Host, "node_name": n.Name, - "cluster_name": esRes.ClusterName, + "cluster_name": nodeStats.ClusterName, } for k, v := range n.Attributes { @@ -124,7 +149,65 @@ func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error { } } } + return nil +} +func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) error { + clusterStats := &clusterHealth{} + if err := e.gatherData(url, clusterStats); err != nil { + return err + } + measurementTime := time.Now() + clusterFields := map[string]interface{}{ + "status": clusterStats.Status, + "timed_out": clusterStats.TimedOut, + "number_of_nodes": clusterStats.NumberOfNodes, + "number_of_data_nodes": clusterStats.NumberOfDataNodes, + "active_primary_shards": clusterStats.ActivePrimaryShards, + "active_shards": clusterStats.ActiveShards, + "relocating_shards": clusterStats.RelocatingShards, + "initializing_shards": clusterStats.InitializingShards, + "unassigned_shards": clusterStats.UnassignedShards, + } + acc.AddFields( + "cluster_health", + clusterFields, + map[string]string{"name": clusterStats.ClusterName}, + measurementTime, + ) + + for name, health := range clusterStats.Indices { + indexFields := map[string]interface{}{ + "status": health.Status, + "number_of_shards": health.NumberOfShards, + "number_of_replicas": health.NumberOfReplicas, + "active_primary_shards": health.ActivePrimaryShards, + "active_shards": health.ActiveShards, + "relocating_shards": health.RelocatingShards, + "initializing_shards": health.InitializingShards, + "unassigned_shards": health.UnassignedShards, + } + acc.AddFields( + "indices", + indexFields, + map[string]string{"index": name}, + measurementTime, + ) + } + return nil +} + +func (e *Elasticsearch) gatherData(url string, v interface{}) error { + r, err := e.client.Get(url) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) + } + if err = json.NewDecoder(r.Body).Decode(v); err != nil { + return err + } return nil } diff --git a/plugins/elasticsearch/elasticsearch_test.go b/plugins/elasticsearch/elasticsearch_test.go index 9e1cf66c9..e29dd4ddb 100644 --- a/plugins/elasticsearch/elasticsearch_test.go +++ b/plugins/elasticsearch/elasticsearch_test.go @@ -8,6 +8,7 @@ import ( "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type transportMock struct { @@ -70,3 +71,39 @@ func TestElasticsearch(t *testing.T) { } } } + +func TestGatherClusterStats(t *testing.T) { + es := NewElasticsearch() + es.Servers = []string{"http://example.com:9200"} + es.ClusterHealth = true + es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) + + var acc testutil.Accumulator + require.NoError(t, es.Gather(&acc)) + + var clusterHealthTests = []struct { + measurement string + fields map[string]interface{} + tags map[string]string + }{ + { + "cluster_health", + clusterHealthExpected, + map[string]string{"name": "elasticsearch_telegraf"}, + }, + { + "indices", + v1IndexExpected, + map[string]string{"index": "v1"}, + }, + { + "indices", + v2IndexExpected, + map[string]string{"index": "v2"}, + }, + } + + for _, exp := range clusterHealthTests { + assert.NoError(t, acc.ValidateTaggedFields(exp.measurement, exp.fields, exp.tags)) + } +} diff --git a/plugins/elasticsearch/testdata_test.go b/plugins/elasticsearch/testdata_test.go index 87bd15692..19a719c05 100644 --- a/plugins/elasticsearch/testdata_test.go +++ b/plugins/elasticsearch/testdata_test.go @@ -1,5 +1,76 @@ package elasticsearch +const clusterResponse = ` +{ + "cluster_name": "elasticsearch_telegraf", + "status": "green", + "timed_out": false, + "number_of_nodes": 3, + "number_of_data_nodes": 3, + "active_primary_shards": 5, + "active_shards": 15, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0, + "indices": { + "v1": { + "status": "green", + "number_of_shards": 10, + "number_of_replicas": 1, + "active_primary_shards": 10, + "active_shards": 20, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0 + }, + "v2": { + "status": "red", + "number_of_shards": 10, + "number_of_replicas": 1, + "active_primary_shards": 0, + "active_shards": 0, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 20 + } + } +} +` + +var clusterHealthExpected = map[string]interface{}{ + "status": "green", + "timed_out": false, + "number_of_nodes": 3, + "number_of_data_nodes": 3, + "active_primary_shards": 5, + "active_shards": 15, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0, +} + +var v1IndexExpected = map[string]interface{}{ + "status": "green", + "number_of_shards": 10, + "number_of_replicas": 1, + "active_primary_shards": 10, + "active_shards": 20, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0, +} + +var v2IndexExpected = map[string]interface{}{ + "status": "red", + "number_of_shards": 10, + "number_of_replicas": 1, + "active_primary_shards": 0, + "active_shards": 0, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 20, +} + const statsResponse = ` { "cluster_name": "es-testcluster",