telegraf/plugins/elasticsearch/elasticsearch.go

167 lines
4.3 KiB
Go
Raw Normal View History

package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"github.com/influxdb/telegraf/plugins"
)
2015-07-09 18:53:54 +00:00
const indicesStatsPath = "/_nodes/stats"
const indicesStatsPathLocal = "/_nodes/_local/stats"
type node struct {
2015-07-09 17:51:12 +00:00
Host string `json:"host"`
Name string `json:"name"`
Attributes map[string]string `json:"attributes"`
Indices interface{} `json:"indices"`
2015-07-09 18:01:59 +00:00
Os interface{} `json:"os"`
2015-07-09 18:06:30 +00:00
Process interface{} `json:"process"`
2015-07-09 18:11:46 +00:00
JVM interface{} `json:"jvm"`
2015-07-09 18:18:24 +00:00
ThreadPool interface{} `json:"thread_pool"`
2015-07-09 18:23:04 +00:00
Network interface{} `json:"network"`
2015-07-09 18:32:56 +00:00
FS interface{} `json:"fs"`
2015-07-09 18:36:22 +00:00
Transport interface{} `json:"transport"`
2015-07-09 18:38:51 +00:00
HTTP interface{} `json:"http"`
2015-07-09 18:43:52 +00:00
Breakers interface{} `json:"breakers"`
}
const sampleConfig = `
# specify a list of one or more Elasticsearch servers
servers = ["http://localhost:9200"]
2015-07-09 18:53:54 +00:00
# set local to false when you want to read the indices stats from all nodes
# within the cluster
local = true
`
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
Local bool
Servers []string
client *http.Client
}
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{client: http.DefaultClient}
}
// SampleConfig returns sample configuration for this plugin.
func (e *Elasticsearch) SampleConfig() string {
return sampleConfig
}
// Description returns the plugin description.
func (e *Elasticsearch) Description() string {
return "Read indices stats from one or more Elasticsearch servers or clusters"
}
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc plugins.Accumulator) error {
for _, serv := range e.Servers {
var url string
if e.Local {
url = serv + indicesStatsPathLocal
} else {
url = serv + indicesStatsPath
}
if err := e.gatherUrl(url, acc); err != nil {
return err
}
}
return nil
}
func (e *Elasticsearch) gatherUrl(url string, acc plugins.Accumulator) error {
r, err := e.client.Get(url)
if err != nil {
return err
}
2015-07-08 19:28:25 +00:00
if r.StatusCode != http.StatusOK {
return fmt.Errorf("elasticsearch: API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
d := json.NewDecoder(r.Body)
esRes := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*node `json:"nodes"`
}{}
if err = d.Decode(esRes); err != nil {
return err
}
for id, n := range esRes.Nodes {
tags := map[string]string{
"node_id": id,
"node_host": n.Host,
2015-07-08 21:07:10 +00:00
"node_name": n.Name,
"cluster_name": esRes.ClusterName,
}
for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
2015-07-09 17:51:12 +00:00
if err := e.parseInterface(acc, "indices", tags, n.Indices); err != nil {
return err
}
2015-07-09 18:01:59 +00:00
if err := e.parseInterface(acc, "os", tags, n.Os); err != nil {
return err
}
2015-07-09 18:06:30 +00:00
if err := e.parseInterface(acc, "process", tags, n.Process); err != nil {
return err
}
2015-07-09 18:11:46 +00:00
if err := e.parseInterface(acc, "jvm", tags, n.JVM); err != nil {
return err
}
2015-07-09 18:18:24 +00:00
if err := e.parseInterface(acc, "thread_pool", tags, n.ThreadPool); err != nil {
return err
}
2015-07-09 18:23:04 +00:00
if err := e.parseInterface(acc, "network", tags, n.Network); err != nil {
return err
}
2015-07-09 18:32:56 +00:00
if err := e.parseInterface(acc, "fs", tags, n.FS); err != nil {
return err
}
2015-07-09 18:36:22 +00:00
if err := e.parseInterface(acc, "transport", tags, n.Transport); err != nil {
return err
}
2015-07-09 18:38:51 +00:00
if err := e.parseInterface(acc, "http", tags, n.HTTP); err != nil {
return err
}
2015-07-09 18:43:52 +00:00
if err := e.parseInterface(acc, "breakers", tags, n.Breakers); err != nil {
return err
}
}
return nil
}
2015-07-09 17:51:12 +00:00
func (e *Elasticsearch) parseInterface(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := e.parseInterface(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, t, tags)
2015-07-09 18:01:59 +00:00
case bool, string, []interface{}:
// ignored types
2015-07-09 17:51:12 +00:00
return nil
default:
return fmt.Errorf("elasticsearch: got unexpected type %T with value %v (%s)", t, t, prefix)
}
return nil
}
func init() {
plugins.Add("elasticsearch", func() plugins.Plugin {
return NewElasticsearch()
})
}