530 lines
16 KiB
Go
530 lines
16 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/internal"
|
|
"github.com/influxdata/telegraf/internal/tls"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
|
|
)
|
|
|
|
// mask for masking username/password from error messages
|
|
var mask = regexp.MustCompile(`https?:\/\/\S+:\S+@`)
|
|
|
|
// Nodestats are always generated, so simply define a constant for these endpoints
|
|
const statsPath = "/_nodes/stats"
|
|
const statsPathLocal = "/_nodes/_local/stats"
|
|
|
|
type nodeStat struct {
|
|
Host string `json:"host"`
|
|
Name string `json:"name"`
|
|
Roles []string `json:"roles"`
|
|
Attributes map[string]string `json:"attributes"`
|
|
Indices interface{} `json:"indices"`
|
|
OS interface{} `json:"os"`
|
|
Process interface{} `json:"process"`
|
|
JVM interface{} `json:"jvm"`
|
|
ThreadPool interface{} `json:"thread_pool"`
|
|
FS interface{} `json:"fs"`
|
|
Transport interface{} `json:"transport"`
|
|
HTTP interface{} `json:"http"`
|
|
Breakers interface{} `json:"breakers"`
|
|
}
|
|
|
|
type clusterHealth struct {
|
|
ActivePrimaryShards int `json:"active_primary_shards"`
|
|
ActiveShards int `json:"active_shards"`
|
|
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
|
|
ClusterName string `json:"cluster_name"`
|
|
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
|
|
InitializingShards int `json:"initializing_shards"`
|
|
NumberOfDataNodes int `json:"number_of_data_nodes"`
|
|
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
|
|
NumberOfNodes int `json:"number_of_nodes"`
|
|
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
|
|
RelocatingShards int `json:"relocating_shards"`
|
|
Status string `json:"status"`
|
|
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
|
|
TimedOut bool `json:"timed_out"`
|
|
UnassignedShards int `json:"unassigned_shards"`
|
|
Indices map[string]indexHealth `json:"indices"`
|
|
}
|
|
|
|
type indexHealth struct {
|
|
ActivePrimaryShards int `json:"active_primary_shards"`
|
|
ActiveShards int `json:"active_shards"`
|
|
InitializingShards int `json:"initializing_shards"`
|
|
NumberOfReplicas int `json:"number_of_replicas"`
|
|
NumberOfShards int `json:"number_of_shards"`
|
|
RelocatingShards int `json:"relocating_shards"`
|
|
Status string `json:"status"`
|
|
UnassignedShards int `json:"unassigned_shards"`
|
|
}
|
|
|
|
type clusterStats struct {
|
|
NodeName string `json:"node_name"`
|
|
ClusterName string `json:"cluster_name"`
|
|
Status string `json:"status"`
|
|
Indices interface{} `json:"indices"`
|
|
Nodes interface{} `json:"nodes"`
|
|
}
|
|
|
|
type catMaster struct {
|
|
NodeID string `json:"id"`
|
|
NodeIP string `json:"ip"`
|
|
NodeName string `json:"node"`
|
|
}
|
|
|
|
const sampleConfig = `
|
|
## specify a list of one or more Elasticsearch servers
|
|
# you can add username and password to your url to use basic authentication:
|
|
# servers = ["http://user:pass@localhost:9200"]
|
|
servers = ["http://localhost:9200"]
|
|
|
|
## Timeout for HTTP requests to the elastic search server(s)
|
|
http_timeout = "5s"
|
|
|
|
## When local is true (the default), the node will read only its own stats.
|
|
## Set local to false when you want to read the node stats from all nodes
|
|
## of the cluster.
|
|
local = true
|
|
|
|
## Set cluster_health to true when you want to also obtain cluster health stats
|
|
cluster_health = false
|
|
|
|
## Adjust cluster_health_level when you want to also obtain detailed health stats
|
|
## The options are
|
|
## - indices (default)
|
|
## - cluster
|
|
# cluster_health_level = "indices"
|
|
|
|
## Set cluster_stats to true when you want to also obtain cluster stats.
|
|
cluster_stats = false
|
|
|
|
## Only gather cluster_stats from the master node. To work this require local = true
|
|
cluster_stats_only_from_master = true
|
|
|
|
## node_stats is a list of sub-stats that you want to have gathered. Valid options
|
|
## are "indices", "os", "process", "jvm", "thread_pool", "fs", "transport", "http",
|
|
## "breaker". Per default, all stats are gathered.
|
|
# node_stats = ["jvm", "http"]
|
|
|
|
## HTTP Basic Authentication username and password.
|
|
# username = ""
|
|
# password = ""
|
|
|
|
## Optional TLS Config
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
## Use TLS but skip chain & host verification
|
|
# insecure_skip_verify = false
|
|
`
|
|
|
|
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
|
|
// servers.
|
|
type Elasticsearch struct {
|
|
Local bool
|
|
Servers []string
|
|
HttpTimeout internal.Duration
|
|
ClusterHealth bool
|
|
ClusterHealthLevel string
|
|
ClusterStats bool
|
|
ClusterStatsOnlyFromMaster bool
|
|
NodeStats []string
|
|
Username string `toml:"username"`
|
|
Password string `toml:"password"`
|
|
tls.ClientConfig
|
|
|
|
client *http.Client
|
|
serverInfo map[string]serverInfo
|
|
serverInfoMutex sync.Mutex
|
|
}
|
|
type serverInfo struct {
|
|
nodeID string
|
|
masterID string
|
|
}
|
|
|
|
func (i serverInfo) isMaster() bool {
|
|
return i.nodeID == i.masterID
|
|
}
|
|
|
|
// NewElasticsearch return a new instance of Elasticsearch
|
|
func NewElasticsearch() *Elasticsearch {
|
|
return &Elasticsearch{
|
|
HttpTimeout: internal.Duration{Duration: time.Second * 5},
|
|
ClusterStatsOnlyFromMaster: true,
|
|
ClusterHealthLevel: "indices",
|
|
}
|
|
}
|
|
|
|
// perform status mapping
|
|
func mapHealthStatusToCode(s string) int {
|
|
switch strings.ToLower(s) {
|
|
case "green":
|
|
return 1
|
|
case "yellow":
|
|
return 2
|
|
case "red":
|
|
return 3
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// SampleConfig returns sample configuration for this plugin.
|
|
func (e *Elasticsearch) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description returns the plugin description.
|
|
func (e *Elasticsearch) Description() string {
|
|
return "Read stats from one or more Elasticsearch servers or clusters"
|
|
}
|
|
|
|
// Gather reads the stats from Elasticsearch and writes it to the
|
|
// Accumulator.
|
|
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
|
|
if e.client == nil {
|
|
client, err := e.createHttpClient()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.client = client
|
|
}
|
|
|
|
if e.ClusterStats {
|
|
var wgC sync.WaitGroup
|
|
wgC.Add(len(e.Servers))
|
|
|
|
e.serverInfo = make(map[string]serverInfo)
|
|
for _, serv := range e.Servers {
|
|
go func(s string, acc telegraf.Accumulator) {
|
|
defer wgC.Done()
|
|
info := serverInfo{}
|
|
|
|
var err error
|
|
|
|
// Gather node ID
|
|
if info.nodeID, err = e.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
|
|
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
|
return
|
|
}
|
|
|
|
// get cat/master information here so NodeStats can determine
|
|
// whether this node is the Master
|
|
if info.masterID, err = e.getCatMaster(s + "/_cat/master"); err != nil {
|
|
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
|
return
|
|
}
|
|
|
|
e.serverInfoMutex.Lock()
|
|
e.serverInfo[s] = info
|
|
e.serverInfoMutex.Unlock()
|
|
|
|
}(serv, acc)
|
|
}
|
|
wgC.Wait()
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(e.Servers))
|
|
|
|
for _, serv := range e.Servers {
|
|
go func(s string, acc telegraf.Accumulator) {
|
|
defer wg.Done()
|
|
url := e.nodeStatsUrl(s)
|
|
|
|
// Always gather node stats
|
|
if err := e.gatherNodeStats(url, acc); err != nil {
|
|
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
|
return
|
|
}
|
|
|
|
if e.ClusterHealth {
|
|
url = s + "/_cluster/health"
|
|
if e.ClusterHealthLevel != "" {
|
|
url = url + "?level=" + e.ClusterHealthLevel
|
|
}
|
|
if err := e.gatherClusterHealth(url, acc); err != nil {
|
|
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
|
return
|
|
}
|
|
}
|
|
|
|
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
|
|
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
|
|
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
|
|
return
|
|
}
|
|
}
|
|
}(serv, acc)
|
|
}
|
|
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (e *Elasticsearch) createHttpClient() (*http.Client, error) {
|
|
tlsCfg, err := e.ClientConfig.TLSConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tr := &http.Transport{
|
|
ResponseHeaderTimeout: e.HttpTimeout.Duration,
|
|
TLSClientConfig: tlsCfg,
|
|
}
|
|
client := &http.Client{
|
|
Transport: tr,
|
|
Timeout: e.HttpTimeout.Duration,
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
|
|
var url string
|
|
|
|
if e.Local {
|
|
url = baseUrl + statsPathLocal
|
|
} else {
|
|
url = baseUrl + statsPath
|
|
}
|
|
|
|
if len(e.NodeStats) == 0 {
|
|
return url
|
|
}
|
|
|
|
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
|
|
}
|
|
|
|
func (e *Elasticsearch) gatherNodeID(url string) (string, error) {
|
|
nodeStats := &struct {
|
|
ClusterName string `json:"cluster_name"`
|
|
Nodes map[string]*nodeStat `json:"nodes"`
|
|
}{}
|
|
if err := e.gatherJsonData(url, nodeStats); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Only 1 should be returned
|
|
for id := range nodeStats.Nodes {
|
|
return id, nil
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
|
|
nodeStats := &struct {
|
|
ClusterName string `json:"cluster_name"`
|
|
Nodes map[string]*nodeStat `json:"nodes"`
|
|
}{}
|
|
if err := e.gatherJsonData(url, nodeStats); err != nil {
|
|
return err
|
|
}
|
|
|
|
for id, n := range nodeStats.Nodes {
|
|
sort.Strings(n.Roles)
|
|
tags := map[string]string{
|
|
"node_id": id,
|
|
"node_host": n.Host,
|
|
"node_name": n.Name,
|
|
"cluster_name": nodeStats.ClusterName,
|
|
"node_roles": strings.Join(n.Roles, ","),
|
|
}
|
|
|
|
for k, v := range n.Attributes {
|
|
tags["node_attribute_"+k] = v
|
|
}
|
|
|
|
stats := map[string]interface{}{
|
|
"indices": n.Indices,
|
|
"os": n.OS,
|
|
"process": n.Process,
|
|
"jvm": n.JVM,
|
|
"thread_pool": n.ThreadPool,
|
|
"fs": n.FS,
|
|
"transport": n.Transport,
|
|
"http": n.HTTP,
|
|
"breakers": n.Breakers,
|
|
}
|
|
|
|
now := time.Now()
|
|
for p, s := range stats {
|
|
// if one of the individual node stats is not even in the
|
|
// original result
|
|
if s == nil {
|
|
continue
|
|
}
|
|
f := jsonparser.JSONFlattener{}
|
|
// parse Json, ignoring strings and bools
|
|
err := f.FlattenJSON("", s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
acc.AddFields("elasticsearch_"+p, f.Fields, tags, now)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator) error {
|
|
healthStats := &clusterHealth{}
|
|
if err := e.gatherJsonData(url, healthStats); err != nil {
|
|
return err
|
|
}
|
|
measurementTime := time.Now()
|
|
clusterFields := map[string]interface{}{
|
|
"active_primary_shards": healthStats.ActivePrimaryShards,
|
|
"active_shards": healthStats.ActiveShards,
|
|
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
|
|
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
|
|
"initializing_shards": healthStats.InitializingShards,
|
|
"number_of_data_nodes": healthStats.NumberOfDataNodes,
|
|
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
|
|
"number_of_nodes": healthStats.NumberOfNodes,
|
|
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
|
|
"relocating_shards": healthStats.RelocatingShards,
|
|
"status": healthStats.Status,
|
|
"status_code": mapHealthStatusToCode(healthStats.Status),
|
|
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
|
|
"timed_out": healthStats.TimedOut,
|
|
"unassigned_shards": healthStats.UnassignedShards,
|
|
}
|
|
acc.AddFields(
|
|
"elasticsearch_cluster_health",
|
|
clusterFields,
|
|
map[string]string{"name": healthStats.ClusterName},
|
|
measurementTime,
|
|
)
|
|
|
|
for name, health := range healthStats.Indices {
|
|
indexFields := map[string]interface{}{
|
|
"active_primary_shards": health.ActivePrimaryShards,
|
|
"active_shards": health.ActiveShards,
|
|
"initializing_shards": health.InitializingShards,
|
|
"number_of_replicas": health.NumberOfReplicas,
|
|
"number_of_shards": health.NumberOfShards,
|
|
"relocating_shards": health.RelocatingShards,
|
|
"status": health.Status,
|
|
"status_code": mapHealthStatusToCode(health.Status),
|
|
"unassigned_shards": health.UnassignedShards,
|
|
}
|
|
acc.AddFields(
|
|
"elasticsearch_cluster_health_indices",
|
|
indexFields,
|
|
map[string]string{"index": name, "name": healthStats.ClusterName},
|
|
measurementTime,
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator) error {
|
|
clusterStats := &clusterStats{}
|
|
if err := e.gatherJsonData(url, clusterStats); err != nil {
|
|
return err
|
|
}
|
|
now := time.Now()
|
|
tags := map[string]string{
|
|
"node_name": clusterStats.NodeName,
|
|
"cluster_name": clusterStats.ClusterName,
|
|
"status": clusterStats.Status,
|
|
}
|
|
|
|
stats := map[string]interface{}{
|
|
"nodes": clusterStats.Nodes,
|
|
"indices": clusterStats.Indices,
|
|
}
|
|
|
|
for p, s := range stats {
|
|
f := jsonparser.JSONFlattener{}
|
|
// parse json, including bools and strings
|
|
err := f.FullFlattenJSON("", s, true, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
acc.AddFields("elasticsearch_clusterstats_"+p, f.Fields, tags, now)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if e.Username != "" || e.Password != "" {
|
|
req.SetBasicAuth(e.Username, e.Password)
|
|
}
|
|
|
|
r, err := e.client.Do(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer r.Body.Close()
|
|
if r.StatusCode != http.StatusOK {
|
|
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
|
// to let the underlying transport close the connection and re-establish a new one for
|
|
// future calls.
|
|
return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
|
|
}
|
|
response, err := ioutil.ReadAll(r.Body)
|
|
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
masterID := strings.Split(string(response), " ")[0]
|
|
|
|
return masterID, nil
|
|
}
|
|
|
|
func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if e.Username != "" || e.Password != "" {
|
|
req.SetBasicAuth(e.Username, e.Password)
|
|
}
|
|
|
|
r, err := e.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Body.Close()
|
|
if r.StatusCode != http.StatusOK {
|
|
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
|
|
// to let the underlying transport close the connection and re-establish a new one for
|
|
// future calls.
|
|
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d",
|
|
r.StatusCode, http.StatusOK)
|
|
}
|
|
|
|
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("elasticsearch", func() telegraf.Input {
|
|
return NewElasticsearch()
|
|
})
|
|
}
|