optinally gather cluster and index health stats
This commit is contained in:
		
							parent
							
								
									d2fb065d0d
								
							
						
					
					
						commit
						cb8499c264
					
				|  | @ -4,12 +4,14 @@ import ( | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdb/telegraf/plugins" | 	"github.com/influxdb/telegraf/plugins" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const statsPath = "/_nodes/stats" | const statsPath = "/_nodes/stats" | ||||||
| const statsPathLocal = "/_nodes/_local/stats" | const statsPathLocal = "/_nodes/_local/stats" | ||||||
|  | const healthPath = "/_cluster/health" | ||||||
| 
 | 
 | ||||||
| type node struct { | type node struct { | ||||||
| 	Host       string            `json:"host"` | 	Host       string            `json:"host"` | ||||||
|  | @ -27,6 +29,31 @@ type node struct { | ||||||
| 	Breakers   interface{}       `json:"breakers"` | 	Breakers   interface{}       `json:"breakers"` | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type clusterHealth struct { | ||||||
|  | 	ClusterName         string                 `json:"cluster_name"` | ||||||
|  | 	Status              string                 `json:"status"` | ||||||
|  | 	TimedOut            bool                   `json:"timed_out"` | ||||||
|  | 	NumberOfNodes       int                    `json:"number_of_nodes"` | ||||||
|  | 	NumberOfDataNodes   int                    `json:"number_of_data_nodes"` | ||||||
|  | 	ActivePrimaryShards int                    `json:"active_primary_shards"` | ||||||
|  | 	ActiveShards        int                    `json:"active_shards"` | ||||||
|  | 	RelocatingShards    int                    `json:"relocating_shards"` | ||||||
|  | 	InitializingShards  int                    `json:"initializing_shards"` | ||||||
|  | 	UnassignedShards    int                    `json:"unassigned_shards"` | ||||||
|  | 	Indices             map[string]indexHealth `json:"indices"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type indexHealth struct { | ||||||
|  | 	Status              string `json:"status"` | ||||||
|  | 	NumberOfShards      int    `json:"number_of_shards"` | ||||||
|  | 	NumberOfReplicas    int    `json:"number_of_replicas"` | ||||||
|  | 	ActivePrimaryShards int    `json:"active_primary_shards"` | ||||||
|  | 	ActiveShards        int    `json:"active_shards"` | ||||||
|  | 	RelocatingShards    int    `json:"relocating_shards"` | ||||||
|  | 	InitializingShards  int    `json:"initializing_shards"` | ||||||
|  | 	UnassignedShards    int    `json:"unassigned_shards"` | ||||||
|  | } | ||||||
|  | 
 | ||||||
| const sampleConfig = ` | const sampleConfig = ` | ||||||
|   # specify a list of one or more Elasticsearch servers |   # specify a list of one or more Elasticsearch servers | ||||||
|   servers = ["http://localhost:9200"] |   servers = ["http://localhost:9200"] | ||||||
|  | @ -34,14 +61,18 @@ const sampleConfig = ` | ||||||
|   # set local to false when you want to read the indices stats from all nodes |   # set local to false when you want to read the indices stats from all nodes | ||||||
|   # within the cluster |   # within the cluster | ||||||
|   local = true |   local = true | ||||||
|  | 
 | ||||||
|  |   # set cluster_health to true when you want to also obtain cluster level stats | ||||||
|  |   cluster_health = false | ||||||
| ` | ` | ||||||
| 
 | 
 | ||||||
| // Elasticsearch is a plugin to read stats from one or many Elasticsearch
 | // Elasticsearch is a plugin to read stats from one or many Elasticsearch
 | ||||||
| // servers.
 | // servers.
 | ||||||
| type Elasticsearch struct { | type Elasticsearch struct { | ||||||
| 	Local   bool | 	Local         bool | ||||||
| 	Servers []string | 	Servers       []string | ||||||
| 	client  *http.Client | 	ClusterHealth bool | ||||||
|  | 	client        *http.Client | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewElasticsearch return a new instance of Elasticsearch
 | // NewElasticsearch return a new instance of Elasticsearch
 | ||||||
|  | @ -69,36 +100,30 @@ func (e *Elasticsearch) Gather(acc plugins.Accumulator) error { | ||||||
| 		} else { | 		} else { | ||||||
| 			url = serv + statsPath | 			url = serv + statsPath | ||||||
| 		} | 		} | ||||||
| 		if err := e.gatherUrl(url, acc); err != nil { | 		if err := e.gatherNodeStats(url, acc); err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  | 		if e.ClusterHealth { | ||||||
|  | 			e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error { | func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) error { | ||||||
| 	r, err := e.client.Get(url) | 	nodeStats := &struct { | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	if r.StatusCode != http.StatusOK { |  | ||||||
| 		return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) |  | ||||||
| 	} |  | ||||||
| 	d := json.NewDecoder(r.Body) |  | ||||||
| 	esRes := &struct { |  | ||||||
| 		ClusterName string           `json:"cluster_name"` | 		ClusterName string           `json:"cluster_name"` | ||||||
| 		Nodes       map[string]*node `json:"nodes"` | 		Nodes       map[string]*node `json:"nodes"` | ||||||
| 	}{} | 	}{} | ||||||
| 	if err = d.Decode(esRes); err != nil { | 	if err := e.gatherData(url, nodeStats); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 	for id, n := range nodeStats.Nodes { | ||||||
| 	for id, n := range esRes.Nodes { |  | ||||||
| 		tags := map[string]string{ | 		tags := map[string]string{ | ||||||
| 			"node_id":      id, | 			"node_id":      id, | ||||||
| 			"node_host":    n.Host, | 			"node_host":    n.Host, | ||||||
| 			"node_name":    n.Name, | 			"node_name":    n.Name, | ||||||
| 			"cluster_name": esRes.ClusterName, | 			"cluster_name": nodeStats.ClusterName, | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		for k, v := range n.Attributes { | 		for k, v := range n.Attributes { | ||||||
|  | @ -124,7 +149,65 @@ func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error { | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|  | func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) error { | ||||||
|  | 	clusterStats := &clusterHealth{} | ||||||
|  | 	if err := e.gatherData(url, clusterStats); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	measurementTime := time.Now() | ||||||
|  | 	clusterFields := map[string]interface{}{ | ||||||
|  | 		"status":                clusterStats.Status, | ||||||
|  | 		"timed_out":             clusterStats.TimedOut, | ||||||
|  | 		"number_of_nodes":       clusterStats.NumberOfNodes, | ||||||
|  | 		"number_of_data_nodes":  clusterStats.NumberOfDataNodes, | ||||||
|  | 		"active_primary_shards": clusterStats.ActivePrimaryShards, | ||||||
|  | 		"active_shards":         clusterStats.ActiveShards, | ||||||
|  | 		"relocating_shards":     clusterStats.RelocatingShards, | ||||||
|  | 		"initializing_shards":   clusterStats.InitializingShards, | ||||||
|  | 		"unassigned_shards":     clusterStats.UnassignedShards, | ||||||
|  | 	} | ||||||
|  | 	acc.AddFields( | ||||||
|  | 		"cluster_health", | ||||||
|  | 		clusterFields, | ||||||
|  | 		map[string]string{"name": clusterStats.ClusterName}, | ||||||
|  | 		measurementTime, | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	for name, health := range clusterStats.Indices { | ||||||
|  | 		indexFields := map[string]interface{}{ | ||||||
|  | 			"status":                health.Status, | ||||||
|  | 			"number_of_shards":      health.NumberOfShards, | ||||||
|  | 			"number_of_replicas":    health.NumberOfReplicas, | ||||||
|  | 			"active_primary_shards": health.ActivePrimaryShards, | ||||||
|  | 			"active_shards":         health.ActiveShards, | ||||||
|  | 			"relocating_shards":     health.RelocatingShards, | ||||||
|  | 			"initializing_shards":   health.InitializingShards, | ||||||
|  | 			"unassigned_shards":     health.UnassignedShards, | ||||||
|  | 		} | ||||||
|  | 		acc.AddFields( | ||||||
|  | 			"indices", | ||||||
|  | 			indexFields, | ||||||
|  | 			map[string]string{"index": name}, | ||||||
|  | 			measurementTime, | ||||||
|  | 		) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *Elasticsearch) gatherData(url string, v interface{}) error { | ||||||
|  | 	r, err := e.client.Get(url) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if r.StatusCode != http.StatusOK { | ||||||
|  | 		return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK) | ||||||
|  | 	} | ||||||
|  | 	if err = json.NewDecoder(r.Body).Decode(v); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -8,6 +8,7 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"github.com/influxdb/telegraf/testutil" | 	"github.com/influxdb/telegraf/testutil" | ||||||
| 	"github.com/stretchr/testify/assert" | 	"github.com/stretchr/testify/assert" | ||||||
|  | 	"github.com/stretchr/testify/require" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type transportMock struct { | type transportMock struct { | ||||||
|  | @ -70,3 +71,39 @@ func TestElasticsearch(t *testing.T) { | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func TestGatherClusterStats(t *testing.T) { | ||||||
|  | 	es := NewElasticsearch() | ||||||
|  | 	es.Servers = []string{"http://example.com:9200"} | ||||||
|  | 	es.ClusterHealth = true | ||||||
|  | 	es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) | ||||||
|  | 
 | ||||||
|  | 	var acc testutil.Accumulator | ||||||
|  | 	require.NoError(t, es.Gather(&acc)) | ||||||
|  | 
 | ||||||
|  | 	var clusterHealthTests = []struct { | ||||||
|  | 		measurement string | ||||||
|  | 		fields      map[string]interface{} | ||||||
|  | 		tags        map[string]string | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			"cluster_health", | ||||||
|  | 			clusterHealthExpected, | ||||||
|  | 			map[string]string{"name": "elasticsearch_telegraf"}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"indices", | ||||||
|  | 			v1IndexExpected, | ||||||
|  | 			map[string]string{"index": "v1"}, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			"indices", | ||||||
|  | 			v2IndexExpected, | ||||||
|  | 			map[string]string{"index": "v2"}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, exp := range clusterHealthTests { | ||||||
|  | 		assert.NoError(t, acc.ValidateTaggedFields(exp.measurement, exp.fields, exp.tags)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | @ -1,5 +1,76 @@ | ||||||
| package elasticsearch | package elasticsearch | ||||||
| 
 | 
 | ||||||
|  | const clusterResponse = ` | ||||||
|  | { | ||||||
|  |    "cluster_name": "elasticsearch_telegraf", | ||||||
|  |    "status": "green", | ||||||
|  |    "timed_out": false, | ||||||
|  |    "number_of_nodes": 3, | ||||||
|  |    "number_of_data_nodes": 3, | ||||||
|  |    "active_primary_shards": 5, | ||||||
|  |    "active_shards": 15, | ||||||
|  |    "relocating_shards": 0, | ||||||
|  |    "initializing_shards": 0, | ||||||
|  |    "unassigned_shards": 0, | ||||||
|  |    "indices": { | ||||||
|  |       "v1": { | ||||||
|  |          "status": "green", | ||||||
|  |          "number_of_shards": 10, | ||||||
|  |          "number_of_replicas": 1, | ||||||
|  |          "active_primary_shards": 10, | ||||||
|  |          "active_shards": 20, | ||||||
|  |          "relocating_shards": 0, | ||||||
|  |          "initializing_shards": 0, | ||||||
|  |          "unassigned_shards": 0 | ||||||
|  |       }, | ||||||
|  |       "v2": { | ||||||
|  |          "status": "red", | ||||||
|  |          "number_of_shards": 10, | ||||||
|  |          "number_of_replicas": 1, | ||||||
|  |          "active_primary_shards": 0, | ||||||
|  |          "active_shards": 0, | ||||||
|  |          "relocating_shards": 0, | ||||||
|  |          "initializing_shards": 0, | ||||||
|  |          "unassigned_shards": 20 | ||||||
|  |       } | ||||||
|  |    } | ||||||
|  | } | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | var clusterHealthExpected = map[string]interface{}{ | ||||||
|  | 	"status":                "green", | ||||||
|  | 	"timed_out":             false, | ||||||
|  | 	"number_of_nodes":       3, | ||||||
|  | 	"number_of_data_nodes":  3, | ||||||
|  | 	"active_primary_shards": 5, | ||||||
|  | 	"active_shards":         15, | ||||||
|  | 	"relocating_shards":     0, | ||||||
|  | 	"initializing_shards":   0, | ||||||
|  | 	"unassigned_shards":     0, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var v1IndexExpected = map[string]interface{}{ | ||||||
|  | 	"status":                "green", | ||||||
|  | 	"number_of_shards":      10, | ||||||
|  | 	"number_of_replicas":    1, | ||||||
|  | 	"active_primary_shards": 10, | ||||||
|  | 	"active_shards":         20, | ||||||
|  | 	"relocating_shards":     0, | ||||||
|  | 	"initializing_shards":   0, | ||||||
|  | 	"unassigned_shards":     0, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var v2IndexExpected = map[string]interface{}{ | ||||||
|  | 	"status":                "red", | ||||||
|  | 	"number_of_shards":      10, | ||||||
|  | 	"number_of_replicas":    1, | ||||||
|  | 	"active_primary_shards": 0, | ||||||
|  | 	"active_shards":         0, | ||||||
|  | 	"relocating_shards":     0, | ||||||
|  | 	"initializing_shards":   0, | ||||||
|  | 	"unassigned_shards":     20, | ||||||
|  | } | ||||||
|  | 
 | ||||||
| const statsResponse = ` | const statsResponse = ` | ||||||
| { | { | ||||||
|   "cluster_name": "es-testcluster", |   "cluster_name": "es-testcluster", | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue