Add Elasticsearch 5.x output (#2332)

This commit is contained in:
Leandro Piccilli 2017-03-21 01:47:57 +01:00 committed by Daniel Nelson
parent 12adad6b54
commit a7e8bc1c02
7 changed files with 659 additions and 2 deletions

1
Godeps
View File

@ -59,4 +59,5 @@ golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3
gopkg.in/dancannon/gorethink.v1 edc7a6a68e2d8015f5ffe1b2560eed989f8a45be
gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc
gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655
gopkg.in/olivere/elastic.v5 ee3ebceab960cf68ab9a89ee6d78c031ef5b4a4e
gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6

View File

@ -51,6 +51,7 @@ docker-run:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
docker run --name memcached -p "11211:11211" -d memcached
docker run --name postgres -p "5432:5432" -d postgres
@ -69,6 +70,7 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann
@ -76,8 +78,8 @@ docker-run-circle:
# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run

View File

@ -211,6 +211,7 @@ Telegraf can also collect metrics via the following service plugins:
* [aws cloudwatch](./plugins/outputs/cloudwatch)
* [datadog](./plugins/outputs/datadog)
* [discard](./plugins/outputs/discard)
* [elasticsearch](./plugins/outputs/elasticsearch)
* [file](./plugins/outputs/file)
* [graphite](./plugins/outputs/graphite)
* [graylog](./plugins/outputs/graylog)

View File

@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"

View File

@ -0,0 +1,218 @@
## Elasticsearch Output Plugin for Telegraf
This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/).
Currently it only supports Elasticsearch 5.x series.
## Elasticsearch indexes and templates
### Indexes per time-frame
This plugin can manage indexes per time-frame, as commonly done in other tools with Elasticsearch.
The timestamp of the metric collected will be used to decide the index destination.
For more information about this usage on Elasticsearch, check https://www.elastic.co/guide/en/elasticsearch/guide/master/time-based.html#index-per-timeframe
### Template management
Index templates are used in Elasticsearch to define settings and mappings for the indexes and how the fields should be analyzed.
For more information on how this works, see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
This plugin can create a working template for use with telegraf metrics. It uses Elasticsearch dynamic templates feature to set proper types for the tags and metrics fields.
If the template specified already exists, it will not overwrite unless you configure this plugin to do so. Thus you can customize this template after its creation if necessary.
Example of an index template created by telegraf:
```json
{
"order": 0,
"template": "telegraf-*",
"settings": {
"index": {
"mapping": {
"total_fields": {
"limit": "5000"
}
},
"refresh_interval": "10s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"tags": {
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
},
"match_mapping_type": "string"
}
},
{
"metrics_long": {
"mapping": {
"index": false,
"type": "float"
},
"match_mapping_type": "long"
}
},
{
"metrics_double": {
"mapping": {
"index": false,
"type": "float"
},
"match_mapping_type": "double"
}
},
{
"text_fields": {
"mapping": {
"norms": false
},
"match": "*"
}
}
],
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date"
},
"measurement_name": {
"type": "keyword"
}
}
}
},
"aliases": {}
}
```
### Example events:
This plugin will format the events in the following way:
```json
{
"@timestamp": "2017-01-01T00:00:00+00:00",
"measurement_name": "cpu",
"cpu": {
"usage_guest": 0,
"usage_guest_nice": 0,
"usage_idle": 71.85413456197966,
"usage_iowait": 0.256805341656516,
"usage_irq": 0,
"usage_nice": 0,
"usage_softirq": 0.2054442732579466,
"usage_steal": 0,
"usage_system": 15.04879301548127,
"usage_user": 12.634822807288275
},
"tag": {
"cpu": "cpu-total",
"host": "elastichost",
"dc": "datacenter1"
}
}
```
```json
{
"@timestamp": "2017-01-01T00:00:00+00:00",
"measurement_name": "system",
"system": {
"load1": 0.78,
"load15": 0.8,
"load5": 0.8,
"n_cpus": 2,
"n_users": 2
},
"tag": {
"host": "elastichost",
"dc": "datacenter1"
}
}
```
### Configuration:
```toml
# Configuration for Elasticsearch to send metrics to.
[[outputs.elasticsearch]]
## The full HTTP endpoint URL for your Elasticsearch instance
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Elasticsearch client timeout, defaults to "5s" if not set.
timeout = "5s"
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
enable_sniffer = false
## Set the interval to check if the Elasticsearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
health_check_interval = "10s"
## HTTP basic authentication details (eg. when using Shield)
# username = "telegraf"
# password = "mypassword"
## Index Config
## The target index for metrics (Elasticsearch will create if it not exists).
## You can use the date specifiers below to create indexes per time frame.
## The metric timestamp will be used to decide the destination index name
# %Y - year (2016)
# %y - last two digits of year (00..99)
# %m - month (01..12)
# %d - day of month (e.g., 01)
# %H - hour (00..23)
index_name = "telegraf-%Y.%m.%d" # required.
## Template Config
## Set to true if you want telegraf to manage its index template.
## If enabled it will create a recommended index template for telegraf indexes
manage_template = true
## The template name used for telegraf indexes
template_name = "telegraf"
## Set to true if you want telegraf to overwrite an existing template
overwrite_template = false
```
### Required parameters:
* `urls`: A list containing the full HTTP URL of one or more nodes from your Elasticsearch instance.
* `index_name`: The target index for metrics. You can use the date specifiers below to create indexes per time frame.
``` %Y - year (2017)
%y - last two digits of year (00..99)
%m - month (01..12)
%d - day of month (e.g., 01)
%H - hour (00..23)
```
### Optional parameters:
* `timeout`: Elasticsearch client timeout, defaults to "5s" if not set.
* `enable_sniffer`: Set to true to ask Elasticsearch a list of all cluster nodes, thus it is not necessary to list all nodes in the urls config option.
* `health_check_interval`: Set the interval to check if the nodes are available, in seconds. Setting to 0 will disable the health check (not recommended in production).
* `username`: The username for HTTP basic authentication details (eg. when using Shield).
* `password`: The password for HTTP basic authentication details (eg. when using Shield).
* `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes.
* `template_name`: The template name used for telegraf indexes.
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
## Known issues
Integer values collected that are bigger than 2^63 and smaller than 1e21 (or in this exact same window of their negative counterparts) are encoded by golang JSON encoder in decimal format and that is not fully supported by Elasticsearch dynamic field mapping. This causes the metrics with such values to be dropped in case a field mapping has not been created yet on the telegraf index. If that's the case you will see an exception on Elasticsearch side like this:
```{"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse"}],"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"illegal_state_exception","reason":"No matching token for number_type [BIG_INTEGER]"}},"status":400}```
The correct field mapping will be created on the telegraf index as soon as a supported JSON value is received by Elasticsearch, and subsequent insertions will work because the field mapping will already exist.
This issue is caused by the way Elasticsearch tries to detect integer fields, and by how golang encodes numbers in JSON. There is no clear workaround for this at the moment.

