Improved Unit Test.
This commit is contained in:
parent
c8e4a0ab14
commit
3cf3737539
4
Makefile
4
Makefile
|
@ -51,6 +51,7 @@ docker-run:
|
|||
-e ADVERTISED_PORT=9092 \
|
||||
-p "2181:2181" -p "9092:9092" \
|
||||
-d spotify/kafka
|
||||
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:2
|
||||
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
|
||||
docker run --name memcached -p "11211:11211" -d memcached
|
||||
docker run --name postgres -p "5432:5432" -d postgres
|
||||
|
@ -65,10 +66,11 @@ docker-run:
|
|||
docker-run-circle:
|
||||
docker run --name aerospike -p "3000:3000" -d aerospike/aerospike-server:3.9.0
|
||||
docker run --name kafka \
|
||||
-e ADVERTISED_HOST=localhost \
|
||||
-e ADVERTISED_HOST=localhost \
|
||||
-e ADVERTISED_PORT=9092 \
|
||||
-p "2181:2181" -p "9092:9092" \
|
||||
-d spotify/kafka
|
||||
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:2
|
||||
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
|
||||
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
|
||||
docker run --name riemann -p "5555:5555" -d blalor/riemann
|
||||
|
|
|
@ -74,7 +74,6 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
|||
for _, metric := range metrics {
|
||||
|
||||
m := make(map[string]interface{})
|
||||
//m["created"] = metric.UnixNano() / 1000000
|
||||
m["created"] = time.Now()
|
||||
|
||||
if host, ok := metric.Tags()["host"]; ok {
|
||||
|
@ -171,6 +170,9 @@ func (a *Elasticsearch) Close() error {
|
|||
|
||||
func init() {
|
||||
outputs.Add("elasticsearch", func() telegraf.Output {
|
||||
return &Elasticsearch{}
|
||||
return &Elasticsearch{
|
||||
EnableSniffer: false,
|
||||
Separator: "_",
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,176 +1,28 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"gopkg.in/olivere/elastic.v2"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type Elasticsearch struct {
|
||||
ServerHost string
|
||||
IndexName string
|
||||
EnableSniffer bool
|
||||
Separator string
|
||||
Client *elastic.Client
|
||||
Version string
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
server_host = "http://10.10.10.10:19200" # required.
|
||||
index_name = "test" # required.
|
||||
enable_sniffer = false
|
||||
delimiter = "_"
|
||||
`
|
||||
|
||||
type TimeSeries struct {
|
||||
Series []*Metric `json:"series"`
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Metric string `json:"metric"`
|
||||
Points [1]Point `json:"metrics"`
|
||||
}
|
||||
|
||||
type Point [2]float64
|
||||
|
||||
func (a *Elasticsearch) Connect() error {
|
||||
if a.ServerHost == "" || a.IndexName == "" {
|
||||
return fmt.Errorf("server_host and index_name are required fields for elasticsearch output")
|
||||
func TestConnectAndWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
client, err := elastic.NewClient(
|
||||
elastic.SetHealthcheck(true),
|
||||
elastic.SetSniff(a.EnableSniffer),
|
||||
elastic.SetHealthcheckInterval(30*time.Second),
|
||||
elastic.SetURL(a.ServerHost),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("FAILED to connect to elasticsearch host %s : %s\n", a.ServerHost, err)
|
||||
serverhost := "http://" + testutil.GetLocalHost() + ":9200"
|
||||
e := &Elasticsearch{
|
||||
ServerHost: serverhost,
|
||||
IndexName: "littletest",
|
||||
}
|
||||
|
||||
a.Client = client
|
||||
// Verify that we can connect to the ElasticSearch
|
||||
err := e.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
version, errVersion := a.Client.ElasticsearchVersion(a.ServerHost)
|
||||
|
||||
if errVersion != nil {
|
||||
return fmt.Errorf("FAILED to get elasticsearch version : %s\n", errVersion)
|
||||
}
|
||||
|
||||
a.Version = version
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
|
||||
if len(metrics) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
|
||||
m := make(map[string]interface{})
|
||||
//m["created"] = metric.UnixNano() / 1000000
|
||||
m["created"] = time.Now()
|
||||
|
||||
if host, ok := metric.Tags()["host"]; ok {
|
||||
m["host"] = host
|
||||
} else {
|
||||
host, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m["host"] = host
|
||||
}
|
||||
|
||||
// Earlier versions of EL doesnt accept '.' in field name
|
||||
if len(a.Separator) > 1 {
|
||||
return fmt.Errorf("FAILED Separator exceed one character : %s\n", a.Separator)
|
||||
}
|
||||
|
||||
for key, value := range metric.Tags() {
|
||||
if key != "host" {
|
||||
if strings.HasPrefix(a.Version, "2.") {
|
||||
m[strings.Replace(key, ".", a.Separator, -1)] = value
|
||||
} else {
|
||||
m[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key, value := range metric.Fields() {
|
||||
if strings.HasPrefix(a.Version, "2.") {
|
||||
m[strings.Replace(key, ".", a.Separator, -1)] = value
|
||||
} else {
|
||||
m[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
_, errMessage := a.Client.Index().
|
||||
Index(a.IndexName).
|
||||
Type(metric.Name()).
|
||||
BodyJson(m).
|
||||
Do()
|
||||
|
||||
if errMessage != nil {
|
||||
return fmt.Errorf("FAILED to send elasticsearch message to index %s : %s\n", a.IndexName, errMessage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Description() string {
|
||||
return "Configuration for Elasticsearch to send metrics to."
|
||||
}
|
||||
|
||||
func buildMetrics(m telegraf.Metric) (map[string]Point, error) {
|
||||
ms := make(map[string]Point)
|
||||
for k, v := range m.Fields() {
|
||||
var p Point
|
||||
if err := p.setValue(v); err != nil {
|
||||
return ms, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
|
||||
}
|
||||
p[0] = float64(m.Time().Unix())
|
||||
ms[k] = p
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func (p *Point) setValue(v interface{}) error {
|
||||
switch d := v.(type) {
|
||||
case int:
|
||||
p[1] = float64(int(d))
|
||||
case int32:
|
||||
p[1] = float64(int32(d))
|
||||
case int64:
|
||||
p[1] = float64(int64(d))
|
||||
case float32:
|
||||
p[1] = float64(d)
|
||||
case float64:
|
||||
p[1] = float64(d)
|
||||
default:
|
||||
return fmt.Errorf("undeterminable type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Elasticsearch) Close() error {
|
||||
a.Client = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("elasticsearch", func() telegraf.Output {
|
||||
return &Elasticsearch{}
|
||||
})
|
||||
// Verify that we can successfully write data to the ElasticSearch
|
||||
err = e.Write(testutil.MockMetrics())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue