From 9e95d51648e20b8f50c1df72a327500391c404d5 Mon Sep 17 00:00:00 2001 From: Leandro Piccilli Date: Tue, 21 Nov 2017 01:25:36 +0100 Subject: [PATCH] Add support for tags in the index name in elasticsearch output (#3470) --- plugins/outputs/elasticsearch/README.md | 7 + .../outputs/elasticsearch/elasticsearch.go | 74 ++++++++-- .../elasticsearch/elasticsearch_test.go | 130 +++++++++++++++++- 3 files changed, 199 insertions(+), 12 deletions(-) diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index d2c84a8d3..b0d2e6f9b 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -173,6 +173,11 @@ This plugin will format the events in the following way: # %d - day of month (e.g., 01) # %H - hour (00..23) # %V - week of the year (ISO week) (01..53) + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the index name. If the tag does not exist, + ## the default tag value will be used. + # index_name = "telegraf-{{host}}-%Y.%m.%d" + # default_tag_value = "none" index_name = "telegraf-%Y.%m.%d" # required. ## Optional SSL Config @@ -202,7 +207,9 @@ This plugin will format the events in the following way: %m - month (01..12) %d - day of month (e.g., 01) %H - hour (00..23) + %V - week of the year (ISO week) (01..53) ``` +Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead. ### Optional parameters: diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index a9fd5a491..326def1d1 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -18,6 +18,8 @@ import ( type Elasticsearch struct { URLs []string `toml:"urls"` IndexName string + DefaultTagValue string + TagKeys []string Username string Password string EnableSniffer bool @@ -38,7 +40,7 @@ var sampleConfig = ` ## Multiple urls can be specified as part of the same cluster, ## this means that only ONE of the urls will be written to each interval. urls = [ "http://node1.es.example.com:9200" ] # required. - ## Elasticsearch client timeout, defaults to "5s" if not set. + ## Elasticsearch client timeout, defaults to "5s" if not set. timeout = "5s" ## Set to true to ask Elasticsearch a list of all cluster nodes, ## thus it is not necessary to list all nodes in the urls config option. @@ -60,6 +62,11 @@ var sampleConfig = ` # %d - day of month (e.g., 01) # %H - hour (00..23) # %V - week of the year (ISO week) (01..53) + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the index name. If the tag does not exist, + ## the default tag value will be used. + # index_name = "telegraf-{{host}}-%Y.%m.%d" + # default_tag_value = "none" index_name = "telegraf-%Y.%m.%d" # required. ## Optional SSL Config @@ -152,6 +159,8 @@ func (a *Elasticsearch) Connect() error { } } + a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName) + return nil } @@ -167,7 +176,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { // index name has to be re-evaluated each time for telegraf // to send the metric to the correct time-based index - indexName := a.GetIndexName(a.IndexName, metric.Time()) + indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags()) m := make(map[string]interface{}) @@ -214,13 +223,21 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) } - templatePattern := a.IndexName + "*" + templatePattern := a.IndexName - if strings.Contains(a.IndexName, "%") { - templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*" + if strings.Contains(templatePattern, "%") { + templatePattern = templatePattern[0:strings.Index(templatePattern, "%")] } - if (a.OverwriteTemplate) || (!templateExists) { + if strings.Contains(templatePattern, "{{") { + templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")] + } + + if templatePattern == "" { + return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix") + } + + if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") { // Create or update the template tmpl := fmt.Sprintf(` { @@ -278,7 +295,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { ] } } - }`, templatePattern) + }`, templatePattern+"*") _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx) if errCreateTemplate != nil { @@ -295,7 +312,35 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { return nil } -func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string { +func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) { + + tagKeys := []string{} + startTag := strings.Index(indexName, "{{") + + for startTag >= 0 { + endTag := strings.Index(indexName, "}}") + + if endTag < 0 { + startTag = -1 + + } else { + tagName := indexName[startTag+2 : endTag] + + var tagReplacer = strings.NewReplacer( + "{{"+tagName+"}}", "%s", + ) + + indexName = tagReplacer.Replace(indexName) + tagKeys = append(tagKeys, (strings.TrimSpace(tagName))) + + startTag = strings.Index(indexName, "{{") + } + } + + return indexName, tagKeys +} + +func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagKeys []string, metricTags map[string]string) string { if strings.Contains(indexName, "%") { var dateReplacer = strings.NewReplacer( "%Y", eventTime.UTC().Format("2006"), @@ -309,7 +354,18 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) stri indexName = dateReplacer.Replace(indexName) } - return indexName + tagValues := []interface{}{} + + for _, key := range tagKeys { + if value, ok := metricTags[key]; ok { + tagValues = append(tagValues, value) + } else { + log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue) + tagValues = append(tagValues, a.DefaultTagValue) + } + } + + return fmt.Sprintf(indexName, tagValues...) } diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index dadd94da9..e2a583402 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -2,6 +2,7 @@ package elasticsearch import ( "context" + "reflect" "testing" "time" @@ -38,6 +39,10 @@ func TestConnectAndWrite(t *testing.T) { } func TestTemplateManagementEmptyTemplate(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} ctx := context.Background() @@ -82,54 +87,173 @@ func TestTemplateManagement(t *testing.T) { require.NoError(t, err) } +func TestTemplateInvalidIndexPattern(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} + + e := &Elasticsearch{ + URLs: urls, + IndexName: "{{host}}-%Y.%m.%d", + Timeout: internal.Duration{Duration: time.Second * 5}, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + } + + err := e.Connect() + require.Error(t, err) +} + +func TestGetTagKeys(t *testing.T) { + e := &Elasticsearch{ + DefaultTagValue: "none", + } + + var tests = []struct { + IndexName string + ExpectedIndexName string + ExpectedTagKeys []string + }{ + { + "indexname", + "indexname", + []string{}, + }, { + "indexname-%Y", + "indexname-%Y", + []string{}, + }, { + "indexname-%Y-%m", + "indexname-%Y-%m", + []string{}, + }, { + "indexname-%Y-%m-%d", + "indexname-%Y-%m-%d", + []string{}, + }, { + "indexname-%Y-%m-%d-%H", + "indexname-%Y-%m-%d-%H", + []string{}, + }, { + "indexname-%y-%m", + "indexname-%y-%m", + []string{}, + }, { + "indexname-{{tag1}}-%y-%m", + "indexname-%s-%y-%m", + []string{"tag1"}, + }, { + "indexname-{{tag1}}-{{tag2}}-%y-%m", + "indexname-%s-%s-%y-%m", + []string{"tag1", "tag2"}, + }, { + "indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m", + "indexname-%s-%s-%s-%y-%m", + []string{"tag1", "tag2", "tag3"}, + }, + } + for _, test := range tests { + indexName, tagKeys := e.GetTagKeys(test.IndexName) + if indexName != test.ExpectedIndexName { + t.Errorf("Expected indexname %s, got %s\n", test.ExpectedIndexName, indexName) + } + if !reflect.DeepEqual(tagKeys, test.ExpectedTagKeys) { + t.Errorf("Expected tagKeys %s, got %s\n", test.ExpectedTagKeys, tagKeys) + } + } + +} + func TestGetIndexName(t *testing.T) { - e := &Elasticsearch{} + e := &Elasticsearch{ + DefaultTagValue: "none", + } var tests = []struct { EventTime time.Time + Tags map[string]string + TagKeys []string IndexName string Expected string }{ { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname", "indexname", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%Y", "indexname-2014", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%Y-%m", "indexname-2014-12", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%Y-%m-%d", "indexname-2014-12-01", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%Y-%m-%d-%H", "indexname-2014-12-01-23", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%y-%m", "indexname-14-12", }, { time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, "indexname-%Y-%V", "indexname-2014-49", }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1"}, + "indexname-%s-%y-%m", + "indexname-value1-14-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1", "tag2"}, + "indexname-%s-%s-%y-%m", + "indexname-value1-value2-14-12", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{"tag1", "tag2", "tag3"}, + "indexname-%s-%s-%s-%y-%m", + "indexname-value1-value2-none-14-12", + }, } for _, test := range tests { - indexName := e.GetIndexName(test.IndexName, test.EventTime) + indexName := e.GetIndexName(test.IndexName, test.EventTime, test.TagKeys, test.Tags) if indexName != test.Expected { - t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected) + t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName) } } }