diff --git a/Godeps b/Godeps index 76dc1673e..3ef0c5fac 100644 --- a/Godeps +++ b/Godeps @@ -62,4 +62,5 @@ golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34 gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715 gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886 +gopkg.in/olivere/elastic.v2 4ca0a93672aab0aacb92e0ed205bdf1d3a81f9ed gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4 diff --git a/README.md b/README.md index 737a3fe07..958aea14e 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,7 @@ want to add support for another service or third-party API. * [aws kinesis](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kinesis) * [aws cloudwatch](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/cloudwatch) * [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog) +* [elasticsearch](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) * [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file) * [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite) * [graylog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graylog) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 28354e7e4..5af00f0fa 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" + _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md new file mode 100644 index 000000000..6f85a5ec1 --- /dev/null +++ b/plugins/outputs/elasticsearch/README.md @@ -0,0 +1,14 @@ +# Elasticsearch Output Plugin + +This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/). + +### Configuration: + +```toml +# Configuration for Elasticsearch to send metrics to +[[outputs.elasticsearch]] + ## The full HTTP endpoint URL for your Elasticsearch. + server_host = "http://10.10.10.10:19200" + ## The target index for metrics # required + index_name = "twitter" + diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go new file mode 100644 index 000000000..f3ca79c27 --- /dev/null +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -0,0 +1,154 @@ +package elasticsearch + +import ( + "fmt" + "time" + "os" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "gopkg.in/olivere/elastic.v2" +) + +type Elasticsearch struct { + ServerHost string + IndexName string + Client *elastic.Client +} + +var sampleConfig = ` + server_host = "http://10.10.10.10:19200" # required. + index_name = "twitter" #required +` + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"metrics"` +} + +type Point [2]float64 + +func (a *Elasticsearch) Connect() error { + if a.ServerHost == "" || a.IndexName == "" { + return fmt.Errorf("server_host and index_name are required fields for elasticsearch output") + } + + client, err := elastic.NewClient( + elastic.SetHealthcheck(true), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(30*time.Second), + elastic.SetURL(a.ServerHost), + ) + + if err != nil { + // Handle error + panic(err) + } + + a.Client = client + + return nil +} + +func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + + m := make(map[string]interface{}) + m["created"] = time.Now().UnixNano() / 1000000 + m["version"] = "1.1" + m["timestamp"] = metric.UnixNano() / 1000000 + m["short_message"] = " " + m["name"] = metric.Name() + + if host, ok := metric.Tags()["host"]; ok { + m["host"] = host + } else { + host, err := os.Hostname() + if err != nil { + panic(err) + } + m["host"] = host + } + + for key, value := range metric.Tags() { + if key != "host" { + m["_"+key] = value + } + } + + for key, value := range metric.Fields() { + m["_"+key] = value + } + + _, err := a.Client.Index(). + Index(a.IndexName). + Type("stats2"). + BodyJson(m). + Do() + + if err != nil { + // Handle error + panic(err) + } + + } + + return nil +} + +func (a *Elasticsearch) SampleConfig() string { + return sampleConfig +} + +func (a *Elasticsearch) Description() string { + return "Configuration for Elasticsearch to send metrics to." +} + + +func buildMetrics(m telegraf.Metric) (map[string]Point, error) { + ms := make(map[string]Point) + for k, v := range m.Fields() { + var p Point + if err := p.setValue(v); err != nil { + return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + p[0] = float64(m.Time().Unix()) + ms[k] = p + } + return ms, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (a *Elasticsearch) Close() error { + return nil +} + +func init() { + outputs.Add("elasticsearch", func() telegraf.Output { + return &Elasticsearch{} + }) +} diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go new file mode 100644 index 000000000..38f6f5fff --- /dev/null +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -0,0 +1,154 @@ +package elasticsearch + +import ( + "fmt" + "time" + "os" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "gopkg.in/olivere/elastic.v2" +) + +type Elasticsearch struct { + ServerHost string + IndexName string + Client *elastic.Client +} + +var sampleConfig = ` + server_host = "http://10.10.10.10:19200" # required. + index_name = "test" # required +` + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"metrics"` +} + +type Point [2]float64 + +func (a *Elasticsearch) Connect() error { + if a.ServerHost == "" || a.IndexName == "" { + return fmt.Errorf("server_host and index_name are required fields for elasticsearch output") + } + + client, err := elastic.NewClient( + elastic.SetHealthcheck(true), + elastic.SetSniff(false), + elastic.SetHealthcheckInterval(30*time.Second), + elastic.SetURL(a.ServerHost), + ) + + if err != nil { + // Handle error + panic(err) + } + + a.Client = client + + return nil +} + +func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + for _, metric := range metrics { + + m := make(map[string]interface{}) + m["created"] = time.Now().UnixNano() / 1000000 + m["version"] = "1.1" + m["timestamp"] = metric.UnixNano() / 1000000 + m["short_message"] = " " + m["name"] = metric.Name() + + if host, ok := metric.Tags()["host"]; ok { + m["host"] = host + } else { + host, err := os.Hostname() + if err != nil { + panic(err) + } + m["host"] = host + } + + for key, value := range metric.Tags() { + if key != "host" { + m["_"+key] = value + } + } + + for key, value := range metric.Fields() { + m["_"+key] = value + } + + _, err := a.Client.Index(). + Index(a.IndexName). + Type("stats2"). + BodyJson(m). + Do() + + if err != nil { + // Handle error + panic(err) + } + + } + + return nil +} + +func (a *Elasticsearch) SampleConfig() string { + return sampleConfig +} + +func (a *Elasticsearch) Description() string { + return "Configuration for Elasticsearch to send metrics to." +} + + +func buildMetrics(m telegraf.Metric) (map[string]Point, error) { + ms := make(map[string]Point) + for k, v := range m.Fields() { + var p Point + if err := p.setValue(v); err != nil { + return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + p[0] = float64(m.Time().Unix()) + ms[k] = p + } + return ms, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (a *Elasticsearch) Close() error { + return nil +} + +func init() { + outputs.Add("elasticsearch", func() telegraf.Output { + return &Elasticsearch{} + }) +}