Changes to make it work with basic authentication (Shield) and older versions of ES ( for eg 0.19.9) .Changes to query Nodestats api endpoint by statistics vs the default of all. (Older versions need all=true speicified explicitly).

This commit is contained in:
snair 2016-10-28 14:39:20 -07:00
parent fc59757a1a
commit 3b3008ebd2
2 changed files with 91 additions and 40 deletions

View File

@ -768,6 +768,12 @@
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
# ## Optional for authentication setup with Shield
# # username = "admin"
# # password = "admin123"
## ## cluster node stats API . Defaults to all of the options below
# #stats = ["indices", "os", "process", "jvm", "thread_pool","fs","transport","http","breakers"]
# # Read metrics from one or more commands that can output to stdout

View File

@ -78,6 +78,14 @@ const sampleConfig = `
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
### Optional for authentication setup with Shield
# username = "admin"
# password = "admin123"
#### cluster node stats API . Defaults to all of the options below
# stats = ["indices", "os", "process", "jvm", "thread_pool","fs","transport","http","breakers"]
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
@ -85,6 +93,7 @@ const sampleConfig = `
type Elasticsearch struct {
Local bool
Servers []string
Stats []string
HttpTimeout internal.Duration
ClusterHealth bool
SSLCA string `toml:"ssl_ca"` // Path to CA file
@ -92,6 +101,8 @@ type Elasticsearch struct {
SSLKey string `toml:"ssl_key"` // Path to cert key file
InsecureSkipVerify bool // Use SSL but skip chain & host verification
client *http.Client
Username string
Password string
}
// NewElasticsearch return a new instance of Elasticsearch
@ -122,7 +133,6 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
e.client = client
}
errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))
@ -131,6 +141,9 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
var url string
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
}
if e.Local {
url = s + statsPathLocal
} else {
@ -140,9 +153,6 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan.C <- err
return
}
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
}
}(serv, acc)
}
@ -159,6 +169,7 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
ResponseHeaderTimeout: e.HttpTimeout.Duration,
TLSClientConfig: tlsCfg,
}
client := &http.Client{
Transport: tr,
Timeout: e.HttpTimeout.Duration,
@ -168,45 +179,75 @@ func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
}
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
}{}
if err := e.gatherData(url, nodeStats); err != nil {
return err
var iurl string
if len(e.Stats) == 0 {
e.Stats = []string{"indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http", "breakers"}
}
for id, n := range nodeStats.Nodes {
tags := map[string]string{
"node_id": id,
"node_host": n.Host,
"node_name": n.Name,
"cluster_name": nodeStats.ClusterName,
for _, s := range e.Stats {
iurl = url
switch s {
case "indices":
iurl = iurl + "/indices"
case "os":
iurl = iurl + "/os"
case "process":
iurl = iurl + "/process"
case "jvm":
iurl = iurl + "/jvm"
case "thread_pool":
iurl = iurl + "/thread_pool"
case "fs":
iurl = iurl + "/fs"
case "transport":
iurl = iurl + "/transport"
case "http":
iurl = iurl + "/http"
case "breakers":
iurl = iurl + "/breakers"
default:
return fmt.Errorf("elasticsearch: No matching NodeStats endpoint found in %s ", s)
}
for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
}{}
if err := e.gatherData(iurl, nodeStats); err != nil {
return err
}
stats := map[string]interface{}{
"indices": n.Indices,
"os": n.OS,
"process": n.Process,
"jvm": n.JVM,
"thread_pool": n.ThreadPool,
"fs": n.FS,
"transport": n.Transport,
"http": n.HTTP,
"breakers": n.Breakers,
}
now := time.Now()
for p, s := range stats {
f := jsonparser.JSONFlattener{}
err := f.FlattenJSON("", s)
if err != nil {
return err
for id, n := range nodeStats.Nodes {
tags := map[string]string{
"node_id": id,
"node_host": n.Host,
"node_name": n.Name,
"cluster_name": nodeStats.ClusterName,
}
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
stats := map[string]interface{}{
"indices": n.Indices,
"os": n.OS,
"process": n.Process,
"jvm": n.JVM,
"thread_pool": n.ThreadPool,
"fs": n.FS,
"transport": n.Transport,
"http": n.HTTP,
"breakers": n.Breakers,
}
now := time.Now()
for p, s := range stats {
f := jsonparser.JSONFlattener{}
err := f.FlattenJSON("", s)
if err != nil {
return err
}
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
}
}
}
return nil
@ -258,7 +299,11 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
}
func (e *Elasticsearch) gatherData(url string, v interface{}) error {
r, err := e.client.Get(url)
req, err := http.NewRequest("GET", url, nil)
if len(e.Username) > 0 && len(e.Password) > 0 {
req.SetBasicAuth(e.Username, e.Password)
}
r, err := e.client.Do(req)
if err != nil {
return err
}