telegraf/plugins/elasticsearch/elasticsearch.go

240 lines
7.1 KiB
Go

package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/influxdb/telegraf/plugins"
)
const statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats"
const healthPath = "/_cluster/health"
type node struct {
Host string `json:"host"`
Name string `json:"name"`
Attributes map[string]string `json:"attributes"`
Indices interface{} `json:"indices"`
OS interface{} `json:"os"`
Process interface{} `json:"process"`
JVM interface{} `json:"jvm"`
ThreadPool interface{} `json:"thread_pool"`
FS interface{} `json:"fs"`
Transport interface{} `json:"transport"`
HTTP interface{} `json:"http"`
Breakers interface{} `json:"breakers"`
}
type clusterHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
Indices map[string]indexHealth `json:"indices"`
}
type indexHealth struct {
Status string `json:"status"`
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
}
const sampleConfig = `
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
# set local to false when you want to read the indices stats from all nodes
# within the cluster
local = true
# set cluster_health to true when you want to also obtain cluster level stats
cluster_health = false
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
Local bool
Servers []string
ClusterHealth bool
client *http.Client
}
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{client: http.DefaultClient}
}
// SampleConfig returns sample configuration for this plugin.
func (e *Elasticsearch) SampleConfig() string {
return sampleConfig
}
// Description returns the plugin description.
func (e *Elasticsearch) Description() string {
return "Read stats from one or more Elasticsearch servers or clusters"
}
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc plugins.Accumulator) error {
for _, serv := range e.Servers {
var url string
if e.Local {
url = serv + statsPathLocal
} else {
url = serv + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
return err
}
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc)
}
}
return nil
}
func (e *Elasticsearch) gatherNodeStats(url string, acc plugins.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
}{}
if err := e.gatherData(url, nodeStats); err != nil {
return err
}
for id, n := range nodeStats.Nodes {
tags := map[string]string{
"node_id": id,
"node_host": n.Host,
"node_name": n.Name,
"cluster_name": nodeStats.ClusterName,
}
for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
stats := map[string]interface{}{
"indices": n.Indices,
"os": n.OS,
"process": n.Process,
"jvm": n.JVM,
"thread_pool": n.ThreadPool,
"fs": n.FS,
"transport": n.Transport,
"http": n.HTTP,
"breakers": n.Breakers,
}
for p, s := range stats {
if err := e.parseInterface(acc, p, tags, s); err != nil {
return err
}
}
}
return nil
}
func (e *Elasticsearch) gatherClusterStats(url string, acc plugins.Accumulator) error {
clusterStats := &clusterHealth{}
if err := e.gatherData(url, clusterStats); err != nil {
return err
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": clusterStats.Status,
"timed_out": clusterStats.TimedOut,
"number_of_nodes": clusterStats.NumberOfNodes,
"number_of_data_nodes": clusterStats.NumberOfDataNodes,
"active_primary_shards": clusterStats.ActivePrimaryShards,
"active_shards": clusterStats.ActiveShards,
"relocating_shards": clusterStats.RelocatingShards,
"initializing_shards": clusterStats.InitializingShards,
"unassigned_shards": clusterStats.UnassignedShards,
}
acc.AddFields(
"cluster_health",
clusterFields,
map[string]string{"name": clusterStats.ClusterName},
measurementTime,
)
for name, health := range clusterStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"number_of_shards": health.NumberOfShards,
"number_of_replicas": health.NumberOfReplicas,
"active_primary_shards": health.ActivePrimaryShards,
"active_shards": health.ActiveShards,
"relocating_shards": health.RelocatingShards,
"initializing_shards": health.InitializingShards,
"unassigned_shards": health.UnassignedShards,
}
acc.AddFields(
"indices",
indexFields,
map[string]string{"index": name},
measurementTime,
)
}
return nil
}
func (e *Elasticsearch) gatherData(url string, v interface{}) error {
r, err := e.client.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
if err = json.NewDecoder(r.Body).Decode(v); err != nil {
return err
}
return nil
}
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, t, tags)
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
}
return nil
}
func init() {
plugins.Add("elasticsearch", func() plugins.Plugin {
return NewElasticsearch()
})
}