Add clusterstats to elasticsearch plugin (#1979)

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* add clusterstats to elasticsearch input plugin

* responses to requested changes

* remove unnecessary recommendation
This commit is contained in:
Matt O'Hara 2016-12-20 10:30:03 -06:00 committed by Cameron Sparr
parent 48fc28331e
commit b0609beb2b
6 changed files with 583 additions and 76 deletions

View File

@ -784,13 +784,18 @@
# ## Timeout for HTTP requests to the elastic search server(s) # ## Timeout for HTTP requests to the elastic search server(s)
# http_timeout = "5s" # http_timeout = "5s"
# #
# ## set local to false when you want to read the indices stats from all nodes # ## When local is true (the default), the node will read only its own stats.
# ## within the cluster # ## Set local to false when you want to read the node stats from all nodes
# ## of the cluster.
# local = true # local = true
# #
# ## set cluster_health to true when you want to also obtain cluster level stats # ## set cluster_health to true when you want to also obtain cluster health stats
# cluster_health = false # cluster_health = false
# #
# ## Set cluster_stats to true when you want to obtain cluster stats from the
# ## Master node.
# cluster_stats = false
# ## Optional SSL Config # ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem" # # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem" # # ssl_cert = "/etc/telegraf/cert.pem"

View File

@ -2,7 +2,8 @@
The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
[node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html) [node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) stats. and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics.
### Configuration: ### Configuration:
@ -14,13 +15,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference
## Timeout for HTTP requests to the elastic search server(s) ## Timeout for HTTP requests to the elastic search server(s)
http_timeout = "5s" http_timeout = "5s"
## set local to false when you want to read the indices stats from all nodes ## When local is true (the default), the node will read only its own stats.
## within the cluster ## Set local to false when you want to read the node stats from all nodes
## of the cluster.
local = true local = true
## set cluster_health to true when you want to also obtain cluster level stats ## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false cluster_health = false
## Set cluster_stats to true when you want to obtain cluster stats from the
## Master node.
cluster_stats = false
## Optional SSL Config ## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem" # ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem" # ssl_cert = "/etc/telegraf/cert.pem"

View File

@ -12,13 +12,15 @@ import (
"github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"io/ioutil"
"strings"
) )
// Nodestats are always generated, so simply define a constant for these endpoints
const statsPath = "/_nodes/stats" const statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats" const statsPathLocal = "/_nodes/_local/stats"
const healthPath = "/_cluster/health"
type node struct { type nodeStat struct {
Host string `json:"host"` Host string `json:"host"`
Name string `json:"name"` Name string `json:"name"`
Attributes map[string]string `json:"attributes"` Attributes map[string]string `json:"attributes"`
@ -58,6 +60,20 @@ type indexHealth struct {
UnassignedShards int `json:"unassigned_shards"` UnassignedShards int `json:"unassigned_shards"`
} }
type clusterStats struct {
NodeName string `json:"node_name"`
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
Indices interface{} `json:"indices"`
Nodes interface{} `json:"nodes"`
}
type catMaster struct {
NodeID string `json:"id"`
NodeIP string `json:"ip"`
NodeName string `json:"node"`
}
const sampleConfig = ` const sampleConfig = `
## specify a list of one or more Elasticsearch servers ## specify a list of one or more Elasticsearch servers
# you can add username and password to your url to use basic authentication: # you can add username and password to your url to use basic authentication:
@ -67,13 +83,18 @@ const sampleConfig = `
## Timeout for HTTP requests to the elastic search server(s) ## Timeout for HTTP requests to the elastic search server(s)
http_timeout = "5s" http_timeout = "5s"
## set local to false when you want to read the indices stats from all nodes ## When local is true (the default), the node will read only its own stats.
## within the cluster ## Set local to false when you want to read the node stats from all nodes
## of the cluster.
local = true local = true
## set cluster_health to true when you want to also obtain cluster level stats ## Set cluster_health to true when you want to also obtain cluster health stats
cluster_health = false cluster_health = false
## Set cluster_stats to true when you want to also obtain cluster stats from the
## Master node.
cluster_stats = false
## Optional SSL Config ## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem" # ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem" # ssl_cert = "/etc/telegraf/cert.pem"
@ -85,15 +106,18 @@ const sampleConfig = `
// Elasticsearch is a plugin to read stats from one or many Elasticsearch // Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers. // servers.
type Elasticsearch struct { type Elasticsearch struct {
Local bool Local bool
Servers []string Servers []string
HttpTimeout internal.Duration HttpTimeout internal.Duration
ClusterHealth bool ClusterHealth bool
SSLCA string `toml:"ssl_ca"` // Path to CA file ClusterStats bool
SSLCert string `toml:"ssl_cert"` // Path to host cert file SSLCA string `toml:"ssl_ca"` // Path to CA file
SSLKey string `toml:"ssl_key"` // Path to cert key file SSLCert string `toml:"ssl_cert"` // Path to host cert file
InsecureSkipVerify bool // Use SSL but skip chain & host verification SSLKey string `toml:"ssl_key"` // Path to cert key file
client *http.Client InsecureSkipVerify bool // Use SSL but skip chain & host verification
client *http.Client
catMasterResponseTokens []string
isMaster bool
} }
// NewElasticsearch return a new instance of Elasticsearch // NewElasticsearch return a new instance of Elasticsearch
@ -138,12 +162,27 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
} else { } else {
url = s + statsPath url = s + statsPath
} }
e.isMaster = false
if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
e.setCatMaster(s + "/_cat/master")
}
// Always gather node states
if err := e.gatherNodeStats(url, acc); err != nil { if err := e.gatherNodeStats(url, acc); err != nil {
errChan.C <- err errChan.C <- err
return return
} }
if e.ClusterHealth { if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc) url = s + "/_cluster/health?level=indices"
e.gatherClusterHealth(url, acc)
}
if e.ClusterStats && e.isMaster {
e.gatherClusterStats(s+"/_cluster/stats", acc)
} }
}(serv, acc) }(serv, acc)
} }
@ -171,12 +210,13 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct { nodeStats := &struct {
ClusterName string `json:"cluster_name"` ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"` Nodes map[string]*nodeStat `json:"nodes"`
}{} }{}
if err := e.gatherData(url, nodeStats); err != nil { if err := e.gatherJsonData(url, nodeStats); err != nil {
return err return err
} }
for id, n := range nodeStats.Nodes { for id, n := range nodeStats.Nodes {
tags := map[string]string{ tags := map[string]string{
"node_id": id, "node_id": id,
@ -185,6 +225,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName, "cluster_name": nodeStats.ClusterName,
} }
if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}
for k, v := range n.Attributes { for k, v := range n.Attributes {
tags["node_attribute_"+k] = v tags["node_attribute_"+k] = v
} }
@ -204,6 +249,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
now := time.Now() now := time.Now()
for p, s := range stats { for p, s := range stats {
f := jsonparser.JSONFlattener{} f := jsonparser.JSONFlattener{}
// parse Json, ignoring strings and bools
err := f.FlattenJSON("", s) err := f.FlattenJSON("", s)
if err != nil { if err != nil {
return err return err
@ -214,31 +260,31 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
return nil return nil
} }
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error { func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterHealth{} healthStats := &clusterHealth{}
if err := e.gatherData(url, clusterStats); err != nil { if err := e.gatherJsonData(url, healthStats); err != nil {
return err return err
} }
measurementTime := time.Now() measurementTime := time.Now()
clusterFields := map[string]interface{}{ clusterFields := map[string]interface{}{
"status": clusterStats.Status, "status": healthStats.Status,
"timed_out": clusterStats.TimedOut, "timed_out": healthStats.TimedOut,
"number_of_nodes": clusterStats.NumberOfNodes, "number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": clusterStats.NumberOfDataNodes, "number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": clusterStats.ActivePrimaryShards, "active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": clusterStats.ActiveShards, "active_shards": healthStats.ActiveShards,
"relocating_shards": clusterStats.RelocatingShards, "relocating_shards": healthStats.RelocatingShards,
"initializing_shards": clusterStats.InitializingShards, "initializing_shards": healthStats.InitializingShards,
"unassigned_shards": clusterStats.UnassignedShards, "unassigned_shards": healthStats.UnassignedShards,
} }
acc.AddFields( acc.AddFields(
"elasticsearch_cluster_health", "elasticsearch_cluster_health",
clusterFields, clusterFields,
map[string]string{"name": clusterStats.ClusterName}, map[string]string{"name": healthStats.ClusterName},
measurementTime, measurementTime,
) )
for name, health := range clusterStats.Indices { for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{ indexFields := map[string]interface{}{
"status": health.Status, "status": health.Status,
"number_of_shards": health.NumberOfShards, "number_of_shards": health.NumberOfShards,
@ -259,7 +305,60 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil return nil
} }
func (e *Elasticsearch) gatherData(url string, v interface{}) error { func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
clusterStats := &clusterStats{}
if err := e.gatherJsonData(url, clusterStats); err != nil {
return err
}
now := time.Now()
tags := map[string]string{
"node_name": clusterStats.NodeName,
"cluster_name": clusterStats.ClusterName,
"status": clusterStats.Status,
}
stats := map[string]interface{}{
"nodes": clusterStats.Nodes,
"indices": clusterStats.Indices,
}
for p, s := range stats {
f := jsonparser.JSONFlattener{}
// parse json, including bools and strings
err := f.FullFlattenJSON("", s, true, true)
if err != nil {
return err
}
acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now)
}
return nil
}
func (e *Elasticsearch) setCatMaster(url string) error {
r, err := e.client.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
response, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
e.catMasterResponseTokens = strings.Split(string(response), " ")
return nil
}
func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
r, err := e.client.Get(url) r, err := e.client.Get(url)
if err != nil { if err != nil {
return err return err
@ -272,9 +371,11 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
r.StatusCode, http.StatusOK) r.StatusCode, http.StatusOK)
} }
if err = json.NewDecoder(r.Body).Decode(v); err != nil { if err = json.NewDecoder(r.Body).Decode(v); err != nil {
return err return err
} }
return nil return nil
} }

