Plugin adapted to work with Elasticsearch 5.

This commit is contained in:
amandahla 2016-10-27 17:01:24 -02:00
parent e0d1beb669
commit c6ac0bc6af
4 changed files with 35 additions and 19 deletions

2
Godeps
View File

@ -62,5 +62,5 @@ golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
gopkg.in/olivere/elastic.v2 4ca0a93672aab0aacb92e0ed205bdf1d3a81f9ed
gopkg.in/olivere/elastic.v3 1ad75b5396674e9c783400e89106bc0c8da9d786
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4

View File

@ -23,10 +23,12 @@ In this case, dots will be replaced with "_".
index_name = "test"
## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically
enable_sniffer = false
## Enable health check
health_check = false
## If index not exists, a template will be created and then the new index.
## You can set number of shards and replicas for this template.
## If the index's name uses formats ("myindex%Y%m%d"), the template's name will be the characters before
## the first '%' ("myindex").
number_of_shards = 5
number_of_replicas = 1
number_of_shards = 1
number_of_replicas = 0

View File

@ -4,7 +4,7 @@ import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v2"
"gopkg.in/olivere/elastic.v3"
"os"
"strconv"
"strings"
@ -15,6 +15,7 @@ type Elasticsearch struct {
ServerHost string
IndexName string
EnableSniffer bool
HealthCheck bool
NumberOfShards int
NumberOfReplicas int
Client *elastic.Client
@ -31,8 +32,9 @@ var sampleConfig = `
# %d - day of month (e.g., 01)
# %H - hour (00..23)
enable_sniffer = false
number_of_shards = 5
number_of_replicas = 1
health_check = false
number_of_shards = 1
number_of_replicas = 0
`
func (a *Elasticsearch) Connect() error {
@ -46,7 +48,7 @@ func (a *Elasticsearch) Connect() error {
}
client, err := elastic.NewClient(
elastic.SetHealthcheck(true),
elastic.SetHealthcheck(a.HealthCheck),
elastic.SetSniff(a.EnableSniffer),
elastic.SetHealthcheckInterval(30*time.Second),
elastic.SetURL(a.ServerHost),
@ -88,6 +90,14 @@ func (a *Elasticsearch) Connect() error {
if !exists {
// First create a template for the new index
// The [string] type is removed in 5.0
typeHostandUnknow := "text"
if strings.HasPrefix(a.Version, "2.") {
typeHostandUnknow = "string"
}
tmpl := fmt.Sprintf(`{
"template":"%s*",
"settings" : {
@ -96,22 +106,25 @@ func (a *Elasticsearch) Connect() error {
},
"mappings" : {
"_default_" : {
"_all": {
"enabled": false
},
"properties" : {
"created" : { "type" : "date" },
"host":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}
"host":{"type":"%s"}
},
"dynamic_templates": [
{ "unknow": {
{ "unknowfields": {
"match": "*",
"match_mapping_type": "unknow",
"mapping": {
"type":"string"
"type":"%s"
}
}}
]
}
}
}`, templateName, strconv.Itoa(a.NumberOfShards), strconv.Itoa(a.NumberOfShards))
}`, templateName, strconv.Itoa(a.NumberOfShards), strconv.Itoa(a.NumberOfReplicas), typeHostandUnknow, typeHostandUnknow)
_, errCreateTemplate := a.Client.IndexPutTemplate(templateName).BodyString(tmpl).Do()
@ -250,6 +263,9 @@ func init() {
outputs.Add("elasticsearch", func() telegraf.Output {
return &Elasticsearch{
EnableSniffer: false,
HealthCheck: false,
NumberOfShards: 1,
NumberOfReplicas: 0,
}
})
}

View File

@ -2,13 +2,11 @@ package elasticsearch
import (
"encoding/json"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestConnectAndWrite(t *testing.T) {
@ -20,9 +18,9 @@ func TestConnectAndWrite(t *testing.T) {
e := &Elasticsearch{
ServerHost: serverhost,
IndexName: "littletest%Y%m%d",
NumberOfShards: 2,
NumberOfReplicas: 2,
IndexName: "littletest",
NumberOfShards: 1,
NumberOfReplicas: 0,
}
// Verify that we can connect to the ElasticSearch