From e4caa347a28132f55e784068aee9af6680204ae6 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 4 Mar 2020 14:20:46 -0800 Subject: [PATCH] Add ClickHouse input plugin (#6441) --- .gitignore | 2 +- plugins/inputs/all/all.go | 1 + plugins/inputs/clickhouse/README.md | 119 ++++++ plugins/inputs/clickhouse/clickhouse.go | 390 ++++++++++++++++++ .../inputs/clickhouse/clickhouse_go1.11.go | 6 + .../inputs/clickhouse/clickhouse_go1.12.go | 8 + plugins/inputs/clickhouse/clickhouse_test.go | 161 ++++++++ plugins/inputs/clickhouse/dev/dhparam.pem | 13 + .../inputs/clickhouse/dev/docker-compose.yml | 16 + plugins/inputs/clickhouse/dev/telegraf.conf | 12 + .../inputs/clickhouse/dev/telegraf_ssl.conf | 16 + .../inputs/clickhouse/dev/tls_settings.xml | 4 + 12 files changed, 747 insertions(+), 1 deletion(-) create mode 100644 plugins/inputs/clickhouse/README.md create mode 100644 plugins/inputs/clickhouse/clickhouse.go create mode 100644 plugins/inputs/clickhouse/clickhouse_go1.11.go create mode 100644 plugins/inputs/clickhouse/clickhouse_go1.12.go create mode 100644 plugins/inputs/clickhouse/clickhouse_test.go create mode 100644 plugins/inputs/clickhouse/dev/dhparam.pem create mode 100644 plugins/inputs/clickhouse/dev/docker-compose.yml create mode 100644 plugins/inputs/clickhouse/dev/telegraf.conf create mode 100644 plugins/inputs/clickhouse/dev/telegraf_ssl.conf create mode 100644 plugins/inputs/clickhouse/dev/tls_settings.xml diff --git a/.gitignore b/.gitignore index 4176a0413..0ae500592 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ /telegraf /telegraf.exe /telegraf.gz -/vendor +/vendor \ No newline at end of file diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 274d7fd41..2484df614 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/chrony" _ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_gnmi" _ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_mdt" + _ "github.com/influxdata/telegraf/plugins/inputs/clickhouse" _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" _ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push" _ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" diff --git a/plugins/inputs/clickhouse/README.md b/plugins/inputs/clickhouse/README.md new file mode 100644 index 000000000..8eb478fbc --- /dev/null +++ b/plugins/inputs/clickhouse/README.md @@ -0,0 +1,119 @@ +# Telegraf Input Plugin: ClickHouse + +This plugin gathers the statistic data from [ClickHouse](https://github.com/ClickHouse/ClickHouse) server. + +### Configuration +```ini +# Read metrics from one or many ClickHouse servers +[[inputs.clickhouse]] + ## Username for authorization on ClickHouse server + ## example: user = "default" + user = "default" + + ## Password for authorization on ClickHouse server + ## example: password = "super_secret" + + ## HTTP(s) timeout while getting metrics values + ## The timeout includes connection time, any redirects, and reading the response body. + ## example: timeout = 1s + # timeout = 5s + + ## List of servers for metrics scraping + ## metrics scrape via HTTP(s) clickhouse interface + ## https://clickhouse.tech/docs/en/interfaces/http/ + ## example: servers = ["http://127.0.0.1:8123","https://custom-server.mdb.yandexcloud.net"] + servers = ["http://127.0.0.1:8123"] + + ## If "auto_discovery"" is "true" plugin tries to connect to all servers available in the cluster + ## with using same "user:password" described in "user" and "password" parameters + ## and get this server hostname list from "system.clusters" table + ## see + ## - https://clickhouse.tech/docs/en/operations/system_tables/#system-clusters + ## - https://clickhouse.tech/docs/en/operations/server_settings/settings/#server_settings_remote_servers + ## - https://clickhouse.tech/docs/en/operations/table_engines/distributed/ + ## - https://clickhouse.tech/docs/en/operations/table_engines/replication/#creating-replicated-tables + ## example: auto_discovery = false + # auto_discovery = true + + ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" + ## when this filter present then "WHERE cluster IN (...)" filter will apply + ## please use only full cluster names here, regexp and glob filters is not allowed + ## for "/etc/clickhouse-server/config.d/remote.xml" + ## + ## + ## + ## + ## clickhouse-ru-1.local9000 + ## clickhouse-ru-2.local9000 + ## + ## + ## clickhouse-eu-1.local9000 + ## clickhouse-eu-2.local9000 + ## + ## + ## + ## + ## + ## + ## example: cluster_include = ["my-own-cluster"] + # cluster_include = [] + + ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" + ## when this filter present then "WHERE cluster NOT IN (...)" filter will apply + ## example: cluster_exclude = ["my-internal-not-discovered-cluster"] + # cluster_exclude = [] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +### Metrics: +- clickhouse_events + - tags: + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - all rows from system.events, all metrics is COUNTER type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-events + +- clickhouse_metrics + - tags: + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - all rows from system.metrics, all metrics is GAUGE type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-metrics + +- clickhouse_asynchronous_metrics + - tags: + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - all rows from system.asynchronous_metrics, all metrics is GAUGE type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-asynchronous_metrics + +- clickhouse_tables + - tags: + - hostname (ClickHouse server hostname) + - table + - database + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) + - fields: + - bytes + - parts + - rows + +### Example Output + +``` +clickhouse_events,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 read_compressed_bytes=212i,arena_alloc_chunks=35i,function_execute=85i,merge_tree_data_writer_rows=3i,rw_lock_acquired_read_locks=421i,file_open=46i,io_buffer_alloc_bytes=86451985i,inserted_bytes=196i,regexp_created=3i,real_time_microseconds=116832i,query=23i,network_receive_elapsed_microseconds=268i,merge_tree_data_writer_compressed_bytes=1080i,arena_alloc_bytes=212992i,disk_write_elapsed_microseconds=556i,inserted_rows=3i,compressed_read_buffer_bytes=81i,read_buffer_from_file_descriptor_read_bytes=148i,write_buffer_from_file_descriptor_write=47i,merge_tree_data_writer_blocks=3i,soft_page_faults=896i,hard_page_faults=7i,select_query=21i,merge_tree_data_writer_uncompressed_bytes=196i,merge_tree_data_writer_blocks_already_sorted=3i,user_time_microseconds=40196i,compressed_read_buffer_blocks=5i,write_buffer_from_file_descriptor_write_bytes=3246i,io_buffer_allocs=296i,created_write_buffer_ordinary=12i,disk_read_elapsed_microseconds=59347044i,network_send_elapsed_microseconds=1538i,context_lock=1040i,insert_query=1i,system_time_microseconds=14582i,read_buffer_from_file_descriptor_read=3i 1569421000000000000 +clickhouse_asynchronous_metrics,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 jemalloc.metadata_thp=0i,replicas_max_relative_delay=0i,jemalloc.mapped=1803177984i,jemalloc.allocated=1724839256i,jemalloc.background_thread.run_interval=0i,jemalloc.background_thread.num_threads=0i,uncompressed_cache_cells=0i,replicas_max_absolute_delay=0i,mark_cache_bytes=0i,compiled_expression_cache_count=0i,replicas_sum_queue_size=0i,number_of_tables=35i,replicas_max_merges_in_queue=0i,replicas_max_inserts_in_queue=0i,replicas_sum_merges_in_queue=0i,replicas_max_queue_size=0i,mark_cache_files=0i,jemalloc.background_thread.num_runs=0i,jemalloc.active=1726210048i,uptime=158i,jemalloc.retained=380481536i,replicas_sum_inserts_in_queue=0i,uncompressed_cache_bytes=0i,number_of_databases=2i,jemalloc.metadata=9207704i,max_part_count_for_partition=1i,jemalloc.resident=1742442496i 1569421000000000000 +clickhouse_metrics,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 replicated_send=0i,write=0i,ephemeral_node=0i,zoo_keeper_request=0i,distributed_files_to_insert=0i,replicated_fetch=0i,background_schedule_pool_task=0i,interserver_connection=0i,leader_replica=0i,delayed_inserts=0i,global_thread_active=41i,merge=0i,readonly_replica=0i,memory_tracking_in_background_schedule_pool=0i,memory_tracking_for_merges=0i,zoo_keeper_session=0i,context_lock_wait=0i,storage_buffer_bytes=0i,background_pool_task=0i,send_external_tables=0i,zoo_keeper_watch=0i,part_mutation=0i,disk_space_reserved_for_merge=0i,distributed_send=0i,version_integer=19014003i,local_thread=0i,replicated_checks=0i,memory_tracking=0i,memory_tracking_in_background_processing_pool=0i,leader_election=0i,revision=54425i,open_file_for_read=0i,open_file_for_write=0i,storage_buffer_rows=0i,rw_lock_waiting_readers=0i,rw_lock_waiting_writers=0i,rw_lock_active_writers=0i,local_thread_active=0i,query_preempted=0i,tcp_connection=1i,http_connection=1i,read=2i,query_thread=0i,dict_cache_requests=0i,rw_lock_active_readers=1i,global_thread=43i,query=1i 1569421000000000000 +clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=system,host=kshvakov,hostname=localhost,shard_num=1,table=trace_log bytes=754i,parts=1i,rows=1i 1569421000000000000 +clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=default,host=kshvakov,hostname=localhost,shard_num=1,table=example bytes=326i,parts=2i,rows=2i 1569421000000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go new file mode 100644 index 000000000..c122af4df --- /dev/null +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -0,0 +1,390 @@ +package clickhouse + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +var defaultTimeout = 5 * time.Second + +var sampleConfig = ` + ## Username for authorization on ClickHouse server + ## example: user = "default"" + user = "default" + + ## Password for authorization on ClickHouse server + ## example: password = "super_secret" + + ## HTTP(s) timeout while getting metrics values + ## The timeout includes connection time, any redirects, and reading the response body. + ## example: timeout = 1s + # timeout = 5s + + ## List of servers for metrics scraping + ## metrics scrape via HTTP(s) clickhouse interface + ## https://clickhouse.tech/docs/en/interfaces/http/ + ## example: servers = ["http://127.0.0.1:8123","https://custom-server.mdb.yandexcloud.net"] + servers = ["http://127.0.0.1:8123"] + + ## If "auto_discovery"" is "true" plugin tries to connect to all servers available in the cluster + ## with using same "user:password" described in "user" and "password" parameters + ## and get this server hostname list from "system.clusters" table + ## see + ## - https://clickhouse.tech/docs/en/operations/system_tables/#system-clusters + ## - https://clickhouse.tech/docs/en/operations/server_settings/settings/#server_settings_remote_servers + ## - https://clickhouse.tech/docs/en/operations/table_engines/distributed/ + ## - https://clickhouse.tech/docs/en/operations/table_engines/replication/#creating-replicated-tables + ## example: auto_discovery = false + # auto_discovery = true + + ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" + ## when this filter present then "WHERE cluster IN (...)" filter will apply + ## please use only full cluster names here, regexp and glob filters is not allowed + ## for "/etc/clickhouse-server/config.d/remote.xml" + ## + ## + ## + ## + ## clickhouse-ru-1.local9000 + ## clickhouse-ru-2.local9000 + ## + ## + ## clickhouse-eu-1.local9000 + ## clickhouse-eu-2.local9000 + ## + ## + ## + ## + ## + ## + ## example: cluster_include = ["my-own-cluster"] + # cluster_include = [] + + ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" + ## when this filter present then "WHERE cluster NOT IN (...)" filter will apply + ## example: cluster_exclude = ["my-internal-not-discovered-cluster"] + # cluster_exclude = [] + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +type connect struct { + Cluster string `json:"cluster"` + ShardNum int `json:"shard_num"` + Hostname string `json:"host_name"` + url *url.URL +} + +func init() { + inputs.Add("clickhouse", func() telegraf.Input { + return &ClickHouse{ + AutoDiscovery: true, + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: false, + }, + Timeout: internal.Duration{Duration: defaultTimeout}, + } + }) +} + +// ClickHouse Telegraf Input Plugin +type ClickHouse struct { + User string `toml:"user"` + Password string `toml:"password"` + Servers []string `toml:"servers"` + AutoDiscovery bool `toml:"auto_discovery"` + ClusterInclude []string `toml:"cluster_include"` + ClusterExclude []string `toml:"cluster_exclude"` + Timeout internal.Duration `toml:"timeout"` + client http.Client + tls.ClientConfig +} + +// SampleConfig returns the sample config +func (*ClickHouse) SampleConfig() string { + return sampleConfig +} + +// Description return plugin description +func (*ClickHouse) Description() string { + return "Read metrics from one or many ClickHouse servers" +} + +// Start ClickHouse input service +func (ch *ClickHouse) Start(telegraf.Accumulator) error { + timeout := defaultTimeout + if ch.Timeout.Duration != 0 { + timeout = ch.Timeout.Duration + } + tlsCfg, err := ch.ClientConfig.TLSConfig() + if err != nil { + return err + } + + ch.client = http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + }, + } + return nil +} + +// Gather collect data from ClickHouse server +func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { + var ( + connects []connect + exists = func(host string) bool { + for _, c := range connects { + if c.Hostname == host { + return true + } + } + return false + } + ) + + for _, server := range ch.Servers { + u, err := url.Parse(server) + if err != nil { + return err + } + switch { + case ch.AutoDiscovery: + var conns []connect + if err := ch.execQuery(u, "SELECT cluster, shard_num, host_name FROM system.clusters "+ch.clusterIncludeExcludeFilter(), &conns); err != nil { + acc.AddError(err) + continue + } + for _, c := range conns { + if !exists(c.Hostname) { + c.url = &url.URL{ + Scheme: u.Scheme, + Host: net.JoinHostPort(c.Hostname, u.Port()), + } + connects = append(connects, c) + } + } + default: + connects = append(connects, connect{ + url: u, + }) + } + } + + for _, conn := range connects { + if err := ch.tables(acc, &conn); err != nil { + acc.AddError(err) + } + for metric := range commonMetrics { + if err := ch.commonMetrics(acc, &conn, metric); err != nil { + acc.AddError(err) + } + } + } + return nil +} + +func (ch *ClickHouse) clusterIncludeExcludeFilter() string { + if len(ch.ClusterInclude) == 0 && len(ch.ClusterExclude) == 0 { + return "" + } + var ( + escape = func(in string) string { + return "'" + strings.NewReplacer(`\`, `\\`, `'`, `\'`).Replace(in) + "'" + } + makeFilter = func(expr string, args []string) string { + in := make([]string, 0, len(args)) + for _, v := range args { + in = append(in, escape(v)) + } + return fmt.Sprintf("cluster %s (%s)", expr, strings.Join(in, ", ")) + } + includeFilter, excludeFilter string + ) + + if len(ch.ClusterInclude) != 0 { + includeFilter = makeFilter("IN", ch.ClusterInclude) + } + if len(ch.ClusterExclude) != 0 { + excludeFilter = makeFilter("NOT IN", ch.ClusterExclude) + } + if includeFilter != "" && excludeFilter != "" { + return "WHERE " + includeFilter + " OR " + excludeFilter + } + if includeFilter == "" && excludeFilter != "" { + return "WHERE " + excludeFilter + } + if includeFilter != "" && excludeFilter == "" { + return "WHERE " + includeFilter + } + return "" +} + +func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error { + var result []struct { + Metric string `json:"metric"` + Value chUInt64 `json:"value"` + } + if err := ch.execQuery(conn.url, commonMetrics[metric], &result); err != nil { + return err + } + + tags := map[string]string{ + "source": conn.Hostname, + } + if len(conn.Cluster) != 0 { + tags["cluster"] = conn.Cluster + } + if conn.ShardNum != 0 { + tags["shard_num"] = strconv.Itoa(conn.ShardNum) + } + + fields := make(map[string]interface{}) + for _, r := range result { + fields[internal.SnakeCase(r.Metric)] = uint64(r.Value) + } + + acc.AddFields("clickhouse_"+metric, fields, tags) + + return nil +} + +func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error { + var parts []struct { + Database string `json:"database"` + Table string `json:"table"` + Bytes chUInt64 `json:"bytes"` + Parts chUInt64 `json:"parts"` + Rows chUInt64 `json:"rows"` + } + + if err := ch.execQuery(conn.url, systemParts, &parts); err != nil { + return err + } + tags := map[string]string{ + "source": conn.Hostname, + } + if len(conn.Cluster) != 0 { + tags["cluster"] = conn.Cluster + } + if conn.ShardNum != 0 { + tags["shard_num"] = strconv.Itoa(conn.ShardNum) + } + for _, part := range parts { + tags["table"] = part.Table + tags["database"] = part.Database + acc.AddFields("clickhouse_tables", + map[string]interface{}{ + "bytes": uint64(part.Bytes), + "parts": uint64(part.Parts), + "rows": uint64(part.Rows), + }, + tags, + ) + } + return nil +} + +type clickhouseError struct { + StatusCode int + body []byte +} + +func (e *clickhouseError) Error() string { + return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body) +} + +func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error { + q := url.Query() + q.Set("query", query+" FORMAT JSON") + url.RawQuery = q.Encode() + req, _ := http.NewRequest("GET", url.String(), nil) + if ch.User != "" { + req.Header.Add("X-ClickHouse-User", ch.User) + } + if ch.Password != "" { + req.Header.Add("X-ClickHouse-Key", ch.Password) + } + resp, err := ch.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) + return &clickhouseError{ + StatusCode: resp.StatusCode, + body: body, + } + } + var response struct { + Data json.RawMessage + } + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return err + } + return json.Unmarshal(response.Data, i) +} + +// see https://clickhouse.yandex/docs/en/operations/settings/settings/#session_settings-output_format_json_quote_64bit_integers +type chUInt64 uint64 + +func (i *chUInt64) UnmarshalJSON(b []byte) error { + b = bytes.TrimPrefix(b, []byte(`"`)) + b = bytes.TrimSuffix(b, []byte(`"`)) + v, err := strconv.ParseUint(string(b), 10, 64) + if err != nil { + return err + } + *i = chUInt64(v) + return nil +} + +const ( + systemEventsSQL = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events" + systemMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.metrics" + systemAsyncMetricsSQL = "SELECT metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics" + systemParts = ` + SELECT + database, + table, + SUM(bytes) AS bytes, + COUNT(*) AS parts, + SUM(rows) AS rows + FROM system.parts + WHERE active = 1 + GROUP BY + database, table + ORDER BY + database, table + ` +) + +var commonMetrics = map[string]string{ + "events": systemEventsSQL, + "metrics": systemMetricsSQL, + "asynchronous_metrics": systemAsyncMetricsSQL, +} + +var _ telegraf.ServiceInput = &ClickHouse{} diff --git a/plugins/inputs/clickhouse/clickhouse_go1.11.go b/plugins/inputs/clickhouse/clickhouse_go1.11.go new file mode 100644 index 000000000..e043dd492 --- /dev/null +++ b/plugins/inputs/clickhouse/clickhouse_go1.11.go @@ -0,0 +1,6 @@ +// +build !go1.12 + +package clickhouse + +// Stop ClickHouse input service +func (ch *ClickHouse) Stop() {} diff --git a/plugins/inputs/clickhouse/clickhouse_go1.12.go b/plugins/inputs/clickhouse/clickhouse_go1.12.go new file mode 100644 index 000000000..86bb69e2b --- /dev/null +++ b/plugins/inputs/clickhouse/clickhouse_go1.12.go @@ -0,0 +1,8 @@ +// +build go1.12 + +package clickhouse + +// Stop ClickHouse input service +func (ch *ClickHouse) Stop() { + ch.client.CloseIdleConnections() +} diff --git a/plugins/inputs/clickhouse/clickhouse_test.go b/plugins/inputs/clickhouse/clickhouse_test.go new file mode 100644 index 000000000..382d2148a --- /dev/null +++ b/plugins/inputs/clickhouse/clickhouse_test.go @@ -0,0 +1,161 @@ +package clickhouse + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func TestClusterIncludeExcludeFilter(t *testing.T) { + ch := ClickHouse{} + if assert.Equal(t, "", ch.clusterIncludeExcludeFilter()) { + ch.ClusterExclude = []string{"test_cluster"} + assert.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) + + ch.ClusterExclude = []string{"test_cluster"} + ch.ClusterInclude = []string{"cluster"} + assert.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) + + ch.ClusterExclude = []string{} + ch.ClusterInclude = []string{"cluster1", "cluster2"} + assert.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) + + ch.ClusterExclude = []string{"cluster1", "cluster2"} + ch.ClusterInclude = []string{} + assert.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) + } +} + +func TestChInt64(t *testing.T) { + assets := map[string]uint64{ + `"1"`: 1, + "1": 1, + "42": 42, + `"42"`: 42, + "18446743937525109187": 18446743937525109187, + } + for src, expected := range assets { + var v chUInt64 + if err := v.UnmarshalJSON([]byte(src)); assert.NoError(t, err) { + assert.Equal(t, expected, uint64(v)) + } + } +} + +func TestGather(t *testing.T) { + var ( + ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + type result struct { + Data interface{} `json:"data"` + } + enc := json.NewEncoder(w) + switch query := r.URL.Query().Get("query"); { + case strings.Contains(query, "system.parts"): + enc.Encode(result{ + Data: []struct { + Database string `json:"database"` + Table string `json:"table"` + Bytes chUInt64 `json:"bytes"` + Parts chUInt64 `json:"parts"` + Rows chUInt64 `json:"rows"` + }{ + { + Database: "test_database", + Table: "test_table", + Bytes: 1, + Parts: 10, + Rows: 100, + }, + }, + }) + case strings.Contains(query, "system.events"): + enc.Encode(result{ + Data: []struct { + Metric string `json:"metric"` + Value chUInt64 `json:"value"` + }{ + { + Metric: "TestSystemEvent", + Value: 1000, + }, + { + Metric: "TestSystemEvent2", + Value: 2000, + }, + }, + }) + case strings.Contains(query, "system.metrics"): + enc.Encode(result{ + Data: []struct { + Metric string `json:"metric"` + Value chUInt64 `json:"value"` + }{ + { + Metric: "TestSystemMetric", + Value: 1000, + }, + { + Metric: "TestSystemMetric2", + Value: 2000, + }, + }, + }) + case strings.Contains(query, "system.asynchronous_metrics"): + enc.Encode(result{ + Data: []struct { + Metric string `json:"metric"` + Value chUInt64 `json:"value"` + }{ + { + Metric: "TestSystemAsynchronousMetric", + Value: 1000, + }, + { + Metric: "TestSystemAsynchronousMetric2", + Value: 2000, + }, + }, + }) + } + })) + ch = &ClickHouse{ + Servers: []string{ + ts.URL, + }, + } + acc = &testutil.Accumulator{} + ) + defer ts.Close() + ch.Gather(acc) + + acc.AssertContainsFields(t, "clickhouse_tables", + map[string]interface{}{ + "bytes": uint64(1), + "parts": uint64(10), + "rows": uint64(100), + }, + ) + acc.AssertContainsFields(t, "clickhouse_events", + map[string]interface{}{ + "test_system_event": uint64(1000), + "test_system_event2": uint64(2000), + }, + ) + acc.AssertContainsFields(t, "clickhouse_metrics", + map[string]interface{}{ + "test_system_metric": uint64(1000), + "test_system_metric2": uint64(2000), + }, + ) + acc.AssertContainsFields(t, "clickhouse_asynchronous_metrics", + map[string]interface{}{ + "test_system_asynchronous_metric": uint64(1000), + "test_system_asynchronous_metric2": uint64(2000), + }, + ) +} diff --git a/plugins/inputs/clickhouse/dev/dhparam.pem b/plugins/inputs/clickhouse/dev/dhparam.pem new file mode 100644 index 000000000..5ae6d7bbe --- /dev/null +++ b/plugins/inputs/clickhouse/dev/dhparam.pem @@ -0,0 +1,13 @@ +-----BEGIN DH PARAMETERS----- +MIICCAKCAgEAoo1x7wI5K57P1/AkHUmVWzKNfy46b/ni/QtClomTB78Ks1FP8dzs +CQBW/pfL8yidxTialNhMRCZO1J+uPjTvd8dG8SFZzVylkF41LBNrUD+MLyh/b6Nr +8uWf3tqYCtsiqsQsnq/oU7C29wn6UjhPPVbRRDPGyJUFOgp0ebPR0L2gOc5HhXSF +Tt0fuWnvgZJBKGvyodby3p2CSheu8K6ZteVc8ZgHuanhCQA30nVN+yNQzyozlB2H +B9jxTDPJy8+/4Mui3iiNyXg6FaiI9lWdH7xgKoZlHi8BWlLz5Se9JVNYg0dPrMTz +K0itQyyTKUlK73x+1uPm6q1AJwz08EZiCXNbk58/Sf+pdwDmAO2QSRrERC73vnvc +B1+4+Kf7RS7oYpAHknKm/MFnkCJLVIq1b6kikYcIgVCYe+Z1UytSmG1QfwdgL8QQ +TVYVHBg4w07+s3/IJ1ekvNhdxpkmmevYt7GjohWu8vKkip4se+reNdo+sqLsgFKf +1IuDMD36zn9FVukvs7e3BwZCTkdosGHvHGjA7zm2DwPPO16hCvJ4mE6ULLpp2NEw +EBYWm3Tv6M/xtrF5Afyh0gAh7eL767/qsarbx6jlqs+dnh3LptqsE3WerWK54+0B +3Hr5CVfgYbeXuW2HeFb+fS6CNUWmiAsq1XRiz5p16hpeMGYN/qyF1IsCAQI= +-----END DH PARAMETERS----- diff --git a/plugins/inputs/clickhouse/dev/docker-compose.yml b/plugins/inputs/clickhouse/dev/docker-compose.yml new file mode 100644 index 000000000..a8b22c34d --- /dev/null +++ b/plugins/inputs/clickhouse/dev/docker-compose.yml @@ -0,0 +1,16 @@ +version: '3' + +services: + clickhouse: + image: yandex/clickhouse-server:latest + volumes: + - ./dhparam.pem:/etc/clickhouse-server/dhparam.pem + - ./tls_settings.xml:/etc/clickhouse-server/config.d/00-tls_settings.xml + - ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key + - ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt + restart: always + ports: + - 8123:8123 + - 8443:8443 + - 9000:9000 + - 9009:9009 \ No newline at end of file diff --git a/plugins/inputs/clickhouse/dev/telegraf.conf b/plugins/inputs/clickhouse/dev/telegraf.conf new file mode 100644 index 000000000..883baf845 --- /dev/null +++ b/plugins/inputs/clickhouse/dev/telegraf.conf @@ -0,0 +1,12 @@ +### ClickHouse input plugin + +[[inputs.clickhouse]] + timeout = 2 + user = "default" + servers = ["http://127.0.0.1:8123"] + auto_discovery = true + cluster_include = [] + cluster_exclude = ["test_shard_localhost"] + +[[outputs.file]] + files = ["stdout"] diff --git a/plugins/inputs/clickhouse/dev/telegraf_ssl.conf b/plugins/inputs/clickhouse/dev/telegraf_ssl.conf new file mode 100644 index 000000000..21288d84f --- /dev/null +++ b/plugins/inputs/clickhouse/dev/telegraf_ssl.conf @@ -0,0 +1,16 @@ +### ClickHouse input plugin + +[[inputs.clickhouse]] + timeout = 2 + user = "default" + servers = ["https://127.0.0.1:8443"] + auto_discovery = true + cluster_include = [] + cluster_exclude = ["test_shard_localhost"] + insecure_skip_verify = false + tls_cert = "./testutil/pki/clientcert.pem" + tls_key = "./testutil/pki/clientkey.pem" + tls_ca = "./testutil/pki/cacert.pem" + +[[outputs.file]] + files = ["stdout"] diff --git a/plugins/inputs/clickhouse/dev/tls_settings.xml b/plugins/inputs/clickhouse/dev/tls_settings.xml new file mode 100644 index 000000000..cf6716b82 --- /dev/null +++ b/plugins/inputs/clickhouse/dev/tls_settings.xml @@ -0,0 +1,4 @@ + + 8443 + 9440 + \ No newline at end of file