Add ability to limit node stats in elasticsearch input (#3304)
This commit is contained in:
parent
1cb21d2bda
commit
4783b872ea
|
@ -33,6 +33,11 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre
|
||||||
## Master node.
|
## Master node.
|
||||||
cluster_stats = false
|
cluster_stats = false
|
||||||
|
|
||||||
|
## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
||||||
|
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
||||||
|
## "breakers". Per default, all stats are gathered.
|
||||||
|
# node_stats = ["jvm", "http"]
|
||||||
|
|
||||||
## Optional SSL Config
|
## Optional SSL Config
|
||||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
|
|
@ -103,6 +103,11 @@ const sampleConfig = `
|
||||||
## Master node.
|
## Master node.
|
||||||
cluster_stats = false
|
cluster_stats = false
|
||||||
|
|
||||||
|
## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
||||||
|
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
||||||
|
## "breakers". Per default, all stats are gathered.
|
||||||
|
# node_stats = ["jvm", "http"]
|
||||||
|
|
||||||
## Optional SSL Config
|
## Optional SSL Config
|
||||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
@ -120,6 +125,7 @@ type Elasticsearch struct {
|
||||||
ClusterHealth bool
|
ClusterHealth bool
|
||||||
ClusterHealthLevel string
|
ClusterHealthLevel string
|
||||||
ClusterStats bool
|
ClusterStats bool
|
||||||
|
NodeStats []string
|
||||||
SSLCA string `toml:"ssl_ca"` // Path to CA file
|
SSLCA string `toml:"ssl_ca"` // Path to CA file
|
||||||
SSLCert string `toml:"ssl_cert"` // Path to host cert file
|
SSLCert string `toml:"ssl_cert"` // Path to host cert file
|
||||||
SSLKey string `toml:"ssl_key"` // Path to cert key file
|
SSLKey string `toml:"ssl_key"` // Path to cert key file
|
||||||
|
@ -165,12 +171,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||||
for _, serv := range e.Servers {
|
for _, serv := range e.Servers {
|
||||||
go func(s string, acc telegraf.Accumulator) {
|
go func(s string, acc telegraf.Accumulator) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var url string
|
url := e.nodeStatsUrl(s)
|
||||||
if e.Local {
|
|
||||||
url = s + statsPathLocal
|
|
||||||
} else {
|
|
||||||
url = s + statsPath
|
|
||||||
}
|
|
||||||
e.isMaster = false
|
e.isMaster = false
|
||||||
|
|
||||||
if e.ClusterStats {
|
if e.ClusterStats {
|
||||||
|
@ -229,6 +230,22 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
|
||||||
|
var url string
|
||||||
|
|
||||||
|
if e.Local {
|
||||||
|
url = baseUrl + statsPathLocal
|
||||||
|
} else {
|
||||||
|
url = baseUrl + statsPath
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(e.NodeStats) == 0 {
|
||||||
|
return url
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
||||||
nodeStats := &struct {
|
nodeStats := &struct {
|
||||||
ClusterName string `json:"cluster_name"`
|
ClusterName string `json:"cluster_name"`
|
||||||
|
@ -269,6 +286,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for p, s := range stats {
|
for p, s := range stats {
|
||||||
|
// if one of the individual node stats is not even in the
|
||||||
|
// original result
|
||||||
|
if s == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
f := jsonparser.JSONFlattener{}
|
f := jsonparser.JSONFlattener{}
|
||||||
// parse Json, ignoring strings and bools
|
// parse Json, ignoring strings and bools
|
||||||
err := f.FlattenJSON("", s)
|
err := f.FlattenJSON("", s)
|
||||||
|
|
|
@ -13,6 +13,16 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func defaultTags() map[string]string {
|
||||||
|
return map[string]string{
|
||||||
|
"cluster_name": "es-testcluster",
|
||||||
|
"node_attribute_master": "true",
|
||||||
|
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
|
||||||
|
"node_name": "test.host.com",
|
||||||
|
"node_host": "test",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type transportMock struct {
|
type transportMock struct {
|
||||||
statusCode int
|
statusCode int
|
||||||
body string
|
body string
|
||||||
|
@ -45,15 +55,9 @@ func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
|
||||||
assert.Fail(t, msg)
|
assert.Fail(t, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
|
|
||||||
tags := map[string]string{
|
|
||||||
"cluster_name": "es-testcluster",
|
|
||||||
"node_attribute_master": "true",
|
|
||||||
"node_id": "SDFsfSDFsdfFSDSDfSFDSDF",
|
|
||||||
"node_name": "test.host.com",
|
|
||||||
"node_host": "test",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
|
||||||
|
tags := defaultTags()
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
||||||
|
@ -79,6 +83,31 @@ func TestGather(t *testing.T) {
|
||||||
checkNodeStatsResult(t, &acc)
|
checkNodeStatsResult(t, &acc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGatherIndividualStats(t *testing.T) {
|
||||||
|
es := newElasticsearchWithClient()
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
es.NodeStats = []string{"jvm", "process"}
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponseJVMProcess)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
if err := acc.GatherError(es.Gather); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkIsMaster(es, false, t)
|
||||||
|
|
||||||
|
tags := defaultTags()
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
||||||
|
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
||||||
|
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
|
||||||
|
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGatherNodeStats(t *testing.T) {
|
func TestGatherNodeStats(t *testing.T) {
|
||||||
es := newElasticsearchWithClient()
|
es := newElasticsearchWithClient()
|
||||||
es.Servers = []string{"http://example.com:9200"}
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
|
|
@ -504,6 +504,100 @@ const nodeStatsResponse = `
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
|
|
||||||
|
const nodeStatsResponseJVMProcess = `
|
||||||
|
{
|
||||||
|
"cluster_name": "es-testcluster",
|
||||||
|
"nodes": {
|
||||||
|
"SDFsfSDFsdfFSDSDfSFDSDF": {
|
||||||
|
"timestamp": 1436365550135,
|
||||||
|
"name": "test.host.com",
|
||||||
|
"transport_address": "inet[/127.0.0.1:9300]",
|
||||||
|
"host": "test",
|
||||||
|
"ip": [
|
||||||
|
"inet[/127.0.0.1:9300]",
|
||||||
|
"NONE"
|
||||||
|
],
|
||||||
|
"attributes": {
|
||||||
|
"master": "true"
|
||||||
|
},
|
||||||
|
"process": {
|
||||||
|
"timestamp": 1436460392945,
|
||||||
|
"open_file_descriptors": 160,
|
||||||
|
"cpu": {
|
||||||
|
"percent": 2,
|
||||||
|
"sys_in_millis": 1870,
|
||||||
|
"user_in_millis": 13610,
|
||||||
|
"total_in_millis": 15480
|
||||||
|
},
|
||||||
|
"mem": {
|
||||||
|
"total_virtual_in_bytes": 4747890688
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"jvm": {
|
||||||
|
"timestamp": 1436460392945,
|
||||||
|
"uptime_in_millis": 202245,
|
||||||
|
"mem": {
|
||||||
|
"heap_used_in_bytes": 52709568,
|
||||||
|
"heap_used_percent": 5,
|
||||||
|
"heap_committed_in_bytes": 259522560,
|
||||||
|
"heap_max_in_bytes": 1038876672,
|
||||||
|
"non_heap_used_in_bytes": 39634576,
|
||||||
|
"non_heap_committed_in_bytes": 40841216,
|
||||||
|
"pools": {
|
||||||
|
"young": {
|
||||||
|
"used_in_bytes": 32685760,
|
||||||
|
"max_in_bytes": 279183360,
|
||||||
|
"peak_used_in_bytes": 71630848,
|
||||||
|
"peak_max_in_bytes": 279183360
|
||||||
|
},
|
||||||
|
"survivor": {
|
||||||
|
"used_in_bytes": 8912880,
|
||||||
|
"max_in_bytes": 34865152,
|
||||||
|
"peak_used_in_bytes": 8912888,
|
||||||
|
"peak_max_in_bytes": 34865152
|
||||||
|
},
|
||||||
|
"old": {
|
||||||
|
"used_in_bytes": 11110928,
|
||||||
|
"max_in_bytes": 724828160,
|
||||||
|
"peak_used_in_bytes": 14354608,
|
||||||
|
"peak_max_in_bytes": 724828160
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"threads": {
|
||||||
|
"count": 44,
|
||||||
|
"peak_count": 45
|
||||||
|
},
|
||||||
|
"gc": {
|
||||||
|
"collectors": {
|
||||||
|
"young": {
|
||||||
|
"collection_count": 2,
|
||||||
|
"collection_time_in_millis": 98
|
||||||
|
},
|
||||||
|
"old": {
|
||||||
|
"collection_count": 1,
|
||||||
|
"collection_time_in_millis": 24
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"buffer_pools": {
|
||||||
|
"direct": {
|
||||||
|
"count": 40,
|
||||||
|
"used_in_bytes": 6304239,
|
||||||
|
"total_capacity_in_bytes": 6304239
|
||||||
|
},
|
||||||
|
"mapped": {
|
||||||
|
"count": 0,
|
||||||
|
"used_in_bytes": 0,
|
||||||
|
"total_capacity_in_bytes": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
var nodestatsIndicesExpected = map[string]interface{}{
|
var nodestatsIndicesExpected = map[string]interface{}{
|
||||||
"id_cache_memory_size_in_bytes": float64(0),
|
"id_cache_memory_size_in_bytes": float64(0),
|
||||||
"completion_size_in_bytes": float64(0),
|
"completion_size_in_bytes": float64(0),
|
||||||
|
|
Loading…
Reference in New Issue