Add CrateDB output plugin (#3210)
This commit is contained in:
parent
5234e4ff7b
commit
e1005ebfab
2
Godeps
2
Godeps
|
@ -32,7 +32,7 @@ github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f
|
||||||
github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea
|
github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea
|
||||||
github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
|
github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f
|
||||||
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
|
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
|
||||||
github.com/jackc/pgx b84338d7d62598f75859b2b146d830b22f1b9ec8
|
github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917
|
||||||
github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d
|
github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d
|
||||||
github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413
|
github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413
|
||||||
github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893
|
github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893
|
||||||
|
|
16
Makefile
16
Makefile
|
@ -86,6 +86,12 @@ docker-run:
|
||||||
-e SLAPD_CONFIG_ROOTPW="secret" \
|
-e SLAPD_CONFIG_ROOTPW="secret" \
|
||||||
-p "389:389" -p "636:636" \
|
-p "389:389" -p "636:636" \
|
||||||
-d cobaugh/openldap-alpine
|
-d cobaugh/openldap-alpine
|
||||||
|
docker run --name cratedb \
|
||||||
|
-p "6543:5432" \
|
||||||
|
-d crate crate \
|
||||||
|
-Cnetwork.host=0.0.0.0 \
|
||||||
|
-Ctransport.host=localhost \
|
||||||
|
-Clicense.enterprise=false
|
||||||
|
|
||||||
# Run docker containers necessary for integration tests; skipping services provided
|
# Run docker containers necessary for integration tests; skipping services provided
|
||||||
# by CircleCI
|
# by CircleCI
|
||||||
|
@ -110,12 +116,18 @@ docker-run-circle:
|
||||||
-e SLAPD_CONFIG_ROOTPW="secret" \
|
-e SLAPD_CONFIG_ROOTPW="secret" \
|
||||||
-p "389:389" -p "636:636" \
|
-p "389:389" -p "636:636" \
|
||||||
-d cobaugh/openldap-alpine
|
-d cobaugh/openldap-alpine
|
||||||
|
docker run --name cratedb \
|
||||||
|
-p "6543:5432" \
|
||||||
|
-d crate crate \
|
||||||
|
-Cnetwork.host=0.0.0.0 \
|
||||||
|
-Ctransport.host=localhost \
|
||||||
|
-Clicense.enterprise=false
|
||||||
|
|
||||||
docker-kill:
|
docker-kill:
|
||||||
-docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
|
-docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
|
||||||
openldap postgres rabbitmq redis riemann zookeeper
|
openldap postgres rabbitmq redis riemann zookeeper cratedb
|
||||||
-docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
|
-docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
|
||||||
openldap postgres rabbitmq redis riemann zookeeper
|
openldap postgres rabbitmq redis riemann zookeeper cratedb
|
||||||
|
|
||||||
.PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \
|
.PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \
|
||||||
package clean docker-run docker-run-circle docker-kill docker-image
|
package clean docker-run docker-run-circle docker-kill docker-image
|
||||||
|
|
|
@ -267,6 +267,7 @@ formats may be used with input plugins supporting the `data_format` option:
|
||||||
* [amqp](./plugins/outputs/amqp) (rabbitmq)
|
* [amqp](./plugins/outputs/amqp) (rabbitmq)
|
||||||
* [aws kinesis](./plugins/outputs/kinesis)
|
* [aws kinesis](./plugins/outputs/kinesis)
|
||||||
* [aws cloudwatch](./plugins/outputs/cloudwatch)
|
* [aws cloudwatch](./plugins/outputs/cloudwatch)
|
||||||
|
* [cratedb](./plugins/outputs/cratedb)
|
||||||
* [datadog](./plugins/outputs/datadog)
|
* [datadog](./plugins/outputs/datadog)
|
||||||
* [discard](./plugins/outputs/discard)
|
* [discard](./plugins/outputs/discard)
|
||||||
* [elasticsearch](./plugins/outputs/elasticsearch)
|
* [elasticsearch](./plugins/outputs/elasticsearch)
|
||||||
|
|
|
@ -199,6 +199,19 @@
|
||||||
# namespace = "InfluxData/Telegraf"
|
# namespace = "InfluxData/Telegraf"
|
||||||
|
|
||||||
|
|
||||||
|
# # Configuration for CrateDB to send metrics to.
|
||||||
|
# [[outputs.cratedb]]
|
||||||
|
# # A github.com/jackc/pgx connection string.
|
||||||
|
# # See https://godoc.org/github.com/jackc/pgx#ParseDSN
|
||||||
|
# url = "postgres://user:password@localhost/schema?sslmode=disable"
|
||||||
|
# # Timeout for all CrateDB queries.
|
||||||
|
# timeout = "5s"
|
||||||
|
# # Name of the table to store metrics in.
|
||||||
|
# table = "metrics"
|
||||||
|
# # If true, and the metrics table does not exist, create it automatically.
|
||||||
|
# table_create = true
|
||||||
|
|
||||||
|
|
||||||
# # Configuration for DataDog API to send metrics to.
|
# # Configuration for DataDog API to send metrics to.
|
||||||
# [[outputs.datadog]]
|
# [[outputs.datadog]]
|
||||||
# ## Datadog API key
|
# ## Datadog API key
|
||||||
|
|
|
@ -51,12 +51,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
"checkpoints_req",
|
"checkpoints_req",
|
||||||
"checkpoints_timed",
|
"checkpoints_timed",
|
||||||
"maxwritten_clean",
|
"maxwritten_clean",
|
||||||
}
|
"datid",
|
||||||
|
|
||||||
int32Metrics := []string{
|
|
||||||
"numbackends",
|
"numbackends",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32Metrics := []string{}
|
||||||
|
|
||||||
floatMetrics := []string{
|
floatMetrics := []string{
|
||||||
"blk_read_time",
|
"blk_read_time",
|
||||||
"blk_write_time",
|
"blk_write_time",
|
||||||
|
@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
|
|
||||||
stringMetrics := []string{
|
stringMetrics := []string{
|
||||||
"datname",
|
"datname",
|
||||||
"datid",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metricsCounted := 0
|
metricsCounted := 0
|
||||||
|
|
|
@ -53,11 +53,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
"temp_files",
|
"temp_files",
|
||||||
"temp_bytes",
|
"temp_bytes",
|
||||||
"deadlocks",
|
"deadlocks",
|
||||||
|
"numbackends",
|
||||||
|
"datid",
|
||||||
}
|
}
|
||||||
|
|
||||||
int32Metrics := []string{
|
int32Metrics := []string{}
|
||||||
"numbackends",
|
|
||||||
}
|
|
||||||
|
|
||||||
floatMetrics := []string{
|
floatMetrics := []string{
|
||||||
"blk_read_time",
|
"blk_read_time",
|
||||||
|
@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
|
||||||
|
|
||||||
stringMetrics := []string{
|
stringMetrics := []string{
|
||||||
"datname",
|
"datname",
|
||||||
"datid",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metricsCounted := 0
|
metricsCounted := 0
|
||||||
|
@ -175,11 +174,11 @@ func TestPostgresqlFieldOutput(t *testing.T) {
|
||||||
"temp_files",
|
"temp_files",
|
||||||
"temp_bytes",
|
"temp_bytes",
|
||||||
"deadlocks",
|
"deadlocks",
|
||||||
|
"numbackends",
|
||||||
|
"datid",
|
||||||
}
|
}
|
||||||
|
|
||||||
int32Metrics := []string{
|
int32Metrics := []string{}
|
||||||
"numbackends",
|
|
||||||
}
|
|
||||||
|
|
||||||
floatMetrics := []string{
|
floatMetrics := []string{
|
||||||
"blk_read_time",
|
"blk_read_time",
|
||||||
|
@ -188,7 +187,6 @@ func TestPostgresqlFieldOutput(t *testing.T) {
|
||||||
|
|
||||||
stringMetrics := []string{
|
stringMetrics := []string{
|
||||||
"datname",
|
"datname",
|
||||||
"datid",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, field := range intMetrics {
|
for _, field := range intMetrics {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/outputs/cratedb"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
|
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
|
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
# CrateDB Output Plugin for Telegraf
|
||||||
|
|
||||||
|
This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol](https://crate.io/docs/crate/reference/protocols/postgres.html).
|
||||||
|
|
||||||
|
## Table Schema
|
||||||
|
|
||||||
|
The plugin requires a table with the following schema.
|
||||||
|
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE my_metrics (
|
||||||
|
"hash_id" LONG INDEX OFF,
|
||||||
|
"timestamp" TIMESTAMP,
|
||||||
|
"name" STRING,
|
||||||
|
"tags" OBJECT(DYNAMIC),
|
||||||
|
"fields" OBJECT(DYNAMIC),
|
||||||
|
PRIMARY KEY ("timestamp", "hash_id","day")
|
||||||
|
) PARTITIONED BY("day");
|
||||||
|
```
|
||||||
|
|
||||||
|
The plugin can create this table for you automatically via the `table_create`
|
||||||
|
config option, see below.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Configuration for CrateDB to send metrics to.
|
||||||
|
[[outputs.cratedb]]
|
||||||
|
# A github.com/jackc/pgx connection string.
|
||||||
|
# See https://godoc.org/github.com/jackc/pgx#ParseDSN
|
||||||
|
url = "postgres://user:password@localhost/schema?sslmode=disable"
|
||||||
|
# Timeout for all CrateDB queries.
|
||||||
|
timeout = "5s"
|
||||||
|
# Name of the table to store metrics in.
|
||||||
|
table = "metrics"
|
||||||
|
# If true, and the metrics table does not exist, create it automatically.
|
||||||
|
table_create = true
|
||||||
|
```
|
|
@ -0,0 +1,230 @@
|
||||||
|
package cratedb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha512"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
_ "github.com/jackc/pgx/stdlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CrateDB struct {
|
||||||
|
URL string
|
||||||
|
Timeout internal.Duration
|
||||||
|
Table string
|
||||||
|
TableCreate bool `toml:"table_create"`
|
||||||
|
DB *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
# A github.com/jackc/pgx connection string.
|
||||||
|
# See https://godoc.org/github.com/jackc/pgx#ParseDSN
|
||||||
|
url = "postgres://user:password@localhost/schema?sslmode=disable"
|
||||||
|
# Timeout for all CrateDB queries.
|
||||||
|
timeout = "5s"
|
||||||
|
# Name of the table to store metrics in.
|
||||||
|
table = "metrics"
|
||||||
|
# If true, and the metrics table does not exist, create it automatically.
|
||||||
|
table_create = true
|
||||||
|
`
|
||||||
|
|
||||||
|
func (c *CrateDB) Connect() error {
|
||||||
|
db, err := sql.Open("pgx", c.URL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if c.TableCreate {
|
||||||
|
sql := `
|
||||||
|
CREATE TABLE IF NOT EXISTS ` + c.Table + ` (
|
||||||
|
"hash_id" LONG INDEX OFF,
|
||||||
|
"timestamp" TIMESTAMP,
|
||||||
|
"name" STRING,
|
||||||
|
"tags" OBJECT(DYNAMIC),
|
||||||
|
"fields" OBJECT(DYNAMIC),
|
||||||
|
"day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"),
|
||||||
|
PRIMARY KEY ("timestamp", "hash_id","day")
|
||||||
|
) PARTITIONED BY("day");
|
||||||
|
`
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := db.ExecContext(ctx, sql); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.DB = db
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CrateDB) Write(metrics []telegraf.Metric) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration)
|
||||||
|
defer cancel()
|
||||||
|
if sql, err := insertSQL(c.Table, metrics); err != nil {
|
||||||
|
return err
|
||||||
|
} else if _, err := c.DB.ExecContext(ctx, sql); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertSQL(table string, metrics []telegraf.Metric) (string, error) {
|
||||||
|
rows := make([]string, len(metrics))
|
||||||
|
for i, m := range metrics {
|
||||||
|
|
||||||
|
cols := []interface{}{
|
||||||
|
hashID(m),
|
||||||
|
m.Time().UTC(),
|
||||||
|
m.Name(),
|
||||||
|
m.Tags(),
|
||||||
|
m.Fields(),
|
||||||
|
}
|
||||||
|
|
||||||
|
escapedCols := make([]string, len(cols))
|
||||||
|
for i, col := range cols {
|
||||||
|
escaped, err := escapeValue(col)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
escapedCols[i] = escaped
|
||||||
|
}
|
||||||
|
rows[i] = `(` + strings.Join(escapedCols, ", ") + `)`
|
||||||
|
}
|
||||||
|
sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields")
|
||||||
|
VALUES
|
||||||
|
` + strings.Join(rows, " ,\n") + `;`
|
||||||
|
return sql, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// escapeValue returns a string version of val that is suitable for being used
|
||||||
|
// inside of a VALUES expression or similar. Unsupported types return an error.
|
||||||
|
//
|
||||||
|
// Warning: This is not ideal from a security perspective, but unfortunately
|
||||||
|
// CrateDB does not support enough of the PostgreSQL wire protocol to allow
|
||||||
|
// using pgx with $1, $2 placeholders [1]. Security conscious users of this
|
||||||
|
// plugin should probably refrain from using it in combination with untrusted
|
||||||
|
// inputs.
|
||||||
|
//
|
||||||
|
// [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371
|
||||||
|
func escapeValue(val interface{}) (string, error) {
|
||||||
|
switch t := val.(type) {
|
||||||
|
case string:
|
||||||
|
return escapeString(t, `'`), nil
|
||||||
|
// We don't handle uint, uint32 and uint64 here because CrateDB doesn't
|
||||||
|
// seem to support unsigned types. But it seems like input plugins don't
|
||||||
|
// produce those types, so it's hopefully ok.
|
||||||
|
case int, int32, int64, float32, float64:
|
||||||
|
return fmt.Sprint(t), nil
|
||||||
|
case time.Time:
|
||||||
|
// see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp
|
||||||
|
return escapeValue(t.Format("2006-01-02T15:04:05.999-0700"))
|
||||||
|
case map[string]string:
|
||||||
|
return escapeObject(convertMap(t))
|
||||||
|
case map[string]interface{}:
|
||||||
|
return escapeObject(t)
|
||||||
|
default:
|
||||||
|
// This might be panic worthy under normal circumstances, but it's probably
|
||||||
|
// better to not shut down the entire telegraf process because of one
|
||||||
|
// misbehaving plugin.
|
||||||
|
return "", fmt.Errorf("unexpected type: %T: %#v", t, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertMap converts m from map[string]string to map[string]interface{} by
|
||||||
|
// copying it. Generics, oh generics where art thou?
|
||||||
|
func convertMap(m map[string]string) map[string]interface{} {
|
||||||
|
c := make(map[string]interface{}, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
c[k] = v
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func escapeObject(m map[string]interface{}) (string, error) {
|
||||||
|
// There is a decent chance that the implementation below doesn't catch all
|
||||||
|
// edge cases, but it's hard to tell since the format seems to be a bit
|
||||||
|
// underspecified.
|
||||||
|
// See https://crate.io/docs/crate/reference/sql/data_types.html#object
|
||||||
|
|
||||||
|
// We find all keys and sort them first because iterating a map in go is
|
||||||
|
// randomized and we need consistent output for our unit tests.
|
||||||
|
keys := make([]string, 0, len(m))
|
||||||
|
for k, _ := range m {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
// Now we build our key = val pairs
|
||||||
|
pairs := make([]string, 0, len(m))
|
||||||
|
for _, k := range keys {
|
||||||
|
// escape the value of our key k (potentially recursive)
|
||||||
|
val, err := escapeValue(m[k])
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
pairs = append(pairs, escapeString(k, `"`)+" = "+val)
|
||||||
|
}
|
||||||
|
return `{` + strings.Join(pairs, ", ") + `}`, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// escapeString wraps s in the given quote string and replaces all occurences
|
||||||
|
// of it inside of s with a double quote.
|
||||||
|
func escapeString(s string, quote string) string {
|
||||||
|
return quote + strings.Replace(s, quote, quote+quote, -1) + quote
|
||||||
|
}
|
||||||
|
|
||||||
|
// hashID returns a cryptographic hash int64 hash that includes the metric name
|
||||||
|
// and tags. It's used instead of m.HashID() because it's not considered stable
|
||||||
|
// and because a cryptogtaphic hash makes more sense for the use case of
|
||||||
|
// deduplication.
|
||||||
|
// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201
|
||||||
|
func hashID(m telegraf.Metric) int64 {
|
||||||
|
h := sha512.New()
|
||||||
|
h.Write([]byte(m.Name()))
|
||||||
|
tags := m.Tags()
|
||||||
|
tmp := make([]string, len(tags))
|
||||||
|
i := 0
|
||||||
|
for k, v := range tags {
|
||||||
|
tmp[i] = k + v
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
sort.Strings(tmp)
|
||||||
|
|
||||||
|
for _, s := range tmp {
|
||||||
|
h.Write([]byte(s))
|
||||||
|
}
|
||||||
|
sum := h.Sum(nil)
|
||||||
|
|
||||||
|
// Note: We have to convert from uint64 to int64 below because CrateDB only
|
||||||
|
// supports a signed 64 bit LONG type:
|
||||||
|
//
|
||||||
|
// CREATE TABLE my_long (val LONG);
|
||||||
|
// INSERT INTO my_long(val) VALUES (14305102049502225714);
|
||||||
|
// -> ERROR: SQLParseException: For input string: "14305102049502225714"
|
||||||
|
return int64(binary.LittleEndian.Uint64(sum))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CrateDB) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CrateDB) Description() string {
|
||||||
|
return "Configuration for CrateDB to send metrics to."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CrateDB) Close() error {
|
||||||
|
return c.DB.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("cratedb", func() telegraf.Output {
|
||||||
|
return &CrateDB{
|
||||||
|
Timeout: internal.Duration{Duration: time.Second * 5},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,215 @@
|
||||||
|
package cratedb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConnectAndWrite(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
url := testURL()
|
||||||
|
table := "test"
|
||||||
|
|
||||||
|
// dropSQL drops our table before each test. This simplifies changing the
|
||||||
|
// schema during development :).
|
||||||
|
dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`)
|
||||||
|
db, err := sql.Open("pgx", url)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = db.Exec(dropSQL)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
c := &CrateDB{
|
||||||
|
URL: url,
|
||||||
|
Table: table,
|
||||||
|
Timeout: internal.Duration{Duration: time.Second * 5},
|
||||||
|
TableCreate: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := testutil.MockMetrics()
|
||||||
|
require.NoError(t, c.Connect())
|
||||||
|
require.NoError(t, c.Write(metrics))
|
||||||
|
|
||||||
|
// The code below verifies that the metrics were written. We have to select
|
||||||
|
// the rows using their primary keys in order to take advantage of
|
||||||
|
// read-after-write consistency in CrateDB.
|
||||||
|
for _, m := range metrics {
|
||||||
|
hashIDVal, err := escapeValue(hashID(m))
|
||||||
|
require.NoError(t, err)
|
||||||
|
timestamp, err := escapeValue(m.Time())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var id int64
|
||||||
|
row := db.QueryRow(
|
||||||
|
"SELECT hash_id FROM " + escapeString(table, `"`) + " " +
|
||||||
|
"WHERE hash_id = " + hashIDVal + " " +
|
||||||
|
"AND timestamp = " + timestamp,
|
||||||
|
)
|
||||||
|
require.NoError(t, row.Scan(&id))
|
||||||
|
// We could check the whole row, but this is meant to be more of a smoke
|
||||||
|
// test, so just checking the HashID seems fine.
|
||||||
|
require.Equal(t, id, hashID(m))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, c.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_insertSQL(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
Metrics []telegraf.Metric
|
||||||
|
Want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Metrics: testutil.MockMetrics(),
|
||||||
|
Want: strings.TrimSpace(`
|
||||||
|
INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields")
|
||||||
|
VALUES
|
||||||
|
(-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1});
|
||||||
|
`),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
if got, err := insertSQL("my_table", test.Metrics); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if got != test.Want {
|
||||||
|
t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_escapeValue(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
Val interface{}
|
||||||
|
Want string
|
||||||
|
}{
|
||||||
|
// string
|
||||||
|
{`foo`, `'foo'`},
|
||||||
|
{`foo'bar 'yeah`, `'foo''bar ''yeah'`},
|
||||||
|
// int types
|
||||||
|
{123, `123`}, // int
|
||||||
|
{int64(123), `123`},
|
||||||
|
{int32(123), `123`},
|
||||||
|
// float types
|
||||||
|
{123.456, `123.456`},
|
||||||
|
{float32(123.456), `123.456`}, // floating point SNAFU
|
||||||
|
{float64(123.456), `123.456`},
|
||||||
|
// time.Time
|
||||||
|
{time.Date(2017, 8, 7, 16, 44, 52, 123*1000*1000, time.FixedZone("Dreamland", 5400)), `'2017-08-07T16:44:52.123+0130'`},
|
||||||
|
// map[string]string
|
||||||
|
{map[string]string{}, `{}`},
|
||||||
|
{map[string]string(nil), `{}`},
|
||||||
|
{map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`},
|
||||||
|
{map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`},
|
||||||
|
// map[string]interface{}
|
||||||
|
{map[string]interface{}{}, `{}`},
|
||||||
|
{map[string]interface{}(nil), `{}`},
|
||||||
|
{map[string]interface{}{"foo": "bar"}, `{"foo" = 'bar'}`},
|
||||||
|
{map[string]interface{}{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`},
|
||||||
|
{map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`},
|
||||||
|
{map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`},
|
||||||
|
}
|
||||||
|
|
||||||
|
url := testURL()
|
||||||
|
db, err := sql.Open("pgx", url)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
got, err := escapeValue(test.Val)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("val: %#v: %s", test.Val, err)
|
||||||
|
} else if got != test.Want {
|
||||||
|
t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a smoke test that will blow up if our escaping causing a SQL
|
||||||
|
// syntax error, which may allow for an attack.
|
||||||
|
var reply interface{}
|
||||||
|
row := db.QueryRow("SELECT " + got)
|
||||||
|
require.NoError(t, row.Scan(&reply))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_hashID(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
Name string
|
||||||
|
Tags map[string]string
|
||||||
|
Fields map[string]interface{}
|
||||||
|
Want int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "metric1",
|
||||||
|
Tags: map[string]string{"tag1": "val1", "tag2": "val2"},
|
||||||
|
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
|
||||||
|
Want: 8973971082006474188,
|
||||||
|
},
|
||||||
|
|
||||||
|
// This metric has a different tag order (in a perhaps non-ideal attempt to
|
||||||
|
// trigger different pseudo-random map iteration)) and fields (none)
|
||||||
|
// compared to the previous metric, but should still get the same hash.
|
||||||
|
{
|
||||||
|
Name: "metric1",
|
||||||
|
Tags: map[string]string{"tag2": "val2", "tag1": "val1"},
|
||||||
|
Fields: map[string]interface{}{"field3": "val3"},
|
||||||
|
Want: 8973971082006474188,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Different metric name -> different hash
|
||||||
|
{
|
||||||
|
Name: "metric2",
|
||||||
|
Tags: map[string]string{"tag1": "val1", "tag2": "val2"},
|
||||||
|
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
|
||||||
|
Want: 306487682448261783,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Different tag val -> different hash
|
||||||
|
{
|
||||||
|
Name: "metric1",
|
||||||
|
Tags: map[string]string{"tag1": "new-val", "tag2": "val2"},
|
||||||
|
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
|
||||||
|
Want: 1938713695181062970,
|
||||||
|
},
|
||||||
|
|
||||||
|
// Different tag key -> different hash
|
||||||
|
{
|
||||||
|
Name: "metric1",
|
||||||
|
Tags: map[string]string{"new-key": "val1", "tag2": "val2"},
|
||||||
|
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
|
||||||
|
Want: 7678889081527706328,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
m, err := metric.New(
|
||||||
|
test.Name,
|
||||||
|
test.Tags,
|
||||||
|
test.Fields,
|
||||||
|
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if got := hashID(m); got != test.Want {
|
||||||
|
t.Errorf("test #%d: got=%d want=%d", i, got, test.Want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testURL() string {
|
||||||
|
url := os.Getenv("CRATE_URL")
|
||||||
|
if url == "" {
|
||||||
|
return "postgres://" + testutil.GetLocalHost() + ":6543/test?sslmode=disable"
|
||||||
|
}
|
||||||
|
return url
|
||||||
|
}
|
Loading…
Reference in New Issue