diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 64dc25202..c4e5e4e7e 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -14,6 +14,19 @@ In this case, dots will be replaced with "_". ## The full HTTP endpoint URL for your Elasticsearch. # required server_host = "http://10.10.10.10:9200" ## The target index for metrics # required + # formats allowed on index_name after a prefix: + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) index_name = "test" ## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically enable_sniffer = false + ## If index not exists, a template will be created and then the new index. + ## You can set number of shards and replicas for this template. + ## If the index's name uses formats ("myindex%Y%m%d"), the template's name will be the characters before + ## the first '%' ("myindex"). + number_of_shards = 5 + number_of_replicas = 1 + diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 67dc20f48..7536f76e0 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -6,22 +6,33 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" "gopkg.in/olivere/elastic.v2" "os" + "strconv" "strings" "time" ) type Elasticsearch struct { - ServerHost string - IndexName string - EnableSniffer bool - Client *elastic.Client - Version string + ServerHost string + IndexName string + EnableSniffer bool + NumberOfShards int + NumberOfReplicas int + Client *elastic.Client + Version string } var sampleConfig = ` server_host = "http://10.10.10.10:9200" # required. index_name = "test" # required. + # regex allowed on index_name: + # %Y - year (2016) + # %y - last two digits of year (00..99) + # %m - month (01..12) + # %d - day of month (e.g., 01) + # %H - hour (00..23) enable_sniffer = false + number_of_shards = 5 + number_of_replicas = 1 ` func (a *Elasticsearch) Connect() error { @@ -29,6 +40,11 @@ func (a *Elasticsearch) Connect() error { return fmt.Errorf("FAILED server_host and index_name are required fields for Elasticsearch output") } + // Check if index's name has a prefix + if strings.HasPrefix(a.IndexName, "%") { + return fmt.Errorf("FAILED Elasticsearch index's name must start with a prefix. \n") + } + client, err := elastic.NewClient( elastic.SetHealthcheck(true), elastic.SetSniff(a.EnableSniffer), @@ -50,6 +66,68 @@ func (a *Elasticsearch) Connect() error { a.Version = version + templateName := a.IndexName + + if strings.Contains(a.IndexName, "%") { + // Template's name its Index's name without date patterns + templateName = a.IndexName[0:strings.Index(a.IndexName, "%")] + + year := strconv.Itoa(time.Now().Year()) + a.IndexName = strings.Replace(a.IndexName, "%Y", year, -1) + a.IndexName = strings.Replace(a.IndexName, "%y", year[len(year)-2:], -1) + a.IndexName = strings.Replace(a.IndexName, "%m", strconv.Itoa(int(time.Now().Month())), -1) + a.IndexName = strings.Replace(a.IndexName, "%d", strconv.Itoa(time.Now().Day()), -1) + a.IndexName = strings.Replace(a.IndexName, "%H", strconv.Itoa(time.Now().Hour()), -1) + } + + exists, errExists := a.Client.IndexExists(a.IndexName).Do() + + if errExists != nil { + return fmt.Errorf("FAILED to check if Elasticsearch index %s exists : %s\n", a.IndexName, errExists) + } + + if !exists { + // First create a template for the new index + tmpl := fmt.Sprintf(`{ + "template":"%s*", + "settings" : { + "number_of_shards" : %s, + "number_of_replicas" : %s + }, + "mappings" : { + "_default_" : { + "properties" : { + "created" : { "type" : "date" }, + "host":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}} + }, + "dynamic_templates": [ + { "unknow": { + "match": "*", + "match_mapping_type": "unknow", + "mapping": { + "type":"string" + } + }} + ] + } + } + }`, templateName, strconv.Itoa(a.NumberOfShards), strconv.Itoa(a.NumberOfShards)) + + _, errCreateTemplate := a.Client.IndexPutTemplate(templateName).BodyString(tmpl).Do() + + if errCreateTemplate != nil { + return fmt.Errorf("FAILED to create Elasticsearch index template %s : %s\n", templateName, errCreateTemplate) + } + + // Now create the new index + _, errCreateIndex := a.Client.CreateIndex(a.IndexName).Do() + + if errCreateIndex != nil { + return fmt.Errorf("FAILED to create Elasticsearch index %s : %s\n", a.IndexName, errCreateIndex) + } + + } + return nil } diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index f370c4880..c2028a853 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -16,11 +16,14 @@ func TestConnectAndWrite(t *testing.T) { t.Skip("Skipping integration test in short mode") } - serverhost := "http://" + testutil.GetLocalHost() + ":9200" + //serverhost := "http://" + testutil.GetLocalHost() + ":19200" + serverhost := "http://10.200.83.10:19200" e := &Elasticsearch{ - ServerHost: serverhost, - IndexName: "littletest", + ServerHost: serverhost, + IndexName: "littletest3%Y%m%d", + NumberOfShards: 2, + NumberOfReplicas: 2, } // Verify that we can connect to the ElasticSearch