diff --git a/plugins/inputs/elasticsearch/README.md b/plugins/inputs/elasticsearch/README.md index f3999dc30..5698cc7f0 100644 --- a/plugins/inputs/elasticsearch/README.md +++ b/plugins/inputs/elasticsearch/README.md @@ -33,6 +33,11 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre ## Master node. cluster_stats = false + ## node_stats is a list of sub-stats that you want to have gathered. Valid options + ## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http", + ## "breakers". Per default, all stats are gathered. + # node_stats = ["jvm", "http"] + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 589d0fe3e..f5ddef5fb 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -103,6 +103,11 @@ const sampleConfig = ` ## Master node. cluster_stats = false + ## node_stats is a list of sub-stats that you want to have gathered. Valid options + ## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http", + ## "breakers". Per default, all stats are gathered. + # node_stats = ["jvm", "http"] + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -120,6 +125,7 @@ type Elasticsearch struct { ClusterHealth bool ClusterHealthLevel string ClusterStats bool + NodeStats []string SSLCA string `toml:"ssl_ca"` // Path to CA file SSLCert string `toml:"ssl_cert"` // Path to host cert file SSLKey string `toml:"ssl_key"` // Path to cert key file @@ -165,12 +171,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { for _, serv := range e.Servers { go func(s string, acc telegraf.Accumulator) { defer wg.Done() - var url string - if e.Local { - url = s + statsPathLocal - } else { - url = s + statsPath - } + url := e.nodeStatsUrl(s) e.isMaster = false if e.ClusterStats { @@ -229,6 +230,22 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) { return client, nil } +func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string { + var url string + + if e.Local { + url = baseUrl + statsPathLocal + } else { + url = baseUrl + statsPath + } + + if len(e.NodeStats) == 0 { + return url + } + + return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ",")) +} + func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { nodeStats := &struct { ClusterName string `json:"cluster_name"` @@ -269,6 +286,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er now := time.Now() for p, s := range stats { + // if one of the individual node stats is not even in the + // original result + if s == nil { + continue + } f := jsonparser.JSONFlattener{} // parse Json, ignoring strings and bools err := f.FlattenJSON("", s) diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 41b578c79..1616bfeb2 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -13,6 +13,16 @@ import ( "github.com/stretchr/testify/require" ) +func defaultTags() map[string]string { + return map[string]string{ + "cluster_name": "es-testcluster", + "node_attribute_master": "true", + "node_id": "SDFsfSDFsdfFSDSDfSFDSDF", + "node_name": "test.host.com", + "node_host": "test", + } +} + type transportMock struct { statusCode int body string @@ -45,15 +55,9 @@ func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) { assert.Fail(t, msg) } } -func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) { - tags := map[string]string{ - "cluster_name": "es-testcluster", - "node_attribute_master": "true", - "node_id": "SDFsfSDFsdfFSDSDfSFDSDF", - "node_name": "test.host.com", - "node_host": "test", - } +func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) { + tags := defaultTags() acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags) @@ -79,6 +83,31 @@ func TestGather(t *testing.T) { checkNodeStatsResult(t, &acc) } +func TestGatherIndividualStats(t *testing.T) { + es := newElasticsearchWithClient() + es.Servers = []string{"http://example.com:9200"} + es.NodeStats = []string{"jvm", "process"} + es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponseJVMProcess) + + var acc testutil.Accumulator + if err := acc.GatherError(es.Gather); err != nil { + t.Fatal(err) + } + + checkIsMaster(es, false, t) + + tags := defaultTags() + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags) + acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags) + acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags) +} + func TestGatherNodeStats(t *testing.T) { es := newElasticsearchWithClient() es.Servers = []string{"http://example.com:9200"} diff --git a/plugins/inputs/elasticsearch/testdata_test.go b/plugins/inputs/elasticsearch/testdata_test.go index d43a3f9c5..6e487ff3f 100644 --- a/plugins/inputs/elasticsearch/testdata_test.go +++ b/plugins/inputs/elasticsearch/testdata_test.go @@ -504,6 +504,100 @@ const nodeStatsResponse = ` } ` +const nodeStatsResponseJVMProcess = ` +{ + "cluster_name": "es-testcluster", + "nodes": { + "SDFsfSDFsdfFSDSDfSFDSDF": { + "timestamp": 1436365550135, + "name": "test.host.com", + "transport_address": "inet[/127.0.0.1:9300]", + "host": "test", + "ip": [ + "inet[/127.0.0.1:9300]", + "NONE" + ], + "attributes": { + "master": "true" + }, + "process": { + "timestamp": 1436460392945, + "open_file_descriptors": 160, + "cpu": { + "percent": 2, + "sys_in_millis": 1870, + "user_in_millis": 13610, + "total_in_millis": 15480 + }, + "mem": { + "total_virtual_in_bytes": 4747890688 + } + }, + "jvm": { + "timestamp": 1436460392945, + "uptime_in_millis": 202245, + "mem": { + "heap_used_in_bytes": 52709568, + "heap_used_percent": 5, + "heap_committed_in_bytes": 259522560, + "heap_max_in_bytes": 1038876672, + "non_heap_used_in_bytes": 39634576, + "non_heap_committed_in_bytes": 40841216, + "pools": { + "young": { + "used_in_bytes": 32685760, + "max_in_bytes": 279183360, + "peak_used_in_bytes": 71630848, + "peak_max_in_bytes": 279183360 + }, + "survivor": { + "used_in_bytes": 8912880, + "max_in_bytes": 34865152, + "peak_used_in_bytes": 8912888, + "peak_max_in_bytes": 34865152 + }, + "old": { + "used_in_bytes": 11110928, + "max_in_bytes": 724828160, + "peak_used_in_bytes": 14354608, + "peak_max_in_bytes": 724828160 + } + } + }, + "threads": { + "count": 44, + "peak_count": 45 + }, + "gc": { + "collectors": { + "young": { + "collection_count": 2, + "collection_time_in_millis": 98 + }, + "old": { + "collection_count": 1, + "collection_time_in_millis": 24 + } + } + }, + "buffer_pools": { + "direct": { + "count": 40, + "used_in_bytes": 6304239, + "total_capacity_in_bytes": 6304239 + }, + "mapped": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + } + } + } + } +} +` + var nodestatsIndicesExpected = map[string]interface{}{ "id_cache_memory_size_in_bytes": float64(0), "completion_size_in_bytes": float64(0),