Use of formats to determine index name. New parameters number_of_shards and number_of_replicas.
This commit is contained in:
parent
cecc7cefae
commit
f5dbbd73c2
|
@ -14,6 +14,19 @@ In this case, dots will be replaced with "_".
|
||||||
## The full HTTP endpoint URL for your Elasticsearch. # required
|
## The full HTTP endpoint URL for your Elasticsearch. # required
|
||||||
server_host = "http://10.10.10.10:9200"
|
server_host = "http://10.10.10.10:9200"
|
||||||
## The target index for metrics # required
|
## The target index for metrics # required
|
||||||
|
# formats allowed on index_name after a prefix:
|
||||||
|
# %Y - year (2016)
|
||||||
|
# %y - last two digits of year (00..99)
|
||||||
|
# %m - month (01..12)
|
||||||
|
# %d - day of month (e.g., 01)
|
||||||
|
# %H - hour (00..23)
|
||||||
index_name = "test"
|
index_name = "test"
|
||||||
## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically
|
## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically
|
||||||
enable_sniffer = false
|
enable_sniffer = false
|
||||||
|
## If index not exists, a template will be created and then the new index.
|
||||||
|
## You can set number of shards and replicas for this template.
|
||||||
|
## If the index's name uses formats ("myindex%Y%m%d"), the template's name will be the characters before
|
||||||
|
## the first '%' ("myindex").
|
||||||
|
number_of_shards = 5
|
||||||
|
number_of_replicas = 1
|
||||||
|
|
||||||
|
|
|
@ -6,22 +6,33 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"gopkg.in/olivere/elastic.v2"
|
"gopkg.in/olivere/elastic.v2"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Elasticsearch struct {
|
type Elasticsearch struct {
|
||||||
ServerHost string
|
ServerHost string
|
||||||
IndexName string
|
IndexName string
|
||||||
EnableSniffer bool
|
EnableSniffer bool
|
||||||
Client *elastic.Client
|
NumberOfShards int
|
||||||
Version string
|
NumberOfReplicas int
|
||||||
|
Client *elastic.Client
|
||||||
|
Version string
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
server_host = "http://10.10.10.10:9200" # required.
|
server_host = "http://10.10.10.10:9200" # required.
|
||||||
index_name = "test" # required.
|
index_name = "test" # required.
|
||||||
|
# regex allowed on index_name:
|
||||||
|
# %Y - year (2016)
|
||||||
|
# %y - last two digits of year (00..99)
|
||||||
|
# %m - month (01..12)
|
||||||
|
# %d - day of month (e.g., 01)
|
||||||
|
# %H - hour (00..23)
|
||||||
enable_sniffer = false
|
enable_sniffer = false
|
||||||
|
number_of_shards = 5
|
||||||
|
number_of_replicas = 1
|
||||||
`
|
`
|
||||||
|
|
||||||
func (a *Elasticsearch) Connect() error {
|
func (a *Elasticsearch) Connect() error {
|
||||||
|
@ -29,6 +40,11 @@ func (a *Elasticsearch) Connect() error {
|
||||||
return fmt.Errorf("FAILED server_host and index_name are required fields for Elasticsearch output")
|
return fmt.Errorf("FAILED server_host and index_name are required fields for Elasticsearch output")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if index's name has a prefix
|
||||||
|
if strings.HasPrefix(a.IndexName, "%") {
|
||||||
|
return fmt.Errorf("FAILED Elasticsearch index's name must start with a prefix. \n")
|
||||||
|
}
|
||||||
|
|
||||||
client, err := elastic.NewClient(
|
client, err := elastic.NewClient(
|
||||||
elastic.SetHealthcheck(true),
|
elastic.SetHealthcheck(true),
|
||||||
elastic.SetSniff(a.EnableSniffer),
|
elastic.SetSniff(a.EnableSniffer),
|
||||||
|
@ -50,6 +66,68 @@ func (a *Elasticsearch) Connect() error {
|
||||||
|
|
||||||
a.Version = version
|
a.Version = version
|
||||||
|
|
||||||
|
templateName := a.IndexName
|
||||||
|
|
||||||
|
if strings.Contains(a.IndexName, "%") {
|
||||||
|
// Template's name its Index's name without date patterns
|
||||||
|
templateName = a.IndexName[0:strings.Index(a.IndexName, "%")]
|
||||||
|
|
||||||
|
year := strconv.Itoa(time.Now().Year())
|
||||||
|
a.IndexName = strings.Replace(a.IndexName, "%Y", year, -1)
|
||||||
|
a.IndexName = strings.Replace(a.IndexName, "%y", year[len(year)-2:], -1)
|
||||||
|
a.IndexName = strings.Replace(a.IndexName, "%m", strconv.Itoa(int(time.Now().Month())), -1)
|
||||||
|
a.IndexName = strings.Replace(a.IndexName, "%d", strconv.Itoa(time.Now().Day()), -1)
|
||||||
|
a.IndexName = strings.Replace(a.IndexName, "%H", strconv.Itoa(time.Now().Hour()), -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
exists, errExists := a.Client.IndexExists(a.IndexName).Do()
|
||||||
|
|
||||||
|
if errExists != nil {
|
||||||
|
return fmt.Errorf("FAILED to check if Elasticsearch index %s exists : %s\n", a.IndexName, errExists)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
// First create a template for the new index
|
||||||
|
tmpl := fmt.Sprintf(`{
|
||||||
|
"template":"%s*",
|
||||||
|
"settings" : {
|
||||||
|
"number_of_shards" : %s,
|
||||||
|
"number_of_replicas" : %s
|
||||||
|
},
|
||||||
|
"mappings" : {
|
||||||
|
"_default_" : {
|
||||||
|
"properties" : {
|
||||||
|
"created" : { "type" : "date" },
|
||||||
|
"host":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}
|
||||||
|
},
|
||||||
|
"dynamic_templates": [
|
||||||
|
{ "unknow": {
|
||||||
|
"match": "*",
|
||||||
|
"match_mapping_type": "unknow",
|
||||||
|
"mapping": {
|
||||||
|
"type":"string"
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`, templateName, strconv.Itoa(a.NumberOfShards), strconv.Itoa(a.NumberOfShards))
|
||||||
|
|
||||||
|
_, errCreateTemplate := a.Client.IndexPutTemplate(templateName).BodyString(tmpl).Do()
|
||||||
|
|
||||||
|
if errCreateTemplate != nil {
|
||||||
|
return fmt.Errorf("FAILED to create Elasticsearch index template %s : %s\n", templateName, errCreateTemplate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now create the new index
|
||||||
|
_, errCreateIndex := a.Client.CreateIndex(a.IndexName).Do()
|
||||||
|
|
||||||
|
if errCreateIndex != nil {
|
||||||
|
return fmt.Errorf("FAILED to create Elasticsearch index %s : %s\n", a.IndexName, errCreateIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,11 +16,14 @@ func TestConnectAndWrite(t *testing.T) {
|
||||||
t.Skip("Skipping integration test in short mode")
|
t.Skip("Skipping integration test in short mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
serverhost := "http://" + testutil.GetLocalHost() + ":9200"
|
//serverhost := "http://" + testutil.GetLocalHost() + ":19200"
|
||||||
|
serverhost := "http://10.200.83.10:19200"
|
||||||
|
|
||||||
e := &Elasticsearch{
|
e := &Elasticsearch{
|
||||||
ServerHost: serverhost,
|
ServerHost: serverhost,
|
||||||
IndexName: "littletest",
|
IndexName: "littletest3%Y%m%d",
|
||||||
|
NumberOfShards: 2,
|
||||||
|
NumberOfReplicas: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that we can connect to the ElasticSearch
|
// Verify that we can connect to the ElasticSearch
|
||||||
|
|
Loading…
Reference in New Issue