View File

@ -8,6 +8,8 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -37,16 +39,13 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
func (t *transportMock) CancelRequest(_ *http.Request) { func (t *transportMock) CancelRequest(_ *http.Request) {
} }
func TestElasticsearch(t *testing.T) { func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
es := newElasticsearchWithClient() if es.isMaster != expected {
es.Servers = []string{"http://example.com:9200"} msg := fmt.Sprintf("IsMaster set incorrectly")
es.client.Transport = newTransportMock(http.StatusOK, statsResponse) assert.Fail(t, msg)
var acc testutil.Accumulator
if err := es.Gather(&acc); err != nil {
t.Fatal(err)
} }
}
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := map[string]string{ tags := map[string]string{
"cluster_name": "es-testcluster", "cluster_name": "es-testcluster",
"node_attribute_master": "true", "node_attribute_master": "true",
@ -55,25 +54,55 @@ func TestElasticsearch(t *testing.T) {
"node_host": "test", "node_host": "test",
} }
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", indicesExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_os", osExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_process", processExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", jvmExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", threadPoolExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", fsExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", transportExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_http", httpExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", breakersExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
} }
func TestGatherClusterStats(t *testing.T) { func TestGather(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
var acc testutil.Accumulator
if err := es.Gather(&acc); err != nil {
t.Fatal(err)
}
checkIsMaster(es, false, t)
checkNodeStatsResult(t, &acc)
}
func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
var acc testutil.Accumulator
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
checkIsMaster(es, false, t)
checkNodeStatsResult(t, &acc)
}
func TestGatherClusterHealth(t *testing.T) {
es := newElasticsearchWithClient() es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"} es.Servers = []string{"http://example.com:9200"}
es.ClusterHealth = true es.ClusterHealth = true
es.client.Transport = newTransportMock(http.StatusOK, clusterResponse) es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, es.Gather(&acc)) require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, false, t)
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health", acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected, clusterHealthExpected,
@ -88,6 +117,77 @@ func TestGatherClusterStats(t *testing.T) {
map[string]string{"index": "v2"}) map[string]string{"index": "v2"})
} }
func TestGatherClusterStatsMaster(t *testing.T) {
// This needs multiple steps to replicate the multiple calls internally.
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
// first get catMaster
es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult)
require.NoError(t, es.setCatMaster("junk"))
IsMasterResultTokens := strings.Split(string(IsMasterResult), " ")
if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] {
msg := fmt.Sprintf("catmaster is incorrect")
assert.Fail(t, msg)
}
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
checkIsMaster(es, true, t)
checkNodeStatsResult(t, &acc)
// now test the clusterstats method
es.client.Transport = newTransportMock(http.StatusOK, clusterStatsResponse)
require.NoError(t, es.gatherClusterStats("junk", &acc))
tags := map[string]string{
"cluster_name": "es-testcluster",
"node_name": "test.host.com",
"status": "red",
}
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
}
func TestGatherClusterStatsNonMaster(t *testing.T) {
// This needs multiple steps to replicate the multiple calls internally.
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
// first get catMaster
es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult)
require.NoError(t, es.setCatMaster("junk"))
IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ")
if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] {
msg := fmt.Sprintf("catmaster is incorrect")
assert.Fail(t, msg)
}
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
// ensure flag is clear so Cluster Stats would not be done
checkIsMaster(es, false, t)
checkNodeStatsResult(t, &acc)
}
func newElasticsearchWithClient() *Elasticsearch { func newElasticsearchWithClient() *Elasticsearch {
es := NewElasticsearch() es := NewElasticsearch()
es.client = &http.Client{} es.client = &http.Client{}

View File

@ -1,6 +1,6 @@
package elasticsearch package elasticsearch
const clusterResponse = ` const clusterHealthResponse = `
{ {
"cluster_name": "elasticsearch_telegraf", "cluster_name": "elasticsearch_telegraf",
"status": "green", "status": "green",
@ -71,7 +71,7 @@ var v2IndexExpected = map[string]interface{}{
"unassigned_shards": 20, "unassigned_shards": 20,
} }
const statsResponse = ` const nodeStatsResponse = `
{ {
"cluster_name": "es-testcluster", "cluster_name": "es-testcluster",
"nodes": { "nodes": {
@ -489,7 +489,7 @@ const statsResponse = `
} }
` `
var indicesExpected = map[string]interface{}{ var nodestatsIndicesExpected = map[string]interface{}{
"id_cache_memory_size_in_bytes": float64(0), "id_cache_memory_size_in_bytes": float64(0),
"completion_size_in_bytes": float64(0), "completion_size_in_bytes": float64(0),
"suggest_total": float64(0), "suggest_total": float64(0),
@ -561,7 +561,7 @@ var indicesExpected = map[string]interface{}{
"segments_fixed_bit_set_memory_in_bytes": float64(0), "segments_fixed_bit_set_memory_in_bytes": float64(0),
} }
var osExpected = map[string]interface{}{ var nodestatsOsExpected = map[string]interface{}{
"load_average_0": float64(0.01), "load_average_0": float64(0.01),
"load_average_1": float64(0.04), "load_average_1": float64(0.04),
"load_average_2": float64(0.05), "load_average_2": float64(0.05),
@ -576,7 +576,7 @@ var osExpected = map[string]interface{}{
"mem_used_in_bytes": float64(1621868544), "mem_used_in_bytes": float64(1621868544),
} }
var processExpected = map[string]interface{}{ var nodestatsProcessExpected = map[string]interface{}{
"mem_total_virtual_in_bytes": float64(4747890688), "mem_total_virtual_in_bytes": float64(4747890688),
"timestamp": float64(1436460392945), "timestamp": float64(1436460392945),
"open_file_descriptors": float64(160), "open_file_descriptors": float64(160),
@ -586,7 +586,7 @@ var processExpected = map[string]interface{}{
"cpu_user_in_millis": float64(13610), "cpu_user_in_millis": float64(13610),
} }
var jvmExpected = map[string]interface{}{ var nodestatsJvmExpected = map[string]interface{}{
"timestamp": float64(1436460392945), "timestamp": float64(1436460392945),
"uptime_in_millis": float64(202245), "uptime_in_millis": float64(202245),
"mem_non_heap_used_in_bytes": float64(39634576), "mem_non_heap_used_in_bytes": float64(39634576),
@ -621,7 +621,7 @@ var jvmExpected = map[string]interface{}{
"buffer_pools_mapped_total_capacity_in_bytes": float64(0), "buffer_pools_mapped_total_capacity_in_bytes": float64(0),
} }
var threadPoolExpected = map[string]interface{}{ var nodestatsThreadPoolExpected = map[string]interface{}{
"merge_threads": float64(6), "merge_threads": float64(6),
"merge_queue": float64(4), "merge_queue": float64(4),
"merge_active": float64(5), "merge_active": float64(5),
@ -726,7 +726,7 @@ var threadPoolExpected = map[string]interface{}{
"flush_completed": float64(3), "flush_completed": float64(3),
} }
var fsExpected = map[string]interface{}{ var nodestatsFsExpected = map[string]interface{}{
"data_0_total_in_bytes": float64(19507089408), "data_0_total_in_bytes": float64(19507089408),
"data_0_free_in_bytes": float64(16909316096), "data_0_free_in_bytes": float64(16909316096),
"data_0_available_in_bytes": float64(15894814720), "data_0_available_in_bytes": float64(15894814720),
@ -736,7 +736,7 @@ var fsExpected = map[string]interface{}{
"total_total_in_bytes": float64(19507089408), "total_total_in_bytes": float64(19507089408),
} }
var transportExpected = map[string]interface{}{ var nodestatsTransportExpected = map[string]interface{}{
"server_open": float64(13), "server_open": float64(13),
"rx_count": float64(6), "rx_count": float64(6),
"rx_size_in_bytes": float64(1380), "rx_size_in_bytes": float64(1380),
@ -744,12 +744,12 @@ var transportExpected = map[string]interface{}{
"tx_size_in_bytes": float64(1380), "tx_size_in_bytes": float64(1380),
} }
var httpExpected = map[string]interface{}{ var nodestatsHttpExpected = map[string]interface{}{
"current_open": float64(3), "current_open": float64(3),
"total_opened": float64(3), "total_opened": float64(3),
} }
var breakersExpected = map[string]interface{}{ var nodestatsBreakersExpected = map[string]interface{}{
"fielddata_estimated_size_in_bytes": float64(0), "fielddata_estimated_size_in_bytes": float64(0),
"fielddata_overhead": float64(1.03), "fielddata_overhead": float64(1.03),
"fielddata_tripped": float64(0), "fielddata_tripped": float64(0),
@ -763,3 +763,273 @@ var breakersExpected = map[string]interface{}{
"parent_limit_size_in_bytes": float64(727213670), "parent_limit_size_in_bytes": float64(727213670),
"parent_estimated_size_in_bytes": float64(0), "parent_estimated_size_in_bytes": float64(0),
} }
const clusterStatsResponse = `
{
"host":"ip-10-0-1-214",
"log_type":"metrics",
"timestamp":1475767451229,
"log_level":"INFO",
"node_name":"test.host.com",
"cluster_name":"es-testcluster",
"status":"red",
"indices":{
"count":1,
"shards":{
"total":4,
"primaries":4,
"replication":0.0,
"index":{
"shards":{
"min":4,
"max":4,
"avg":4.0
},
"primaries":{
"min":4,
"max":4,
"avg":4.0
},
"replication":{
"min":0.0,
"max":0.0,
"avg":0.0
}
}
},
"docs":{
"count":4,
"deleted":0
},
"store":{
"size_in_bytes":17084,
"throttle_time_in_millis":0
},
"fielddata":{
"memory_size_in_bytes":0,
"evictions":0
},
"query_cache":{
"memory_size_in_bytes":0,
"total_count":0,
"hit_count":0,
"miss_count":0,
"cache_size":0,
"cache_count":0,
"evictions":0
},
"completion":{
"size_in_bytes":0
},
"segments":{
"count":4,
"memory_in_bytes":11828,
"terms_memory_in_bytes":8932,
"stored_fields_memory_in_bytes":1248,
"term_vectors_memory_in_bytes":0,
"norms_memory_in_bytes":1280,
"doc_values_memory_in_bytes":368,
"index_writer_memory_in_bytes":0,
"index_writer_max_memory_in_bytes":2048000,
"version_map_memory_in_bytes":0,
"fixed_bit_set_memory_in_bytes":0
},
"percolate":{
"total":0,
"time_in_millis":0,
"current":0,
"memory_size_in_bytes":-1,
"memory_size":"-1b",
"queries":0
}
},
"nodes":{
"count":{
"total":1,
"master_only":0,
"data_only":0,
"master_data":1,
"client":0
},
"versions":[
{
"version": "2.3.3"
}
],
"os":{
"available_processors":1,
"allocated_processors":1,
"mem":{
"total_in_bytes":593301504
},
"names":[
{
"name":"Linux",
"count":1
}
]
},
"process":{
"cpu":{
"percent":0
},
"open_file_descriptors":{
"min":145,
"max":145,
"avg":145
}
},
"jvm":{
"max_uptime_in_millis":11580527,
"versions":[
{
"version":"1.8.0_101",
"vm_name":"OpenJDK 64-Bit Server VM",
"vm_version":"25.101-b13",
"vm_vendor":"Oracle Corporation",
"count":1
}
],
"mem":{
"heap_used_in_bytes":70550288,
"heap_max_in_bytes":1065025536
},
"threads":30
},
"fs":{
"total_in_bytes":8318783488,
"free_in_bytes":6447439872,
"available_in_bytes":6344785920
},
"plugins":[
{
"name":"cloud-aws",
"version":"2.3.3",
"description":"The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
"jvm":true,
"classname":"org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
"isolated":true,
"site":false
},
{
"name":"kopf",
"version":"2.0.1",
"description":"kopf - simple web administration tool for Elasticsearch",
"url":"/_plugin/kopf/",
"jvm":false,
"site":true
},
{
"name":"tr-metrics",
"version":"7bd5b4b",
"description":"Logs cluster and node stats for performance monitoring.",
"jvm":true,
"classname":"com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
"isolated":true,
"site":false
}
]
}
}
`
var clusterstatsIndicesExpected = map[string]interface{}{
"completion_size_in_bytes": float64(0),
"count": float64(1),
"docs_count": float64(4),
"docs_deleted": float64(0),
"fielddata_evictions": float64(0),
"fielddata_memory_size_in_bytes": float64(0),
"percolate_current": float64(0),
"percolate_memory_size_in_bytes": float64(-1),
"percolate_queries": float64(0),
"percolate_time_in_millis": float64(0),
"percolate_total": float64(0),
"percolate_memory_size": "-1b",
"query_cache_cache_count": float64(0),
"query_cache_cache_size": float64(0),
"query_cache_evictions": float64(0),
"query_cache_hit_count": float64(0),
"query_cache_memory_size_in_bytes": float64(0),
"query_cache_miss_count": float64(0),
"query_cache_total_count": float64(0),
"segments_count": float64(4),
"segments_doc_values_memory_in_bytes": float64(368),
"segments_fixed_bit_set_memory_in_bytes": float64(0),
"segments_index_writer_max_memory_in_bytes": float64(2.048e+06),
"segments_index_writer_memory_in_bytes": float64(0),
"segments_memory_in_bytes": float64(11828),
"segments_norms_memory_in_bytes": float64(1280),
"segments_stored_fields_memory_in_bytes": float64(1248),
"segments_term_vectors_memory_in_bytes": float64(0),
"segments_terms_memory_in_bytes": float64(8932),
"segments_version_map_memory_in_bytes": float64(0),
"shards_index_primaries_avg": float64(4),
"shards_index_primaries_max": float64(4),
"shards_index_primaries_min": float64(4),
"shards_index_replication_avg": float64(0),
"shards_index_replication_max": float64(0),
"shards_index_replication_min": float64(0),
"shards_index_shards_avg": float64(4),
"shards_index_shards_max": float64(4),
"shards_index_shards_min": float64(4),
"shards_primaries": float64(4),
"shards_replication": float64(0),
"shards_total": float64(4),
"store_size_in_bytes": float64(17084),
"store_throttle_time_in_millis": float64(0),
}
var clusterstatsNodesExpected = map[string]interface{}{
"count_client": float64(0),
"count_data_only": float64(0),
"count_master_data": float64(1),
"count_master_only": float64(0),
"count_total": float64(1),
"fs_available_in_bytes": float64(6.34478592e+09),
"fs_free_in_bytes": float64(6.447439872e+09),
"fs_total_in_bytes": float64(8.318783488e+09),
"jvm_max_uptime_in_millis": float64(1.1580527e+07),
"jvm_mem_heap_max_in_bytes": float64(1.065025536e+09),
"jvm_mem_heap_used_in_bytes": float64(7.0550288e+07),
"jvm_threads": float64(30),
"jvm_versions_0_count": float64(1),
"jvm_versions_0_version": "1.8.0_101",
"jvm_versions_0_vm_name": "OpenJDK 64-Bit Server VM",
"jvm_versions_0_vm_vendor": "Oracle Corporation",
"jvm_versions_0_vm_version": "25.101-b13",
"os_allocated_processors": float64(1),
"os_available_processors": float64(1),
"os_mem_total_in_bytes": float64(5.93301504e+08),
"os_names_0_count": float64(1),
"os_names_0_name": "Linux",
"process_cpu_percent": float64(0),
"process_open_file_descriptors_avg": float64(145),
"process_open_file_descriptors_max": float64(145),
"process_open_file_descriptors_min": float64(145),
"versions_0_version": "2.3.3",
"plugins_0_classname": "org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
"plugins_0_description": "The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
"plugins_0_isolated": true,
"plugins_0_jvm": true,
"plugins_0_name": "cloud-aws",
"plugins_0_site": false,
"plugins_0_version": "2.3.3",
"plugins_1_description": "kopf - simple web administration tool for Elasticsearch",
"plugins_1_jvm": false,
"plugins_1_name": "kopf",
"plugins_1_site": true,
"plugins_1_url": "/_plugin/kopf/",
"plugins_1_version": "2.0.1",
"plugins_2_classname": "com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
"plugins_2_description": "Logs cluster and node stats for performance monitoring.",
"plugins_2_isolated": true,
"plugins_2_jvm": true,
"plugins_2_name": "tr-metrics",
"plugins_2_site": false,
"plugins_2_version": "7bd5b4b",
}
const IsMasterResult = "SDFsfSDFsdfFSDSDfSFDSDF 10.206.124.66 10.206.124.66 test.host.com "
const IsNotMasterResult = "junk 10.206.124.66 10.206.124.66 test.junk.com "

View File

@ -103,10 +103,22 @@ type JSONFlattener struct {
Fields map[string]interface{} Fields map[string]interface{}
} }
// FlattenJSON flattens nested maps/interfaces into a fields map // FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
func (f *JSONFlattener) FlattenJSON( func (f *JSONFlattener) FlattenJSON(
fieldname string,
v interface{}) error {
if f.Fields == nil {
f.Fields = make(map[string]interface{})
}
return f.FullFlattenJSON(fieldname, v, false, false)
}
// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
func (f *JSONFlattener) FullFlattenJSON(
fieldname string, fieldname string,
v interface{}, v interface{},
convertString bool,
convertBool bool,
) error { ) error {
if f.Fields == nil { if f.Fields == nil {
f.Fields = make(map[string]interface{}) f.Fields = make(map[string]interface{})
@ -115,7 +127,7 @@ func (f *JSONFlattener) FlattenJSON(
switch t := v.(type) { switch t := v.(type) {
case map[string]interface{}: case map[string]interface{}:
for k, v := range t { for k, v := range t {
err := f.FlattenJSON(fieldname+"_"+k+"_", v) err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
if err != nil { if err != nil {
return err return err
} }
@ -123,15 +135,28 @@ func (f *JSONFlattener) FlattenJSON(
case []interface{}: case []interface{}:
for i, v := range t { for i, v := range t {
k := strconv.Itoa(i) k := strconv.Itoa(i)
err := f.FlattenJSON(fieldname+"_"+k+"_", v) err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
if err != nil { if err != nil {
return nil return nil
} }
} }
case float64: case float64:
f.Fields[fieldname] = t f.Fields[fieldname] = t
case bool, string, nil: case string:
if convertString {
f.Fields[fieldname] = v.(string)
} else {
return nil
}
case bool:
if convertBool {
f.Fields[fieldname] = v.(bool)
} else {
return nil
}
case nil:
// ignored types // ignored types
fmt.Println("json parser ignoring " + fieldname)
return nil return nil
default: default:
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)", return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",