diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 2ad0bcbae..207b2288a 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -768,6 +768,12 @@ # # ssl_key = "/etc/telegraf/key.pem" # ## Use SSL but skip chain & host verification # # insecure_skip_verify = false +# ## Optional for authentication setup with Shield +# # username = "admin" +# # password = "admin123" + +## ## cluster node stats API . Defaults to all of the options below +# #stats = ["indices", "os", "process", "jvm", "thread_pool","fs","transport","http","breakers"] # # Read metrics from one or more commands that can output to stdout diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 896e03f2e..596529806 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -78,6 +78,14 @@ const sampleConfig = ` # ssl_key = "/etc/telegraf/key.pem" ## Use SSL but skip chain & host verification # insecure_skip_verify = false + + ### Optional for authentication setup with Shield + # username = "admin" + # password = "admin123" + + #### cluster node stats API . Defaults to all of the options below + # stats = ["indices", "os", "process", "jvm", "thread_pool","fs","transport","http","breakers"] + ` // Elasticsearch is a plugin to read stats from one or many Elasticsearch @@ -85,6 +93,7 @@ const sampleConfig = ` type Elasticsearch struct { Local bool Servers []string + Stats []string HttpTimeout internal.Duration ClusterHealth bool SSLCA string `toml:"ssl_ca"` // Path to CA file @@ -92,6 +101,8 @@ type Elasticsearch struct { SSLKey string `toml:"ssl_key"` // Path to cert key file InsecureSkipVerify bool // Use SSL but skip chain & host verification client *http.Client + Username string + Password string } // NewElasticsearch return a new instance of Elasticsearch @@ -122,7 +133,6 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } e.client = client } - errChan := errchan.New(len(e.Servers)) var wg sync.WaitGroup wg.Add(len(e.Servers)) @@ -131,6 +141,9 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { go func(s string, acc telegraf.Accumulator) { defer wg.Done() var url string + if e.ClusterHealth { + e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc) + } if e.Local { url = s + statsPathLocal } else { @@ -140,9 +153,6 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { errChan.C <- err return } - if e.ClusterHealth { - e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc) - } }(serv, acc) } @@ -159,6 +169,7 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) { ResponseHeaderTimeout: e.HttpTimeout.Duration, TLSClientConfig: tlsCfg, } + client := &http.Client{ Transport: tr, Timeout: e.HttpTimeout.Duration, @@ -168,45 +179,75 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) { } func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error { - nodeStats := &struct { - ClusterName string `json:"cluster_name"` - Nodes map[string]*node `json:"nodes"` - }{} - if err := e.gatherData(url, nodeStats); err != nil { - return err + var iurl string + if len(e.Stats) == 0 { + e.Stats = []string{"indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http", "breakers"} } - for id, n := range nodeStats.Nodes { - tags := map[string]string{ - "node_id": id, - "node_host": n.Host, - "node_name": n.Name, - "cluster_name": nodeStats.ClusterName, + for _, s := range e.Stats { + iurl = url + switch s { + case "indices": + iurl = iurl + "/indices" + case "os": + iurl = iurl + "/os" + case "process": + iurl = iurl + "/process" + case "jvm": + iurl = iurl + "/jvm" + case "thread_pool": + iurl = iurl + "/thread_pool" + case "fs": + iurl = iurl + "/fs" + case "transport": + iurl = iurl + "/transport" + case "http": + iurl = iurl + "/http" + case "breakers": + iurl = iurl + "/breakers" + default: + return fmt.Errorf("elasticsearch: No matching NodeStats endpoint found in %s ", s) } - - for k, v := range n.Attributes { - tags["node_attribute_"+k] = v + nodeStats := &struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]*node `json:"nodes"` + }{} + if err := e.gatherData(iurl, nodeStats); err != nil { + return err } - - stats := map[string]interface{}{ - "indices": n.Indices, - "os": n.OS, - "process": n.Process, - "jvm": n.JVM, - "thread_pool": n.ThreadPool, - "fs": n.FS, - "transport": n.Transport, - "http": n.HTTP, - "breakers": n.Breakers, - } - - now := time.Now() - for p, s := range stats { - f := jsonparser.JSONFlattener{} - err := f.FlattenJSON("", s) - if err != nil { - return err + for id, n := range nodeStats.Nodes { + tags := map[string]string{ + "node_id": id, + "node_host": n.Host, + "node_name": n.Name, + "cluster_name": nodeStats.ClusterName, } - acc.AddFields("elasticsearch_"+p, f.Fields, tags, now) + + for k, v := range n.Attributes { + tags["node_attribute_"+k] = v + } + + stats := map[string]interface{}{ + "indices": n.Indices, + "os": n.OS, + "process": n.Process, + "jvm": n.JVM, + "thread_pool": n.ThreadPool, + "fs": n.FS, + "transport": n.Transport, + "http": n.HTTP, + "breakers": n.Breakers, + } + + now := time.Now() + for p, s := range stats { + f := jsonparser.JSONFlattener{} + err := f.FlattenJSON("", s) + if err != nil { + return err + } + acc.AddFields("elasticsearch_"+p, f.Fields, tags, now) + } + } } return nil @@ -258,7 +299,11 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) } func (e *Elasticsearch) gatherData(url string, v interface{}) error { - r, err := e.client.Get(url) + req, err := http.NewRequest("GET", url, nil) + if len(e.Username) > 0 && len(e.Password) > 0 { + req.SetBasicAuth(e.Username, e.Password) + } + r, err := e.client.Do(req) if err != nil { return err }