Add Elasticsearch output.
This commit is contained in:
parent
d627bdbbdb
commit
da6bee4aae
1
Godeps
1
Godeps
|
@ -62,4 +62,5 @@ golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
|
|||
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
|
||||
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
|
||||
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
|
||||
gopkg.in/olivere/elastic.v2 4ca0a93672aab0aacb92e0ed205bdf1d3a81f9ed
|
||||
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4
|
||||
|
|
|
@ -240,6 +240,7 @@ want to add support for another service or third-party API.
|
|||
* [aws kinesis](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/kinesis)
|
||||
* [aws cloudwatch](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/cloudwatch)
|
||||
* [datadog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/datadog)
|
||||
* [elasticsearch](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch)
|
||||
* [file](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/file)
|
||||
* [graphite](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graphite)
|
||||
* [graylog](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/graylog)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/file"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
|
||||
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
# Elasticsearch Output Plugin
|
||||
|
||||
This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/).
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
# Configuration for Elasticsearch to send metrics to
|
||||
[[outputs.elasticsearch]]
|
||||
## The full HTTP endpoint URL for your Elasticsearch.
|
||||
server_host = "http://10.10.10.10:19200"
|
||||
## The target index for metrics # required
|
||||
index_name = "twitter"
|
||||
|
|
@ -0,0 +1,154 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"os"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"gopkg.in/olivere/elastic.v2"
|
||||
)
|
||||
|
||||
type Elasticsearch struct {
|
||||
ServerHost string
|
||||
IndexName string
|
||||
Client *elastic.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
server_host = "http://10.10.10.10:19200" # required.
|
||||
index_name = "twitter" #required
|
||||
`
|
||||
|
||||
type TimeSeries struct {
|
||||
Series []*Metric `json:"series"`
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Metric string `json:"metric"`
|
||||
Points [1]Point `json:"metrics"`
|
||||
}
|
||||
|
||||
type Point [2]float64
|
||||
|
||||
func (a *Elasticsearch) Connect() error {
|
||||
if a.ServerHost == "" || a.IndexName == "" {
|
||||
return fmt.Errorf("server_host and index_name are required fields for elasticsearch output")
|
||||
}
|
||||
|
||||
client, err := elastic.NewClient(
|
||||
elastic.SetHealthcheck(true),
|
||||
elastic.SetSniff(false),
|
||||
elastic.SetHealthcheckInterval(30*time.Second),
|
||||
elastic.SetURL(a.ServerHost),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
// Handle error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
a.Client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
|
||||
m := make(map[string]interface{})
|
||||
m["created"] = time.Now().UnixNano() / 1000000
|
||||
m["version"] = "1.1"
|
||||
m["timestamp"] = metric.UnixNano() / 1000000
|
||||
m["short_message"] = " "
|
||||
m["name"] = metric.Name()
|
||||
|
||||
if host, ok := metric.Tags()["host"]; ok {
|
||||
m["host"] = host
|
||||
} else {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m["host"] = host
|
||||
}
|
||||
|
||||
for key, value := range metric.Tags() {
|
||||
if key != "host" {
|
||||
m["_"+key] = value
|
||||
}
|
||||
}
|
||||
|
||||
for key, value := range metric.Fields() {
|
||||
m["_"+key] = value
|
||||
}
|
||||
|
||||
_, err := a.Client.Index().
|
||||
Index(a.IndexName).
|
||||
Type("stats2").
|
||||
BodyJson(m).
|
||||
Do()
|
||||
|
||||
if err != nil {
|
||||
// Handle error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Description() string {
|
||||
return "Configuration for Elasticsearch to send metrics to."
|
||||
}
|
||||
|
||||
|
||||
func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
|
||||
ms := make(map[string]Point)
|
||||
for k, v := range m.Fields() {
|
||||
var p Point
|
||||
if err := p.setValue(v); err != nil {
|
||||
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||
}
|
||||
p[0] = float64(m.Time().Unix())
|
||||
ms[k] = p
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func (p *Point) setValue(v interface{}) error {
|
||||
switch d := v.(type) {
|
||||
case int:
|
||||
p[1] = float64(int(d))
|
||||
case int32:
|
||||
p[1] = float64(int32(d))
|
||||
case int64:
|
||||
p[1] = float64(int64(d))
|
||||
case float32:
|
||||
p[1] = float64(d)
|
||||
case float64:
|
||||
p[1] = float64(d)
|
||||
default:
|
||||
return fmt.Errorf("undeterminable type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("elasticsearch", func() telegraf.Output {
|
||||
return &Elasticsearch{}
|
||||
})
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"os"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"gopkg.in/olivere/elastic.v2"
|
||||
)
|
||||
|
||||
type Elasticsearch struct {
|
||||
ServerHost string
|
||||
IndexName string
|
||||
Client *elastic.Client
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
server_host = "http://10.10.10.10:19200" # required.
|
||||
index_name = "test" # required
|
||||
`
|
||||
|
||||
type TimeSeries struct {
|
||||
Series []*Metric `json:"series"`
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Metric string `json:"metric"`
|
||||
Points [1]Point `json:"metrics"`
|
||||
}
|
||||
|
||||
type Point [2]float64
|
||||
|
||||
func (a *Elasticsearch) Connect() error {
|
||||
if a.ServerHost == "" || a.IndexName == "" {
|
||||
return fmt.Errorf("server_host and index_name are required fields for elasticsearch output")
|
||||
}
|
||||
|
||||
client, err := elastic.NewClient(
|
||||
elastic.SetHealthcheck(true),
|
||||
elastic.SetSniff(false),
|
||||
elastic.SetHealthcheckInterval(30*time.Second),
|
||||
elastic.SetURL(a.ServerHost),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
// Handle error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
a.Client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
|
||||
m := make(map[string]interface{})
|
||||
m["created"] = time.Now().UnixNano() / 1000000
|
||||
m["version"] = "1.1"
|
||||
m["timestamp"] = metric.UnixNano() / 1000000
|
||||
m["short_message"] = " "
|
||||
m["name"] = metric.Name()
|
||||
|
||||
if host, ok := metric.Tags()["host"]; ok {
|
||||
m["host"] = host
|
||||
} else {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m["host"] = host
|
||||
}
|
||||
|
||||
for key, value := range metric.Tags() {
|
||||
if key != "host" {
|
||||
m["_"+key] = value
|
||||
}
|
||||
}
|
||||
|
||||
for key, value := range metric.Fields() {
|
||||
m["_"+key] = value
|
||||
}
|
||||
|
||||
_, err := a.Client.Index().
|
||||
Index(a.IndexName).
|
||||
Type("stats2").
|
||||
BodyJson(m).
|
||||
Do()
|
||||
|
||||
if err != nil {
|
||||
// Handle error
|
||||
panic(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Description() string {
|
||||
return "Configuration for Elasticsearch to send metrics to."
|
||||
}
|
||||
|
||||
|
||||
func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
|
||||
ms := make(map[string]Point)
|
||||
for k, v := range m.Fields() {
|
||||
var p Point
|
||||
if err := p.setValue(v); err != nil {
|
||||
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||
}
|
||||
p[0] = float64(m.Time().Unix())
|
||||
ms[k] = p
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func (p *Point) setValue(v interface{}) error {
|
||||
switch d := v.(type) {
|
||||
case int:
|
||||
p[1] = float64(int(d))
|
||||
case int32:
|
||||
p[1] = float64(int32(d))
|
||||
case int64:
|
||||
p[1] = float64(int64(d))
|
||||
case float32:
|
||||
p[1] = float64(d)
|
||||
case float64:
|
||||
p[1] = float64(d)
|
||||
default:
|
||||
return fmt.Errorf("undeterminable type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("elasticsearch", func() telegraf.Output {
|
||||
return &Elasticsearch{}
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue