Add Elasticsearch 5 and 2.X Output.

This commit is contained in:
Amanda Hager Lopes de Andrade 2016-10-10 17:48:13 -03:00
parent da6bee4aae
commit 1b80a842b7
3 changed files with 87 additions and 35 deletions

View File

@ -2,13 +2,19 @@
This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/). This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/).
Tested with: 5.0.0-beta1 and 2.4.0
### Configuration: ### Configuration:
```toml ```toml
# Configuration for Elasticsearch to send metrics to # Configuration for Elasticsearch to send metrics to
[[outputs.elasticsearch]] [[outputs.elasticsearch]]
## The full HTTP endpoint URL for your Elasticsearch. ## The full HTTP endpoint URL for your Elasticsearch. # required
server_host = "http://10.10.10.10:19200" server_host = "http://10.10.10.10:19200"
## The target index for metrics # required ## The target index for metrics # required
index_name = "twitter" index_name = "twitter"
## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically
enable_sniffer = false
## Earlier versions of EL doesn't accept "." in field name. Set delimiter with the character that you want instead.
delimiter = "_"

View File

@ -2,8 +2,9 @@ package elasticsearch
import ( import (
"fmt" "fmt"
"time"
"os" "os"
"time"
"strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v2" "gopkg.in/olivere/elastic.v2"
@ -12,12 +13,17 @@ import (
type Elasticsearch struct { type Elasticsearch struct {
ServerHost string ServerHost string
IndexName string IndexName string
EnableSniffer bool
Separator string
Client *elastic.Client Client *elastic.Client
Version string
} }
var sampleConfig = ` var sampleConfig = `
server_host = "http://10.10.10.10:19200" # required. server_host = "http://10.10.10.10:19200" # required.
index_name = "twitter" #required index_name = "test" # required.
enable_sniffer = false
delimiter = "_"
` `
type TimeSeries struct { type TimeSeries struct {
@ -38,18 +44,25 @@ func (a *Elasticsearch) Connect() error {
client, err := elastic.NewClient( client, err := elastic.NewClient(
elastic.SetHealthcheck(true), elastic.SetHealthcheck(true),
elastic.SetSniff(false), elastic.SetSniff(a.EnableSniffer),
elastic.SetHealthcheckInterval(30*time.Second), elastic.SetHealthcheckInterval(30*time.Second),
elastic.SetURL(a.ServerHost), elastic.SetURL(a.ServerHost),
) )
if err != nil { if err != nil {
// Handle error return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err)
panic(err)
} }
a.Client = client a.Client = client
version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost)
if errVersion != nil {
return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion)
}
a.Version = version
return nil return nil
} }
@ -61,11 +74,8 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics { for _, metric := range metrics {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["created"] = time.Now().UnixNano() / 1000000 //m["created"] = metric.UnixNano() / 1000000
m["version"] = "1.1" m["created"] = time.Now()
m["timestamp"] = metric.UnixNano() / 1000000
m["short_message"] = " "
m["name"] = metric.Name()
if host, ok := metric.Tags()["host"]; ok { if host, ok := metric.Tags()["host"]; ok {
m["host"] = host m["host"] = host
@ -77,25 +87,37 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
m["host"] = host m["host"] = host
} }
// Earlier versions of EL doesnt accept '.' in field name
if len(a.Separator) > 1 {
return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator)
}
for key, value := range metric.Tags() { for key, value := range metric.Tags() {
if key != "host" { if key != "host" {
m["_"+key] = value if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
} else {
m[key] = value
}
} }
} }
for key, value := range metric.Fields() { for key, value := range metric.Fields() {
m["_"+key] = value if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
} else {
m[key] = value
}
} }
_, err := a.Client.Index(). _, errMessage:= a.Client.Index().
Index(a.IndexName). Index(a.IndexName).
Type("stats2"). Type(metric.Name()).
BodyJson(m). BodyJson(m).
Do() Do()
if err != nil { if errMessage != nil {
// Handle error return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage)
panic(err)
} }
} }
@ -144,6 +166,7 @@ func (p *Point) setValue(v interface{}) error {
} }
func (a *Elasticsearch) Close() error { func (a *Elasticsearch) Close() error {
a.Client = nil
return nil return nil
} }

View File

@ -2,8 +2,9 @@ package elasticsearch
import ( import (
"fmt" "fmt"
"time"
"os" "os"
"time"
"strings"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v2" "gopkg.in/olivere/elastic.v2"
@ -12,12 +13,17 @@ import (
type Elasticsearch struct { type Elasticsearch struct {
ServerHost string ServerHost string
IndexName string IndexName string
EnableSniffer bool
Separator string
Client *elastic.Client Client *elastic.Client
Version string
} }
var sampleConfig = ` var sampleConfig = `
server_host = "http://10.10.10.10:19200" # required. server_host = "http://10.10.10.10:19200" # required.
index_name = "test" # required index_name = "test" # required.
enable_sniffer = false
delimiter = "_"
` `
type TimeSeries struct { type TimeSeries struct {
@ -38,18 +44,25 @@ func (a *Elasticsearch) Connect() error {
client, err := elastic.NewClient( client, err := elastic.NewClient(
elastic.SetHealthcheck(true), elastic.SetHealthcheck(true),
elastic.SetSniff(false), elastic.SetSniff(a.EnableSniffer),
elastic.SetHealthcheckInterval(30*time.Second), elastic.SetHealthcheckInterval(30*time.Second),
elastic.SetURL(a.ServerHost), elastic.SetURL(a.ServerHost),
) )
if err != nil { if err != nil {
// Handle error return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err)
panic(err)
} }
a.Client = client a.Client = client
version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost)
if errVersion != nil {
return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion)
}
a.Version = version
return nil return nil
} }
@ -61,11 +74,8 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics { for _, metric := range metrics {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["created"] = time.Now().UnixNano() / 1000000 //m["created"] = metric.UnixNano() / 1000000
m["version"] = "1.1" m["created"] = time.Now()
m["timestamp"] = metric.UnixNano() / 1000000
m["short_message"] = " "
m["name"] = metric.Name()
if host, ok := metric.Tags()["host"]; ok { if host, ok := metric.Tags()["host"]; ok {
m["host"] = host m["host"] = host
@ -77,25 +87,37 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
m["host"] = host m["host"] = host
} }
// Earlier versions of EL doesnt accept '.' in field name
if len(a.Separator) > 1 {
return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator)
}
for key, value := range metric.Tags() { for key, value := range metric.Tags() {
if key != "host" { if key != "host" {
m["_"+key] = value if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
} else {
m[key] = value
}
} }
} }
for key, value := range metric.Fields() { for key, value := range metric.Fields() {
m["_"+key] = value if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
} else {
m[key] = value
}
} }
_, err := a.Client.Index(). _, errMessage:= a.Client.Index().
Index(a.IndexName). Index(a.IndexName).
Type("stats2"). Type(metric.Name()).
BodyJson(m). BodyJson(m).
Do() Do()
if err != nil { if errMessage != nil {
// Handle error return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage)
panic(err)
} }
} }
@ -144,6 +166,7 @@ func (p *Point) setValue(v interface{}) error {
} }
func (a *Elasticsearch) Close() error { func (a *Elasticsearch) Close() error {
a.Client = nil
return nil return nil
} }