optinally gather cluster and index health stats
This commit is contained in:
parent
889c0a50a4
commit
0dab4db0d3
|
@ -4,12 +4,14 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/plugins"
|
"github.com/influxdb/telegraf/plugins"
|
||||||
)
|
)
|
||||||
|
|
||||||
const statsPath = "/_nodes/stats"
|
const statsPath = "/_nodes/stats"
|
||||||
const statsPathLocal = "/_nodes/_local/stats"
|
const statsPathLocal = "/_nodes/_local/stats"
|
||||||
|
const healthPath = "/_cluster/health"
|
||||||
|
|
||||||
type node struct {
|
type node struct {
|
||||||
Host string `json:"host"`
|
Host string `json:"host"`
|
||||||
|
@ -27,6 +29,31 @@ type node struct {
|
||||||
Breakers interface{} `json:"breakers"`
|
Breakers interface{} `json:"breakers"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type clusterHealth struct {
|
||||||
|
ClusterName string `json:"cluster_name"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
TimedOut bool `json:"timed_out"`
|
||||||
|
NumberOfNodes int `json:"number_of_nodes"`
|
||||||
|
NumberOfDataNodes int `json:"number_of_data_nodes"`
|
||||||
|
ActivePrimaryShards int `json:"active_primary_shards"`
|
||||||
|
ActiveShards int `json:"active_shards"`
|
||||||
|
RelocatingShards int `json:"relocating_shards"`
|
||||||
|
InitializingShards int `json:"initializing_shards"`
|
||||||
|
UnassignedShards int `json:"unassigned_shards"`
|
||||||
|
Indices map[string]indexHealth `json:"indices"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexHealth struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
NumberOfShards int `json:"number_of_shards"`
|
||||||
|
NumberOfReplicas int `json:"number_of_replicas"`
|
||||||
|
ActivePrimaryShards int `json:"active_primary_shards"`
|
||||||
|
ActiveShards int `json:"active_shards"`
|
||||||
|
RelocatingShards int `json:"relocating_shards"`
|
||||||
|
InitializingShards int `json:"initializing_shards"`
|
||||||
|
UnassignedShards int `json:"unassigned_shards"`
|
||||||
|
}
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
# specify a list of one or more Elasticsearch servers
|
# specify a list of one or more Elasticsearch servers
|
||||||
servers = ["http://localhost:9200"]
|
servers = ["http://localhost:9200"]
|
||||||
|
@ -34,6 +61,9 @@ const sampleConfig = `
|
||||||
# set local to false when you want to read the indices stats from all nodes
|
# set local to false when you want to read the indices stats from all nodes
|
||||||
# within the cluster
|
# within the cluster
|
||||||
local = true
|
local = true
|
||||||
|
|
||||||
|
# set cluster_health to true when you want to also obtain cluster level stats
|
||||||
|
cluster_health = false
|
||||||
`
|
`
|
||||||
|
|
||||||
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
|
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
|
||||||
|
@ -41,6 +71,7 @@ const sampleConfig = `
|
||||||
type Elasticsearch struct {
|
type Elasticsearch struct {
|
||||||
Local bool
|
Local bool
|
||||||
Servers []string
|
Servers []string
|
||||||
|
ClusterHealth bool
|
||||||
client *http.Client
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,36 +100,30 @@ func (e *Elasticsearch) Gather(acc plugins.Accumulator) error {
|
||||||
} else {
|
} else {
|
||||||
url = serv + statsPath
|
url = serv + statsPath
|
||||||
}
|
}
|
||||||
if err := e.gatherUrl(url, acc); err != nil {
|
if err := e.gatherNodeStats(url, acc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if e.ClusterHealth {
|
||||||
|
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error {
|
func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) error {
|
||||||
r, err := e.client.Get(url)
|
nodeStats := &struct {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if r.StatusCode != http.StatusOK {
|
|
||||||
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
|
||||||
}
|
|
||||||
d := json.NewDecoder(r.Body)
|
|
||||||
esRes := &struct {
|
|
||||||
ClusterName string `json:"cluster_name"`
|
ClusterName string `json:"cluster_name"`
|
||||||
Nodes map[string]*node `json:"nodes"`
|
Nodes map[string]*node `json:"nodes"`
|
||||||
}{}
|
}{}
|
||||||
if err = d.Decode(esRes); err != nil {
|
if err := e.gatherData(url, nodeStats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for id, n := range nodeStats.Nodes {
|
||||||
for id, n := range esRes.Nodes {
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"node_id": id,
|
"node_id": id,
|
||||||
"node_host": n.Host,
|
"node_host": n.Host,
|
||||||
"node_name": n.Name,
|
"node_name": n.Name,
|
||||||
"cluster_name": esRes.ClusterName,
|
"cluster_name": nodeStats.ClusterName,
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range n.Attributes {
|
for k, v := range n.Attributes {
|
||||||
|
@ -124,7 +149,65 @@ func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) error {
|
||||||
|
clusterStats := &clusterHealth{}
|
||||||
|
if err := e.gatherData(url, clusterStats); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
measurementTime := time.Now()
|
||||||
|
clusterFields := map[string]interface{}{
|
||||||
|
"status": clusterStats.Status,
|
||||||
|
"timed_out": clusterStats.TimedOut,
|
||||||
|
"number_of_nodes": clusterStats.NumberOfNodes,
|
||||||
|
"number_of_data_nodes": clusterStats.NumberOfDataNodes,
|
||||||
|
"active_primary_shards": clusterStats.ActivePrimaryShards,
|
||||||
|
"active_shards": clusterStats.ActiveShards,
|
||||||
|
"relocating_shards": clusterStats.RelocatingShards,
|
||||||
|
"initializing_shards": clusterStats.InitializingShards,
|
||||||
|
"unassigned_shards": clusterStats.UnassignedShards,
|
||||||
|
}
|
||||||
|
acc.AddFields(
|
||||||
|
"cluster_health",
|
||||||
|
clusterFields,
|
||||||
|
map[string]string{"name": clusterStats.ClusterName},
|
||||||
|
measurementTime,
|
||||||
|
)
|
||||||
|
|
||||||
|
for name, health := range clusterStats.Indices {
|
||||||
|
indexFields := map[string]interface{}{
|
||||||
|
"status": health.Status,
|
||||||
|
"number_of_shards": health.NumberOfShards,
|
||||||
|
"number_of_replicas": health.NumberOfReplicas,
|
||||||
|
"active_primary_shards": health.ActivePrimaryShards,
|
||||||
|
"active_shards": health.ActiveShards,
|
||||||
|
"relocating_shards": health.RelocatingShards,
|
||||||
|
"initializing_shards": health.InitializingShards,
|
||||||
|
"unassigned_shards": health.UnassignedShards,
|
||||||
|
}
|
||||||
|
acc.AddFields(
|
||||||
|
"indices",
|
||||||
|
indexFields,
|
||||||
|
map[string]string{"index": name},
|
||||||
|
measurementTime,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
||||||
|
r, err := e.client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if r.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
||||||
|
}
|
||||||
|
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdb/telegraf/testutil"
|
"github.com/influxdb/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type transportMock struct {
|
type transportMock struct {
|
||||||
|
@ -70,3 +71,39 @@ func TestElasticsearch(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGatherClusterStats(t *testing.T) {
|
||||||
|
es := NewElasticsearch()
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
es.ClusterHealth = true
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, clusterResponse)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, es.Gather(&acc))
|
||||||
|
|
||||||
|
var clusterHealthTests = []struct {
|
||||||
|
measurement string
|
||||||
|
fields map[string]interface{}
|
||||||
|
tags map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"cluster_health",
|
||||||
|
clusterHealthExpected,
|
||||||
|
map[string]string{"name": "elasticsearch_telegraf"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"indices",
|
||||||
|
v1IndexExpected,
|
||||||
|
map[string]string{"index": "v1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"indices",
|
||||||
|
v2IndexExpected,
|
||||||
|
map[string]string{"index": "v2"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, exp := range clusterHealthTests {
|
||||||
|
assert.NoError(t, acc.ValidateTaggedFields(exp.measurement, exp.fields, exp.tags))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,76 @@
|
||||||
package elasticsearch
|
package elasticsearch
|
||||||
|
|
||||||
|
const clusterResponse = `
|
||||||
|
{
|
||||||
|
"cluster_name": "elasticsearch_telegraf",
|
||||||
|
"status": "green",
|
||||||
|
"timed_out": false,
|
||||||
|
"number_of_nodes": 3,
|
||||||
|
"number_of_data_nodes": 3,
|
||||||
|
"active_primary_shards": 5,
|
||||||
|
"active_shards": 15,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 0,
|
||||||
|
"indices": {
|
||||||
|
"v1": {
|
||||||
|
"status": "green",
|
||||||
|
"number_of_shards": 10,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"active_primary_shards": 10,
|
||||||
|
"active_shards": 20,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 0
|
||||||
|
},
|
||||||
|
"v2": {
|
||||||
|
"status": "red",
|
||||||
|
"number_of_shards": 10,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"active_primary_shards": 0,
|
||||||
|
"active_shards": 0,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 20
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
var clusterHealthExpected = map[string]interface{}{
|
||||||
|
"status": "green",
|
||||||
|
"timed_out": false,
|
||||||
|
"number_of_nodes": 3,
|
||||||
|
"number_of_data_nodes": 3,
|
||||||
|
"active_primary_shards": 5,
|
||||||
|
"active_shards": 15,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
var v1IndexExpected = map[string]interface{}{
|
||||||
|
"status": "green",
|
||||||
|
"number_of_shards": 10,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"active_primary_shards": 10,
|
||||||
|
"active_shards": 20,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
var v2IndexExpected = map[string]interface{}{
|
||||||
|
"status": "red",
|
||||||
|
"number_of_shards": 10,
|
||||||
|
"number_of_replicas": 1,
|
||||||
|
"active_primary_shards": 0,
|
||||||
|
"active_shards": 0,
|
||||||
|
"relocating_shards": 0,
|
||||||
|
"initializing_shards": 0,
|
||||||
|
"unassigned_shards": 20,
|
||||||
|
}
|
||||||
|
|
||||||
const statsResponse = `
|
const statsResponse = `
|
||||||
{
|
{
|
||||||
"cluster_name": "es-testcluster",
|
"cluster_name": "es-testcluster",
|
||||||
|
|
Loading…
Reference in New Issue