Add cluster health level configuration to elasticsearch input (#3269)

This commit is contained in:
Christian Meilke 2017-10-05 00:29:32 +02:00 committed by Daniel Nelson
parent ed9c2ccfa2
commit 4acee14f8a
4 changed files with 92 additions and 11 deletions

View File

@ -23,7 +23,13 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre
## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false
## Set cluster_stats to true when you want to obtain cluster stats from the
## Adjust cluster_health_level when you want to also obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node.
cluster_stats = false

View File

@ -3,17 +3,16 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"io/ioutil"
"net/http"
"regexp"
"strings"
"sync"
"time"
)
// mask for masking username/password from error messages
@ -94,6 +93,12 @@ const sampleConfig = `
## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false
## Adjust cluster_health_level when you want to also obtain detailed health stats
## The options are
## - indices (default)
## - cluster
# cluster_health_level = "indices"
## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node.
cluster_stats = false
@ -113,6 +118,7 @@ type Elasticsearch struct {
Servers []string
HttpTimeout internal.Duration
ClusterHealth bool
ClusterHealthLevel string
ClusterStats bool
SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLCert string `toml:"ssl_cert"` // Path to host cert file
@ -126,7 +132,8 @@ type Elasticsearch struct {
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{
HttpTimeout: internal.Duration{Duration: time.Second * 5},
HttpTimeout: internal.Duration{Duration: time.Second * 5},
ClusterHealthLevel: "indices",
}
}
@ -182,7 +189,10 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
if e.ClusterHealth {
url = s + "/_cluster/health?level=indices"
url = s + "/_cluster/health"
if e.ClusterHealthLevel != "" {
url = url + "?level=" + e.ClusterHealthLevel
}
if err := e.gatherClusterHealth(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return

View File

@ -93,10 +93,11 @@ func TestGatherNodeStats(t *testing.T) {
checkNodeStatsResult(t, &acc)
}
func TestGatherClusterHealth(t *testing.T) {
func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = ""
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
var acc testutil.Accumulator
@ -104,6 +105,56 @@ func TestGatherClusterHealth(t *testing.T) {
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "cluster"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v1IndexExpected,
map[string]string{"index": "v1"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices",
v2IndexExpected,
map[string]string{"index": "v2"})
}
func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true
es.ClusterHealthLevel = "indices"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponseWithIndices)
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
map[string]string{"name": "elasticsearch_telegraf"})
@ -185,7 +236,6 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
// ensure flag is clear so Cluster Stats would not be done
checkIsMaster(es, false, t)
checkNodeStatsResult(t, &acc)
}
func newElasticsearchWithClient() *Elasticsearch {

View File

@ -1,6 +1,21 @@
package elasticsearch
const clusterHealthResponse = `
{
"cluster_name": "elasticsearch_telegraf",
"status": "green",
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"active_primary_shards": 5,
"active_shards": 15,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0
}
`
const clusterHealthResponseWithIndices = `
{
"cluster_name": "elasticsearch_telegraf",
"status": "green",