Requested changes applied.

This commit is contained in:
Amanda Hager Lopes de Andrade 2016-10-11 12:30:31 -03:00
parent 3cf3737539
commit 862f728635
3 changed files with 18 additions and 64 deletions

View File

@ -2,7 +2,9 @@
This plugin writes to [Elasticsearch](https://www.elastic.co) via Elastic (http://olivere.github.io/elastic/).
Tested with: 5.0.0-beta1 and 2.4.0
Attention:
Elasticsearch 2.x does not support this dots-to-object transformation and so dots in field names are not allowed in versions 2.X.
In this case, dots will be replaced with "_".
### Configuration:
@ -10,11 +12,8 @@ Tested with: 5.0.0-beta1 and 2.4.0
# Configuration for Elasticsearch to send metrics to
[[outputs.elasticsearch]]
## The full HTTP endpoint URL for your Elasticsearch. # required
server_host = "http://10.10.10.10:19200"
server_host = "http://10.10.10.10:9200"
## The target index for metrics # required
index_name = "twitter"
index_name = "test"
## ElasticSearch uses a sniffing process to find all nodes of your cluster by default, automatically
enable_sniffer = false
## Earlier versions of EL doesn't accept "." in field name. Set delimiter with the character that you want instead.
delimiter = "_"

View File

@ -14,32 +14,19 @@ type Elasticsearch struct {
ServerHost string
IndexName string
EnableSniffer bool
Separator string
Client *elastic.Client
Version string
}
var sampleConfig = `
server_host = "http://10.10.10.10:19200" # required.
server_host = "http://10.10.10.10:9200" # required.
index_name = "test" # required.
enable_sniffer = false
delimiter = "_"
`
type TimeSeries struct {
Series []*Metric `json:"series"`
}
type Metric struct {
Metric string `json:"metric"`
Points [1]Point `json:"metrics"`
}
type Point [2]float64
func (a *Elasticsearch) Connect() error {
if a.ServerHost == "" || a.IndexName == "" {
return fmt.Errorf("server_host and index_name are required fields for elasticsearch output")
return fmt.Errorf("FAILED server_host and index_name are required fields for Elasticsearch output")
}
client, err := elastic.NewClient(
@ -50,7 +37,7 @@ func (a *Elasticsearch) Connect() error {
)
if err != nil {
return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err)
return fmt.Errorf("FAILED to connect to Elasticsearch host %s : %s\n", a.ServerHost, err)
}
a.Client = client
@ -58,7 +45,7 @@ func (a *Elasticsearch) Connect() error {
version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost)
if errVersion != nil {
return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion)
return fmt.Errorf("FAILED to get Elasticsearch version : %s\n", errVersion)
}
a.Version = version
@ -74,27 +61,26 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
m := make(map[string]interface{})
m["created"] = time.Now()
m["created"] = metric.Time()
if host, ok := metric.Tags()["host"]; ok {
m["host"] = host
} else {
host, err := os.Hostname()
if err != nil {
panic(err)
return fmt.Errorf("FAILED Obtaining host to Elasticsearch output : %s\n", err)
}
m["host"] = host
}
// Earlier versions of EL doesnt accept '.' in field name
if len(a.Separator) > 1 {
return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator)
}
// Elasticsearch 2.x does not support this dots-to-object transformation
// and so dots in field names are not allowed in versions 2.X.
// In this case, dots will be replaced with "_"
for key, value := range metric.Tags() {
if key != "host" {
if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
m[strings.Replace(key, ".", "_", -1)] = value
} else {
m[key] = value
}
@ -103,7 +89,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
for key, value := range metric.Fields() {
if strings.HasPrefix(a.Version, "2.") {
m[strings.Replace(key, ".", a.Separator, -1)] = value
m[strings.Replace(key, ".", "_", -1)] = value
} else {
m[key] = value
}
@ -116,7 +102,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
Do()
if errMessage != nil {
return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage)
return fmt.Errorf("FAILED to send Elasticsearch message to index %s : %s\n", a.IndexName, errMessage)
}
}
@ -132,37 +118,6 @@ func (a *Elasticsearch) Description() string {
return "Configuration for Elasticsearch to send metrics to."
}
func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
ms := make(map[string]Point)
for k, v := range m.Fields() {
var p Point
if err := p.setValue(v); err != nil {
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
p[0] = float64(m.Time().Unix())
ms[k] = p
}
return ms, nil
}
func (p *Point) setValue(v interface{}) error {
switch d := v.(type) {
case int:
p[1] = float64(int(d))
case int32:
p[1] = float64(int32(d))
case int64:
p[1] = float64(int64(d))
case float32:
p[1] = float64(d)
case float64:
p[1] = float64(d)
default:
return fmt.Errorf("undeterminable type")
}
return nil
}
func (a *Elasticsearch) Close() error {
a.Client = nil
return nil
@ -172,7 +127,6 @@ func init() {
outputs.Add("elasticsearch", func() telegraf.Output {
return &Elasticsearch{
EnableSniffer: false,
Separator: "_",
}
})
}

View File

@ -13,6 +13,7 @@ func TestConnectAndWrite(t *testing.T) {
}
serverhost := "http://" + testutil.GetLocalHost() + ":9200"
e := &Elasticsearch{
ServerHost: serverhost,
IndexName: "littletest",