diff --git a/Godeps b/Godeps index 6b8ba5533..7e90dd061 100644 --- a/Godeps +++ b/Godeps @@ -32,7 +32,7 @@ github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec -github.com/jackc/pgx b84338d7d62598f75859b2b146d830b22f1b9ec8 +github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 diff --git a/Makefile b/Makefile index d316cbb8b..d8ccf2faf 100644 --- a/Makefile +++ b/Makefile @@ -86,6 +86,12 @@ docker-run: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine + docker run --name cratedb \ + -p "6543:5432" \ + -d crate crate \ + -Cnetwork.host=0.0.0.0 \ + -Ctransport.host=localhost \ + -Clicense.enterprise=false # Run docker containers necessary for integration tests; skipping services provided # by CircleCI @@ -110,12 +116,18 @@ docker-run-circle: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine + docker run --name cratedb \ + -p "6543:5432" \ + -d crate crate \ + -Cnetwork.host=0.0.0.0 \ + -Ctransport.host=localhost \ + -Clicense.enterprise=false docker-kill: -docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ - openldap postgres rabbitmq redis riemann zookeeper + openldap postgres rabbitmq redis riemann zookeeper cratedb -docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ - openldap postgres rabbitmq redis riemann zookeeper + openldap postgres rabbitmq redis riemann zookeeper cratedb .PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \ package clean docker-run docker-run-circle docker-kill docker-image diff --git a/README.md b/README.md index a5c6f08e4..c013e4af3 100644 --- a/README.md +++ b/README.md @@ -267,6 +267,7 @@ formats may be used with input plugins supporting the `data_format` option: * [amqp](./plugins/outputs/amqp) (rabbitmq) * [aws kinesis](./plugins/outputs/kinesis) * [aws cloudwatch](./plugins/outputs/cloudwatch) +* [cratedb](./plugins/outputs/cratedb) * [datadog](./plugins/outputs/datadog) * [discard](./plugins/outputs/discard) * [elasticsearch](./plugins/outputs/elasticsearch) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 63ce2ebfe..3ab0afd51 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -199,6 +199,19 @@ # namespace = "InfluxData/Telegraf" +# # Configuration for CrateDB to send metrics to. +# [[outputs.cratedb]] +# # A github.com/jackc/pgx connection string. +# # See https://godoc.org/github.com/jackc/pgx#ParseDSN +# url = "postgres://user:password@localhost/schema?sslmode=disable" +# # Timeout for all CrateDB queries. +# timeout = "5s" +# # Name of the table to store metrics in. +# table = "metrics" +# # If true, and the metrics table does not exist, create it automatically. +# table_create = true + + # # Configuration for DataDog API to send metrics to. # [[outputs.datadog]] # ## Datadog API key diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 03c09936d..410b9b421 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -51,12 +51,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "checkpoints_req", "checkpoints_timed", "maxwritten_clean", - } - - int32Metrics := []string{ + "datid", "numbackends", } + int32Metrics := []string{} + floatMetrics := []string{ "blk_read_time", "blk_write_time", @@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } metricsCounted := 0 diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 466cdfd98..4545a2478 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -53,11 +53,11 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", + "numbackends", + "datid", } - int32Metrics := []string{ - "numbackends", - } + int32Metrics := []string{} floatMetrics := []string{ "blk_read_time", @@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } metricsCounted := 0 @@ -175,11 +174,11 @@ func TestPostgresqlFieldOutput(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", + "numbackends", + "datid", } - int32Metrics := []string{ - "numbackends", - } + int32Metrics := []string{} floatMetrics := []string{ "blk_read_time", @@ -188,7 +187,6 @@ func TestPostgresqlFieldOutput(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } for _, field := range intMetrics { diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 53d1cb8ca..5e74eb796 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" + _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md new file mode 100644 index 000000000..a8a01fdfe --- /dev/null +++ b/plugins/outputs/cratedb/README.md @@ -0,0 +1,38 @@ +# CrateDB Output Plugin for Telegraf + +This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol](https://crate.io/docs/crate/reference/protocols/postgres.html). + +## Table Schema + +The plugin requires a table with the following schema. + + +```sql +CREATE TABLE my_metrics ( + "hash_id" LONG INDEX OFF, + "timestamp" TIMESTAMP, + "name" STRING, + "tags" OBJECT(DYNAMIC), + "fields" OBJECT(DYNAMIC), + PRIMARY KEY ("timestamp", "hash_id","day") +) PARTITIONED BY("day"); +``` + +The plugin can create this table for you automatically via the `table_create` +config option, see below. + +## Configuration + +```toml +# Configuration for CrateDB to send metrics to. +[[outputs.cratedb]] + # A github.com/jackc/pgx connection string. + # See https://godoc.org/github.com/jackc/pgx#ParseDSN + url = "postgres://user:password@localhost/schema?sslmode=disable" + # Timeout for all CrateDB queries. + timeout = "5s" + # Name of the table to store metrics in. + table = "metrics" + # If true, and the metrics table does not exist, create it automatically. + table_create = true +``` diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go new file mode 100644 index 000000000..5a5987c77 --- /dev/null +++ b/plugins/outputs/cratedb/cratedb.go @@ -0,0 +1,230 @@ +package cratedb + +import ( + "context" + "crypto/sha512" + "database/sql" + "encoding/binary" + "fmt" + "sort" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + _ "github.com/jackc/pgx/stdlib" +) + +type CrateDB struct { + URL string + Timeout internal.Duration + Table string + TableCreate bool `toml:"table_create"` + DB *sql.DB +} + +var sampleConfig = ` + # A github.com/jackc/pgx connection string. + # See https://godoc.org/github.com/jackc/pgx#ParseDSN + url = "postgres://user:password@localhost/schema?sslmode=disable" + # Timeout for all CrateDB queries. + timeout = "5s" + # Name of the table to store metrics in. + table = "metrics" + # If true, and the metrics table does not exist, create it automatically. + table_create = true +` + +func (c *CrateDB) Connect() error { + db, err := sql.Open("pgx", c.URL) + if err != nil { + return err + } else if c.TableCreate { + sql := ` +CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( + "hash_id" LONG INDEX OFF, + "timestamp" TIMESTAMP, + "name" STRING, + "tags" OBJECT(DYNAMIC), + "fields" OBJECT(DYNAMIC), + "day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), + PRIMARY KEY ("timestamp", "hash_id","day") +) PARTITIONED BY("day"); +` + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) + defer cancel() + if _, err := db.ExecContext(ctx, sql); err != nil { + return err + } + } + c.DB = db + return nil +} + +func (c *CrateDB) Write(metrics []telegraf.Metric) error { + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) + defer cancel() + if sql, err := insertSQL(c.Table, metrics); err != nil { + return err + } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { + return err + } + return nil +} + +func insertSQL(table string, metrics []telegraf.Metric) (string, error) { + rows := make([]string, len(metrics)) + for i, m := range metrics { + + cols := []interface{}{ + hashID(m), + m.Time().UTC(), + m.Name(), + m.Tags(), + m.Fields(), + } + + escapedCols := make([]string, len(cols)) + for i, col := range cols { + escaped, err := escapeValue(col) + if err != nil { + return "", err + } + escapedCols[i] = escaped + } + rows[i] = `(` + strings.Join(escapedCols, ", ") + `)` + } + sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields") +VALUES +` + strings.Join(rows, " ,\n") + `;` + return sql, nil +} + +// escapeValue returns a string version of val that is suitable for being used +// inside of a VALUES expression or similar. Unsupported types return an error. +// +// Warning: This is not ideal from a security perspective, but unfortunately +// CrateDB does not support enough of the PostgreSQL wire protocol to allow +// using pgx with $1, $2 placeholders [1]. Security conscious users of this +// plugin should probably refrain from using it in combination with untrusted +// inputs. +// +// [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371 +func escapeValue(val interface{}) (string, error) { + switch t := val.(type) { + case string: + return escapeString(t, `'`), nil + // We don't handle uint, uint32 and uint64 here because CrateDB doesn't + // seem to support unsigned types. But it seems like input plugins don't + // produce those types, so it's hopefully ok. + case int, int32, int64, float32, float64: + return fmt.Sprint(t), nil + case time.Time: + // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp + return escapeValue(t.Format("2006-01-02T15:04:05.999-0700")) + case map[string]string: + return escapeObject(convertMap(t)) + case map[string]interface{}: + return escapeObject(t) + default: + // This might be panic worthy under normal circumstances, but it's probably + // better to not shut down the entire telegraf process because of one + // misbehaving plugin. + return "", fmt.Errorf("unexpected type: %T: %#v", t, t) + } +} + +// convertMap converts m from map[string]string to map[string]interface{} by +// copying it. Generics, oh generics where art thou? +func convertMap(m map[string]string) map[string]interface{} { + c := make(map[string]interface{}, len(m)) + for k, v := range m { + c[k] = v + } + return c +} + +func escapeObject(m map[string]interface{}) (string, error) { + // There is a decent chance that the implementation below doesn't catch all + // edge cases, but it's hard to tell since the format seems to be a bit + // underspecified. + // See https://crate.io/docs/crate/reference/sql/data_types.html#object + + // We find all keys and sort them first because iterating a map in go is + // randomized and we need consistent output for our unit tests. + keys := make([]string, 0, len(m)) + for k, _ := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + // Now we build our key = val pairs + pairs := make([]string, 0, len(m)) + for _, k := range keys { + // escape the value of our key k (potentially recursive) + val, err := escapeValue(m[k]) + if err != nil { + return "", err + } + pairs = append(pairs, escapeString(k, `"`)+" = "+val) + } + return `{` + strings.Join(pairs, ", ") + `}`, nil +} + +// escapeString wraps s in the given quote string and replaces all occurences +// of it inside of s with a double quote. +func escapeString(s string, quote string) string { + return quote + strings.Replace(s, quote, quote+quote, -1) + quote +} + +// hashID returns a cryptographic hash int64 hash that includes the metric name +// and tags. It's used instead of m.HashID() because it's not considered stable +// and because a cryptogtaphic hash makes more sense for the use case of +// deduplication. +// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201 +func hashID(m telegraf.Metric) int64 { + h := sha512.New() + h.Write([]byte(m.Name())) + tags := m.Tags() + tmp := make([]string, len(tags)) + i := 0 + for k, v := range tags { + tmp[i] = k + v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + sum := h.Sum(nil) + + // Note: We have to convert from uint64 to int64 below because CrateDB only + // supports a signed 64 bit LONG type: + // + // CREATE TABLE my_long (val LONG); + // INSERT INTO my_long(val) VALUES (14305102049502225714); + // -> ERROR: SQLParseException: For input string: "14305102049502225714" + return int64(binary.LittleEndian.Uint64(sum)) +} + +func (c *CrateDB) SampleConfig() string { + return sampleConfig +} + +func (c *CrateDB) Description() string { + return "Configuration for CrateDB to send metrics to." +} + +func (c *CrateDB) Close() error { + return c.DB.Close() +} + +func init() { + outputs.Add("cratedb", func() telegraf.Output { + return &CrateDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go new file mode 100644 index 000000000..590098834 --- /dev/null +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -0,0 +1,215 @@ +package cratedb + +import ( + "database/sql" + "os" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + url := testURL() + table := "test" + + // dropSQL drops our table before each test. This simplifies changing the + // schema during development :). + dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`) + db, err := sql.Open("pgx", url) + require.NoError(t, err) + _, err = db.Exec(dropSQL) + require.NoError(t, err) + defer db.Close() + + c := &CrateDB{ + URL: url, + Table: table, + Timeout: internal.Duration{Duration: time.Second * 5}, + TableCreate: true, + } + + metrics := testutil.MockMetrics() + require.NoError(t, c.Connect()) + require.NoError(t, c.Write(metrics)) + + // The code below verifies that the metrics were written. We have to select + // the rows using their primary keys in order to take advantage of + // read-after-write consistency in CrateDB. + for _, m := range metrics { + hashIDVal, err := escapeValue(hashID(m)) + require.NoError(t, err) + timestamp, err := escapeValue(m.Time()) + require.NoError(t, err) + + var id int64 + row := db.QueryRow( + "SELECT hash_id FROM " + escapeString(table, `"`) + " " + + "WHERE hash_id = " + hashIDVal + " " + + "AND timestamp = " + timestamp, + ) + require.NoError(t, row.Scan(&id)) + // We could check the whole row, but this is meant to be more of a smoke + // test, so just checking the HashID seems fine. + require.Equal(t, id, hashID(m)) + } + + require.NoError(t, c.Close()) +} + +func Test_insertSQL(t *testing.T) { + tests := []struct { + Metrics []telegraf.Metric + Want string + }{ + { + Metrics: testutil.MockMetrics(), + Want: strings.TrimSpace(` +INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") +VALUES +(-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +`), + }, + } + + for _, test := range tests { + if got, err := insertSQL("my_table", test.Metrics); err != nil { + t.Error(err) + } else if got != test.Want { + t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) + } + } +} + +func Test_escapeValue(t *testing.T) { + tests := []struct { + Val interface{} + Want string + }{ + // string + {`foo`, `'foo'`}, + {`foo'bar 'yeah`, `'foo''bar ''yeah'`}, + // int types + {123, `123`}, // int + {int64(123), `123`}, + {int32(123), `123`}, + // float types + {123.456, `123.456`}, + {float32(123.456), `123.456`}, // floating point SNAFU + {float64(123.456), `123.456`}, + // time.Time + {time.Date(2017, 8, 7, 16, 44, 52, 123*1000*1000, time.FixedZone("Dreamland", 5400)), `'2017-08-07T16:44:52.123+0130'`}, + // map[string]string + {map[string]string{}, `{}`}, + {map[string]string(nil), `{}`}, + {map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`}, + {map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, + // map[string]interface{} + {map[string]interface{}{}, `{}`}, + {map[string]interface{}(nil), `{}`}, + {map[string]interface{}{"foo": "bar"}, `{"foo" = 'bar'}`}, + {map[string]interface{}{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, + {map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`}, + {map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`}, + } + + url := testURL() + db, err := sql.Open("pgx", url) + require.NoError(t, err) + defer db.Close() + + for _, test := range tests { + got, err := escapeValue(test.Val) + if err != nil { + t.Errorf("val: %#v: %s", test.Val, err) + } else if got != test.Want { + t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) + } + + // This is a smoke test that will blow up if our escaping causing a SQL + // syntax error, which may allow for an attack. + var reply interface{} + row := db.QueryRow("SELECT " + got) + require.NoError(t, row.Scan(&reply)) + } +} + +func Test_hashID(t *testing.T) { + tests := []struct { + Name string + Tags map[string]string + Fields map[string]interface{} + Want int64 + }{ + { + Name: "metric1", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 8973971082006474188, + }, + + // This metric has a different tag order (in a perhaps non-ideal attempt to + // trigger different pseudo-random map iteration)) and fields (none) + // compared to the previous metric, but should still get the same hash. + { + Name: "metric1", + Tags: map[string]string{"tag2": "val2", "tag1": "val1"}, + Fields: map[string]interface{}{"field3": "val3"}, + Want: 8973971082006474188, + }, + + // Different metric name -> different hash + { + Name: "metric2", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 306487682448261783, + }, + + // Different tag val -> different hash + { + Name: "metric1", + Tags: map[string]string{"tag1": "new-val", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 1938713695181062970, + }, + + // Different tag key -> different hash + { + Name: "metric1", + Tags: map[string]string{"new-key": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 7678889081527706328, + }, + } + + for i, test := range tests { + m, err := metric.New( + test.Name, + test.Tags, + test.Fields, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + require.NoError(t, err) + if got := hashID(m); got != test.Want { + t.Errorf("test #%d: got=%d want=%d", i, got, test.Want) + } + } +} + +func testURL() string { + url := os.Getenv("CRATE_URL") + if url == "" { + return "postgres://" + testutil.GetLocalHost() + ":6543/test?sslmode=disable" + } + return url +}