View File

@ -0,0 +1,308 @@
package elasticsearch
import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v5"
)
type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
Username string
Password string
EnableSniffer bool
Timeout internal.Duration
HealthCheckInterval internal.Duration
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
Client *elastic.Client
}
var sampleConfig = `
## The full HTTP endpoint URL for your Elasticsearch instance
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Elasticsearch client timeout, defaults to "5s" if not set.
timeout = "5s"
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option.
enable_sniffer = false
## Set the interval to check if the Elasticsearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
health_check_interval = "10s"
## HTTP basic authentication details (eg. when using Shield)
# username = "telegraf"
# password = "mypassword"
## Index Config
## The target index for metrics (Elasticsearch will create if it not exists).
## You can use the date specifiers below to create indexes per time frame.
## The metric timestamp will be used to decide the destination index name
# %Y - year (2016)
# %y - last two digits of year (00..99)
# %m - month (01..12)
# %d - day of month (e.g., 01)
# %H - hour (00..23)
index_name = "telegraf-%Y.%m.%d" # required.
## Template Config
## Set to true if you want telegraf to manage its index template.
## If enabled it will create a recommended index template for telegraf indexes
manage_template = true
## The template name used for telegraf indexes
template_name = "telegraf"
## Set to true if you want telegraf to overwrite an existing template
overwrite_template = false
`
func (a *Elasticsearch) Connect() error {
if a.URLs == nil || a.IndexName == "" {
return fmt.Errorf("Elasticsearch urls or index_name is not defined")
}
ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration)
defer cancel()
var clientOptions []elastic.ClientOptionFunc
clientOptions = append(clientOptions,
elastic.SetSniff(a.EnableSniffer),
elastic.SetURL(a.URLs...),
elastic.SetHealthcheckInterval(a.HealthCheckInterval.Duration),
)
if a.Username != "" && a.Password != "" {
clientOptions = append(clientOptions,
elastic.SetBasicAuth(a.Username, a.Password),
)
}
if a.HealthCheckInterval.Duration == 0 {
clientOptions = append(clientOptions,
elastic.SetHealthcheck(false),
)
log.Printf("D! Elasticsearch output: disabling health check")
}
client, err := elastic.NewClient(clientOptions...)
if err != nil {
return err
}
// check for ES version on first node
esVersion, err := client.ElasticsearchVersion(a.URLs[0])
if err != nil {
return fmt.Errorf("Elasticsearch version check failed: %s", err)
}
// quit if ES version is not supported
i, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
if err != nil || i < 5 {
return fmt.Errorf("Elasticsearch version not supported: %s", esVersion)
}
log.Println("I! Elasticsearch version: " + esVersion)
a.Client = client
if a.ManageTemplate {
err := a.manageTemplate(ctx)
if err != nil {
return err
}
}
return nil
}
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
bulkRequest := a.Client.Bulk()
for _, metric := range metrics {
var name = metric.Name()
// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time())
m := make(map[string]interface{})
m["@timestamp"] = metric.Time()
m["measurement_name"] = name
m["tag"] = metric.Tags()
m[name] = metric.Fields()
bulkRequest.Add(elastic.NewBulkIndexRequest().
Index(indexName).
Type("metrics").
Doc(m))
}
ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration)
defer cancel()
res, err := bulkRequest.Do(ctx)
if err != nil {
return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err)
}
if res.Errors {
for id, err := range res.Failed() {
log.Printf("E! Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"])
}
return fmt.Errorf("W! Elasticsearch failed to index %d metrics", len(res.Failed()))
}
return nil
}
func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
if a.TemplateName == "" {
return fmt.Errorf("Elasticsearch template_name configuration not defined")
}
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)
if errExists != nil {
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
}
templatePattern := a.IndexName + "*"
if strings.Contains(a.IndexName, "%") {
templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*"
}
if (a.OverwriteTemplate) || (!templateExists) {
// Create or update the template
tmpl := fmt.Sprintf(`
{
"template":"%s",
"settings": {
"index": {
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000
}
},
"mappings" : {
"_default_" : {
"_all": { "enabled": false },
"properties" : {
"@timestamp" : { "type" : "date" },
"measurement_name" : { "type" : "keyword" }
},
"dynamic_templates": [
{
"tags": {
"match_mapping_type": "string",
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
}
}
},
{
"metrics_long": {
"match_mapping_type": "long",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"metrics_double": {
"match_mapping_type": "double",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"text_fields": {
"match": "*",
"mapping": {
"norms": false
}
}
}
]
}
}
}`, templatePattern)
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)
if errCreateTemplate != nil {
return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
}
log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName)
} else {
log.Println("D! Found existing Elasticsearch template. Skipping template management")
}
return nil
}
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
if strings.Contains(indexName, "%") {
var dateReplacer = strings.NewReplacer(
"%Y", eventTime.UTC().Format("2006"),
"%y", eventTime.UTC().Format("06"),
"%m", eventTime.UTC().Format("01"),
"%d", eventTime.UTC().Format("02"),
"%H", eventTime.UTC().Format("15"),
)
indexName = dateReplacer.Replace(indexName)
}
return indexName
}
func (a *Elasticsearch) SampleConfig() string {
return sampleConfig
}
func (a *Elasticsearch) Description() string {
return "Configuration for Elasticsearch to send metrics to."
}
func (a *Elasticsearch) Close() error {
a.Client = nil
return nil
}
func init() {
outputs.Add("elasticsearch", func() telegraf.Output {
return &Elasticsearch{
Timeout: internal.Duration{Duration: time.Second * 5},
HealthCheckInterval: internal.Duration{Duration: time.Second * 10},
}
})
}

View File

@ -0,0 +1,126 @@
package elasticsearch
import (
"context"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConnectAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: false,
HealthCheckInterval: internal.Duration{Duration: time.Second * 10},
}
// Verify that we can connect to Elasticsearch
err := e.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to Elasticsearch
err = e.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestTemplateManagementEmptyTemplate(t *testing.T) {
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
ctx := context.Background()
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "",
OverwriteTemplate: true,
}
err := e.manageTemplate(ctx)
require.Error(t, err)
}
func TestTemplateManagement(t *testing.T) {
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "test-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
}
ctx, cancel := context.WithTimeout(context.Background(), e.Timeout.Duration)
defer cancel()
err := e.Connect()
require.NoError(t, err)
err = e.manageTemplate(ctx)
require.NoError(t, err)
}
func TestGetIndexName(t *testing.T) {
e := &Elasticsearch{}
var tests = []struct {
EventTime time.Time
IndexName string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname",
"indexname",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname-%Y",
"indexname-2014",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname-%Y-%m",
"indexname-2014-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname-%Y-%m-%d",
"indexname-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname-%Y-%m-%d-%H",
"indexname-2014-12-01-23",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
"indexname-%y-%m",
"indexname-14-12",
},
}
for _, test := range tests {
indexName := e.GetIndexName(test.IndexName, test.EventTime)
if indexName != test.Expected {
t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected)
}
}
}