Add cluster health level configuration to elasticsearch input (#3269)

This commit is contained in:
Christian Meilke 2017-10-05 00:29:32 +02:00 committed by Daniel Nelson
parent a4ea4c7a25
commit 0bb32570ba
4 changed files with 92 additions and 11 deletions

View File

@ -23,7 +23,13 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre
## Set cluster_health to true when you want to also obtain cluster health stats ## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false cluster_health = false
## Set cluster_stats to true when you want to obtain cluster stats from the ## Adjust cluster_health_level when you want to also obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node. ## Master node.
cluster_stats = false cluster_stats = false

View File

@ -3,17 +3,16 @@ package elasticsearch
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"regexp"
"sync"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"io/ioutil" "io/ioutil"
"net/http"
"regexp"
"strings" "strings"
"sync"
"time"
) )
// mask for masking username/password from error messages // mask for masking username/password from error messages
@ -94,6 +93,12 @@ const sampleConfig = `
## Set cluster_health to true when you want to also obtain cluster health stats ## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false cluster_health = false
## Adjust cluster_health_level when you want to also obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
## Set cluster_stats to true when you want to also obtain cluster stats from the ## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node. ## Master node.
cluster_stats = false cluster_stats = false
@ -113,6 +118,7 @@ type Elasticsearch struct {
Servers []string Servers []string
HttpTimeout internal.Duration HttpTimeout internal.Duration
ClusterHealth bool ClusterHealth bool
ClusterHealthLevel string
ClusterStats bool ClusterStats bool
SSLCA string `toml:"ssl_ca"` // Path to CA file SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file SSLCert string `toml:"ssl_cert"` // Path to host cert file
@ -127,6 +133,7 @@ type Elasticsearch struct {
func NewElasticsearch() *Elasticsearch { func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{ return &Elasticsearch{
HttpTimeout: internal.Duration{Duration: time.Second * 5}, HttpTimeout: internal.Duration{Duration: time.Second * 5},
ClusterHealthLevel: "indices",
} }
} }
@ -182,7 +189,10 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
} }
if e.ClusterHealth { if e.ClusterHealth {
url = s + "/_cluster/health?level=indices" url = s + "/_cluster/health"
if e.ClusterHealthLevel != "" {
url = url + "?level=" + e.ClusterHealthLevel
}
if err := e.gatherClusterHealth(url, acc); err != nil { if err := e.gatherClusterHealth(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return return

View File

@ -93,10 +93,11 @@ func TestGatherNodeStats(t *testing.T) {
checkNodeStatsResult(t, &acc) checkNodeStatsResult(t, &acc)
} }
func TestGatherClusterHealth(t *testing.T) { func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
es := newElasticsearchWithClient() es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"} es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true es.ClusterHealth = true
es.ClusterHealthLevel = ""
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse) es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
var acc testutil.Accumulator var acc testutil.Accumulator
@ -104,6 +105,56 @@ func TestGatherClusterHealth(t *testing.T) {
checkIsMaster(es, false, t) checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "cluster"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "indices"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponseWithIndices)
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected, clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"}) map[string]string{"name": "elasticsearch_telegraf"})
@ -185,7 +236,6 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
// ensure flag is clear so Cluster Stats would not be done // ensure flag is clear so Cluster Stats would not be done
checkIsMaster(es, false, t) checkIsMaster(es, false, t)
checkNodeStatsResult(t, &acc) checkNodeStatsResult(t, &acc)
} }
func newElasticsearchWithClient() *Elasticsearch { func newElasticsearchWithClient() *Elasticsearch {

View File

@ -1,6 +1,21 @@
package elasticsearch package elasticsearch
const clusterHealthResponse = ` const clusterHealthResponse = `
{
"cluster_name": "elasticsearch_telegraf",
"status": "green",
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"active_primary_shards": 5,
"active_shards": 15,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
}
`
const clusterHealthResponseWithIndices = `
{ {
"cluster_name": "elasticsearch_telegraf", "cluster_name": "elasticsearch_telegraf",
"status": "green", "status": "green",