diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 1f9c79e45..7aef15f41 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -110,6 +110,51 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { return nil } +func (a *Elasticsearch) WriteOneMessage(metric telegraf.Metric) (string, error) { + + m := make(map[string]interface{}) + m["created"] = metric.Time() + + if host, ok := metric.Tags()["host"]; ok { + m["host"] = host + } + + // Elasticsearch 2.x does not support this dots-to-object transformation + // and so dots in field names are not allowed in versions 2.X. + // In this case, dots will be replaced with "_" + + for key, value := range metric.Tags() { + if key != "host" { + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", "_", -1)] = value + } else { + m[key] = value + } + } + } + + for key, value := range metric.Fields() { + if strings.HasPrefix(a.Version, "2.") { + m[strings.Replace(key, ".", "_", -1)] = value + } else { + m[key] = value + } + } + + put1, errMessage := a.Client.Index(). + Index(a.IndexName). + Type(metric.Name()). + BodyJson(m). + Do() + + if errMessage != nil { + return "",fmt.Errorf("FAILED to send Elasticsearch message to index %s : %s\n", a.IndexName, errMessage) + } + + return put1.Id,nil + +} + func (a *Elasticsearch) SampleConfig() string { return sampleConfig } diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index af5bf215c..be5f69c8a 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -2,9 +2,14 @@ package elasticsearch import ( "testing" + "time" + "encoding/json" + + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + ) func TestConnectAndWrite(t *testing.T) { @@ -26,4 +31,35 @@ func TestConnectAndWrite(t *testing.T) { // Verify that we can successfully write data to the ElasticSearch err = e.Write(testutil.MockMetrics()) require.NoError(t, err) + + // Verify if metric sent has same data on Elasticsearch + metrictest, _ := telegraf.NewMetric( + "my_measurement", + map[string]string{"host": "192.168.0.1"}, + map[string]interface{}{"value": float64(3.14)}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + messageid, err := e.WriteOneMessage(metrictest) + require.NoError(t, err) + + get1, errGet := e.Client.Get(). + Index(e.IndexName). + Type(metrictest.Name()). + Id(messageid). + Do() + require.NoError(t, errGet) + + require.Equal(t,true,get1.Found,"Message not found on Elasticsearch.") + + require.NotEqual(t,nil,get1.Source,"Source not found on Elasticsearch.") + + var dat map[string]interface{} + err = json.Unmarshal(*get1.Source, &dat) + require.NoError(t, err) + + require.Equal(t,"192.168.0.1",dat["host"],"Values of Host are not the same.") + require.Equal(t,"2010-11-10T23:00:00Z",dat["created"],"Values of Created are not the same.") + require.Equal(t,3.14,dat["value"],"Values of Value are not the same.") + }