diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 6f85a5ec1..bab7dfafe 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -2,13 +2,19 @@ This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/). +Tested with: 5.0.0-beta1 and 2.4.0 + ### Configuration: ```toml # Configuration for Elasticsearch to send metrics to [[outputs.elasticsearch]] - ## The full HTTP endpoint URL for your Elasticsearch. + ## The full HTTP endpoint URL for your Elasticsearch. # required server_host = "http://10.10.10.10:19200" ## The target index for metrics # required index_name = "twitter" + ## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically + enable_sniffer = false + ## Earlier versions of EL doesn't accept "." in field name. Set delimiter with the character that you want instead. + delimiter = "_" diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index f3ca79c27..bce7a4509 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -2,8 +2,9 @@ package elasticsearch import ( "fmt" - "time" "os" + "time" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "gopkg.in/olivere/elastic.v2" @@ -12,12 +13,17 @@ import ( type Elasticsearch struct { ServerHost string IndexName string + EnableSniffer bool + Separator string Client *elastic.Client + Version string } var sampleConfig = ` server_host = "http://10.10.10.10:19200" # required. - index_name = "twitter" #required + index_name = "test" # required. + enable_sniffer = false + delimiter = "_" ` type TimeSeries struct { @@ -38,18 +44,25 @@ func (a *Elasticsearch) Connect() error { client, err := elastic.NewClient( elastic.SetHealthcheck(true), - elastic.SetSniff(false), + elastic.SetSniff(a.EnableSniffer), elastic.SetHealthcheckInterval(30*time.Second), elastic.SetURL(a.ServerHost), ) if err != nil { - // Handle error - panic(err) + return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err) } a.Client = client + version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost) + + if errVersion != nil { + return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion) + } + + a.Version = version + return nil } @@ -61,11 +74,8 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { m := make(map[string]interface{}) - m["created"] = time.Now().UnixNano() / 1000000 - m["version"] = "1.1" - m["timestamp"] = metric.UnixNano() / 1000000 - m["short_message"] = " " - m["name"] = metric.Name() + //m["created"] = metric.UnixNano() / 1000000 + m["created"] = time.Now() if host, ok := metric.Tags()["host"]; ok { m["host"] = host @@ -77,25 +87,37 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { m["host"] = host } + // Earlier versions of EL doesnt accept '.' in field name + if len(a.Separator) > 1 { + return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator) + } + for key, value := range metric.Tags() { if key != "host" { - m["_"+key] = value + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", a.Separator, -1)] = value + } else { + m[key] = value + } } } for key, value := range metric.Fields() { - m["_"+key] = value + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", a.Separator, -1)] = value + } else { + m[key] = value + } } - _, err := a.Client.Index(). + _, errMessage:= a.Client.Index(). Index(a.IndexName). - Type("stats2"). + Type(metric.Name()). BodyJson(m). Do() - if err != nil { - // Handle error - panic(err) + if errMessage != nil { + return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage) } } @@ -144,6 +166,7 @@ func (p *Point) setValue(v interface{}) error { } func (a *Elasticsearch) Close() error { + a.Client = nil return nil } diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index 38f6f5fff..bce7a4509 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -2,8 +2,9 @@ package elasticsearch import ( "fmt" - "time" "os" + "time" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "gopkg.in/olivere/elastic.v2" @@ -12,12 +13,17 @@ import ( type Elasticsearch struct { ServerHost string IndexName string + EnableSniffer bool + Separator string Client *elastic.Client + Version string } var sampleConfig = ` server_host = "http://10.10.10.10:19200" # required. - index_name = "test" # required + index_name = "test" # required. + enable_sniffer = false + delimiter = "_" ` type TimeSeries struct { @@ -38,18 +44,25 @@ func (a *Elasticsearch) Connect() error { client, err := elastic.NewClient( elastic.SetHealthcheck(true), - elastic.SetSniff(false), + elastic.SetSniff(a.EnableSniffer), elastic.SetHealthcheckInterval(30*time.Second), elastic.SetURL(a.ServerHost), ) if err != nil { - // Handle error - panic(err) + return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err) } a.Client = client + version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost) + + if errVersion != nil { + return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion) + } + + a.Version = version + return nil } @@ -61,11 +74,8 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { m := make(map[string]interface{}) - m["created"] = time.Now().UnixNano() / 1000000 - m["version"] = "1.1" - m["timestamp"] = metric.UnixNano() / 1000000 - m["short_message"] = " " - m["name"] = metric.Name() + //m["created"] = metric.UnixNano() / 1000000 + m["created"] = time.Now() if host, ok := metric.Tags()["host"]; ok { m["host"] = host @@ -77,25 +87,37 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { m["host"] = host } + // Earlier versions of EL doesnt accept '.' in field name + if len(a.Separator) > 1 { + return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator) + } + for key, value := range metric.Tags() { if key != "host" { - m["_"+key] = value + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", a.Separator, -1)] = value + } else { + m[key] = value + } } } for key, value := range metric.Fields() { - m["_"+key] = value + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", a.Separator, -1)] = value + } else { + m[key] = value + } } - _, err := a.Client.Index(). + _, errMessage:= a.Client.Index(). Index(a.IndexName). - Type("stats2"). + Type(metric.Name()). BodyJson(m). Do() - if err != nil { - // Handle error - panic(err) + if errMessage != nil { + return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage) } } @@ -144,6 +166,7 @@ func (p *Point) setValue(v interface{}) error { } func (a *Elasticsearch) Close() error { + a.Client = nil return nil }