Add CrateDB output plugin (#3210)
This commit is contained in:
		
							parent
							
								
									07297e80a8
								
							
						
					
					
						commit
						62ec3e50d9
					
				
							
								
								
									
										2
									
								
								Godeps
								
								
								
								
							
							
						
						
									
										2
									
								
								Godeps
								
								
								
								
							|  | @ -32,7 +32,7 @@ github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f | ||||||
| github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea | github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea | ||||||
| github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f | github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f | ||||||
| github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec | github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec | ||||||
| github.com/jackc/pgx b84338d7d62598f75859b2b146d830b22f1b9ec8 | github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 | ||||||
| github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d | github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d | ||||||
| github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 | github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 | ||||||
| github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 | github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 | ||||||
|  |  | ||||||
							
								
								
									
										16
									
								
								Makefile
								
								
								
								
							
							
						
						
									
										16
									
								
								Makefile
								
								
								
								
							|  | @ -86,6 +86,12 @@ docker-run: | ||||||
| 		-e SLAPD_CONFIG_ROOTPW="secret" \
 | 		-e SLAPD_CONFIG_ROOTPW="secret" \
 | ||||||
| 		-p "389:389" -p "636:636" \
 | 		-p "389:389" -p "636:636" \
 | ||||||
| 		-d cobaugh/openldap-alpine | 		-d cobaugh/openldap-alpine | ||||||
|  | 	docker run --name cratedb \
 | ||||||
|  | 		-p "6543:5432" \
 | ||||||
|  | 		-d crate crate \
 | ||||||
|  | 		-Cnetwork.host=0.0.0.0 \
 | ||||||
|  | 		-Ctransport.host=localhost \
 | ||||||
|  | 		-Clicense.enterprise=false | ||||||
| 
 | 
 | ||||||
| # Run docker containers necessary for integration tests; skipping services provided
 | # Run docker containers necessary for integration tests; skipping services provided
 | ||||||
| # by CircleCI
 | # by CircleCI
 | ||||||
|  | @ -110,12 +116,18 @@ docker-run-circle: | ||||||
| 		-e SLAPD_CONFIG_ROOTPW="secret" \
 | 		-e SLAPD_CONFIG_ROOTPW="secret" \
 | ||||||
| 		-p "389:389" -p "636:636" \
 | 		-p "389:389" -p "636:636" \
 | ||||||
| 		-d cobaugh/openldap-alpine | 		-d cobaugh/openldap-alpine | ||||||
|  | 	docker run --name cratedb \
 | ||||||
|  | 		-p "6543:5432" \
 | ||||||
|  | 		-d crate crate \
 | ||||||
|  | 		-Cnetwork.host=0.0.0.0 \
 | ||||||
|  | 		-Ctransport.host=localhost \
 | ||||||
|  | 		-Clicense.enterprise=false | ||||||
| 
 | 
 | ||||||
| docker-kill: | docker-kill: | ||||||
| 	-docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
 | 	-docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
 | ||||||
| 		openldap postgres rabbitmq redis riemann zookeeper | 		openldap postgres rabbitmq redis riemann zookeeper cratedb | ||||||
| 	-docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
 | 	-docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \
 | ||||||
| 		openldap postgres rabbitmq redis riemann zookeeper | 		openldap postgres rabbitmq redis riemann zookeeper cratedb | ||||||
| 
 | 
 | ||||||
| .PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \ | .PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \ | ||||||
| 	package clean docker-run docker-run-circle docker-kill docker-image | 	package clean docker-run docker-run-circle docker-kill docker-image | ||||||
|  |  | ||||||
|  | @ -267,6 +267,7 @@ formats may be used with input plugins supporting the `data_format` option: | ||||||
| * [amqp](./plugins/outputs/amqp) (rabbitmq) | * [amqp](./plugins/outputs/amqp) (rabbitmq) | ||||||
| * [aws kinesis](./plugins/outputs/kinesis) | * [aws kinesis](./plugins/outputs/kinesis) | ||||||
| * [aws cloudwatch](./plugins/outputs/cloudwatch) | * [aws cloudwatch](./plugins/outputs/cloudwatch) | ||||||
|  | * [cratedb](./plugins/outputs/cratedb) | ||||||
| * [datadog](./plugins/outputs/datadog) | * [datadog](./plugins/outputs/datadog) | ||||||
| * [discard](./plugins/outputs/discard) | * [discard](./plugins/outputs/discard) | ||||||
| * [elasticsearch](./plugins/outputs/elasticsearch) | * [elasticsearch](./plugins/outputs/elasticsearch) | ||||||
|  |  | ||||||
|  | @ -199,6 +199,19 @@ | ||||||
| #   namespace = "InfluxData/Telegraf" | #   namespace = "InfluxData/Telegraf" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # # Configuration for CrateDB to send metrics to. | ||||||
|  | # [[outputs.cratedb]] | ||||||
|  | #   # A github.com/jackc/pgx connection string. | ||||||
|  | #   # See https://godoc.org/github.com/jackc/pgx#ParseDSN | ||||||
|  | #   url = "postgres://user:password@localhost/schema?sslmode=disable" | ||||||
|  | #   # Timeout for all CrateDB queries. | ||||||
|  | #   timeout = "5s" | ||||||
|  | #   # Name of the table to store metrics in. | ||||||
|  | #   table = "metrics" | ||||||
|  | #   # If true, and the metrics table does not exist, create it automatically. | ||||||
|  | #   table_create = true | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # # Configuration for DataDog API to send metrics to. | # # Configuration for DataDog API to send metrics to. | ||||||
| # [[outputs.datadog]] | # [[outputs.datadog]] | ||||||
| #   ## Datadog API key | #   ## Datadog API key | ||||||
|  |  | ||||||
|  | @ -51,12 +51,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { | ||||||
| 		"checkpoints_req", | 		"checkpoints_req", | ||||||
| 		"checkpoints_timed", | 		"checkpoints_timed", | ||||||
| 		"maxwritten_clean", | 		"maxwritten_clean", | ||||||
| 	} | 		"datid", | ||||||
| 
 |  | ||||||
| 	int32Metrics := []string{ |  | ||||||
| 		"numbackends", | 		"numbackends", | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	int32Metrics := []string{} | ||||||
|  | 
 | ||||||
| 	floatMetrics := []string{ | 	floatMetrics := []string{ | ||||||
| 		"blk_read_time", | 		"blk_read_time", | ||||||
| 		"blk_write_time", | 		"blk_write_time", | ||||||
|  | @ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	stringMetrics := []string{ | 	stringMetrics := []string{ | ||||||
| 		"datname", | 		"datname", | ||||||
| 		"datid", |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	metricsCounted := 0 | 	metricsCounted := 0 | ||||||
|  |  | ||||||
|  | @ -53,11 +53,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { | ||||||
| 		"temp_files", | 		"temp_files", | ||||||
| 		"temp_bytes", | 		"temp_bytes", | ||||||
| 		"deadlocks", | 		"deadlocks", | ||||||
|  | 		"numbackends", | ||||||
|  | 		"datid", | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	int32Metrics := []string{ | 	int32Metrics := []string{} | ||||||
| 		"numbackends", |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	floatMetrics := []string{ | 	floatMetrics := []string{ | ||||||
| 		"blk_read_time", | 		"blk_read_time", | ||||||
|  | @ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	stringMetrics := []string{ | 	stringMetrics := []string{ | ||||||
| 		"datname", | 		"datname", | ||||||
| 		"datid", |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	metricsCounted := 0 | 	metricsCounted := 0 | ||||||
|  | @ -175,11 +174,11 @@ func TestPostgresqlFieldOutput(t *testing.T) { | ||||||
| 		"temp_files", | 		"temp_files", | ||||||
| 		"temp_bytes", | 		"temp_bytes", | ||||||
| 		"deadlocks", | 		"deadlocks", | ||||||
|  | 		"numbackends", | ||||||
|  | 		"datid", | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	int32Metrics := []string{ | 	int32Metrics := []string{} | ||||||
| 		"numbackends", |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	floatMetrics := []string{ | 	floatMetrics := []string{ | ||||||
| 		"blk_read_time", | 		"blk_read_time", | ||||||
|  | @ -188,7 +187,6 @@ func TestPostgresqlFieldOutput(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	stringMetrics := []string{ | 	stringMetrics := []string{ | ||||||
| 		"datname", | 		"datname", | ||||||
| 		"datid", |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for _, field := range intMetrics { | 	for _, field := range intMetrics { | ||||||
|  |  | ||||||
|  | @ -4,6 +4,7 @@ import ( | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/amon" | 	_ "github.com/influxdata/telegraf/plugins/outputs/amon" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/amqp" | 	_ "github.com/influxdata/telegraf/plugins/outputs/amqp" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" | 	_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" | ||||||
|  | 	_ "github.com/influxdata/telegraf/plugins/outputs/cratedb" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/datadog" | 	_ "github.com/influxdata/telegraf/plugins/outputs/datadog" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/discard" | 	_ "github.com/influxdata/telegraf/plugins/outputs/discard" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" | 	_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" | ||||||
|  |  | ||||||
|  | @ -0,0 +1,38 @@ | ||||||
|  | # CrateDB Output Plugin for Telegraf | ||||||
|  | 
 | ||||||
|  | This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol](https://crate.io/docs/crate/reference/protocols/postgres.html). | ||||||
|  | 
 | ||||||
|  | ## Table Schema | ||||||
|  | 
 | ||||||
|  | The plugin requires a table with the following schema. | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | ```sql | ||||||
|  | CREATE TABLE my_metrics ( | ||||||
|  |   "hash_id" LONG INDEX OFF, | ||||||
|  |   "timestamp" TIMESTAMP, | ||||||
|  |   "name" STRING, | ||||||
|  |   "tags" OBJECT(DYNAMIC), | ||||||
|  |   "fields" OBJECT(DYNAMIC), | ||||||
|  |   PRIMARY KEY ("timestamp", "hash_id","day") | ||||||
|  | ) PARTITIONED BY("day"); | ||||||
|  | ``` | ||||||
|  | 
 | ||||||
|  | The plugin can create this table for you automatically via the `table_create` | ||||||
|  | config option, see below. | ||||||
|  | 
 | ||||||
|  | ## Configuration | ||||||
|  | 
 | ||||||
|  | ```toml | ||||||
|  | # Configuration for CrateDB to send metrics to. | ||||||
|  | [[outputs.cratedb]] | ||||||
|  |   # A github.com/jackc/pgx connection string. | ||||||
|  |   # See https://godoc.org/github.com/jackc/pgx#ParseDSN | ||||||
|  |   url = "postgres://user:password@localhost/schema?sslmode=disable" | ||||||
|  |   # Timeout for all CrateDB queries. | ||||||
|  |   timeout = "5s" | ||||||
|  |   # Name of the table to store metrics in. | ||||||
|  |   table = "metrics" | ||||||
|  |   # If true, and the metrics table does not exist, create it automatically. | ||||||
|  |   table_create = true | ||||||
|  | ``` | ||||||
|  | @ -0,0 +1,230 @@ | ||||||
|  | package cratedb | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"crypto/sha512" | ||||||
|  | 	"database/sql" | ||||||
|  | 	"encoding/binary" | ||||||
|  | 	"fmt" | ||||||
|  | 	"sort" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
|  | 	"github.com/influxdata/telegraf/plugins/outputs" | ||||||
|  | 	_ "github.com/jackc/pgx/stdlib" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type CrateDB struct { | ||||||
|  | 	URL         string | ||||||
|  | 	Timeout     internal.Duration | ||||||
|  | 	Table       string | ||||||
|  | 	TableCreate bool `toml:"table_create"` | ||||||
|  | 	DB          *sql.DB | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var sampleConfig = ` | ||||||
|  |   # A github.com/jackc/pgx connection string. | ||||||
|  |   # See https://godoc.org/github.com/jackc/pgx#ParseDSN
 | ||||||
|  |   url = "postgres://user:password@localhost/schema?sslmode=disable" | ||||||
|  |   # Timeout for all CrateDB queries. | ||||||
|  |   timeout = "5s" | ||||||
|  |   # Name of the table to store metrics in. | ||||||
|  |   table = "metrics" | ||||||
|  |   # If true, and the metrics table does not exist, create it automatically. | ||||||
|  |   table_create = true | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | func (c *CrateDB) Connect() error { | ||||||
|  | 	db, err := sql.Open("pgx", c.URL) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} else if c.TableCreate { | ||||||
|  | 		sql := ` | ||||||
|  | CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( | ||||||
|  | 	"hash_id" LONG INDEX OFF, | ||||||
|  | 	"timestamp" TIMESTAMP, | ||||||
|  | 	"name" STRING, | ||||||
|  | 	"tags" OBJECT(DYNAMIC), | ||||||
|  | 	"fields" OBJECT(DYNAMIC), | ||||||
|  | 	"day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), | ||||||
|  | 	PRIMARY KEY ("timestamp", "hash_id","day") | ||||||
|  | ) PARTITIONED BY("day"); | ||||||
|  | ` | ||||||
|  | 		ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) | ||||||
|  | 		defer cancel() | ||||||
|  | 		if _, err := db.ExecContext(ctx, sql); err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	c.DB = db | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *CrateDB) Write(metrics []telegraf.Metric) error { | ||||||
|  | 	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) | ||||||
|  | 	defer cancel() | ||||||
|  | 	if sql, err := insertSQL(c.Table, metrics); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} else if _, err := c.DB.ExecContext(ctx, sql); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func insertSQL(table string, metrics []telegraf.Metric) (string, error) { | ||||||
|  | 	rows := make([]string, len(metrics)) | ||||||
|  | 	for i, m := range metrics { | ||||||
|  | 
 | ||||||
|  | 		cols := []interface{}{ | ||||||
|  | 			hashID(m), | ||||||
|  | 			m.Time().UTC(), | ||||||
|  | 			m.Name(), | ||||||
|  | 			m.Tags(), | ||||||
|  | 			m.Fields(), | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		escapedCols := make([]string, len(cols)) | ||||||
|  | 		for i, col := range cols { | ||||||
|  | 			escaped, err := escapeValue(col) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return "", err | ||||||
|  | 			} | ||||||
|  | 			escapedCols[i] = escaped | ||||||
|  | 		} | ||||||
|  | 		rows[i] = `(` + strings.Join(escapedCols, ", ") + `)` | ||||||
|  | 	} | ||||||
|  | 	sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields") | ||||||
|  | VALUES | ||||||
|  | ` + strings.Join(rows, " ,\n") + `;` | ||||||
|  | 	return sql, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // escapeValue returns a string version of val that is suitable for being used
 | ||||||
|  | // inside of a VALUES expression or similar. Unsupported types return an error.
 | ||||||
|  | //
 | ||||||
|  | // Warning: This is not ideal from a security perspective, but unfortunately
 | ||||||
|  | // CrateDB does not support enough of the PostgreSQL wire protocol to allow
 | ||||||
|  | // using pgx with $1, $2 placeholders [1]. Security conscious users of this
 | ||||||
|  | // plugin should probably refrain from using it in combination with untrusted
 | ||||||
|  | // inputs.
 | ||||||
|  | //
 | ||||||
|  | // [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371
 | ||||||
|  | func escapeValue(val interface{}) (string, error) { | ||||||
|  | 	switch t := val.(type) { | ||||||
|  | 	case string: | ||||||
|  | 		return escapeString(t, `'`), nil | ||||||
|  | 	// We don't handle uint, uint32 and uint64 here because CrateDB doesn't
 | ||||||
|  | 	// seem to support unsigned types. But it seems like input plugins don't
 | ||||||
|  | 	// produce those types, so it's hopefully ok.
 | ||||||
|  | 	case int, int32, int64, float32, float64: | ||||||
|  | 		return fmt.Sprint(t), nil | ||||||
|  | 	case time.Time: | ||||||
|  | 		// see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp
 | ||||||
|  | 		return escapeValue(t.Format("2006-01-02T15:04:05.999-0700")) | ||||||
|  | 	case map[string]string: | ||||||
|  | 		return escapeObject(convertMap(t)) | ||||||
|  | 	case map[string]interface{}: | ||||||
|  | 		return escapeObject(t) | ||||||
|  | 	default: | ||||||
|  | 		// This might be panic worthy under normal circumstances, but it's probably
 | ||||||
|  | 		// better to not shut down the entire telegraf process because of one
 | ||||||
|  | 		// misbehaving plugin.
 | ||||||
|  | 		return "", fmt.Errorf("unexpected type: %T: %#v", t, t) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // convertMap converts m from map[string]string to map[string]interface{} by
 | ||||||
|  | // copying it. Generics, oh generics where art thou?
 | ||||||
|  | func convertMap(m map[string]string) map[string]interface{} { | ||||||
|  | 	c := make(map[string]interface{}, len(m)) | ||||||
|  | 	for k, v := range m { | ||||||
|  | 		c[k] = v | ||||||
|  | 	} | ||||||
|  | 	return c | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func escapeObject(m map[string]interface{}) (string, error) { | ||||||
|  | 	// There is a decent chance that the implementation below doesn't catch all
 | ||||||
|  | 	// edge cases, but it's hard to tell since the format seems to be a bit
 | ||||||
|  | 	// underspecified.
 | ||||||
|  | 	// See https://crate.io/docs/crate/reference/sql/data_types.html#object
 | ||||||
|  | 
 | ||||||
|  | 	// We find all keys and sort them first because iterating a map in go is
 | ||||||
|  | 	// randomized and we need consistent output for our unit tests.
 | ||||||
|  | 	keys := make([]string, 0, len(m)) | ||||||
|  | 	for k, _ := range m { | ||||||
|  | 		keys = append(keys, k) | ||||||
|  | 	} | ||||||
|  | 	sort.Strings(keys) | ||||||
|  | 
 | ||||||
|  | 	// Now we build our key = val pairs
 | ||||||
|  | 	pairs := make([]string, 0, len(m)) | ||||||
|  | 	for _, k := range keys { | ||||||
|  | 		// escape the value of our key k (potentially recursive)
 | ||||||
|  | 		val, err := escapeValue(m[k]) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return "", err | ||||||
|  | 		} | ||||||
|  | 		pairs = append(pairs, escapeString(k, `"`)+" = "+val) | ||||||
|  | 	} | ||||||
|  | 	return `{` + strings.Join(pairs, ", ") + `}`, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // escapeString wraps s in the given quote string and replaces all occurences
 | ||||||
|  | // of it inside of s with a double quote.
 | ||||||
|  | func escapeString(s string, quote string) string { | ||||||
|  | 	return quote + strings.Replace(s, quote, quote+quote, -1) + quote | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // hashID returns a cryptographic hash int64 hash that includes the metric name
 | ||||||
|  | // and tags. It's used instead of m.HashID() because it's not considered stable
 | ||||||
|  | // and because a cryptogtaphic hash makes more sense for the use case of
 | ||||||
|  | // deduplication.
 | ||||||
|  | // [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201
 | ||||||
|  | func hashID(m telegraf.Metric) int64 { | ||||||
|  | 	h := sha512.New() | ||||||
|  | 	h.Write([]byte(m.Name())) | ||||||
|  | 	tags := m.Tags() | ||||||
|  | 	tmp := make([]string, len(tags)) | ||||||
|  | 	i := 0 | ||||||
|  | 	for k, v := range tags { | ||||||
|  | 		tmp[i] = k + v | ||||||
|  | 		i++ | ||||||
|  | 	} | ||||||
|  | 	sort.Strings(tmp) | ||||||
|  | 
 | ||||||
|  | 	for _, s := range tmp { | ||||||
|  | 		h.Write([]byte(s)) | ||||||
|  | 	} | ||||||
|  | 	sum := h.Sum(nil) | ||||||
|  | 
 | ||||||
|  | 	// Note: We have to convert from uint64 to int64 below because CrateDB only
 | ||||||
|  | 	// supports a signed 64 bit LONG type:
 | ||||||
|  | 	//
 | ||||||
|  | 	// CREATE TABLE my_long (val LONG);
 | ||||||
|  | 	// INSERT INTO my_long(val) VALUES (14305102049502225714);
 | ||||||
|  | 	// -> ERROR:  SQLParseException: For input string: "14305102049502225714"
 | ||||||
|  | 	return int64(binary.LittleEndian.Uint64(sum)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *CrateDB) SampleConfig() string { | ||||||
|  | 	return sampleConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *CrateDB) Description() string { | ||||||
|  | 	return "Configuration for CrateDB to send metrics to." | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *CrateDB) Close() error { | ||||||
|  | 	return c.DB.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	outputs.Add("cratedb", func() telegraf.Output { | ||||||
|  | 		return &CrateDB{ | ||||||
|  | 			Timeout: internal.Duration{Duration: time.Second * 5}, | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | @ -0,0 +1,215 @@ | ||||||
|  | package cratedb | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"database/sql" | ||||||
|  | 	"os" | ||||||
|  | 	"strings" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
|  | 	"github.com/influxdata/telegraf/metric" | ||||||
|  | 	"github.com/influxdata/telegraf/testutil" | ||||||
|  | 	"github.com/stretchr/testify/require" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestConnectAndWrite(t *testing.T) { | ||||||
|  | 	if testing.Short() { | ||||||
|  | 		t.Skip("Skipping integration test in short mode") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	url := testURL() | ||||||
|  | 	table := "test" | ||||||
|  | 
 | ||||||
|  | 	// dropSQL drops our table before each test. This simplifies changing the
 | ||||||
|  | 	// schema during development :).
 | ||||||
|  | 	dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`) | ||||||
|  | 	db, err := sql.Open("pgx", url) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	_, err = db.Exec(dropSQL) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer db.Close() | ||||||
|  | 
 | ||||||
|  | 	c := &CrateDB{ | ||||||
|  | 		URL:         url, | ||||||
|  | 		Table:       table, | ||||||
|  | 		Timeout:     internal.Duration{Duration: time.Second * 5}, | ||||||
|  | 		TableCreate: true, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	metrics := testutil.MockMetrics() | ||||||
|  | 	require.NoError(t, c.Connect()) | ||||||
|  | 	require.NoError(t, c.Write(metrics)) | ||||||
|  | 
 | ||||||
|  | 	// The code below verifies that the metrics were written. We have to select
 | ||||||
|  | 	// the rows using their primary keys in order to take advantage of
 | ||||||
|  | 	// read-after-write consistency in CrateDB.
 | ||||||
|  | 	for _, m := range metrics { | ||||||
|  | 		hashIDVal, err := escapeValue(hashID(m)) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		timestamp, err := escapeValue(m.Time()) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 		var id int64 | ||||||
|  | 		row := db.QueryRow( | ||||||
|  | 			"SELECT hash_id FROM " + escapeString(table, `"`) + " " + | ||||||
|  | 				"WHERE hash_id = " + hashIDVal + " " + | ||||||
|  | 				"AND timestamp = " + timestamp, | ||||||
|  | 		) | ||||||
|  | 		require.NoError(t, row.Scan(&id)) | ||||||
|  | 		// We could check the whole row, but this is meant to be more of a smoke
 | ||||||
|  | 		// test, so just checking the HashID seems fine.
 | ||||||
|  | 		require.Equal(t, id, hashID(m)) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	require.NoError(t, c.Close()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func Test_insertSQL(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		Metrics []telegraf.Metric | ||||||
|  | 		Want    string | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			Metrics: testutil.MockMetrics(), | ||||||
|  | 			Want: strings.TrimSpace(` | ||||||
|  | INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") | ||||||
|  | VALUES | ||||||
|  | (-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); | ||||||
|  | `), | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		if got, err := insertSQL("my_table", test.Metrics); err != nil { | ||||||
|  | 			t.Error(err) | ||||||
|  | 		} else if got != test.Want { | ||||||
|  | 			t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func Test_escapeValue(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		Val  interface{} | ||||||
|  | 		Want string | ||||||
|  | 	}{ | ||||||
|  | 		// string
 | ||||||
|  | 		{`foo`, `'foo'`}, | ||||||
|  | 		{`foo'bar 'yeah`, `'foo''bar ''yeah'`}, | ||||||
|  | 		// int types
 | ||||||
|  | 		{123, `123`}, // int
 | ||||||
|  | 		{int64(123), `123`}, | ||||||
|  | 		{int32(123), `123`}, | ||||||
|  | 		// float types
 | ||||||
|  | 		{123.456, `123.456`}, | ||||||
|  | 		{float32(123.456), `123.456`}, // floating point SNAFU
 | ||||||
|  | 		{float64(123.456), `123.456`}, | ||||||
|  | 		// time.Time
 | ||||||
|  | 		{time.Date(2017, 8, 7, 16, 44, 52, 123*1000*1000, time.FixedZone("Dreamland", 5400)), `'2017-08-07T16:44:52.123+0130'`}, | ||||||
|  | 		// map[string]string
 | ||||||
|  | 		{map[string]string{}, `{}`}, | ||||||
|  | 		{map[string]string(nil), `{}`}, | ||||||
|  | 		{map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`}, | ||||||
|  | 		{map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, | ||||||
|  | 		// map[string]interface{}
 | ||||||
|  | 		{map[string]interface{}{}, `{}`}, | ||||||
|  | 		{map[string]interface{}(nil), `{}`}, | ||||||
|  | 		{map[string]interface{}{"foo": "bar"}, `{"foo" = 'bar'}`}, | ||||||
|  | 		{map[string]interface{}{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, | ||||||
|  | 		{map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`}, | ||||||
|  | 		{map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	url := testURL() | ||||||
|  | 	db, err := sql.Open("pgx", url) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	defer db.Close() | ||||||
|  | 
 | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		got, err := escapeValue(test.Val) | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Errorf("val: %#v: %s", test.Val, err) | ||||||
|  | 		} else if got != test.Want { | ||||||
|  | 			t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// This is a smoke test that will blow up if our escaping causing a SQL
 | ||||||
|  | 		// syntax error, which may allow for an attack.
 | ||||||
|  | 		var reply interface{} | ||||||
|  | 		row := db.QueryRow("SELECT " + got) | ||||||
|  | 		require.NoError(t, row.Scan(&reply)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func Test_hashID(t *testing.T) { | ||||||
|  | 	tests := []struct { | ||||||
|  | 		Name   string | ||||||
|  | 		Tags   map[string]string | ||||||
|  | 		Fields map[string]interface{} | ||||||
|  | 		Want   int64 | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			Name:   "metric1", | ||||||
|  | 			Tags:   map[string]string{"tag1": "val1", "tag2": "val2"}, | ||||||
|  | 			Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, | ||||||
|  | 			Want:   8973971082006474188, | ||||||
|  | 		}, | ||||||
|  | 
 | ||||||
|  | 		// This metric has a different tag order (in a perhaps non-ideal attempt to
 | ||||||
|  | 		// trigger different pseudo-random map iteration)) and fields (none)
 | ||||||
|  | 		// compared to the previous metric, but should still get the same hash.
 | ||||||
|  | 		{ | ||||||
|  | 			Name:   "metric1", | ||||||
|  | 			Tags:   map[string]string{"tag2": "val2", "tag1": "val1"}, | ||||||
|  | 			Fields: map[string]interface{}{"field3": "val3"}, | ||||||
|  | 			Want:   8973971082006474188, | ||||||
|  | 		}, | ||||||
|  | 
 | ||||||
|  | 		// Different metric name -> different hash
 | ||||||
|  | 		{ | ||||||
|  | 			Name:   "metric2", | ||||||
|  | 			Tags:   map[string]string{"tag1": "val1", "tag2": "val2"}, | ||||||
|  | 			Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, | ||||||
|  | 			Want:   306487682448261783, | ||||||
|  | 		}, | ||||||
|  | 
 | ||||||
|  | 		// Different tag val -> different hash
 | ||||||
|  | 		{ | ||||||
|  | 			Name:   "metric1", | ||||||
|  | 			Tags:   map[string]string{"tag1": "new-val", "tag2": "val2"}, | ||||||
|  | 			Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, | ||||||
|  | 			Want:   1938713695181062970, | ||||||
|  | 		}, | ||||||
|  | 
 | ||||||
|  | 		// Different tag key -> different hash
 | ||||||
|  | 		{ | ||||||
|  | 			Name:   "metric1", | ||||||
|  | 			Tags:   map[string]string{"new-key": "val1", "tag2": "val2"}, | ||||||
|  | 			Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, | ||||||
|  | 			Want:   7678889081527706328, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i, test := range tests { | ||||||
|  | 		m, err := metric.New( | ||||||
|  | 			test.Name, | ||||||
|  | 			test.Tags, | ||||||
|  | 			test.Fields, | ||||||
|  | 			time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), | ||||||
|  | 		) | ||||||
|  | 		require.NoError(t, err) | ||||||
|  | 		if got := hashID(m); got != test.Want { | ||||||
|  | 			t.Errorf("test #%d: got=%d want=%d", i, got, test.Want) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func testURL() string { | ||||||
|  | 	url := os.Getenv("CRATE_URL") | ||||||
|  | 	if url == "" { | ||||||
|  | 		return "postgres://" + testutil.GetLocalHost() + ":6543/test?sslmode=disable" | ||||||
|  | 	} | ||||||
|  | 	return url | ||||||
|  | } | ||||||
		Loading…
	
		Reference in New Issue