Add support for tags in the index name in elasticsearch output (#3470)

This commit is contained in:
Leandro Piccilli 2017-11-21 01:25:36 +01:00 committed by Daniel Nelson
parent 612d81d689
commit 9e95d51648
3 changed files with 199 additions and 12 deletions

View File

@ -173,6 +173,11 @@ This plugin will format the events in the following way:
# %d - day of month (e.g., 01)
# %H - hour (00..23)
# %V - week of the year (ISO week) (01..53)
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the index name. If the tag does not exist,
## the default tag value will be used.
# index_name = "telegraf-{{host}}-%Y.%m.%d"
# default_tag_value = "none"
index_name = "telegraf-%Y.%m.%d" # required.
## Optional SSL Config
@ -202,7 +207,9 @@ This plugin will format the events in the following way:
%m - month (01..12)
%d - day of month (e.g., 01)
%H - hour (00..23)
%V - week of the year (ISO week) (01..53)
```
Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead.
### Optional parameters:

View File

@ -18,6 +18,8 @@ import (
type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
DefaultTagValue string
TagKeys []string
Username string
Password string
EnableSniffer bool
@ -38,7 +40,7 @@ var sampleConfig = `
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Elasticsearch client timeout, defaults to "5s" if not set.
## Elasticsearch client timeout, defaults to "5s" if not set.
timeout = "5s"
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option.
@ -60,6 +62,11 @@ var sampleConfig = `
# %d - day of month (e.g., 01)
# %H - hour (00..23)
# %V - week of the year (ISO week) (01..53)
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the index name. If the tag does not exist,
## the default tag value will be used.
# index_name = "telegraf-{{host}}-%Y.%m.%d"
# default_tag_value = "none"
index_name = "telegraf-%Y.%m.%d" # required.
## Optional SSL Config
@ -152,6 +159,8 @@ func (a *Elasticsearch) Connect() error {
}
}
a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
return nil
}
@ -167,7 +176,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time())
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
m := make(map[string]interface{})
@ -214,13 +223,21 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
}
templatePattern := a.IndexName + "*"
templatePattern := a.IndexName
if strings.Contains(a.IndexName, "%") {
templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*"
if strings.Contains(templatePattern, "%") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "%")]
}
if (a.OverwriteTemplate) || (!templateExists) {
if strings.Contains(templatePattern, "{{") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
}
if templatePattern == "" {
return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix")
}
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
// Create or update the template
tmpl := fmt.Sprintf(`
{
@ -278,7 +295,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
]
}
}
}`, templatePattern)
}`, templatePattern+"*")
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)
if errCreateTemplate != nil {
@ -295,7 +312,35 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return nil
}
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
tagKeys := []string{}
startTag := strings.Index(indexName, "{{")
for startTag >= 0 {
endTag := strings.Index(indexName, "}}")
if endTag < 0 {
startTag = -1
} else {
tagName := indexName[startTag+2 : endTag]
var tagReplacer = strings.NewReplacer(
"{{"+tagName+"}}", "%s",
)
indexName = tagReplacer.Replace(indexName)
tagKeys = append(tagKeys, (strings.TrimSpace(tagName)))
startTag = strings.Index(indexName, "{{")
}
}
return indexName, tagKeys
}
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagKeys []string, metricTags map[string]string) string {
if strings.Contains(indexName, "%") {
var dateReplacer = strings.NewReplacer(
"%Y", eventTime.UTC().Format("2006"),
@ -309,7 +354,18 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) stri
indexName = dateReplacer.Replace(indexName)
}
return indexName
tagValues := []interface{}{}
for _, key := range tagKeys {
if value, ok := metricTags[key]; ok {
tagValues = append(tagValues, value)
} else {
log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue)
tagValues = append(tagValues, a.DefaultTagValue)
}
}
return fmt.Sprintf(indexName, tagValues...)
}

View File

@ -2,6 +2,7 @@ package elasticsearch
import (
"context"
"reflect"
"testing"
"time"
@ -38,6 +39,10 @@ func TestConnectAndWrite(t *testing.T) {
}
func TestTemplateManagementEmptyTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
ctx := context.Background()
@ -82,54 +87,173 @@ func TestTemplateManagement(t *testing.T) {
require.NoError(t, err)
}
func TestTemplateInvalidIndexPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "{{host}}-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
}
err := e.Connect()
require.Error(t, err)
}
func TestGetTagKeys(t *testing.T) {
e := &Elasticsearch{
DefaultTagValue: "none",
}
var tests = []struct {
IndexName string
ExpectedIndexName string
ExpectedTagKeys []string
}{
{
"indexname",
"indexname",
[]string{},
}, {
"indexname-%Y",
"indexname-%Y",
[]string{},
}, {
"indexname-%Y-%m",
"indexname-%Y-%m",
[]string{},
}, {
"indexname-%Y-%m-%d",
"indexname-%Y-%m-%d",
[]string{},
}, {
"indexname-%Y-%m-%d-%H",
"indexname-%Y-%m-%d-%H",
[]string{},
}, {
"indexname-%y-%m",
"indexname-%y-%m",
[]string{},
}, {
"indexname-{{tag1}}-%y-%m",
"indexname-%s-%y-%m",
[]string{"tag1"},
}, {
"indexname-{{tag1}}-{{tag2}}-%y-%m",
"indexname-%s-%s-%y-%m",
[]string{"tag1", "tag2"},
}, {
"indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m",
"indexname-%s-%s-%s-%y-%m",
[]string{"tag1", "tag2", "tag3"},
},
}
for _, test := range tests {
indexName, tagKeys := e.GetTagKeys(test.IndexName)
if indexName != test.ExpectedIndexName {
t.Errorf("Expected indexname %s, got %s\n", test.ExpectedIndexName, indexName)
}
if !reflect.DeepEqual(tagKeys, test.ExpectedTagKeys) {
t.Errorf("Expected tagKeys %s, got %s\n", test.ExpectedTagKeys, tagKeys)
}
}
}
func TestGetIndexName(t *testing.T) {
e := &Elasticsearch{}
e := &Elasticsearch{
DefaultTagValue: "none",
}
var tests = []struct {
EventTime time.Time
Tags map[string]string
TagKeys []string
IndexName string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname",
"indexname",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y",
"indexname-2014",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m",
"indexname-2014-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m-%d",
"indexname-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m-%d-%H",
"indexname-2014-12-01-23",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%y-%m",
"indexname-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%V",
"indexname-2014-49",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1"},
"indexname-%s-%y-%m",
"indexname-value1-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1", "tag2"},
"indexname-%s-%s-%y-%m",
"indexname-value1-value2-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1", "tag2", "tag3"},
"indexname-%s-%s-%s-%y-%m",
"indexname-value1-value2-none-14-12",
},
}
for _, test := range tests {
indexName := e.GetIndexName(test.IndexName, test.EventTime)
indexName := e.GetIndexName(test.IndexName, test.EventTime, test.TagKeys, test.Tags)
if indexName != test.Expected {
t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected)
t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName)
}
}
}