2015-07-08 19:07:39 +00:00
package elasticsearch
import (
"encoding/json"
"fmt"
2016-01-27 21:21:36 +00:00
"github.com/influxdata/telegraf"
2016-06-22 15:23:49 +00:00
"github.com/influxdata/telegraf/internal"
2016-01-20 18:57:35 +00:00
"github.com/influxdata/telegraf/plugins/inputs"
2016-02-06 00:36:35 +00:00
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
2016-12-20 16:30:03 +00:00
"io/ioutil"
2017-10-04 22:29:32 +00:00
"net/http"
"regexp"
2016-12-20 16:30:03 +00:00
"strings"
2017-10-04 22:29:32 +00:00
"sync"
"time"
2015-07-08 19:07:39 +00:00
)
2016-12-20 19:32:04 +00:00
// mask for masking username/password from error messages
var mask = regexp . MustCompile ( ` https?:\/\/\S+:\S+@ ` )
2016-12-20 16:30:03 +00:00
// Nodestats are always generated, so simply define a constant for these endpoints
2015-07-09 19:02:19 +00:00
const statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats"
2015-07-08 19:07:39 +00:00
2016-12-20 16:30:03 +00:00
type nodeStat struct {
2015-07-09 17:51:12 +00:00
Host string ` json:"host" `
Name string ` json:"name" `
Attributes map [ string ] string ` json:"attributes" `
Indices interface { } ` json:"indices" `
2015-07-09 18:58:54 +00:00
OS interface { } ` json:"os" `
2015-07-09 18:06:30 +00:00
Process interface { } ` json:"process" `
2015-07-09 18:11:46 +00:00
JVM interface { } ` json:"jvm" `
2015-07-09 18:18:24 +00:00
ThreadPool interface { } ` json:"thread_pool" `
2015-07-09 18:32:56 +00:00
FS interface { } ` json:"fs" `
2015-07-09 18:36:22 +00:00
Transport interface { } ` json:"transport" `
2015-07-09 18:38:51 +00:00
HTTP interface { } ` json:"http" `
2015-07-09 18:43:52 +00:00
Breakers interface { } ` json:"breakers" `
2015-07-08 19:07:39 +00:00
}
2015-10-28 05:31:25 +00:00
type clusterHealth struct {
ClusterName string ` json:"cluster_name" `
Status string ` json:"status" `
TimedOut bool ` json:"timed_out" `
NumberOfNodes int ` json:"number_of_nodes" `
NumberOfDataNodes int ` json:"number_of_data_nodes" `
ActivePrimaryShards int ` json:"active_primary_shards" `
ActiveShards int ` json:"active_shards" `
RelocatingShards int ` json:"relocating_shards" `
InitializingShards int ` json:"initializing_shards" `
UnassignedShards int ` json:"unassigned_shards" `
Indices map [ string ] indexHealth ` json:"indices" `
}
type indexHealth struct {
Status string ` json:"status" `
NumberOfShards int ` json:"number_of_shards" `
NumberOfReplicas int ` json:"number_of_replicas" `
ActivePrimaryShards int ` json:"active_primary_shards" `
ActiveShards int ` json:"active_shards" `
RelocatingShards int ` json:"relocating_shards" `
InitializingShards int ` json:"initializing_shards" `
UnassignedShards int ` json:"unassigned_shards" `
}
2016-12-20 16:30:03 +00:00
type clusterStats struct {
NodeName string ` json:"node_name" `
ClusterName string ` json:"cluster_name" `
Status string ` json:"status" `
Indices interface { } ` json:"indices" `
Nodes interface { } ` json:"nodes" `
}
type catMaster struct {
NodeID string ` json:"id" `
NodeIP string ` json:"ip" `
NodeName string ` json:"node" `
}
2015-07-08 19:07:39 +00:00
const sampleConfig = `
2016-02-18 21:26:51 +00:00
# # specify a list of one or more Elasticsearch servers
2016-11-23 09:51:14 +00:00
# you can add username and password to your url to use basic authentication :
# servers = [ "http://user:pass@localhost:9200" ]
2015-10-15 21:53:29 +00:00
servers = [ "http://localhost:9200" ]
2015-07-09 18:53:54 +00:00
2016-08-29 09:39:32 +00:00
# # Timeout for HTTP requests to the elastic search server ( s )
http_timeout = "5s"
2016-12-20 16:30:03 +00:00
# # When local is true ( the default ) , the node will read only its own stats .
# # Set local to false when you want to read the node stats from all nodes
# # of the cluster .
2015-10-15 21:53:29 +00:00
local = true
2015-10-28 05:31:25 +00:00
2016-12-20 16:30:03 +00:00
# # Set cluster_health to true when you want to also obtain cluster health stats
2015-10-28 05:31:25 +00:00
cluster_health = false
2016-06-22 15:23:49 +00:00
2017-10-04 22:29:32 +00:00
# # Adjust cluster_health_level when you want to also obtain detailed health stats
# # The options are
# # - indices ( default )
# # - cluster
# cluster_health_level = "indices"
2016-12-20 16:30:03 +00:00
# # Set cluster_stats to true when you want to also obtain cluster stats from the
# # Master node .
cluster_stats = false
2017-10-06 23:16:32 +00:00
# # node_stats is a list of sub - stats that you want to have gathered . Valid options
# # are "indices" , "os" , "process" , "jvm" , "thread_pool" , "fs" , "transport" , "http" ,
# # "breakers" . Per default , all stats are gathered .
# node_stats = [ "jvm" , "http" ]
2016-06-22 15:23:49 +00:00
# # Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
# # Use SSL but skip chain & host verification
# insecure_skip_verify = false
2015-07-08 19:07:39 +00:00
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
2016-12-20 16:30:03 +00:00
Local bool
Servers [ ] string
HttpTimeout internal . Duration
ClusterHealth bool
2017-10-04 22:29:32 +00:00
ClusterHealthLevel string
2016-12-20 16:30:03 +00:00
ClusterStats bool
2017-10-06 23:16:32 +00:00
NodeStats [ ] string
2016-12-20 16:30:03 +00:00
SSLCA string ` toml:"ssl_ca" ` // Path to CA file
SSLCert string ` toml:"ssl_cert" ` // Path to host cert file
SSLKey string ` toml:"ssl_key" ` // Path to cert key file
InsecureSkipVerify bool // Use SSL but skip chain & host verification
client * http . Client
catMasterResponseTokens [ ] string
isMaster bool
2015-07-08 19:07:39 +00:00
}
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch ( ) * Elasticsearch {
2016-08-29 09:39:32 +00:00
return & Elasticsearch {
2017-10-04 22:29:32 +00:00
HttpTimeout : internal . Duration { Duration : time . Second * 5 } ,
ClusterHealthLevel : "indices" ,
2016-08-29 09:39:32 +00:00
}
2015-07-08 19:07:39 +00:00
}
// SampleConfig returns sample configuration for this plugin.
func ( e * Elasticsearch ) SampleConfig ( ) string {
return sampleConfig
}
// Description returns the plugin description.
func ( e * Elasticsearch ) Description ( ) string {
2015-07-09 19:02:19 +00:00
return "Read stats from one or more Elasticsearch servers or clusters"
2015-07-08 19:07:39 +00:00
}
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
2016-01-27 21:21:36 +00:00
func ( e * Elasticsearch ) Gather ( acc telegraf . Accumulator ) error {
2016-06-22 15:23:49 +00:00
if e . client == nil {
client , err := e . createHttpClient ( )
if err != nil {
return err
}
e . client = client
}
2016-01-21 23:52:49 +00:00
var wg sync . WaitGroup
wg . Add ( len ( e . Servers ) )
2015-07-08 19:07:39 +00:00
for _ , serv := range e . Servers {
2016-01-27 21:21:36 +00:00
go func ( s string , acc telegraf . Accumulator ) {
2016-01-21 23:52:49 +00:00
defer wg . Done ( )
2017-10-06 23:16:32 +00:00
url := e . nodeStatsUrl ( s )
2016-12-20 16:30:03 +00:00
e . isMaster = false
if e . ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
2017-06-26 22:23:53 +00:00
if err := e . setCatMaster ( s + "/_cat/master" ) ; err != nil {
acc . AddError ( fmt . Errorf ( mask . ReplaceAllString ( err . Error ( ) , "http(s)://XXX:XXX@" ) ) )
return
}
2016-12-20 16:30:03 +00:00
}
// Always gather node states
2016-01-21 23:52:49 +00:00
if err := e . gatherNodeStats ( url , acc ) ; err != nil {
2017-04-24 18:13:26 +00:00
acc . AddError ( fmt . Errorf ( mask . ReplaceAllString ( err . Error ( ) , "http(s)://XXX:XXX@" ) ) )
2016-01-21 23:52:49 +00:00
return
}
2016-12-20 16:30:03 +00:00
2016-01-21 23:52:49 +00:00
if e . ClusterHealth {
2017-10-04 22:29:32 +00:00
url = s + "/_cluster/health"
if e . ClusterHealthLevel != "" {
url = url + "?level=" + e . ClusterHealthLevel
}
2016-12-20 19:32:04 +00:00
if err := e . gatherClusterHealth ( url , acc ) ; err != nil {
2017-04-24 18:13:26 +00:00
acc . AddError ( fmt . Errorf ( mask . ReplaceAllString ( err . Error ( ) , "http(s)://XXX:XXX@" ) ) )
2016-12-20 19:32:04 +00:00
return
}
2016-12-20 16:30:03 +00:00
}
if e . ClusterStats && e . isMaster {
2016-12-20 19:32:04 +00:00
if err := e . gatherClusterStats ( s + "/_cluster/stats" , acc ) ; err != nil {
2017-04-24 18:13:26 +00:00
acc . AddError ( fmt . Errorf ( mask . ReplaceAllString ( err . Error ( ) , "http(s)://XXX:XXX@" ) ) )
2016-12-20 19:32:04 +00:00
return
}
2016-01-21 23:52:49 +00:00
}
} ( serv , acc )
}
wg . Wait ( )
2017-04-24 18:13:26 +00:00
return nil
2015-07-08 19:07:39 +00:00
}
2016-06-22 15:23:49 +00:00
func ( e * Elasticsearch ) createHttpClient ( ) ( * http . Client , error ) {
tlsCfg , err := internal . GetTLSConfig ( e . SSLCert , e . SSLKey , e . SSLCA , e . InsecureSkipVerify )
if err != nil {
return nil , err
}
tr := & http . Transport {
2016-08-29 09:39:32 +00:00
ResponseHeaderTimeout : e . HttpTimeout . Duration ,
2016-06-22 15:23:49 +00:00
TLSClientConfig : tlsCfg ,
}
client := & http . Client {
Transport : tr ,
2016-08-29 09:39:32 +00:00
Timeout : e . HttpTimeout . Duration ,
2016-06-22 15:23:49 +00:00
}
return client , nil
}
2017-10-06 23:16:32 +00:00
func ( e * Elasticsearch ) nodeStatsUrl ( baseUrl string ) string {
var url string
if e . Local {
url = baseUrl + statsPathLocal
} else {
url = baseUrl + statsPath
}
if len ( e . NodeStats ) == 0 {
return url
}
return fmt . Sprintf ( "%s/%s" , url , strings . Join ( e . NodeStats , "," ) )
}
2016-01-27 21:21:36 +00:00
func ( e * Elasticsearch ) gatherNodeStats ( url string , acc telegraf . Accumulator ) error {
2015-10-28 05:31:25 +00:00
nodeStats := & struct {
2016-12-20 16:30:03 +00:00
ClusterName string ` json:"cluster_name" `
Nodes map [ string ] * nodeStat ` json:"nodes" `
2015-07-08 19:07:39 +00:00
} { }
2016-12-20 16:30:03 +00:00
if err := e . gatherJsonData ( url , nodeStats ) ; err != nil {
2015-07-08 19:07:39 +00:00
return err
}
2016-12-20 16:30:03 +00:00
2015-10-28 05:31:25 +00:00
for id , n := range nodeStats . Nodes {
2015-07-08 19:07:39 +00:00
tags := map [ string ] string {
2015-07-09 16:41:16 +00:00
"node_id" : id ,
2015-07-08 19:07:39 +00:00
"node_host" : n . Host ,
2015-07-08 21:07:10 +00:00
"node_name" : n . Name ,
2015-10-28 05:31:25 +00:00
"cluster_name" : nodeStats . ClusterName ,
2015-07-08 19:07:39 +00:00
}
2016-12-20 16:30:03 +00:00
if e . ClusterStats {
// check for master
e . isMaster = ( id == e . catMasterResponseTokens [ 0 ] )
}
2015-07-09 16:41:16 +00:00
for k , v := range n . Attributes {
tags [ "node_attribute_" + k ] = v
}
2015-07-09 18:58:54 +00:00
stats := map [ string ] interface { } {
"indices" : n . Indices ,
"os" : n . OS ,
"process" : n . Process ,
"jvm" : n . JVM ,
"thread_pool" : n . ThreadPool ,
"fs" : n . FS ,
"transport" : n . Transport ,
"http" : n . HTTP ,
"breakers" : n . Breakers ,
2015-07-09 18:23:04 +00:00
}
2015-07-09 18:58:54 +00:00
2015-12-14 22:15:51 +00:00
now := time . Now ( )
2015-07-09 18:58:54 +00:00
for p , s := range stats {
2017-10-06 23:16:32 +00:00
// if one of the individual node stats is not even in the
// original result
if s == nil {
continue
}
2016-02-06 00:36:35 +00:00
f := jsonparser . JSONFlattener { }
2016-12-20 16:30:03 +00:00
// parse Json, ignoring strings and bools
2015-12-14 22:15:51 +00:00
err := f . FlattenJSON ( "" , s )
if err != nil {
2015-07-09 18:58:54 +00:00
return err
}
2015-12-14 22:15:51 +00:00
acc . AddFields ( "elasticsearch_" + p , f . Fields , tags , now )
2015-07-09 18:43:52 +00:00
}
2015-07-08 19:07:39 +00:00
}
2015-10-28 05:31:25 +00:00
return nil
}
2015-07-08 19:07:39 +00:00
2016-12-20 16:30:03 +00:00
func ( e * Elasticsearch ) gatherClusterHealth ( url string , acc telegraf . Accumulator ) error {
healthStats := & clusterHealth { }
if err := e . gatherJsonData ( url , healthStats ) ; err != nil {
2015-10-28 05:31:25 +00:00
return err
}
measurementTime := time . Now ( )
clusterFields := map [ string ] interface { } {
2016-12-20 16:30:03 +00:00
"status" : healthStats . Status ,
"timed_out" : healthStats . TimedOut ,
"number_of_nodes" : healthStats . NumberOfNodes ,
"number_of_data_nodes" : healthStats . NumberOfDataNodes ,
"active_primary_shards" : healthStats . ActivePrimaryShards ,
"active_shards" : healthStats . ActiveShards ,
"relocating_shards" : healthStats . RelocatingShards ,
"initializing_shards" : healthStats . InitializingShards ,
"unassigned_shards" : healthStats . UnassignedShards ,
2015-10-28 05:31:25 +00:00
}
acc . AddFields (
2015-12-14 22:15:51 +00:00
"elasticsearch_cluster_health" ,
2015-10-28 05:31:25 +00:00
clusterFields ,
2016-12-20 16:30:03 +00:00
map [ string ] string { "name" : healthStats . ClusterName } ,
2015-10-28 05:31:25 +00:00
measurementTime ,
)
2016-12-20 16:30:03 +00:00
for name , health := range healthStats . Indices {
2015-10-28 05:31:25 +00:00
indexFields := map [ string ] interface { } {
"status" : health . Status ,
"number_of_shards" : health . NumberOfShards ,
"number_of_replicas" : health . NumberOfReplicas ,
"active_primary_shards" : health . ActivePrimaryShards ,
"active_shards" : health . ActiveShards ,
"relocating_shards" : health . RelocatingShards ,
"initializing_shards" : health . InitializingShards ,
"unassigned_shards" : health . UnassignedShards ,
}
acc . AddFields (
2015-12-14 22:15:51 +00:00
"elasticsearch_indices" ,
2015-10-28 05:31:25 +00:00
indexFields ,
map [ string ] string { "index" : name } ,
measurementTime ,
)
}
return nil
}
2016-12-20 16:30:03 +00:00
func ( e * Elasticsearch ) gatherClusterStats ( url string , acc telegraf . Accumulator ) error {
clusterStats := & clusterStats { }
if err := e . gatherJsonData ( url , clusterStats ) ; err != nil {
return err
}
now := time . Now ( )
tags := map [ string ] string {
"node_name" : clusterStats . NodeName ,
"cluster_name" : clusterStats . ClusterName ,
"status" : clusterStats . Status ,
}
stats := map [ string ] interface { } {
"nodes" : clusterStats . Nodes ,
"indices" : clusterStats . Indices ,
}
for p , s := range stats {
f := jsonparser . JSONFlattener { }
// parse json, including bools and strings
err := f . FullFlattenJSON ( "" , s , true , true )
if err != nil {
return err
}
acc . AddFields ( "elasticsearch_clusterstats_" + p , f . Fields , tags , now )
}
return nil
}
func ( e * Elasticsearch ) setCatMaster ( url string ) error {
r , err := e . client . Get ( url )
if err != nil {
return err
}
defer r . Body . Close ( )
if r . StatusCode != http . StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
2017-06-26 22:23:53 +00:00
return fmt . Errorf ( "elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d" , r . StatusCode , http . StatusOK )
2016-12-20 16:30:03 +00:00
}
response , err := ioutil . ReadAll ( r . Body )
if err != nil {
return err
}
e . catMasterResponseTokens = strings . Split ( string ( response ) , " " )
return nil
}
func ( e * Elasticsearch ) gatherJsonData ( url string , v interface { } ) error {
2015-10-28 05:31:25 +00:00
r , err := e . client . Get ( url )
if err != nil {
return err
}
2015-12-17 21:12:37 +00:00
defer r . Body . Close ( )
2015-10-28 05:31:25 +00:00
if r . StatusCode != http . StatusOK {
2015-12-17 21:12:37 +00:00
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
2015-12-14 22:15:51 +00:00
return fmt . Errorf ( "elasticsearch: API responded with status-code %d, expected %d" ,
r . StatusCode , http . StatusOK )
2015-10-28 05:31:25 +00:00
}
2016-12-20 16:30:03 +00:00
2015-10-28 05:31:25 +00:00
if err = json . NewDecoder ( r . Body ) . Decode ( v ) ; err != nil {
return err
}
2016-12-20 16:30:03 +00:00
2015-07-08 19:07:39 +00:00
return nil
}
func init ( ) {
2016-01-27 21:21:36 +00:00
inputs . Add ( "elasticsearch" , func ( ) telegraf . Input {
2015-07-08 19:07:39 +00:00
return NewElasticsearch ( )
} )
}