Add Indices stats to elasticsearch input (#6060)
This commit is contained in:
parent
364bf38b4a
commit
78d3b86581
|
@ -2267,6 +2267,12 @@
|
|||
# ## Only gather cluster_stats from the master node. To work this require local = true
|
||||
# cluster_stats_only_from_master = true
|
||||
#
|
||||
# ## Indices to collect; can be one or more indices names or _all
|
||||
# indices_include = ["_all"]
|
||||
#
|
||||
# ## One of "shards", "cluster", "indices"
|
||||
# indices_level = "shards"
|
||||
#
|
||||
# ## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
||||
# ## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
||||
# ## "breaker". Per default, all stats are gathered.
|
||||
|
|
|
@ -1,15 +1,32 @@
|
|||
# Elasticsearch Input Plugin
|
||||
|
||||
The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
|
||||
[node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
|
||||
and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
|
||||
or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics.
|
||||
[Node Stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
|
||||
and optionally
|
||||
[Cluster-Health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
|
||||
metrics.
|
||||
|
||||
In addition, the following optional queries are only made by the master node:
|
||||
[Cluster Stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html)
|
||||
[Indices Stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html)
|
||||
[Shard Stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html)
|
||||
|
||||
Specific Elasticsearch endpoints that are queried:
|
||||
- Node: either /_nodes/stats or /_nodes/_local/stats depending on 'local' configuration setting
|
||||
- Cluster Heath: /_cluster/health?level=indices
|
||||
- Cluster Stats: /_cluster/stats
|
||||
- Indices Stats: /_all/_stats
|
||||
- Shard Stats: /_all/_stats?level=shards
|
||||
|
||||
Note that specific statistics information can change between Elassticsearch versions. In general, this plugin attempts to stay as version-generic as possible by tagging high-level categories only and using a generic json parser to make unique field names of whatever statistics names are provided at the mid-low level.
|
||||
|
||||
### Configuration
|
||||
|
||||
```toml
|
||||
[[inputs.elasticsearch]]
|
||||
## specify a list of one or more Elasticsearch servers
|
||||
## you can add username and password to your url to use basic authentication:
|
||||
## servers = ["http://user:pass@localhost:9200"]
|
||||
servers = ["http://localhost:9200"]
|
||||
|
||||
## Timeout for HTTP requests to the elastic search server(s)
|
||||
|
@ -20,21 +37,28 @@ or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/curre
|
|||
## of the cluster.
|
||||
local = true
|
||||
|
||||
## Set cluster_health to true when you want to also obtain cluster health stats
|
||||
## Set cluster_health to true when you want to obtain cluster health stats
|
||||
cluster_health = false
|
||||
|
||||
## Adjust cluster_health_level when you want to also obtain detailed health stats
|
||||
## Adjust cluster_health_level when you want to obtain detailed health stats
|
||||
## The options are
|
||||
## - indices (default)
|
||||
## - cluster
|
||||
# cluster_health_level = "indices"
|
||||
|
||||
## Set cluster_stats to true when you want to also obtain cluster stats.
|
||||
## Set cluster_stats to true when you want to obtain cluster stats.
|
||||
cluster_stats = false
|
||||
|
||||
## Only gather cluster_stats from the master node. To work this require local = true
|
||||
cluster_stats_only_from_master = true
|
||||
|
||||
## Indices to collect; can be one or more indices names or _all
|
||||
indices_include = ["_all"]
|
||||
|
||||
## One of "shards", "cluster", "indices"
|
||||
## Currently only "shards" is implemented
|
||||
indices_level = "shards"
|
||||
|
||||
## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
||||
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
||||
## "breaker". Per default, all stats are gathered.
|
||||
|
@ -618,3 +642,211 @@ Emitted when the appropriate `node_stats` options are set.
|
|||
- write_queue (float)
|
||||
- write_rejected (float)
|
||||
- write_threads (float)
|
||||
|
||||
Emitted when the appropriate `indices_stats` options are set.
|
||||
|
||||
- elasticsearch_indices_stats_(primaries|total)
|
||||
- tags:
|
||||
- index_name
|
||||
- fields:
|
||||
- completion_size_in_bytes (float)
|
||||
- docs_count (float)
|
||||
- docs_deleted (float)
|
||||
- fielddata_evictions (float)
|
||||
- fielddata_memory_size_in_bytes (float)
|
||||
- flush_periodic (float)
|
||||
- flush_total (float)
|
||||
- flush_total_time_in_millis (float)
|
||||
- get_current (float)
|
||||
- get_exists_time_in_millis (float)
|
||||
- get_exists_total (float)
|
||||
- get_missing_time_in_millis (float)
|
||||
- get_missing_total (float)
|
||||
- get_time_in_millis (float)
|
||||
- get_total (float)
|
||||
- indexing_delete_current (float)
|
||||
- indexing_delete_time_in_millis (float)
|
||||
- indexing_delete_total (float)
|
||||
- indexing_index_current (float)
|
||||
- indexing_index_failed (float)
|
||||
- indexing_index_time_in_millis (float)
|
||||
- indexing_index_total (float)
|
||||
- indexing_is_throttled (float)
|
||||
- indexing_noop_update_total (float)
|
||||
- indexing_throttle_time_in_millis (float)
|
||||
- merges_current (float)
|
||||
- merges_current_docs (float)
|
||||
- merges_current_size_in_bytes (float)
|
||||
- merges_total (float)
|
||||
- merges_total_auto_throttle_in_bytes (float)
|
||||
- merges_total_docs (float)
|
||||
- merges_total_size_in_bytes (float)
|
||||
- merges_total_stopped_time_in_millis (float)
|
||||
- merges_total_throttled_time_in_millis (float)
|
||||
- merges_total_time_in_millis (float)
|
||||
- query_cache_cache_count (float)
|
||||
- query_cache_cache_size (float)
|
||||
- query_cache_evictions (float)
|
||||
- query_cache_hit_count (float)
|
||||
- query_cache_memory_size_in_bytes (float)
|
||||
- query_cache_miss_count (float)
|
||||
- query_cache_total_count (float)
|
||||
- recovery_current_as_source (float)
|
||||
- recovery_current_as_target (float)
|
||||
- recovery_throttle_time_in_millis (float)
|
||||
- refresh_external_total (float)
|
||||
- refresh_external_total_time_in_millis (float)
|
||||
- refresh_listeners (float)
|
||||
- refresh_total (float)
|
||||
- refresh_total_time_in_millis (float)
|
||||
- request_cache_evictions (float)
|
||||
- request_cache_hit_count (float)
|
||||
- request_cache_memory_size_in_bytes (float)
|
||||
- request_cache_miss_count (float)
|
||||
- search_fetch_current (float)
|
||||
- search_fetch_time_in_millis (float)
|
||||
- search_fetch_total (float)
|
||||
- search_open_contexts (float)
|
||||
- search_query_current (float)
|
||||
- search_query_time_in_millis (float)
|
||||
- search_query_total (float)
|
||||
- search_scroll_current (float)
|
||||
- search_scroll_time_in_millis (float)
|
||||
- search_scroll_total (float)
|
||||
- search_suggest_current (float)
|
||||
- search_suggest_time_in_millis (float)
|
||||
- search_suggest_total (float)
|
||||
- segments_count (float)
|
||||
- segments_doc_values_memory_in_bytes (float)
|
||||
- segments_fixed_bit_set_memory_in_bytes (float)
|
||||
- segments_index_writer_memory_in_bytes (float)
|
||||
- segments_max_unsafe_auto_id_timestamp (float)
|
||||
- segments_memory_in_bytes (float)
|
||||
- segments_norms_memory_in_bytes (float)
|
||||
- segments_points_memory_in_bytes (float)
|
||||
- segments_stored_fields_memory_in_bytes (float)
|
||||
- segments_term_vectors_memory_in_bytes (float)
|
||||
- segments_terms_memory_in_bytes (float)
|
||||
- segments_version_map_memory_in_bytes (float)
|
||||
- store_size_in_bytes (float)
|
||||
- translog_earliest_last_modified_age (float)
|
||||
- translog_operations (float)
|
||||
- translog_size_in_bytes (float)
|
||||
- translog_uncommitted_operations (float)
|
||||
- translog_uncommitted_size_in_bytes (float)
|
||||
- warmer_current (float)
|
||||
- warmer_total (float)
|
||||
- warmer_total_time_in_millis (float)
|
||||
|
||||
Emitted when the appropriate `shards_stats` options are set.
|
||||
|
||||
- elasticsearch_indices_stats_shards_total
|
||||
- fields:
|
||||
- failed (float)
|
||||
- successful (float)
|
||||
- total (float)
|
||||
|
||||
- elasticsearch_indices_stats_shards
|
||||
- tags:
|
||||
- index_name
|
||||
- node_name
|
||||
- shard_name
|
||||
- type
|
||||
- fields:
|
||||
- commit_generation (float)
|
||||
- commit_num_docs (float)
|
||||
- completion_size_in_bytes (float)
|
||||
- docs_count (float)
|
||||
- docs_deleted (float)
|
||||
- fielddata_evictions (float)
|
||||
- fielddata_memory_size_in_bytes (float)
|
||||
- flush_periodic (float)
|
||||
- flush_total (float)
|
||||
- flush_total_time_in_millis (float)
|
||||
- get_current (float)
|
||||
- get_exists_time_in_millis (float)
|
||||
- get_exists_total (float)
|
||||
- get_missing_time_in_millis (float)
|
||||
- get_missing_total (float)
|
||||
- get_time_in_millis (float)
|
||||
- get_total (float)
|
||||
- indexing_delete_current (float)
|
||||
- indexing_delete_time_in_millis (float)
|
||||
- indexing_delete_total (float)
|
||||
- indexing_index_current (float)
|
||||
- indexing_index_failed (float)
|
||||
- indexing_index_time_in_millis (float)
|
||||
- indexing_index_total (float)
|
||||
- indexing_is_throttled (bool)
|
||||
- indexing_noop_update_total (float)
|
||||
- indexing_throttle_time_in_millis (float)
|
||||
- merges_current (float)
|
||||
- merges_current_docs (float)
|
||||
- merges_current_size_in_bytes (float)
|
||||
- merges_total (float)
|
||||
- merges_total_auto_throttle_in_bytes (float)
|
||||
- merges_total_docs (float)
|
||||
- merges_total_size_in_bytes (float)
|
||||
- merges_total_stopped_time_in_millis (float)
|
||||
- merges_total_throttled_time_in_millis (float)
|
||||
- merges_total_time_in_millis (float)
|
||||
- query_cache_cache_count (float)
|
||||
- query_cache_cache_size (float)
|
||||
- query_cache_evictions (float)
|
||||
- query_cache_hit_count (float)
|
||||
- query_cache_memory_size_in_bytes (float)
|
||||
- query_cache_miss_count (float)
|
||||
- query_cache_total_count (float)
|
||||
- recovery_current_as_source (float)
|
||||
- recovery_current_as_target (float)
|
||||
- recovery_throttle_time_in_millis (float)
|
||||
- refresh_external_total (float)
|
||||
- refresh_external_total_time_in_millis (float)
|
||||
- refresh_listeners (float)
|
||||
- refresh_total (float)
|
||||
- refresh_total_time_in_millis (float)
|
||||
- request_cache_evictions (float)
|
||||
- request_cache_hit_count (float)
|
||||
- request_cache_memory_size_in_bytes (float)
|
||||
- request_cache_miss_count (float)
|
||||
- retention_leases_primary_term (float)
|
||||
- retention_leases_version (float)
|
||||
- routing_state (int) (UNASSIGNED = 1, INITIALIZING = 2, STARTED = 3, RELOCATING = 4, other = 0)
|
||||
- search_fetch_current (float)
|
||||
- search_fetch_time_in_millis (float)
|
||||
- search_fetch_total (float)
|
||||
- search_open_contexts (float)
|
||||
- search_query_current (float)
|
||||
- search_query_time_in_millis (float)
|
||||
- search_query_total (float)
|
||||
- search_scroll_current (float)
|
||||
- search_scroll_time_in_millis (float)
|
||||
- search_scroll_total (float)
|
||||
- search_suggest_current (float)
|
||||
- search_suggest_time_in_millis (float)
|
||||
- search_suggest_total (float)
|
||||
- segments_count (float)
|
||||
- segments_doc_values_memory_in_bytes (float)
|
||||
- segments_fixed_bit_set_memory_in_bytes (float)
|
||||
- segments_index_writer_memory_in_bytes (float)
|
||||
- segments_max_unsafe_auto_id_timestamp (float)
|
||||
- segments_memory_in_bytes (float)
|
||||
- segments_norms_memory_in_bytes (float)
|
||||
- segments_points_memory_in_bytes (float)
|
||||
- segments_stored_fields_memory_in_bytes (float)
|
||||
- segments_term_vectors_memory_in_bytes (float)
|
||||
- segments_terms_memory_in_bytes (float)
|
||||
- segments_version_map_memory_in_bytes (float)
|
||||
- seq_no_global_checkpoint (float)
|
||||
- seq_no_local_checkpoint (float)
|
||||
- seq_no_max_seq_no (float)
|
||||
- shard_path_is_custom_data_path (bool)
|
||||
- store_size_in_bytes (float)
|
||||
- translog_earliest_last_modified_age (float)
|
||||
- translog_operations (float)
|
||||
- translog_size_in_bytes (float)
|
||||
- translog_uncommitted_operations (float)
|
||||
- translog_uncommitted_size_in_bytes (float)
|
||||
- warmer_current (float)
|
||||
- warmer_total (float)
|
||||
- warmer_total_time_in_millis (float)
|
|
@ -79,10 +79,10 @@ type clusterStats struct {
|
|||
Nodes interface{} `json:"nodes"`
|
||||
}
|
||||
|
||||
type catMaster struct {
|
||||
NodeID string `json:"id"`
|
||||
NodeIP string `json:"ip"`
|
||||
NodeName string `json:"node"`
|
||||
type indexStat struct {
|
||||
Primaries interface{} `json:"primaries"`
|
||||
Total interface{} `json:"total"`
|
||||
Shards map[string][]interface{} `json:"shards"`
|
||||
}
|
||||
|
||||
const sampleConfig = `
|
||||
|
@ -114,6 +114,12 @@ const sampleConfig = `
|
|||
## Only gather cluster_stats from the master node. To work this require local = true
|
||||
cluster_stats_only_from_master = true
|
||||
|
||||
## Indices to collect; can be one or more indices names or _all
|
||||
indices_include = ["_all"]
|
||||
|
||||
## One of "shards", "cluster", "indices"
|
||||
indices_level = "shards"
|
||||
|
||||
## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
||||
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
||||
## "breaker". Per default, all stats are gathered.
|
||||
|
@ -134,14 +140,16 @@ const sampleConfig = `
|
|||
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
|
||||
// servers.
|
||||
type Elasticsearch struct {
|
||||
Local bool
|
||||
Servers []string
|
||||
HttpTimeout internal.Duration
|
||||
ClusterHealth bool
|
||||
ClusterHealthLevel string
|
||||
ClusterStats bool
|
||||
ClusterStatsOnlyFromMaster bool
|
||||
NodeStats []string
|
||||
Local bool `toml:"local"`
|
||||
Servers []string `toml:"servers"`
|
||||
HTTPTimeout internal.Duration `toml:"http_timeout"`
|
||||
ClusterHealth bool `toml:"cluster_health"`
|
||||
ClusterHealthLevel string `toml:"cluster_health_level"`
|
||||
ClusterStats bool `toml:"cluster_stats"`
|
||||
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
|
||||
IndicesInclude []string `toml:"indices_include"`
|
||||
IndicesLevel string `toml:"indices_level"`
|
||||
NodeStats []string `toml:"node_stats"`
|
||||
Username string `toml:"username"`
|
||||
Password string `toml:"password"`
|
||||
tls.ClientConfig
|
||||
|
@ -162,7 +170,7 @@ func (i serverInfo) isMaster() bool {
|
|||
// NewElasticsearch return a new instance of Elasticsearch
|
||||
func NewElasticsearch() *Elasticsearch {
|
||||
return &Elasticsearch{
|
||||
HttpTimeout: internal.Duration{Duration: time.Second * 5},
|
||||
HTTPTimeout: internal.Duration{Duration: time.Second * 5},
|
||||
ClusterStatsOnlyFromMaster: true,
|
||||
ClusterHealthLevel: "indices",
|
||||
}
|
||||
|
@ -181,6 +189,21 @@ func mapHealthStatusToCode(s string) int {
|
|||
return 0
|
||||
}
|
||||
|
||||
// perform shard status mapping
|
||||
func mapShardStatusToCode(s string) int {
|
||||
switch strings.ToUpper(s) {
|
||||
case "UNASSIGNED":
|
||||
return 1
|
||||
case "INITIALIZING":
|
||||
return 2
|
||||
case "STARTED":
|
||||
return 3
|
||||
case "RELOCATING":
|
||||
return 4
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// SampleConfig returns sample configuration for this plugin.
|
||||
func (e *Elasticsearch) SampleConfig() string {
|
||||
return sampleConfig
|
||||
|
@ -195,7 +218,7 @@ func (e *Elasticsearch) Description() string {
|
|||
// Accumulator.
|
||||
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||
if e.client == nil {
|
||||
client, err := e.createHttpClient()
|
||||
client, err := e.createHTTPClient()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -203,7 +226,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
|||
e.client = client
|
||||
}
|
||||
|
||||
if e.ClusterStats {
|
||||
if e.ClusterStats || len(e.IndicesInclude) > 0 || len(e.IndicesLevel) > 0 {
|
||||
var wgC sync.WaitGroup
|
||||
wgC.Add(len(e.Servers))
|
||||
|
||||
|
@ -243,7 +266,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
|||
for _, serv := range e.Servers {
|
||||
go func(s string, acc telegraf.Accumulator) {
|
||||
defer wg.Done()
|
||||
url := e.nodeStatsUrl(s)
|
||||
url := e.nodeStatsURL(s)
|
||||
|
||||
// Always gather node stats
|
||||
if err := e.gatherNodeStats(url, acc); err != nil {
|
||||
|
@ -268,6 +291,20 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
|||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(e.IndicesInclude) > 0 && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
|
||||
if e.IndicesLevel != "shards" {
|
||||
if err := e.gatherIndicesStats(s+"/"+strings.Join(e.IndicesInclude, ",")+"/_stats", acc); err != nil {
|
||||
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := e.gatherIndicesStats(s+"/"+strings.Join(e.IndicesInclude, ",")+"/_stats?level=shards", acc); err != nil {
|
||||
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(serv, acc)
|
||||
}
|
||||
|
||||
|
@ -275,30 +312,30 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
|
||||
func (e *Elasticsearch) createHTTPClient() (*http.Client, error) {
|
||||
tlsCfg, err := e.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr := &http.Transport{
|
||||
ResponseHeaderTimeout: e.HttpTimeout.Duration,
|
||||
ResponseHeaderTimeout: e.HTTPTimeout.Duration,
|
||||
TLSClientConfig: tlsCfg,
|
||||
}
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: e.HttpTimeout.Duration,
|
||||
Timeout: e.HTTPTimeout.Duration,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
|
||||
func (e *Elasticsearch) nodeStatsURL(baseURL string) string {
|
||||
var url string
|
||||
|
||||
if e.Local {
|
||||
url = baseUrl + statsPathLocal
|
||||
url = baseURL + statsPathLocal
|
||||
} else {
|
||||
url = baseUrl + statsPath
|
||||
url = baseURL + statsPath
|
||||
}
|
||||
|
||||
if len(e.NodeStats) == 0 {
|
||||
|
@ -313,7 +350,7 @@ func (e *Elasticsearch) gatherNodeID(url string) (string, error) {
|
|||
ClusterName string `json:"cluster_name"`
|
||||
Nodes map[string]*nodeStat `json:"nodes"`
|
||||
}{}
|
||||
if err := e.gatherJsonData(url, nodeStats); err != nil {
|
||||
if err := e.gatherJSONData(url, nodeStats); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
|
@ -329,7 +366,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
|||
ClusterName string `json:"cluster_name"`
|
||||
Nodes map[string]*nodeStat `json:"nodes"`
|
||||
}{}
|
||||
if err := e.gatherJsonData(url, nodeStats); err != nil {
|
||||
if err := e.gatherJSONData(url, nodeStats); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -380,7 +417,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
|||
|
||||
func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
|
||||
healthStats := &clusterHealth{}
|
||||
if err := e.gatherJsonData(url, healthStats); err != nil {
|
||||
if err := e.gatherJSONData(url, healthStats); err != nil {
|
||||
return err
|
||||
}
|
||||
measurementTime := time.Now()
|
||||
|
@ -432,7 +469,7 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
|
|||
|
||||
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
|
||||
clusterStats := &clusterStats{}
|
||||
if err := e.gatherJsonData(url, clusterStats); err != nil {
|
||||
if err := e.gatherJSONData(url, clusterStats); err != nil {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
|
@ -460,6 +497,102 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) gatherIndicesStats(url string, acc telegraf.Accumulator) error {
|
||||
indicesStats := &struct {
|
||||
Shards map[string]interface{} `json:"_shards"`
|
||||
All map[string]interface{} `json:"_all"`
|
||||
Indices map[string]indexStat `json:"indices"`
|
||||
}{}
|
||||
|
||||
if err := e.gatherJSONData(url, indicesStats); err != nil {
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
|
||||
// Total Shards Stats
|
||||
shardsStats := map[string]interface{}{}
|
||||
for k, v := range indicesStats.Shards {
|
||||
shardsStats[k] = v
|
||||
}
|
||||
acc.AddFields("elasticsearch_indices_stats_shards_total", shardsStats, map[string]string{}, now)
|
||||
|
||||
// All Stats
|
||||
for m, s := range indicesStats.All {
|
||||
// parse Json, ignoring strings and bools
|
||||
jsonParser := jsonparser.JSONFlattener{}
|
||||
err := jsonParser.FullFlattenJSON("_", s, true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
acc.AddFields("elasticsearch_indices_stats_"+m, jsonParser.Fields, map[string]string{"index_name": "_all"}, now)
|
||||
}
|
||||
|
||||
// Individual Indices stats
|
||||
for id, index := range indicesStats.Indices {
|
||||
indexTag := map[string]string{"index_name": id}
|
||||
stats := map[string]interface{}{
|
||||
"primaries": index.Primaries,
|
||||
"total": index.Total,
|
||||
}
|
||||
for m, s := range stats {
|
||||
f := jsonparser.JSONFlattener{}
|
||||
// parse Json, getting strings and bools
|
||||
err := f.FullFlattenJSON("", s, true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
acc.AddFields("elasticsearch_indices_stats_"+m, f.Fields, indexTag, now)
|
||||
}
|
||||
|
||||
if e.IndicesLevel == "shards" {
|
||||
for shardNumber, shard := range index.Shards {
|
||||
if len(shard) > 0 {
|
||||
// Get Shard Stats
|
||||
flattened := jsonparser.JSONFlattener{}
|
||||
err := flattened.FullFlattenJSON("", shard[0], true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// determine shard tag and primary/replica designation
|
||||
shardType := "replica"
|
||||
if flattened.Fields["routing_primary"] == true {
|
||||
shardType = "primary"
|
||||
}
|
||||
delete(flattened.Fields, "routing_primary")
|
||||
|
||||
routingState, ok := flattened.Fields["routing_state"].(string)
|
||||
if ok {
|
||||
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
|
||||
}
|
||||
|
||||
routingNode, _ := flattened.Fields["routing_node"].(string)
|
||||
shardTags := map[string]string{
|
||||
"index_name": id,
|
||||
"node_id": routingNode,
|
||||
"shard_name": string(shardNumber),
|
||||
"type": shardType,
|
||||
}
|
||||
|
||||
for key, field := range flattened.Fields {
|
||||
switch field.(type) {
|
||||
case string, bool:
|
||||
delete(flattened.Fields, key)
|
||||
}
|
||||
}
|
||||
|
||||
acc.AddFields("elasticsearch_indices_stats_shards",
|
||||
flattened.Fields,
|
||||
shardTags,
|
||||
now)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
|
@ -492,7 +625,7 @@ func (e *Elasticsearch) getCatMaster(url string) (string, error) {
|
|||
return masterID, nil
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
|
||||
func (e *Elasticsearch) gatherJSONData(url string, v interface{}) error {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -70,7 +70,7 @@ func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
|
|||
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
||||
}
|
||||
|
||||
|
@ -113,7 +113,7 @@ func TestGatherIndividualStats(t *testing.T) {
|
|||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_http", nodestatsHTTPExpected, tags)
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
||||
}
|
||||
|
||||
|
@ -292,6 +292,54 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
|
|||
checkNodeStatsResult(t, &acc)
|
||||
}
|
||||
|
||||
func TestGatherClusterIndicesStats(t *testing.T) {
|
||||
es := newElasticsearchWithClient()
|
||||
es.IndicesInclude = []string{"_all"}
|
||||
es.Servers = []string{"http://example.com:9200"}
|
||||
es.client.Transport = newTransportMock(http.StatusOK, clusterIndicesResponse)
|
||||
es.serverInfo = make(map[string]serverInfo)
|
||||
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
||||
|
||||
var acc testutil.Accumulator
|
||||
if err := es.gatherIndicesStats("junk", &acc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
||||
clusterIndicesExpected,
|
||||
map[string]string{"index_name": "twitter"})
|
||||
}
|
||||
|
||||
func TestGatherClusterIndiceShardsStats(t *testing.T) {
|
||||
es := newElasticsearchWithClient()
|
||||
es.IndicesLevel = "shards"
|
||||
es.Servers = []string{"http://example.com:9200"}
|
||||
es.client.Transport = newTransportMock(http.StatusOK, clusterIndicesShardsResponse)
|
||||
es.serverInfo = make(map[string]serverInfo)
|
||||
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
|
||||
|
||||
var acc testutil.Accumulator
|
||||
if err := es.gatherIndicesStats("junk", &acc); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
|
||||
clusterIndicesExpected,
|
||||
map[string]string{"index_name": "twitter"})
|
||||
|
||||
tags := map[string]string{
|
||||
"index_name": "twitter",
|
||||
"node_id": "oqvR8I1dTpONvwRM30etww",
|
||||
"shard_name": "1",
|
||||
"type": "replica",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_shards",
|
||||
clusterIndicesShardsExpected,
|
||||
tags)
|
||||
|
||||
}
|
||||
|
||||
func newElasticsearchWithClient() *Elasticsearch {
|
||||
es := NewElasticsearch()
|
||||
es.client = &http.Client{}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue