From a7e8bc1c020d7cc9918b2b0e35a9dae16b574e99 Mon Sep 17 00:00:00 2001 From: Leandro Piccilli Date: Tue, 21 Mar 2017 01:47:57 +0100 Subject: [PATCH] Add Elasticsearch 5.x output (#2332) --- Godeps | 1 + Makefile | 6 +- README.md | 1 + plugins/outputs/all/all.go | 1 + plugins/outputs/elasticsearch/README.md | 218 +++++++++++++ .../outputs/elasticsearch/elasticsearch.go | 308 ++++++++++++++++++ .../elasticsearch/elasticsearch_test.go | 126 +++++++ 7 files changed, 659 insertions(+), 2 deletions(-) create mode 100644 plugins/outputs/elasticsearch/README.md create mode 100644 plugins/outputs/elasticsearch/elasticsearch.go create mode 100644 plugins/outputs/elasticsearch/elasticsearch_test.go diff --git a/Godeps b/Godeps index 2d0419ef6..6cbe9efa7 100644 --- a/Godeps +++ b/Godeps @@ -59,4 +59,5 @@ golang.org/x/text 506f9d5c962f284575e88337e7d9296d27e729d3 gopkg.in/dancannon/gorethink.v1 edc7a6a68e2d8015f5ffe1b2560eed989f8a45be gopkg.in/fatih/pool.v2 6e328e67893eb46323ad06f0e92cb9536babbabc gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 +gopkg.in/olivere/elastic.v5 ee3ebceab960cf68ab9a89ee6d78c031ef5b4a4e gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 diff --git a/Makefile b/Makefile index 79276f887..d2bad656d 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,7 @@ docker-run: -e ADVERTISED_PORT=9092 \ -p "2181:2181" -p "9092:9092" \ -d spotify/kafka + docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql docker run --name memcached -p "11211:11211" -d memcached docker run --name postgres -p "5432:5432" -d postgres @@ -69,6 +70,7 @@ docker-run-circle: -e ADVERTISED_PORT=9092 \ -p "2181:2181" -p "9092:9092" \ -d spotify/kafka + docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5 docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt docker run --name riemann -p "5555:5555" -d stealthly/docker-riemann @@ -76,8 +78,8 @@ docker-run-circle: # Kill all docker containers, ignore errors docker-kill: - -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats - -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats + -docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch + -docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch # Run full unit tests using docker containers (includes setup and teardown) test: vet docker-kill docker-run diff --git a/README.md b/README.md index 915c7b761..906862714 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,7 @@ Telegraf can also collect metrics via the following service plugins: * [aws cloudwatch](./plugins/outputs/cloudwatch) * [datadog](./plugins/outputs/datadog) * [discard](./plugins/outputs/discard) +* [elasticsearch](./plugins/outputs/elasticsearch) * [file](./plugins/outputs/file) * [graphite](./plugins/outputs/graphite) * [graylog](./plugins/outputs/graylog) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index eec2b95e3..089a56909 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" + _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md new file mode 100644 index 000000000..620d5a82c --- /dev/null +++ b/plugins/outputs/elasticsearch/README.md @@ -0,0 +1,218 @@ +## Elasticsearch Output Plugin for Telegraf + +This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/). + +Currently it only supports Elasticsearch 5.x series. + +## Elasticsearch indexes and templates + +### Indexes per time-frame + +This plugin can manage indexes per time-frame, as commonly done in other tools with Elasticsearch. + +The timestamp of the metric collected will be used to decide the index destination. + +For more information about this usage on Elasticsearch, check https://www.elastic.co/guide/en/elasticsearch/guide/master/time-based.html#index-per-timeframe + +### Template management + +Index templates are used in Elasticsearch to define settings and mappings for the indexes and how the fields should be analyzed. +For more information on how this works, see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html + +This plugin can create a working template for use with telegraf metrics. It uses Elasticsearch dynamic templates feature to set proper types for the tags and metrics fields. +If the template specified already exists, it will not overwrite unless you configure this plugin to do so. Thus you can customize this template after its creation if necessary. + +Example of an index template created by telegraf: + +```json +{ + "order": 0, + "template": "telegraf-*", + "settings": { + "index": { + "mapping": { + "total_fields": { + "limit": "5000" + } + }, + "refresh_interval": "10s" + } + }, + "mappings": { + "_default_": { + "dynamic_templates": [ + { + "tags": { + "path_match": "tag.*", + "mapping": { + "ignore_above": 512, + "type": "keyword" + }, + "match_mapping_type": "string" + } + }, + { + "metrics_long": { + "mapping": { + "index": false, + "type": "float" + }, + "match_mapping_type": "long" + } + }, + { + "metrics_double": { + "mapping": { + "index": false, + "type": "float" + }, + "match_mapping_type": "double" + } + }, + { + "text_fields": { + "mapping": { + "norms": false + }, + "match": "*" + } + } + ], + "_all": { + "enabled": false + }, + "properties": { + "@timestamp": { + "type": "date" + }, + "measurement_name": { + "type": "keyword" + } + } + } + }, + "aliases": {} +} + +``` + +### Example events: + +This plugin will format the events in the following way: + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "cpu", + "cpu": { + "usage_guest": 0, + "usage_guest_nice": 0, + "usage_idle": 71.85413456197966, + "usage_iowait": 0.256805341656516, + "usage_irq": 0, + "usage_nice": 0, + "usage_softirq": 0.2054442732579466, + "usage_steal": 0, + "usage_system": 15.04879301548127, + "usage_user": 12.634822807288275 + }, + "tag": { + "cpu": "cpu-total", + "host": "elastichost", + "dc": "datacenter1" + } +} +``` + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "system", + "system": { + "load1": 0.78, + "load15": 0.8, + "load5": 0.8, + "n_cpus": 2, + "n_users": 2 + }, + "tag": { + "host": "elastichost", + "dc": "datacenter1" + } +} +``` + +### Configuration: + +```toml +# Configuration for Elasticsearch to send metrics to. +[[outputs.elasticsearch]] + ## The full HTTP endpoint URL for your Elasticsearch instance + ## Multiple urls can be specified as part of the same cluster, + ## this means that only ONE of the urls will be written to each interval. + urls = [ "http://node1.es.example.com:9200" ] # required. + ## Elasticsearch client timeout, defaults to "5s" if not set. + timeout = "5s" + ## Set to true to ask Elasticsearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option + enable_sniffer = false + ## Set the interval to check if the Elasticsearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + health_check_interval = "10s" + ## HTTP basic authentication details (eg. when using Shield) + # username = "telegraf" + # password = "mypassword" + + ## Index Config + ## The target index for metrics (Elasticsearch will create if it not exists). + ## You can use the date specifiers below to create indexes per time frame. + ## The metric timestamp will be used to decide the destination index name + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) + index_name = "telegraf-%Y.%m.%d" # required. + + ## Template Config + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + manage_template = true + ## The template name used for telegraf indexes + template_name = "telegraf" + ## Set to true if you want telegraf to overwrite an existing template + overwrite_template = false +``` + +### Required parameters: + +* `urls`: A list containing the full HTTP URL of one or more nodes from your Elasticsearch instance. +* `index_name`: The target index for metrics. You can use the date specifiers below to create indexes per time frame. + +``` %Y - year (2017) + %y - last two digits of year (00..99) + %m - month (01..12) + %d - day of month (e.g., 01) + %H - hour (00..23) +``` + +### Optional parameters: + +* `timeout`: Elasticsearch client timeout, defaults to "5s" if not set. +* `enable_sniffer`: Set to true to ask Elasticsearch a list of all cluster nodes, thus it is not necessary to list all nodes in the urls config option. +* `health_check_interval`: Set the interval to check if the nodes are available, in seconds. Setting to 0 will disable the health check (not recommended in production). +* `username`: The username for HTTP basic authentication details (eg. when using Shield). +* `password`: The password for HTTP basic authentication details (eg. when using Shield). +* `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes. +* `template_name`: The template name used for telegraf indexes. +* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template. + +## Known issues + +Integer values collected that are bigger than 2^63 and smaller than 1e21 (or in this exact same window of their negative counterparts) are encoded by golang JSON encoder in decimal format and that is not fully supported by Elasticsearch dynamic field mapping. This causes the metrics with such values to be dropped in case a field mapping has not been created yet on the telegraf index. If that's the case you will see an exception on Elasticsearch side like this: + +```{"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse"}],"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"illegal_state_exception","reason":"No matching token for number_type [BIG_INTEGER]"}},"status":400}``` + +The correct field mapping will be created on the telegraf index as soon as a supported JSON value is received by Elasticsearch, and subsequent insertions will work because the field mapping will already exist. + +This issue is caused by the way Elasticsearch tries to detect integer fields, and by how golang encodes numbers in JSON. There is no clear workaround for this at the moment. \ No newline at end of file diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go new file mode 100644 index 000000000..dbd359b90 --- /dev/null +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -0,0 +1,308 @@ +package elasticsearch + +import ( + "context" + "fmt" + "log" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "gopkg.in/olivere/elastic.v5" +) + +type Elasticsearch struct { + URLs []string `toml:"urls"` + IndexName string + Username string + Password string + EnableSniffer bool + Timeout internal.Duration + HealthCheckInterval internal.Duration + ManageTemplate bool + TemplateName string + OverwriteTemplate bool + Client *elastic.Client +} + +var sampleConfig = ` + ## The full HTTP endpoint URL for your Elasticsearch instance + ## Multiple urls can be specified as part of the same cluster, + ## this means that only ONE of the urls will be written to each interval. + urls = [ "http://node1.es.example.com:9200" ] # required. + ## Elasticsearch client timeout, defaults to "5s" if not set. + timeout = "5s" + ## Set to true to ask Elasticsearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option. + enable_sniffer = false + ## Set the interval to check if the Elasticsearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + health_check_interval = "10s" + ## HTTP basic authentication details (eg. when using Shield) + # username = "telegraf" + # password = "mypassword" + + ## Index Config + ## The target index for metrics (Elasticsearch will create if it not exists). + ## You can use the date specifiers below to create indexes per time frame. + ## The metric timestamp will be used to decide the destination index name + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) + index_name = "telegraf-%Y.%m.%d" # required. + + ## Template Config + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + manage_template = true + ## The template name used for telegraf indexes + template_name = "telegraf" + ## Set to true if you want telegraf to overwrite an existing template + overwrite_template = false +` + +func (a *Elasticsearch) Connect() error { + if a.URLs == nil || a.IndexName == "" { + return fmt.Errorf("Elasticsearch urls or index_name is not defined") + } + + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration) + defer cancel() + + var clientOptions []elastic.ClientOptionFunc + + clientOptions = append(clientOptions, + elastic.SetSniff(a.EnableSniffer), + elastic.SetURL(a.URLs...), + elastic.SetHealthcheckInterval(a.HealthCheckInterval.Duration), + ) + + if a.Username != "" && a.Password != "" { + clientOptions = append(clientOptions, + elastic.SetBasicAuth(a.Username, a.Password), + ) + } + + if a.HealthCheckInterval.Duration == 0 { + clientOptions = append(clientOptions, + elastic.SetHealthcheck(false), + ) + log.Printf("D! Elasticsearch output: disabling health check") + } + + client, err := elastic.NewClient(clientOptions...) + + if err != nil { + return err + } + + // check for ES version on first node + esVersion, err := client.ElasticsearchVersion(a.URLs[0]) + + if err != nil { + return fmt.Errorf("Elasticsearch version check failed: %s", err) + } + + // quit if ES version is not supported + i, err := strconv.Atoi(strings.Split(esVersion, ".")[0]) + if err != nil || i < 5 { + return fmt.Errorf("Elasticsearch version not supported: %s", esVersion) + } + + log.Println("I! Elasticsearch version: " + esVersion) + + a.Client = client + + if a.ManageTemplate { + err := a.manageTemplate(ctx) + if err != nil { + return err + } + } + + return nil +} + +func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + bulkRequest := a.Client.Bulk() + + for _, metric := range metrics { + var name = metric.Name() + + // index name has to be re-evaluated each time for telegraf + // to send the metric to the correct time-based index + indexName := a.GetIndexName(a.IndexName, metric.Time()) + + m := make(map[string]interface{}) + + m["@timestamp"] = metric.Time() + m["measurement_name"] = name + m["tag"] = metric.Tags() + m[name] = metric.Fields() + + bulkRequest.Add(elastic.NewBulkIndexRequest(). + Index(indexName). + Type("metrics"). + Doc(m)) + + } + + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration) + defer cancel() + + res, err := bulkRequest.Do(ctx) + + if err != nil { + return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err) + } + + if res.Errors { + for id, err := range res.Failed() { + log.Printf("E! Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"]) + } + return fmt.Errorf("W! Elasticsearch failed to index %d metrics", len(res.Failed())) + } + + return nil + +} + +func (a *Elasticsearch) manageTemplate(ctx context.Context) error { + if a.TemplateName == "" { + return fmt.Errorf("Elasticsearch template_name configuration not defined") + } + + templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx) + + if errExists != nil { + return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) + } + + templatePattern := a.IndexName + "*" + + if strings.Contains(a.IndexName, "%") { + templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*" + } + + if (a.OverwriteTemplate) || (!templateExists) { + // Create or update the template + tmpl := fmt.Sprintf(` + { + "template":"%s", + "settings": { + "index": { + "refresh_interval": "10s", + "mapping.total_fields.limit": 5000 + } + }, + "mappings" : { + "_default_" : { + "_all": { "enabled": false }, + "properties" : { + "@timestamp" : { "type" : "date" }, + "measurement_name" : { "type" : "keyword" } + }, + "dynamic_templates": [ + { + "tags": { + "match_mapping_type": "string", + "path_match": "tag.*", + "mapping": { + "ignore_above": 512, + "type": "keyword" + } + } + }, + { + "metrics_long": { + "match_mapping_type": "long", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "metrics_double": { + "match_mapping_type": "double", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "text_fields": { + "match": "*", + "mapping": { + "norms": false + } + } + } + ] + } + } + }`, templatePattern) + _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx) + + if errCreateTemplate != nil { + return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate) + } + + log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName) + + } else { + + log.Println("D! Found existing Elasticsearch template. Skipping template management") + + } + return nil +} + +func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string { + if strings.Contains(indexName, "%") { + var dateReplacer = strings.NewReplacer( + "%Y", eventTime.UTC().Format("2006"), + "%y", eventTime.UTC().Format("06"), + "%m", eventTime.UTC().Format("01"), + "%d", eventTime.UTC().Format("02"), + "%H", eventTime.UTC().Format("15"), + ) + + indexName = dateReplacer.Replace(indexName) + } + + return indexName + +} + +func (a *Elasticsearch) SampleConfig() string { + return sampleConfig +} + +func (a *Elasticsearch) Description() string { + return "Configuration for Elasticsearch to send metrics to." +} + +func (a *Elasticsearch) Close() error { + a.Client = nil + return nil +} + +func init() { + outputs.Add("elasticsearch", func() telegraf.Output { + return &Elasticsearch{ + Timeout: internal.Duration{Duration: time.Second * 5}, + HealthCheckInterval: internal.Duration{Duration: time.Second * 10}, + } + }) +} diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go new file mode 100644 index 000000000..9163a2bbe --- /dev/null +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -0,0 +1,126 @@ +package elasticsearch + +import ( + "context" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: internal.Duration{Duration: time.Second * 5}, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: internal.Duration{Duration: time.Second * 10}, + } + + // Verify that we can connect to Elasticsearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to Elasticsearch + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) + +} + +func TestTemplateManagementEmptyTemplate(t *testing.T) { + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + ctx := context.Background() + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: internal.Duration{Duration: time.Second * 5}, + ManageTemplate: true, + TemplateName: "", + OverwriteTemplate: true, + } + + err := e.manageTemplate(ctx) + require.Error(t, err) + +} + +func TestTemplateManagement(t *testing.T) { + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "test-%Y.%m.%d", + Timeout: internal.Duration{Duration: time.Second * 5}, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + } + + ctx, cancel := context.WithTimeout(context.Background(), e.Timeout.Duration) + defer cancel() + + err := e.Connect() + require.NoError(t, err) + + err = e.manageTemplate(ctx) + require.NoError(t, err) +} + +func TestGetIndexName(t *testing.T) { + e := &Elasticsearch{} + + var tests = []struct { + EventTime time.Time + IndexName string + Expected string + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname", + "indexname", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname-%Y", + "indexname-2014", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname-%Y-%m", + "indexname-2014-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname-%Y-%m-%d", + "indexname-2014-12-01", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname-%Y-%m-%d-%H", + "indexname-2014-12-01-23", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + "indexname-%y-%m", + "indexname-14-12", + }, + } + for _, test := range tests { + indexName := e.GetIndexName(test.IndexName, test.EventTime) + if indexName != test.Expected { + t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected) + } + } +}