Add clusterstats to elasticsearch plugin (#1979)
* add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * add clusterstats to elasticsearch input plugin * responses to requested changes * remove unnecessary recommendation
This commit is contained in:
parent
a90afd95c6
commit
1392e73125
|
@ -784,13 +784,18 @@
|
||||||
# ## Timeout for HTTP requests to the elastic search server(s)
|
# ## Timeout for HTTP requests to the elastic search server(s)
|
||||||
# http_timeout = "5s"
|
# http_timeout = "5s"
|
||||||
#
|
#
|
||||||
# ## set local to false when you want to read the indices stats from all nodes
|
# ## When local is true (the default), the node will read only its own stats.
|
||||||
# ## within the cluster
|
# ## Set local to false when you want to read the node stats from all nodes
|
||||||
|
# ## of the cluster.
|
||||||
# local = true
|
# local = true
|
||||||
#
|
#
|
||||||
# ## set cluster_health to true when you want to also obtain cluster level stats
|
# ## set cluster_health to true when you want to also obtain cluster health stats
|
||||||
# cluster_health = false
|
# cluster_health = false
|
||||||
#
|
#
|
||||||
|
# ## Set cluster_stats to true when you want to obtain cluster stats from the
|
||||||
|
# ## Master node.
|
||||||
|
# cluster_stats = false
|
||||||
|
|
||||||
# ## Optional SSL Config
|
# ## Optional SSL Config
|
||||||
# # ssl_ca = "/etc/telegraf/ca.pem"
|
# # ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
# # ssl_cert = "/etc/telegraf/cert.pem"
|
# # ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
|
|
@ -2,7 +2,8 @@
|
||||||
|
|
||||||
The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
|
The [elasticsearch](https://www.elastic.co/) plugin queries endpoints to obtain
|
||||||
[node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
|
[node](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html)
|
||||||
and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html) stats.
|
and optionally [cluster-health](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html)
|
||||||
|
or [cluster-stats](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-stats.html) metrics.
|
||||||
|
|
||||||
### Configuration:
|
### Configuration:
|
||||||
|
|
||||||
|
@ -14,13 +15,18 @@ and optionally [cluster](https://www.elastic.co/guide/en/elasticsearch/reference
|
||||||
## Timeout for HTTP requests to the elastic search server(s)
|
## Timeout for HTTP requests to the elastic search server(s)
|
||||||
http_timeout = "5s"
|
http_timeout = "5s"
|
||||||
|
|
||||||
## set local to false when you want to read the indices stats from all nodes
|
## When local is true (the default), the node will read only its own stats.
|
||||||
## within the cluster
|
## Set local to false when you want to read the node stats from all nodes
|
||||||
|
## of the cluster.
|
||||||
local = true
|
local = true
|
||||||
|
|
||||||
## set cluster_health to true when you want to also obtain cluster level stats
|
## Set cluster_health to true when you want to also obtain cluster health stats
|
||||||
cluster_health = false
|
cluster_health = false
|
||||||
|
|
||||||
|
## Set cluster_stats to true when you want to obtain cluster stats from the
|
||||||
|
## Master node.
|
||||||
|
cluster_stats = false
|
||||||
|
|
||||||
## Optional SSL Config
|
## Optional SSL Config
|
||||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
|
|
@ -12,13 +12,15 @@ import (
|
||||||
"github.com/influxdata/telegraf/internal/errchan"
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Nodestats are always generated, so simply define a constant for these endpoints
|
||||||
const statsPath = "/_nodes/stats"
|
const statsPath = "/_nodes/stats"
|
||||||
const statsPathLocal = "/_nodes/_local/stats"
|
const statsPathLocal = "/_nodes/_local/stats"
|
||||||
const healthPath = "/_cluster/health"
|
|
||||||
|
|
||||||
type node struct {
|
type nodeStat struct {
|
||||||
Host string `json:"host"`
|
Host string `json:"host"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Attributes map[string]string `json:"attributes"`
|
Attributes map[string]string `json:"attributes"`
|
||||||
|
@ -58,6 +60,20 @@ type indexHealth struct {
|
||||||
UnassignedShards int `json:"unassigned_shards"`
|
UnassignedShards int `json:"unassigned_shards"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type clusterStats struct {
|
||||||
|
NodeName string `json:"node_name"`
|
||||||
|
ClusterName string `json:"cluster_name"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Indices interface{} `json:"indices"`
|
||||||
|
Nodes interface{} `json:"nodes"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type catMaster struct {
|
||||||
|
NodeID string `json:"id"`
|
||||||
|
NodeIP string `json:"ip"`
|
||||||
|
NodeName string `json:"node"`
|
||||||
|
}
|
||||||
|
|
||||||
const sampleConfig = `
|
const sampleConfig = `
|
||||||
## specify a list of one or more Elasticsearch servers
|
## specify a list of one or more Elasticsearch servers
|
||||||
# you can add username and password to your url to use basic authentication:
|
# you can add username and password to your url to use basic authentication:
|
||||||
|
@ -67,13 +83,18 @@ const sampleConfig = `
|
||||||
## Timeout for HTTP requests to the elastic search server(s)
|
## Timeout for HTTP requests to the elastic search server(s)
|
||||||
http_timeout = "5s"
|
http_timeout = "5s"
|
||||||
|
|
||||||
## set local to false when you want to read the indices stats from all nodes
|
## When local is true (the default), the node will read only its own stats.
|
||||||
## within the cluster
|
## Set local to false when you want to read the node stats from all nodes
|
||||||
|
## of the cluster.
|
||||||
local = true
|
local = true
|
||||||
|
|
||||||
## set cluster_health to true when you want to also obtain cluster level stats
|
## Set cluster_health to true when you want to also obtain cluster health stats
|
||||||
cluster_health = false
|
cluster_health = false
|
||||||
|
|
||||||
|
## Set cluster_stats to true when you want to also obtain cluster stats from the
|
||||||
|
## Master node.
|
||||||
|
cluster_stats = false
|
||||||
|
|
||||||
## Optional SSL Config
|
## Optional SSL Config
|
||||||
# ssl_ca = "/etc/telegraf/ca.pem"
|
# ssl_ca = "/etc/telegraf/ca.pem"
|
||||||
# ssl_cert = "/etc/telegraf/cert.pem"
|
# ssl_cert = "/etc/telegraf/cert.pem"
|
||||||
|
@ -89,11 +110,14 @@ type Elasticsearch struct {
|
||||||
Servers []string
|
Servers []string
|
||||||
HttpTimeout internal.Duration
|
HttpTimeout internal.Duration
|
||||||
ClusterHealth bool
|
ClusterHealth bool
|
||||||
|
ClusterStats bool
|
||||||
SSLCA string `toml:"ssl_ca"` // Path to CA file
|
SSLCA string `toml:"ssl_ca"` // Path to CA file
|
||||||
SSLCert string `toml:"ssl_cert"` // Path to host cert file
|
SSLCert string `toml:"ssl_cert"` // Path to host cert file
|
||||||
SSLKey string `toml:"ssl_key"` // Path to cert key file
|
SSLKey string `toml:"ssl_key"` // Path to cert key file
|
||||||
InsecureSkipVerify bool // Use SSL but skip chain & host verification
|
InsecureSkipVerify bool // Use SSL but skip chain & host verification
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
catMasterResponseTokens []string
|
||||||
|
isMaster bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewElasticsearch return a new instance of Elasticsearch
|
// NewElasticsearch return a new instance of Elasticsearch
|
||||||
|
@ -138,12 +162,27 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
||||||
} else {
|
} else {
|
||||||
url = s + statsPath
|
url = s + statsPath
|
||||||
}
|
}
|
||||||
|
e.isMaster = false
|
||||||
|
|
||||||
|
if e.ClusterStats {
|
||||||
|
// get cat/master information here so NodeStats can determine
|
||||||
|
// whether this node is the Master
|
||||||
|
e.setCatMaster(s + "/_cat/master")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Always gather node states
|
||||||
if err := e.gatherNodeStats(url, acc); err != nil {
|
if err := e.gatherNodeStats(url, acc); err != nil {
|
||||||
errChan.C <- err
|
errChan.C <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.ClusterHealth {
|
if e.ClusterHealth {
|
||||||
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
|
url = s + "/_cluster/health?level=indices"
|
||||||
|
e.gatherClusterHealth(url, acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.ClusterStats && e.isMaster {
|
||||||
|
e.gatherClusterStats(s+"/_cluster/stats", acc)
|
||||||
}
|
}
|
||||||
}(serv, acc)
|
}(serv, acc)
|
||||||
}
|
}
|
||||||
|
@ -172,11 +211,12 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
|
||||||
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
||||||
nodeStats := &struct {
|
nodeStats := &struct {
|
||||||
ClusterName string `json:"cluster_name"`
|
ClusterName string `json:"cluster_name"`
|
||||||
Nodes map[string]*node `json:"nodes"`
|
Nodes map[string]*nodeStat `json:"nodes"`
|
||||||
}{}
|
}{}
|
||||||
if err := e.gatherData(url, nodeStats); err != nil {
|
if err := e.gatherJsonData(url, nodeStats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for id, n := range nodeStats.Nodes {
|
for id, n := range nodeStats.Nodes {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"node_id": id,
|
"node_id": id,
|
||||||
|
@ -185,6 +225,11 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
||||||
"cluster_name": nodeStats.ClusterName,
|
"cluster_name": nodeStats.ClusterName,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e.ClusterStats {
|
||||||
|
// check for master
|
||||||
|
e.isMaster = (id == e.catMasterResponseTokens[0])
|
||||||
|
}
|
||||||
|
|
||||||
for k, v := range n.Attributes {
|
for k, v := range n.Attributes {
|
||||||
tags["node_attribute_"+k] = v
|
tags["node_attribute_"+k] = v
|
||||||
}
|
}
|
||||||
|
@ -204,6 +249,7 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for p, s := range stats {
|
for p, s := range stats {
|
||||||
f := jsonparser.JSONFlattener{}
|
f := jsonparser.JSONFlattener{}
|
||||||
|
// parse Json, ignoring strings and bools
|
||||||
err := f.FlattenJSON("", s)
|
err := f.FlattenJSON("", s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -214,31 +260,31 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
|
func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
|
||||||
clusterStats := &clusterHealth{}
|
healthStats := &clusterHealth{}
|
||||||
if err := e.gatherData(url, clusterStats); err != nil {
|
if err := e.gatherJsonData(url, healthStats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
measurementTime := time.Now()
|
measurementTime := time.Now()
|
||||||
clusterFields := map[string]interface{}{
|
clusterFields := map[string]interface{}{
|
||||||
"status": clusterStats.Status,
|
"status": healthStats.Status,
|
||||||
"timed_out": clusterStats.TimedOut,
|
"timed_out": healthStats.TimedOut,
|
||||||
"number_of_nodes": clusterStats.NumberOfNodes,
|
"number_of_nodes": healthStats.NumberOfNodes,
|
||||||
"number_of_data_nodes": clusterStats.NumberOfDataNodes,
|
"number_of_data_nodes": healthStats.NumberOfDataNodes,
|
||||||
"active_primary_shards": clusterStats.ActivePrimaryShards,
|
"active_primary_shards": healthStats.ActivePrimaryShards,
|
||||||
"active_shards": clusterStats.ActiveShards,
|
"active_shards": healthStats.ActiveShards,
|
||||||
"relocating_shards": clusterStats.RelocatingShards,
|
"relocating_shards": healthStats.RelocatingShards,
|
||||||
"initializing_shards": clusterStats.InitializingShards,
|
"initializing_shards": healthStats.InitializingShards,
|
||||||
"unassigned_shards": clusterStats.UnassignedShards,
|
"unassigned_shards": healthStats.UnassignedShards,
|
||||||
}
|
}
|
||||||
acc.AddFields(
|
acc.AddFields(
|
||||||
"elasticsearch_cluster_health",
|
"elasticsearch_cluster_health",
|
||||||
clusterFields,
|
clusterFields,
|
||||||
map[string]string{"name": clusterStats.ClusterName},
|
map[string]string{"name": healthStats.ClusterName},
|
||||||
measurementTime,
|
measurementTime,
|
||||||
)
|
)
|
||||||
|
|
||||||
for name, health := range clusterStats.Indices {
|
for name, health := range healthStats.Indices {
|
||||||
indexFields := map[string]interface{}{
|
indexFields := map[string]interface{}{
|
||||||
"status": health.Status,
|
"status": health.Status,
|
||||||
"number_of_shards": health.NumberOfShards,
|
"number_of_shards": health.NumberOfShards,
|
||||||
|
@ -259,7 +305,60 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
|
||||||
|
clusterStats := &clusterStats{}
|
||||||
|
if err := e.gatherJsonData(url, clusterStats); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
tags := map[string]string{
|
||||||
|
"node_name": clusterStats.NodeName,
|
||||||
|
"cluster_name": clusterStats.ClusterName,
|
||||||
|
"status": clusterStats.Status,
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := map[string]interface{}{
|
||||||
|
"nodes": clusterStats.Nodes,
|
||||||
|
"indices": clusterStats.Indices,
|
||||||
|
}
|
||||||
|
|
||||||
|
for p, s := range stats {
|
||||||
|
f := jsonparser.JSONFlattener{}
|
||||||
|
// parse json, including bools and strings
|
||||||
|
err := f.FullFlattenJSON("", s, true, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Elasticsearch) setCatMaster(url string) error {
|
||||||
|
r, err := e.client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer r.Body.Close()
|
||||||
|
if r.StatusCode != http.StatusOK {
|
||||||
|
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
||||||
|
// to let the underlying transport close the connection and re-establish a new one for
|
||||||
|
// future calls.
|
||||||
|
return fmt.Errorf("status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
||||||
|
}
|
||||||
|
response, err := ioutil.ReadAll(r.Body)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.catMasterResponseTokens = strings.Split(string(response), " ")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
|
||||||
r, err := e.client.Get(url)
|
r, err := e.client.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -272,9 +371,11 @@ func (e *Elasticsearch) gatherData(url string, v interface{}) error {
|
||||||
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
||||||
r.StatusCode, http.StatusOK)
|
r.StatusCode, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,16 +39,13 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||||
func (t *transportMock) CancelRequest(_ *http.Request) {
|
func (t *transportMock) CancelRequest(_ *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestElasticsearch(t *testing.T) {
|
func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
|
||||||
es := newElasticsearchWithClient()
|
if es.isMaster != expected {
|
||||||
es.Servers = []string{"http://example.com:9200"}
|
msg := fmt.Sprintf("IsMaster set incorrectly")
|
||||||
es.client.Transport = newTransportMock(http.StatusOK, statsResponse)
|
assert.Fail(t, msg)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
if err := es.Gather(&acc); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"cluster_name": "es-testcluster",
|
"cluster_name": "es-testcluster",
|
||||||
"node_attribute_master": "true",
|
"node_attribute_master": "true",
|
||||||
|
@ -55,25 +54,55 @@ func TestElasticsearch(t *testing.T) {
|
||||||
"node_host": "test",
|
"node_host": "test",
|
||||||
}
|
}
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", indicesExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_os", osExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_os", nodestatsOsExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_process", processExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_process", nodestatsProcessExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", jvmExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_jvm", nodestatsJvmExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", threadPoolExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_thread_pool", nodestatsThreadPoolExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", fsExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_fs", nodestatsFsExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", transportExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_transport", nodestatsTransportExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_http", httpExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_http", nodestatsHttpExpected, tags)
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", breakersExpected, tags)
|
acc.AssertContainsTaggedFields(t, "elasticsearch_breakers", nodestatsBreakersExpected, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGatherClusterStats(t *testing.T) {
|
func TestGather(t *testing.T) {
|
||||||
|
es := newElasticsearchWithClient()
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
if err := es.Gather(&acc); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkIsMaster(es, false, t)
|
||||||
|
checkNodeStatsResult(t, &acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherNodeStats(t *testing.T) {
|
||||||
|
es := newElasticsearchWithClient()
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
if err := es.gatherNodeStats("junk", &acc); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkIsMaster(es, false, t)
|
||||||
|
checkNodeStatsResult(t, &acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherClusterHealth(t *testing.T) {
|
||||||
es := newElasticsearchWithClient()
|
es := newElasticsearchWithClient()
|
||||||
es.Servers = []string{"http://example.com:9200"}
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
es.ClusterHealth = true
|
es.ClusterHealth = true
|
||||||
es.client.Transport = newTransportMock(http.StatusOK, clusterResponse)
|
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
require.NoError(t, es.Gather(&acc))
|
require.NoError(t, es.gatherClusterHealth("junk", &acc))
|
||||||
|
|
||||||
|
checkIsMaster(es, false, t)
|
||||||
|
|
||||||
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
|
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
|
||||||
clusterHealthExpected,
|
clusterHealthExpected,
|
||||||
|
@ -88,6 +117,77 @@ func TestGatherClusterStats(t *testing.T) {
|
||||||
map[string]string{"index": "v2"})
|
map[string]string{"index": "v2"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGatherClusterStatsMaster(t *testing.T) {
|
||||||
|
// This needs multiple steps to replicate the multiple calls internally.
|
||||||
|
es := newElasticsearchWithClient()
|
||||||
|
es.ClusterStats = true
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
|
||||||
|
// first get catMaster
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult)
|
||||||
|
require.NoError(t, es.setCatMaster("junk"))
|
||||||
|
|
||||||
|
IsMasterResultTokens := strings.Split(string(IsMasterResult), " ")
|
||||||
|
if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] {
|
||||||
|
msg := fmt.Sprintf("catmaster is incorrect")
|
||||||
|
assert.Fail(t, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now get node status, which determines whether we're master
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
es.Local = true
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
|
||||||
|
if err := es.gatherNodeStats("junk", &acc); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkIsMaster(es, true, t)
|
||||||
|
checkNodeStatsResult(t, &acc)
|
||||||
|
|
||||||
|
// now test the clusterstats method
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, clusterStatsResponse)
|
||||||
|
require.NoError(t, es.gatherClusterStats("junk", &acc))
|
||||||
|
|
||||||
|
tags := map[string]string{
|
||||||
|
"cluster_name": "es-testcluster",
|
||||||
|
"node_name": "test.host.com",
|
||||||
|
"status": "red",
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_nodes", clusterstatsNodesExpected, tags)
|
||||||
|
acc.AssertContainsTaggedFields(t, "elasticsearch_clusterstats_indices", clusterstatsIndicesExpected, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherClusterStatsNonMaster(t *testing.T) {
|
||||||
|
// This needs multiple steps to replicate the multiple calls internally.
|
||||||
|
es := newElasticsearchWithClient()
|
||||||
|
es.ClusterStats = true
|
||||||
|
es.Servers = []string{"http://example.com:9200"}
|
||||||
|
|
||||||
|
// first get catMaster
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult)
|
||||||
|
require.NoError(t, es.setCatMaster("junk"))
|
||||||
|
|
||||||
|
IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ")
|
||||||
|
if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] {
|
||||||
|
msg := fmt.Sprintf("catmaster is incorrect")
|
||||||
|
assert.Fail(t, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now get node status, which determines whether we're master
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
es.Local = true
|
||||||
|
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
|
||||||
|
if err := es.gatherNodeStats("junk", &acc); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure flag is clear so Cluster Stats would not be done
|
||||||
|
checkIsMaster(es, false, t)
|
||||||
|
checkNodeStatsResult(t, &acc)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func newElasticsearchWithClient() *Elasticsearch {
|
func newElasticsearchWithClient() *Elasticsearch {
|
||||||
es := NewElasticsearch()
|
es := NewElasticsearch()
|
||||||
es.client = &http.Client{}
|
es.client = &http.Client{}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package elasticsearch
|
package elasticsearch
|
||||||
|
|
||||||
const clusterResponse = `
|
const clusterHealthResponse = `
|
||||||
{
|
{
|
||||||
"cluster_name": "elasticsearch_telegraf",
|
"cluster_name": "elasticsearch_telegraf",
|
||||||
"status": "green",
|
"status": "green",
|
||||||
|
@ -71,7 +71,7 @@ var v2IndexExpected = map[string]interface{}{
|
||||||
"unassigned_shards": 20,
|
"unassigned_shards": 20,
|
||||||
}
|
}
|
||||||
|
|
||||||
const statsResponse = `
|
const nodeStatsResponse = `
|
||||||
{
|
{
|
||||||
"cluster_name": "es-testcluster",
|
"cluster_name": "es-testcluster",
|
||||||
"nodes": {
|
"nodes": {
|
||||||
|
@ -489,7 +489,7 @@ const statsResponse = `
|
||||||
}
|
}
|
||||||
`
|
`
|
||||||
|
|
||||||
var indicesExpected = map[string]interface{}{
|
var nodestatsIndicesExpected = map[string]interface{}{
|
||||||
"id_cache_memory_size_in_bytes": float64(0),
|
"id_cache_memory_size_in_bytes": float64(0),
|
||||||
"completion_size_in_bytes": float64(0),
|
"completion_size_in_bytes": float64(0),
|
||||||
"suggest_total": float64(0),
|
"suggest_total": float64(0),
|
||||||
|
@ -561,7 +561,7 @@ var indicesExpected = map[string]interface{}{
|
||||||
"segments_fixed_bit_set_memory_in_bytes": float64(0),
|
"segments_fixed_bit_set_memory_in_bytes": float64(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
var osExpected = map[string]interface{}{
|
var nodestatsOsExpected = map[string]interface{}{
|
||||||
"load_average_0": float64(0.01),
|
"load_average_0": float64(0.01),
|
||||||
"load_average_1": float64(0.04),
|
"load_average_1": float64(0.04),
|
||||||
"load_average_2": float64(0.05),
|
"load_average_2": float64(0.05),
|
||||||
|
@ -576,7 +576,7 @@ var osExpected = map[string]interface{}{
|
||||||
"mem_used_in_bytes": float64(1621868544),
|
"mem_used_in_bytes": float64(1621868544),
|
||||||
}
|
}
|
||||||
|
|
||||||
var processExpected = map[string]interface{}{
|
var nodestatsProcessExpected = map[string]interface{}{
|
||||||
"mem_total_virtual_in_bytes": float64(4747890688),
|
"mem_total_virtual_in_bytes": float64(4747890688),
|
||||||
"timestamp": float64(1436460392945),
|
"timestamp": float64(1436460392945),
|
||||||
"open_file_descriptors": float64(160),
|
"open_file_descriptors": float64(160),
|
||||||
|
@ -586,7 +586,7 @@ var processExpected = map[string]interface{}{
|
||||||
"cpu_user_in_millis": float64(13610),
|
"cpu_user_in_millis": float64(13610),
|
||||||
}
|
}
|
||||||
|
|
||||||
var jvmExpected = map[string]interface{}{
|
var nodestatsJvmExpected = map[string]interface{}{
|
||||||
"timestamp": float64(1436460392945),
|
"timestamp": float64(1436460392945),
|
||||||
"uptime_in_millis": float64(202245),
|
"uptime_in_millis": float64(202245),
|
||||||
"mem_non_heap_used_in_bytes": float64(39634576),
|
"mem_non_heap_used_in_bytes": float64(39634576),
|
||||||
|
@ -621,7 +621,7 @@ var jvmExpected = map[string]interface{}{
|
||||||
"buffer_pools_mapped_total_capacity_in_bytes": float64(0),
|
"buffer_pools_mapped_total_capacity_in_bytes": float64(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
var threadPoolExpected = map[string]interface{}{
|
var nodestatsThreadPoolExpected = map[string]interface{}{
|
||||||
"merge_threads": float64(6),
|
"merge_threads": float64(6),
|
||||||
"merge_queue": float64(4),
|
"merge_queue": float64(4),
|
||||||
"merge_active": float64(5),
|
"merge_active": float64(5),
|
||||||
|
@ -726,7 +726,7 @@ var threadPoolExpected = map[string]interface{}{
|
||||||
"flush_completed": float64(3),
|
"flush_completed": float64(3),
|
||||||
}
|
}
|
||||||
|
|
||||||
var fsExpected = map[string]interface{}{
|
var nodestatsFsExpected = map[string]interface{}{
|
||||||
"data_0_total_in_bytes": float64(19507089408),
|
"data_0_total_in_bytes": float64(19507089408),
|
||||||
"data_0_free_in_bytes": float64(16909316096),
|
"data_0_free_in_bytes": float64(16909316096),
|
||||||
"data_0_available_in_bytes": float64(15894814720),
|
"data_0_available_in_bytes": float64(15894814720),
|
||||||
|
@ -736,7 +736,7 @@ var fsExpected = map[string]interface{}{
|
||||||
"total_total_in_bytes": float64(19507089408),
|
"total_total_in_bytes": float64(19507089408),
|
||||||
}
|
}
|
||||||
|
|
||||||
var transportExpected = map[string]interface{}{
|
var nodestatsTransportExpected = map[string]interface{}{
|
||||||
"server_open": float64(13),
|
"server_open": float64(13),
|
||||||
"rx_count": float64(6),
|
"rx_count": float64(6),
|
||||||
"rx_size_in_bytes": float64(1380),
|
"rx_size_in_bytes": float64(1380),
|
||||||
|
@ -744,12 +744,12 @@ var transportExpected = map[string]interface{}{
|
||||||
"tx_size_in_bytes": float64(1380),
|
"tx_size_in_bytes": float64(1380),
|
||||||
}
|
}
|
||||||
|
|
||||||
var httpExpected = map[string]interface{}{
|
var nodestatsHttpExpected = map[string]interface{}{
|
||||||
"current_open": float64(3),
|
"current_open": float64(3),
|
||||||
"total_opened": float64(3),
|
"total_opened": float64(3),
|
||||||
}
|
}
|
||||||
|
|
||||||
var breakersExpected = map[string]interface{}{
|
var nodestatsBreakersExpected = map[string]interface{}{
|
||||||
"fielddata_estimated_size_in_bytes": float64(0),
|
"fielddata_estimated_size_in_bytes": float64(0),
|
||||||
"fielddata_overhead": float64(1.03),
|
"fielddata_overhead": float64(1.03),
|
||||||
"fielddata_tripped": float64(0),
|
"fielddata_tripped": float64(0),
|
||||||
|
@ -763,3 +763,273 @@ var breakersExpected = map[string]interface{}{
|
||||||
"parent_limit_size_in_bytes": float64(727213670),
|
"parent_limit_size_in_bytes": float64(727213670),
|
||||||
"parent_estimated_size_in_bytes": float64(0),
|
"parent_estimated_size_in_bytes": float64(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const clusterStatsResponse = `
|
||||||
|
{
|
||||||
|
"host":"ip-10-0-1-214",
|
||||||
|
"log_type":"metrics",
|
||||||
|
"timestamp":1475767451229,
|
||||||
|
"log_level":"INFO",
|
||||||
|
"node_name":"test.host.com",
|
||||||
|
"cluster_name":"es-testcluster",
|
||||||
|
"status":"red",
|
||||||
|
"indices":{
|
||||||
|
"count":1,
|
||||||
|
"shards":{
|
||||||
|
"total":4,
|
||||||
|
"primaries":4,
|
||||||
|
"replication":0.0,
|
||||||
|
"index":{
|
||||||
|
"shards":{
|
||||||
|
"min":4,
|
||||||
|
"max":4,
|
||||||
|
"avg":4.0
|
||||||
|
},
|
||||||
|
"primaries":{
|
||||||
|
"min":4,
|
||||||
|
"max":4,
|
||||||
|
"avg":4.0
|
||||||
|
},
|
||||||
|
"replication":{
|
||||||
|
"min":0.0,
|
||||||
|
"max":0.0,
|
||||||
|
"avg":0.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"docs":{
|
||||||
|
"count":4,
|
||||||
|
"deleted":0
|
||||||
|
},
|
||||||
|
"store":{
|
||||||
|
"size_in_bytes":17084,
|
||||||
|
"throttle_time_in_millis":0
|
||||||
|
},
|
||||||
|
"fielddata":{
|
||||||
|
"memory_size_in_bytes":0,
|
||||||
|
"evictions":0
|
||||||
|
},
|
||||||
|
"query_cache":{
|
||||||
|
"memory_size_in_bytes":0,
|
||||||
|
"total_count":0,
|
||||||
|
"hit_count":0,
|
||||||
|
"miss_count":0,
|
||||||
|
"cache_size":0,
|
||||||
|
"cache_count":0,
|
||||||
|
"evictions":0
|
||||||
|
},
|
||||||
|
"completion":{
|
||||||
|
"size_in_bytes":0
|
||||||
|
},
|
||||||
|
"segments":{
|
||||||
|
"count":4,
|
||||||
|
"memory_in_bytes":11828,
|
||||||
|
"terms_memory_in_bytes":8932,
|
||||||
|
"stored_fields_memory_in_bytes":1248,
|
||||||
|
"term_vectors_memory_in_bytes":0,
|
||||||
|
"norms_memory_in_bytes":1280,
|
||||||
|
"doc_values_memory_in_bytes":368,
|
||||||
|
"index_writer_memory_in_bytes":0,
|
||||||
|
"index_writer_max_memory_in_bytes":2048000,
|
||||||
|
"version_map_memory_in_bytes":0,
|
||||||
|
"fixed_bit_set_memory_in_bytes":0
|
||||||
|
},
|
||||||
|
"percolate":{
|
||||||
|
"total":0,
|
||||||
|
"time_in_millis":0,
|
||||||
|
"current":0,
|
||||||
|
"memory_size_in_bytes":-1,
|
||||||
|
"memory_size":"-1b",
|
||||||
|
"queries":0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nodes":{
|
||||||
|
"count":{
|
||||||
|
"total":1,
|
||||||
|
"master_only":0,
|
||||||
|
"data_only":0,
|
||||||
|
"master_data":1,
|
||||||
|
"client":0
|
||||||
|
},
|
||||||
|
"versions":[
|
||||||
|
{
|
||||||
|
"version": "2.3.3"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"os":{
|
||||||
|
"available_processors":1,
|
||||||
|
"allocated_processors":1,
|
||||||
|
"mem":{
|
||||||
|
"total_in_bytes":593301504
|
||||||
|
},
|
||||||
|
"names":[
|
||||||
|
{
|
||||||
|
"name":"Linux",
|
||||||
|
"count":1
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"process":{
|
||||||
|
"cpu":{
|
||||||
|
"percent":0
|
||||||
|
},
|
||||||
|
"open_file_descriptors":{
|
||||||
|
"min":145,
|
||||||
|
"max":145,
|
||||||
|
"avg":145
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"jvm":{
|
||||||
|
"max_uptime_in_millis":11580527,
|
||||||
|
"versions":[
|
||||||
|
{
|
||||||
|
"version":"1.8.0_101",
|
||||||
|
"vm_name":"OpenJDK 64-Bit Server VM",
|
||||||
|
"vm_version":"25.101-b13",
|
||||||
|
"vm_vendor":"Oracle Corporation",
|
||||||
|
"count":1
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"mem":{
|
||||||
|
"heap_used_in_bytes":70550288,
|
||||||
|
"heap_max_in_bytes":1065025536
|
||||||
|
},
|
||||||
|
"threads":30
|
||||||
|
},
|
||||||
|
"fs":{
|
||||||
|
"total_in_bytes":8318783488,
|
||||||
|
"free_in_bytes":6447439872,
|
||||||
|
"available_in_bytes":6344785920
|
||||||
|
},
|
||||||
|
"plugins":[
|
||||||
|
{
|
||||||
|
"name":"cloud-aws",
|
||||||
|
"version":"2.3.3",
|
||||||
|
"description":"The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
|
||||||
|
"jvm":true,
|
||||||
|
"classname":"org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
|
||||||
|
"isolated":true,
|
||||||
|
"site":false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"kopf",
|
||||||
|
"version":"2.0.1",
|
||||||
|
"description":"kopf - simple web administration tool for Elasticsearch",
|
||||||
|
"url":"/_plugin/kopf/",
|
||||||
|
"jvm":false,
|
||||||
|
"site":true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"tr-metrics",
|
||||||
|
"version":"7bd5b4b",
|
||||||
|
"description":"Logs cluster and node stats for performance monitoring.",
|
||||||
|
"jvm":true,
|
||||||
|
"classname":"com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
|
||||||
|
"isolated":true,
|
||||||
|
"site":false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
var clusterstatsIndicesExpected = map[string]interface{}{
|
||||||
|
"completion_size_in_bytes": float64(0),
|
||||||
|
"count": float64(1),
|
||||||
|
"docs_count": float64(4),
|
||||||
|
"docs_deleted": float64(0),
|
||||||
|
"fielddata_evictions": float64(0),
|
||||||
|
"fielddata_memory_size_in_bytes": float64(0),
|
||||||
|
"percolate_current": float64(0),
|
||||||
|
"percolate_memory_size_in_bytes": float64(-1),
|
||||||
|
"percolate_queries": float64(0),
|
||||||
|
"percolate_time_in_millis": float64(0),
|
||||||
|
"percolate_total": float64(0),
|
||||||
|
"percolate_memory_size": "-1b",
|
||||||
|
"query_cache_cache_count": float64(0),
|
||||||
|
"query_cache_cache_size": float64(0),
|
||||||
|
"query_cache_evictions": float64(0),
|
||||||
|
"query_cache_hit_count": float64(0),
|
||||||
|
"query_cache_memory_size_in_bytes": float64(0),
|
||||||
|
"query_cache_miss_count": float64(0),
|
||||||
|
"query_cache_total_count": float64(0),
|
||||||
|
"segments_count": float64(4),
|
||||||
|
"segments_doc_values_memory_in_bytes": float64(368),
|
||||||
|
"segments_fixed_bit_set_memory_in_bytes": float64(0),
|
||||||
|
"segments_index_writer_max_memory_in_bytes": float64(2.048e+06),
|
||||||
|
"segments_index_writer_memory_in_bytes": float64(0),
|
||||||
|
"segments_memory_in_bytes": float64(11828),
|
||||||
|
"segments_norms_memory_in_bytes": float64(1280),
|
||||||
|
"segments_stored_fields_memory_in_bytes": float64(1248),
|
||||||
|
"segments_term_vectors_memory_in_bytes": float64(0),
|
||||||
|
"segments_terms_memory_in_bytes": float64(8932),
|
||||||
|
"segments_version_map_memory_in_bytes": float64(0),
|
||||||
|
"shards_index_primaries_avg": float64(4),
|
||||||
|
"shards_index_primaries_max": float64(4),
|
||||||
|
"shards_index_primaries_min": float64(4),
|
||||||
|
"shards_index_replication_avg": float64(0),
|
||||||
|
"shards_index_replication_max": float64(0),
|
||||||
|
"shards_index_replication_min": float64(0),
|
||||||
|
"shards_index_shards_avg": float64(4),
|
||||||
|
"shards_index_shards_max": float64(4),
|
||||||
|
"shards_index_shards_min": float64(4),
|
||||||
|
"shards_primaries": float64(4),
|
||||||
|
"shards_replication": float64(0),
|
||||||
|
"shards_total": float64(4),
|
||||||
|
"store_size_in_bytes": float64(17084),
|
||||||
|
"store_throttle_time_in_millis": float64(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
var clusterstatsNodesExpected = map[string]interface{}{
|
||||||
|
"count_client": float64(0),
|
||||||
|
"count_data_only": float64(0),
|
||||||
|
"count_master_data": float64(1),
|
||||||
|
"count_master_only": float64(0),
|
||||||
|
"count_total": float64(1),
|
||||||
|
"fs_available_in_bytes": float64(6.34478592e+09),
|
||||||
|
"fs_free_in_bytes": float64(6.447439872e+09),
|
||||||
|
"fs_total_in_bytes": float64(8.318783488e+09),
|
||||||
|
"jvm_max_uptime_in_millis": float64(1.1580527e+07),
|
||||||
|
"jvm_mem_heap_max_in_bytes": float64(1.065025536e+09),
|
||||||
|
"jvm_mem_heap_used_in_bytes": float64(7.0550288e+07),
|
||||||
|
"jvm_threads": float64(30),
|
||||||
|
"jvm_versions_0_count": float64(1),
|
||||||
|
"jvm_versions_0_version": "1.8.0_101",
|
||||||
|
"jvm_versions_0_vm_name": "OpenJDK 64-Bit Server VM",
|
||||||
|
"jvm_versions_0_vm_vendor": "Oracle Corporation",
|
||||||
|
"jvm_versions_0_vm_version": "25.101-b13",
|
||||||
|
"os_allocated_processors": float64(1),
|
||||||
|
"os_available_processors": float64(1),
|
||||||
|
"os_mem_total_in_bytes": float64(5.93301504e+08),
|
||||||
|
"os_names_0_count": float64(1),
|
||||||
|
"os_names_0_name": "Linux",
|
||||||
|
"process_cpu_percent": float64(0),
|
||||||
|
"process_open_file_descriptors_avg": float64(145),
|
||||||
|
"process_open_file_descriptors_max": float64(145),
|
||||||
|
"process_open_file_descriptors_min": float64(145),
|
||||||
|
"versions_0_version": "2.3.3",
|
||||||
|
"plugins_0_classname": "org.elasticsearch.plugin.cloud.aws.CloudAwsPlugin",
|
||||||
|
"plugins_0_description": "The Amazon Web Service (AWS) Cloud plugin allows to use AWS API for the unicast discovery mechanism and add S3 repositories.",
|
||||||
|
"plugins_0_isolated": true,
|
||||||
|
"plugins_0_jvm": true,
|
||||||
|
"plugins_0_name": "cloud-aws",
|
||||||
|
"plugins_0_site": false,
|
||||||
|
"plugins_0_version": "2.3.3",
|
||||||
|
"plugins_1_description": "kopf - simple web administration tool for Elasticsearch",
|
||||||
|
"plugins_1_jvm": false,
|
||||||
|
"plugins_1_name": "kopf",
|
||||||
|
"plugins_1_site": true,
|
||||||
|
"plugins_1_url": "/_plugin/kopf/",
|
||||||
|
"plugins_1_version": "2.0.1",
|
||||||
|
"plugins_2_classname": "com.trgr.elasticsearch.plugin.metrics.MetricsPlugin",
|
||||||
|
"plugins_2_description": "Logs cluster and node stats for performance monitoring.",
|
||||||
|
"plugins_2_isolated": true,
|
||||||
|
"plugins_2_jvm": true,
|
||||||
|
"plugins_2_name": "tr-metrics",
|
||||||
|
"plugins_2_site": false,
|
||||||
|
"plugins_2_version": "7bd5b4b",
|
||||||
|
}
|
||||||
|
|
||||||
|
const IsMasterResult = "SDFsfSDFsdfFSDSDfSFDSDF 10.206.124.66 10.206.124.66 test.host.com "
|
||||||
|
|
||||||
|
const IsNotMasterResult = "junk 10.206.124.66 10.206.124.66 test.junk.com "
|
||||||
|
|
|
@ -103,10 +103,22 @@ type JSONFlattener struct {
|
||||||
Fields map[string]interface{}
|
Fields map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FlattenJSON flattens nested maps/interfaces into a fields map
|
// FlattenJSON flattens nested maps/interfaces into a fields map (ignoring bools and string)
|
||||||
func (f *JSONFlattener) FlattenJSON(
|
func (f *JSONFlattener) FlattenJSON(
|
||||||
|
fieldname string,
|
||||||
|
v interface{}) error {
|
||||||
|
if f.Fields == nil {
|
||||||
|
f.Fields = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
return f.FullFlattenJSON(fieldname, v, false, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FullFlattenJSON flattens nested maps/interfaces into a fields map (including bools and string)
|
||||||
|
func (f *JSONFlattener) FullFlattenJSON(
|
||||||
fieldname string,
|
fieldname string,
|
||||||
v interface{},
|
v interface{},
|
||||||
|
convertString bool,
|
||||||
|
convertBool bool,
|
||||||
) error {
|
) error {
|
||||||
if f.Fields == nil {
|
if f.Fields == nil {
|
||||||
f.Fields = make(map[string]interface{})
|
f.Fields = make(map[string]interface{})
|
||||||
|
@ -115,7 +127,7 @@ func (f *JSONFlattener) FlattenJSON(
|
||||||
switch t := v.(type) {
|
switch t := v.(type) {
|
||||||
case map[string]interface{}:
|
case map[string]interface{}:
|
||||||
for k, v := range t {
|
for k, v := range t {
|
||||||
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
|
err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,15 +135,28 @@ func (f *JSONFlattener) FlattenJSON(
|
||||||
case []interface{}:
|
case []interface{}:
|
||||||
for i, v := range t {
|
for i, v := range t {
|
||||||
k := strconv.Itoa(i)
|
k := strconv.Itoa(i)
|
||||||
err := f.FlattenJSON(fieldname+"_"+k+"_", v)
|
err := f.FullFlattenJSON(fieldname+"_"+k+"_", v, convertString, convertBool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case float64:
|
case float64:
|
||||||
f.Fields[fieldname] = t
|
f.Fields[fieldname] = t
|
||||||
case bool, string, nil:
|
case string:
|
||||||
|
if convertString {
|
||||||
|
f.Fields[fieldname] = v.(string)
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case bool:
|
||||||
|
if convertBool {
|
||||||
|
f.Fields[fieldname] = v.(bool)
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case nil:
|
||||||
// ignored types
|
// ignored types
|
||||||
|
fmt.Println("json parser ignoring " + fieldname)
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
|
return fmt.Errorf("JSON Flattener: got unexpected type %T with value %v (%s)",
|
||||||
|
|
Loading…
Reference in New Issue