Add ClickHouse input plugin (#6441)
This commit is contained in:
		
							parent
							
								
									b6892378a0
								
							
						
					
					
						commit
						e4caa347a2
					
				|  | @ -2,4 +2,4 @@ | ||||||
| /telegraf | /telegraf | ||||||
| /telegraf.exe | /telegraf.exe | ||||||
| /telegraf.gz | /telegraf.gz | ||||||
| /vendor | /vendor | ||||||
|  | @ -19,6 +19,7 @@ import ( | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/chrony" | 	_ "github.com/influxdata/telegraf/plugins/inputs/chrony" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_gnmi" | 	_ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_gnmi" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_mdt" | 	_ "github.com/influxdata/telegraf/plugins/inputs/cisco_telemetry_mdt" | ||||||
|  | 	_ "github.com/influxdata/telegraf/plugins/inputs/clickhouse" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" | 	_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push" | 	_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push" | ||||||
| 	_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" | 	_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch" | ||||||
|  |  | ||||||
|  | @ -0,0 +1,119 @@ | ||||||
|  | # Telegraf Input Plugin: ClickHouse | ||||||
|  | 
 | ||||||
|  | This plugin gathers the statistic data from [ClickHouse](https://github.com/ClickHouse/ClickHouse)  server. | ||||||
|  | 
 | ||||||
|  | ### Configuration | ||||||
|  | ```ini | ||||||
|  | # Read metrics from one or many ClickHouse servers | ||||||
|  | [[inputs.clickhouse]] | ||||||
|  |   ## Username for authorization on ClickHouse server | ||||||
|  |   ## example: user = "default" | ||||||
|  |   user = "default" | ||||||
|  |      | ||||||
|  |   ## Password for authorization on ClickHouse server | ||||||
|  |   ## example: password = "super_secret" | ||||||
|  | 
 | ||||||
|  |   ## HTTP(s) timeout while getting metrics values  | ||||||
|  |   ## The timeout includes connection time, any redirects, and reading the response body. | ||||||
|  |   ##   example: timeout = 1s | ||||||
|  |   # timeout = 5s  | ||||||
|  | 
 | ||||||
|  |   ## List of servers for metrics scraping | ||||||
|  |   ## metrics scrape via HTTP(s) clickhouse interface | ||||||
|  |   ## https://clickhouse.tech/docs/en/interfaces/http/ | ||||||
|  |   ##    example: servers = ["http://127.0.0.1:8123","https://custom-server.mdb.yandexcloud.net"] | ||||||
|  |   servers         = ["http://127.0.0.1:8123"] | ||||||
|  | 
 | ||||||
|  |   ## If "auto_discovery"" is "true" plugin tries to connect to all servers available in the cluster | ||||||
|  |   ## with using same "user:password" described in "user" and "password" parameters | ||||||
|  |   ## and get this server hostname list from "system.clusters" table | ||||||
|  |   ## see | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/system_tables/#system-clusters | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/server_settings/settings/#server_settings_remote_servers | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/table_engines/distributed/ | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/table_engines/replication/#creating-replicated-tables | ||||||
|  |   ##    example: auto_discovery = false | ||||||
|  |   # auto_discovery = true | ||||||
|  | 
 | ||||||
|  |   ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" | ||||||
|  |   ## when this filter present then "WHERE cluster IN (...)" filter will apply | ||||||
|  |   ## please use only full cluster names here, regexp and glob filters is not allowed | ||||||
|  |   ## for "/etc/clickhouse-server/config.d/remote.xml" | ||||||
|  |   ## <yandex> | ||||||
|  |   ##  <remote_servers> | ||||||
|  |   ##    <my-own-cluster> | ||||||
|  |   ##        <shard> | ||||||
|  |   ##          <replica><host>clickhouse-ru-1.local</host><port>9000</port></replica> | ||||||
|  |   ##          <replica><host>clickhouse-ru-2.local</host><port>9000</port></replica> | ||||||
|  |   ##        </shard> | ||||||
|  |   ##        <shard> | ||||||
|  |   ##          <replica><host>clickhouse-eu-1.local</host><port>9000</port></replica> | ||||||
|  |   ##          <replica><host>clickhouse-eu-2.local</host><port>9000</port></replica> | ||||||
|  |   ##        </shard> | ||||||
|  |   ##    </my-onw-cluster> | ||||||
|  |   ##  </remote_servers> | ||||||
|  |   ## | ||||||
|  |   ## </yandex> | ||||||
|  |   ##  | ||||||
|  |   ## example: cluster_include = ["my-own-cluster"] | ||||||
|  |   # cluster_include = [] | ||||||
|  | 
 | ||||||
|  |   ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" | ||||||
|  |   ## when this filter present then "WHERE cluster NOT IN (...)" filter will apply | ||||||
|  |   ##    example: cluster_exclude = ["my-internal-not-discovered-cluster"] | ||||||
|  |   # cluster_exclude = [] | ||||||
|  | 
 | ||||||
|  |   ## Optional TLS Config | ||||||
|  |   # tls_ca = "/etc/telegraf/ca.pem" | ||||||
|  |   # tls_cert = "/etc/telegraf/cert.pem" | ||||||
|  |   # tls_key = "/etc/telegraf/key.pem" | ||||||
|  |   ## Use TLS but skip chain & host verification | ||||||
|  |   # insecure_skip_verify = false | ||||||
|  | ``` | ||||||
|  | 
 | ||||||
|  | ### Metrics: | ||||||
|  | - clickhouse_events | ||||||
|  |   - tags: | ||||||
|  |     - hostname (ClickHouse server hostname) | ||||||
|  |     - cluster (Name of the cluster [optional]) | ||||||
|  |     - shard_num (Shard number in the cluster [optional]) | ||||||
|  |   - fields: | ||||||
|  |     - all rows from system.events, all metrics is COUNTER type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-events | ||||||
|  | 
 | ||||||
|  | - clickhouse_metrics | ||||||
|  |   - tags: | ||||||
|  |     - hostname (ClickHouse server hostname) | ||||||
|  |     - cluster (Name of the cluster [optional]) | ||||||
|  |     - shard_num (Shard number in the cluster [optional]) | ||||||
|  |   - fields: | ||||||
|  |     - all rows from system.metrics, all metrics is GAUGE type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-metrics | ||||||
|  | 
 | ||||||
|  | - clickhouse_asynchronous_metrics | ||||||
|  |   - tags: | ||||||
|  |     - hostname (ClickHouse server hostname) | ||||||
|  |     - cluster (Name of the cluster [optional]) | ||||||
|  |     - shard_num (Shard number in the cluster [optional]) | ||||||
|  |   - fields: | ||||||
|  |     - all rows from system.asynchronous_metrics, all metrics is GAUGE type, look https://clickhouse.tech/docs/en/operations/system_tables/#system_tables-asynchronous_metrics | ||||||
|  | 
 | ||||||
|  | - clickhouse_tables | ||||||
|  |   - tags: | ||||||
|  |     - hostname (ClickHouse server hostname) | ||||||
|  |     - table | ||||||
|  |     - database | ||||||
|  |     - cluster (Name of the cluster [optional]) | ||||||
|  |     - shard_num (Shard number in the cluster [optional]) | ||||||
|  |   - fields: | ||||||
|  |     - bytes | ||||||
|  |     - parts | ||||||
|  |     - rows | ||||||
|  | 
 | ||||||
|  | ### Example Output | ||||||
|  | 
 | ||||||
|  | ``` | ||||||
|  | clickhouse_events,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 read_compressed_bytes=212i,arena_alloc_chunks=35i,function_execute=85i,merge_tree_data_writer_rows=3i,rw_lock_acquired_read_locks=421i,file_open=46i,io_buffer_alloc_bytes=86451985i,inserted_bytes=196i,regexp_created=3i,real_time_microseconds=116832i,query=23i,network_receive_elapsed_microseconds=268i,merge_tree_data_writer_compressed_bytes=1080i,arena_alloc_bytes=212992i,disk_write_elapsed_microseconds=556i,inserted_rows=3i,compressed_read_buffer_bytes=81i,read_buffer_from_file_descriptor_read_bytes=148i,write_buffer_from_file_descriptor_write=47i,merge_tree_data_writer_blocks=3i,soft_page_faults=896i,hard_page_faults=7i,select_query=21i,merge_tree_data_writer_uncompressed_bytes=196i,merge_tree_data_writer_blocks_already_sorted=3i,user_time_microseconds=40196i,compressed_read_buffer_blocks=5i,write_buffer_from_file_descriptor_write_bytes=3246i,io_buffer_allocs=296i,created_write_buffer_ordinary=12i,disk_read_elapsed_microseconds=59347044i,network_send_elapsed_microseconds=1538i,context_lock=1040i,insert_query=1i,system_time_microseconds=14582i,read_buffer_from_file_descriptor_read=3i 1569421000000000000 | ||||||
|  | clickhouse_asynchronous_metrics,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 jemalloc.metadata_thp=0i,replicas_max_relative_delay=0i,jemalloc.mapped=1803177984i,jemalloc.allocated=1724839256i,jemalloc.background_thread.run_interval=0i,jemalloc.background_thread.num_threads=0i,uncompressed_cache_cells=0i,replicas_max_absolute_delay=0i,mark_cache_bytes=0i,compiled_expression_cache_count=0i,replicas_sum_queue_size=0i,number_of_tables=35i,replicas_max_merges_in_queue=0i,replicas_max_inserts_in_queue=0i,replicas_sum_merges_in_queue=0i,replicas_max_queue_size=0i,mark_cache_files=0i,jemalloc.background_thread.num_runs=0i,jemalloc.active=1726210048i,uptime=158i,jemalloc.retained=380481536i,replicas_sum_inserts_in_queue=0i,uncompressed_cache_bytes=0i,number_of_databases=2i,jemalloc.metadata=9207704i,max_part_count_for_partition=1i,jemalloc.resident=1742442496i 1569421000000000000 | ||||||
|  | clickhouse_metrics,cluster=test_cluster_two_shards_localhost,host=kshvakov,hostname=localhost,shard_num=1 replicated_send=0i,write=0i,ephemeral_node=0i,zoo_keeper_request=0i,distributed_files_to_insert=0i,replicated_fetch=0i,background_schedule_pool_task=0i,interserver_connection=0i,leader_replica=0i,delayed_inserts=0i,global_thread_active=41i,merge=0i,readonly_replica=0i,memory_tracking_in_background_schedule_pool=0i,memory_tracking_for_merges=0i,zoo_keeper_session=0i,context_lock_wait=0i,storage_buffer_bytes=0i,background_pool_task=0i,send_external_tables=0i,zoo_keeper_watch=0i,part_mutation=0i,disk_space_reserved_for_merge=0i,distributed_send=0i,version_integer=19014003i,local_thread=0i,replicated_checks=0i,memory_tracking=0i,memory_tracking_in_background_processing_pool=0i,leader_election=0i,revision=54425i,open_file_for_read=0i,open_file_for_write=0i,storage_buffer_rows=0i,rw_lock_waiting_readers=0i,rw_lock_waiting_writers=0i,rw_lock_active_writers=0i,local_thread_active=0i,query_preempted=0i,tcp_connection=1i,http_connection=1i,read=2i,query_thread=0i,dict_cache_requests=0i,rw_lock_active_readers=1i,global_thread=43i,query=1i 1569421000000000000 | ||||||
|  | clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=system,host=kshvakov,hostname=localhost,shard_num=1,table=trace_log bytes=754i,parts=1i,rows=1i 1569421000000000000 | ||||||
|  | clickhouse_tables,cluster=test_cluster_two_shards_localhost,database=default,host=kshvakov,hostname=localhost,shard_num=1,table=example bytes=326i,parts=2i,rows=2i 1569421000000000000 | ||||||
|  | ``` | ||||||
|  | @ -0,0 +1,390 @@ | ||||||
|  | package clickhouse | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"io/ioutil" | ||||||
|  | 	"net" | ||||||
|  | 	"net/http" | ||||||
|  | 	"net/url" | ||||||
|  | 	"strconv" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf" | ||||||
|  | 	"github.com/influxdata/telegraf/internal" | ||||||
|  | 	"github.com/influxdata/telegraf/internal/tls" | ||||||
|  | 	"github.com/influxdata/telegraf/plugins/inputs" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var defaultTimeout = 5 * time.Second | ||||||
|  | 
 | ||||||
|  | var sampleConfig = ` | ||||||
|  |   ## Username for authorization on ClickHouse server | ||||||
|  |   ## example: user = "default"" | ||||||
|  |   user = "default" | ||||||
|  |      | ||||||
|  |   ## Password for authorization on ClickHouse server | ||||||
|  |   ## example: password = "super_secret" | ||||||
|  | 
 | ||||||
|  |   ## HTTP(s) timeout while getting metrics values  | ||||||
|  |   ## The timeout includes connection time, any redirects, and reading the response body. | ||||||
|  |   ##   example: timeout = 1s | ||||||
|  |   # timeout = 5s  | ||||||
|  | 
 | ||||||
|  |   ## List of servers for metrics scraping | ||||||
|  |   ## metrics scrape via HTTP(s) clickhouse interface | ||||||
|  |   ## https://clickhouse.tech/docs/en/interfaces/http/
 | ||||||
|  |   ##    example: servers = ["http://127.0.0.1:8123","https://custom-server.mdb.yandexcloud.net"] | ||||||
|  |   servers         = ["http://127.0.0.1:8123"] | ||||||
|  | 
 | ||||||
|  |   ## If "auto_discovery"" is "true" plugin tries to connect to all servers available in the cluster | ||||||
|  |   ## with using same "user:password" described in "user" and "password" parameters | ||||||
|  |   ## and get this server hostname list from "system.clusters" table | ||||||
|  |   ## see | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/system_tables/#system-clusters
 | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/server_settings/settings/#server_settings_remote_servers
 | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/table_engines/distributed/
 | ||||||
|  |   ## - https://clickhouse.tech/docs/en/operations/table_engines/replication/#creating-replicated-tables
 | ||||||
|  |   ##    example: auto_discovery = false | ||||||
|  |   # auto_discovery = true | ||||||
|  | 
 | ||||||
|  |   ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" | ||||||
|  |   ## when this filter present then "WHERE cluster IN (...)" filter will apply | ||||||
|  |   ## please use only full cluster names here, regexp and glob filters is not allowed | ||||||
|  |   ## for "/etc/clickhouse-server/config.d/remote.xml" | ||||||
|  |   ## <yandex> | ||||||
|  |   ##  <remote_servers> | ||||||
|  |   ##    <my-own-cluster> | ||||||
|  |   ##        <shard> | ||||||
|  |   ##          <replica><host>clickhouse-ru-1.local</host><port>9000</port></replica> | ||||||
|  |   ##          <replica><host>clickhouse-ru-2.local</host><port>9000</port></replica> | ||||||
|  |   ##        </shard> | ||||||
|  |   ##        <shard> | ||||||
|  |   ##          <replica><host>clickhouse-eu-1.local</host><port>9000</port></replica> | ||||||
|  |   ##          <replica><host>clickhouse-eu-2.local</host><port>9000</port></replica> | ||||||
|  |   ##        </shard> | ||||||
|  |   ##    </my-onw-cluster> | ||||||
|  |   ##  </remote_servers> | ||||||
|  |   ## | ||||||
|  |   ## </yandex> | ||||||
|  |   ##  | ||||||
|  |   ## example: cluster_include = ["my-own-cluster"] | ||||||
|  |   # cluster_include = [] | ||||||
|  | 
 | ||||||
|  |   ## Filter cluster names in "system.clusters" when "auto_discovery" is "true" | ||||||
|  |   ## when this filter present then "WHERE cluster NOT IN (...)" filter will apply | ||||||
|  |   ##    example: cluster_exclude = ["my-internal-not-discovered-cluster"] | ||||||
|  |   # cluster_exclude = [] | ||||||
|  | 
 | ||||||
|  |   ## Optional TLS Config | ||||||
|  |   # tls_ca = "/etc/telegraf/ca.pem" | ||||||
|  |   # tls_cert = "/etc/telegraf/cert.pem" | ||||||
|  |   # tls_key = "/etc/telegraf/key.pem" | ||||||
|  |   ## Use TLS but skip chain & host verification | ||||||
|  |   # insecure_skip_verify = false | ||||||
|  | ` | ||||||
|  | 
 | ||||||
|  | type connect struct { | ||||||
|  | 	Cluster  string `json:"cluster"` | ||||||
|  | 	ShardNum int    `json:"shard_num"` | ||||||
|  | 	Hostname string `json:"host_name"` | ||||||
|  | 	url      *url.URL | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func init() { | ||||||
|  | 	inputs.Add("clickhouse", func() telegraf.Input { | ||||||
|  | 		return &ClickHouse{ | ||||||
|  | 			AutoDiscovery: true, | ||||||
|  | 			ClientConfig: tls.ClientConfig{ | ||||||
|  | 				InsecureSkipVerify: false, | ||||||
|  | 			}, | ||||||
|  | 			Timeout: internal.Duration{Duration: defaultTimeout}, | ||||||
|  | 		} | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ClickHouse Telegraf Input Plugin
 | ||||||
|  | type ClickHouse struct { | ||||||
|  | 	User           string            `toml:"user"` | ||||||
|  | 	Password       string            `toml:"password"` | ||||||
|  | 	Servers        []string          `toml:"servers"` | ||||||
|  | 	AutoDiscovery  bool              `toml:"auto_discovery"` | ||||||
|  | 	ClusterInclude []string          `toml:"cluster_include"` | ||||||
|  | 	ClusterExclude []string          `toml:"cluster_exclude"` | ||||||
|  | 	Timeout        internal.Duration `toml:"timeout"` | ||||||
|  | 	client         http.Client | ||||||
|  | 	tls.ClientConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // SampleConfig returns the sample config
 | ||||||
|  | func (*ClickHouse) SampleConfig() string { | ||||||
|  | 	return sampleConfig | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Description return plugin description
 | ||||||
|  | func (*ClickHouse) Description() string { | ||||||
|  | 	return "Read metrics from one or many ClickHouse servers" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Start ClickHouse input service
 | ||||||
|  | func (ch *ClickHouse) Start(telegraf.Accumulator) error { | ||||||
|  | 	timeout := defaultTimeout | ||||||
|  | 	if ch.Timeout.Duration != 0 { | ||||||
|  | 		timeout = ch.Timeout.Duration | ||||||
|  | 	} | ||||||
|  | 	tlsCfg, err := ch.ClientConfig.TLSConfig() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ch.client = http.Client{ | ||||||
|  | 		Timeout: timeout, | ||||||
|  | 		Transport: &http.Transport{ | ||||||
|  | 			TLSClientConfig: tlsCfg, | ||||||
|  | 			Proxy:           http.ProxyFromEnvironment, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Gather collect data from ClickHouse server
 | ||||||
|  | func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { | ||||||
|  | 	var ( | ||||||
|  | 		connects []connect | ||||||
|  | 		exists   = func(host string) bool { | ||||||
|  | 			for _, c := range connects { | ||||||
|  | 				if c.Hostname == host { | ||||||
|  | 					return true | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	for _, server := range ch.Servers { | ||||||
|  | 		u, err := url.Parse(server) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		switch { | ||||||
|  | 		case ch.AutoDiscovery: | ||||||
|  | 			var conns []connect | ||||||
|  | 			if err := ch.execQuery(u, "SELECT cluster, shard_num, host_name FROM system.clusters "+ch.clusterIncludeExcludeFilter(), &conns); err != nil { | ||||||
|  | 				acc.AddError(err) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			for _, c := range conns { | ||||||
|  | 				if !exists(c.Hostname) { | ||||||
|  | 					c.url = &url.URL{ | ||||||
|  | 						Scheme: u.Scheme, | ||||||
|  | 						Host:   net.JoinHostPort(c.Hostname, u.Port()), | ||||||
|  | 					} | ||||||
|  | 					connects = append(connects, c) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		default: | ||||||
|  | 			connects = append(connects, connect{ | ||||||
|  | 				url: u, | ||||||
|  | 			}) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, conn := range connects { | ||||||
|  | 		if err := ch.tables(acc, &conn); err != nil { | ||||||
|  | 			acc.AddError(err) | ||||||
|  | 		} | ||||||
|  | 		for metric := range commonMetrics { | ||||||
|  | 			if err := ch.commonMetrics(acc, &conn, metric); err != nil { | ||||||
|  | 				acc.AddError(err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ch *ClickHouse) clusterIncludeExcludeFilter() string { | ||||||
|  | 	if len(ch.ClusterInclude) == 0 && len(ch.ClusterExclude) == 0 { | ||||||
|  | 		return "" | ||||||
|  | 	} | ||||||
|  | 	var ( | ||||||
|  | 		escape = func(in string) string { | ||||||
|  | 			return "'" + strings.NewReplacer(`\`, `\\`, `'`, `\'`).Replace(in) + "'" | ||||||
|  | 		} | ||||||
|  | 		makeFilter = func(expr string, args []string) string { | ||||||
|  | 			in := make([]string, 0, len(args)) | ||||||
|  | 			for _, v := range args { | ||||||
|  | 				in = append(in, escape(v)) | ||||||
|  | 			} | ||||||
|  | 			return fmt.Sprintf("cluster %s (%s)", expr, strings.Join(in, ", ")) | ||||||
|  | 		} | ||||||
|  | 		includeFilter, excludeFilter string | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	if len(ch.ClusterInclude) != 0 { | ||||||
|  | 		includeFilter = makeFilter("IN", ch.ClusterInclude) | ||||||
|  | 	} | ||||||
|  | 	if len(ch.ClusterExclude) != 0 { | ||||||
|  | 		excludeFilter = makeFilter("NOT IN", ch.ClusterExclude) | ||||||
|  | 	} | ||||||
|  | 	if includeFilter != "" && excludeFilter != "" { | ||||||
|  | 		return "WHERE " + includeFilter + " OR " + excludeFilter | ||||||
|  | 	} | ||||||
|  | 	if includeFilter == "" && excludeFilter != "" { | ||||||
|  | 		return "WHERE " + excludeFilter | ||||||
|  | 	} | ||||||
|  | 	if includeFilter != "" && excludeFilter == "" { | ||||||
|  | 		return "WHERE " + includeFilter | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ch *ClickHouse) commonMetrics(acc telegraf.Accumulator, conn *connect, metric string) error { | ||||||
|  | 	var result []struct { | ||||||
|  | 		Metric string   `json:"metric"` | ||||||
|  | 		Value  chUInt64 `json:"value"` | ||||||
|  | 	} | ||||||
|  | 	if err := ch.execQuery(conn.url, commonMetrics[metric], &result); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	tags := map[string]string{ | ||||||
|  | 		"source": conn.Hostname, | ||||||
|  | 	} | ||||||
|  | 	if len(conn.Cluster) != 0 { | ||||||
|  | 		tags["cluster"] = conn.Cluster | ||||||
|  | 	} | ||||||
|  | 	if conn.ShardNum != 0 { | ||||||
|  | 		tags["shard_num"] = strconv.Itoa(conn.ShardNum) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fields := make(map[string]interface{}) | ||||||
|  | 	for _, r := range result { | ||||||
|  | 		fields[internal.SnakeCase(r.Metric)] = uint64(r.Value) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	acc.AddFields("clickhouse_"+metric, fields, tags) | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ch *ClickHouse) tables(acc telegraf.Accumulator, conn *connect) error { | ||||||
|  | 	var parts []struct { | ||||||
|  | 		Database string   `json:"database"` | ||||||
|  | 		Table    string   `json:"table"` | ||||||
|  | 		Bytes    chUInt64 `json:"bytes"` | ||||||
|  | 		Parts    chUInt64 `json:"parts"` | ||||||
|  | 		Rows     chUInt64 `json:"rows"` | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := ch.execQuery(conn.url, systemParts, &parts); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	tags := map[string]string{ | ||||||
|  | 		"source": conn.Hostname, | ||||||
|  | 	} | ||||||
|  | 	if len(conn.Cluster) != 0 { | ||||||
|  | 		tags["cluster"] = conn.Cluster | ||||||
|  | 	} | ||||||
|  | 	if conn.ShardNum != 0 { | ||||||
|  | 		tags["shard_num"] = strconv.Itoa(conn.ShardNum) | ||||||
|  | 	} | ||||||
|  | 	for _, part := range parts { | ||||||
|  | 		tags["table"] = part.Table | ||||||
|  | 		tags["database"] = part.Database | ||||||
|  | 		acc.AddFields("clickhouse_tables", | ||||||
|  | 			map[string]interface{}{ | ||||||
|  | 				"bytes": uint64(part.Bytes), | ||||||
|  | 				"parts": uint64(part.Parts), | ||||||
|  | 				"rows":  uint64(part.Rows), | ||||||
|  | 			}, | ||||||
|  | 			tags, | ||||||
|  | 		) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type clickhouseError struct { | ||||||
|  | 	StatusCode int | ||||||
|  | 	body       []byte | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (e *clickhouseError) Error() string { | ||||||
|  | 	return fmt.Sprintf("received error code %d: %s", e.StatusCode, e.body) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (ch *ClickHouse) execQuery(url *url.URL, query string, i interface{}) error { | ||||||
|  | 	q := url.Query() | ||||||
|  | 	q.Set("query", query+" FORMAT JSON") | ||||||
|  | 	url.RawQuery = q.Encode() | ||||||
|  | 	req, _ := http.NewRequest("GET", url.String(), nil) | ||||||
|  | 	if ch.User != "" { | ||||||
|  | 		req.Header.Add("X-ClickHouse-User", ch.User) | ||||||
|  | 	} | ||||||
|  | 	if ch.Password != "" { | ||||||
|  | 		req.Header.Add("X-ClickHouse-Key", ch.Password) | ||||||
|  | 	} | ||||||
|  | 	resp, err := ch.client.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer resp.Body.Close() | ||||||
|  | 	if resp.StatusCode >= 300 { | ||||||
|  | 		body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 200)) | ||||||
|  | 		return &clickhouseError{ | ||||||
|  | 			StatusCode: resp.StatusCode, | ||||||
|  | 			body:       body, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	var response struct { | ||||||
|  | 		Data json.RawMessage | ||||||
|  | 	} | ||||||
|  | 	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return json.Unmarshal(response.Data, i) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // see https://clickhouse.yandex/docs/en/operations/settings/settings/#session_settings-output_format_json_quote_64bit_integers
 | ||||||
|  | type chUInt64 uint64 | ||||||
|  | 
 | ||||||
|  | func (i *chUInt64) UnmarshalJSON(b []byte) error { | ||||||
|  | 	b = bytes.TrimPrefix(b, []byte(`"`)) | ||||||
|  | 	b = bytes.TrimSuffix(b, []byte(`"`)) | ||||||
|  | 	v, err := strconv.ParseUint(string(b), 10, 64) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	*i = chUInt64(v) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	systemEventsSQL       = "SELECT event AS metric, CAST(value AS UInt64) AS value FROM system.events" | ||||||
|  | 	systemMetricsSQL      = "SELECT          metric, CAST(value AS UInt64) AS value FROM system.metrics" | ||||||
|  | 	systemAsyncMetricsSQL = "SELECT          metric, CAST(value AS UInt64) AS value FROM system.asynchronous_metrics" | ||||||
|  | 	systemParts           = ` | ||||||
|  | 		SELECT | ||||||
|  | 			database, | ||||||
|  | 			table, | ||||||
|  | 			SUM(bytes) AS bytes, | ||||||
|  | 			COUNT(*)   AS parts, | ||||||
|  | 			SUM(rows)  AS rows | ||||||
|  | 		FROM system.parts | ||||||
|  | 		WHERE active = 1 | ||||||
|  | 		GROUP BY | ||||||
|  | 			database, table | ||||||
|  | 		ORDER BY | ||||||
|  | 			database, table | ||||||
|  | 	` | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | var commonMetrics = map[string]string{ | ||||||
|  | 	"events":               systemEventsSQL, | ||||||
|  | 	"metrics":              systemMetricsSQL, | ||||||
|  | 	"asynchronous_metrics": systemAsyncMetricsSQL, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | var _ telegraf.ServiceInput = &ClickHouse{} | ||||||
|  | @ -0,0 +1,6 @@ | ||||||
|  | // +build !go1.12
 | ||||||
|  | 
 | ||||||
|  | package clickhouse | ||||||
|  | 
 | ||||||
|  | // Stop ClickHouse input service
 | ||||||
|  | func (ch *ClickHouse) Stop() {} | ||||||
|  | @ -0,0 +1,8 @@ | ||||||
|  | // +build go1.12
 | ||||||
|  | 
 | ||||||
|  | package clickhouse | ||||||
|  | 
 | ||||||
|  | // Stop ClickHouse input service
 | ||||||
|  | func (ch *ClickHouse) Stop() { | ||||||
|  | 	ch.client.CloseIdleConnections() | ||||||
|  | } | ||||||
|  | @ -0,0 +1,161 @@ | ||||||
|  | package clickhouse | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"net/http" | ||||||
|  | 	"net/http/httptest" | ||||||
|  | 	"strings" | ||||||
|  | 	"testing" | ||||||
|  | 
 | ||||||
|  | 	"github.com/influxdata/telegraf/testutil" | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | func TestClusterIncludeExcludeFilter(t *testing.T) { | ||||||
|  | 	ch := ClickHouse{} | ||||||
|  | 	if assert.Equal(t, "", ch.clusterIncludeExcludeFilter()) { | ||||||
|  | 		ch.ClusterExclude = []string{"test_cluster"} | ||||||
|  | 		assert.Equal(t, "WHERE cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) | ||||||
|  | 
 | ||||||
|  | 		ch.ClusterExclude = []string{"test_cluster"} | ||||||
|  | 		ch.ClusterInclude = []string{"cluster"} | ||||||
|  | 		assert.Equal(t, "WHERE cluster IN ('cluster') OR cluster NOT IN ('test_cluster')", ch.clusterIncludeExcludeFilter()) | ||||||
|  | 
 | ||||||
|  | 		ch.ClusterExclude = []string{} | ||||||
|  | 		ch.ClusterInclude = []string{"cluster1", "cluster2"} | ||||||
|  | 		assert.Equal(t, "WHERE cluster IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) | ||||||
|  | 
 | ||||||
|  | 		ch.ClusterExclude = []string{"cluster1", "cluster2"} | ||||||
|  | 		ch.ClusterInclude = []string{} | ||||||
|  | 		assert.Equal(t, "WHERE cluster NOT IN ('cluster1', 'cluster2')", ch.clusterIncludeExcludeFilter()) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestChInt64(t *testing.T) { | ||||||
|  | 	assets := map[string]uint64{ | ||||||
|  | 		`"1"`:                  1, | ||||||
|  | 		"1":                    1, | ||||||
|  | 		"42":                   42, | ||||||
|  | 		`"42"`:                 42, | ||||||
|  | 		"18446743937525109187": 18446743937525109187, | ||||||
|  | 	} | ||||||
|  | 	for src, expected := range assets { | ||||||
|  | 		var v chUInt64 | ||||||
|  | 		if err := v.UnmarshalJSON([]byte(src)); assert.NoError(t, err) { | ||||||
|  | 			assert.Equal(t, expected, uint64(v)) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func TestGather(t *testing.T) { | ||||||
|  | 	var ( | ||||||
|  | 		ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 			type result struct { | ||||||
|  | 				Data interface{} `json:"data"` | ||||||
|  | 			} | ||||||
|  | 			enc := json.NewEncoder(w) | ||||||
|  | 			switch query := r.URL.Query().Get("query"); { | ||||||
|  | 			case strings.Contains(query, "system.parts"): | ||||||
|  | 				enc.Encode(result{ | ||||||
|  | 					Data: []struct { | ||||||
|  | 						Database string   `json:"database"` | ||||||
|  | 						Table    string   `json:"table"` | ||||||
|  | 						Bytes    chUInt64 `json:"bytes"` | ||||||
|  | 						Parts    chUInt64 `json:"parts"` | ||||||
|  | 						Rows     chUInt64 `json:"rows"` | ||||||
|  | 					}{ | ||||||
|  | 						{ | ||||||
|  | 							Database: "test_database", | ||||||
|  | 							Table:    "test_table", | ||||||
|  | 							Bytes:    1, | ||||||
|  | 							Parts:    10, | ||||||
|  | 							Rows:     100, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}) | ||||||
|  | 			case strings.Contains(query, "system.events"): | ||||||
|  | 				enc.Encode(result{ | ||||||
|  | 					Data: []struct { | ||||||
|  | 						Metric string   `json:"metric"` | ||||||
|  | 						Value  chUInt64 `json:"value"` | ||||||
|  | 					}{ | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemEvent", | ||||||
|  | 							Value:  1000, | ||||||
|  | 						}, | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemEvent2", | ||||||
|  | 							Value:  2000, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}) | ||||||
|  | 			case strings.Contains(query, "system.metrics"): | ||||||
|  | 				enc.Encode(result{ | ||||||
|  | 					Data: []struct { | ||||||
|  | 						Metric string   `json:"metric"` | ||||||
|  | 						Value  chUInt64 `json:"value"` | ||||||
|  | 					}{ | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemMetric", | ||||||
|  | 							Value:  1000, | ||||||
|  | 						}, | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemMetric2", | ||||||
|  | 							Value:  2000, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}) | ||||||
|  | 			case strings.Contains(query, "system.asynchronous_metrics"): | ||||||
|  | 				enc.Encode(result{ | ||||||
|  | 					Data: []struct { | ||||||
|  | 						Metric string   `json:"metric"` | ||||||
|  | 						Value  chUInt64 `json:"value"` | ||||||
|  | 					}{ | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemAsynchronousMetric", | ||||||
|  | 							Value:  1000, | ||||||
|  | 						}, | ||||||
|  | 						{ | ||||||
|  | 							Metric: "TestSystemAsynchronousMetric2", | ||||||
|  | 							Value:  2000, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}) | ||||||
|  | 			} | ||||||
|  | 		})) | ||||||
|  | 		ch = &ClickHouse{ | ||||||
|  | 			Servers: []string{ | ||||||
|  | 				ts.URL, | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  | 		acc = &testutil.Accumulator{} | ||||||
|  | 	) | ||||||
|  | 	defer ts.Close() | ||||||
|  | 	ch.Gather(acc) | ||||||
|  | 
 | ||||||
|  | 	acc.AssertContainsFields(t, "clickhouse_tables", | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"bytes": uint64(1), | ||||||
|  | 			"parts": uint64(10), | ||||||
|  | 			"rows":  uint64(100), | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	acc.AssertContainsFields(t, "clickhouse_events", | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"test_system_event":  uint64(1000), | ||||||
|  | 			"test_system_event2": uint64(2000), | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	acc.AssertContainsFields(t, "clickhouse_metrics", | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"test_system_metric":  uint64(1000), | ||||||
|  | 			"test_system_metric2": uint64(2000), | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | 	acc.AssertContainsFields(t, "clickhouse_asynchronous_metrics", | ||||||
|  | 		map[string]interface{}{ | ||||||
|  | 			"test_system_asynchronous_metric":  uint64(1000), | ||||||
|  | 			"test_system_asynchronous_metric2": uint64(2000), | ||||||
|  | 		}, | ||||||
|  | 	) | ||||||
|  | } | ||||||
|  | @ -0,0 +1,13 @@ | ||||||
|  | -----BEGIN DH PARAMETERS----- | ||||||
|  | MIICCAKCAgEAoo1x7wI5K57P1/AkHUmVWzKNfy46b/ni/QtClomTB78Ks1FP8dzs | ||||||
|  | CQBW/pfL8yidxTialNhMRCZO1J+uPjTvd8dG8SFZzVylkF41LBNrUD+MLyh/b6Nr | ||||||
|  | 8uWf3tqYCtsiqsQsnq/oU7C29wn6UjhPPVbRRDPGyJUFOgp0ebPR0L2gOc5HhXSF | ||||||
|  | Tt0fuWnvgZJBKGvyodby3p2CSheu8K6ZteVc8ZgHuanhCQA30nVN+yNQzyozlB2H | ||||||
|  | B9jxTDPJy8+/4Mui3iiNyXg6FaiI9lWdH7xgKoZlHi8BWlLz5Se9JVNYg0dPrMTz | ||||||
|  | K0itQyyTKUlK73x+1uPm6q1AJwz08EZiCXNbk58/Sf+pdwDmAO2QSRrERC73vnvc | ||||||
|  | B1+4+Kf7RS7oYpAHknKm/MFnkCJLVIq1b6kikYcIgVCYe+Z1UytSmG1QfwdgL8QQ | ||||||
|  | TVYVHBg4w07+s3/IJ1ekvNhdxpkmmevYt7GjohWu8vKkip4se+reNdo+sqLsgFKf | ||||||
|  | 1IuDMD36zn9FVukvs7e3BwZCTkdosGHvHGjA7zm2DwPPO16hCvJ4mE6ULLpp2NEw | ||||||
|  | EBYWm3Tv6M/xtrF5Afyh0gAh7eL767/qsarbx6jlqs+dnh3LptqsE3WerWK54+0B | ||||||
|  | 3Hr5CVfgYbeXuW2HeFb+fS6CNUWmiAsq1XRiz5p16hpeMGYN/qyF1IsCAQI= | ||||||
|  | -----END DH PARAMETERS----- | ||||||
|  | @ -0,0 +1,16 @@ | ||||||
|  | version: '3' | ||||||
|  | 
 | ||||||
|  | services: | ||||||
|  |   clickhouse: | ||||||
|  |     image: yandex/clickhouse-server:latest | ||||||
|  |     volumes: | ||||||
|  |       - ./dhparam.pem:/etc/clickhouse-server/dhparam.pem | ||||||
|  |       - ./tls_settings.xml:/etc/clickhouse-server/config.d/00-tls_settings.xml | ||||||
|  |       - ../../../../testutil/pki/serverkey.pem:/etc/clickhouse-server/server.key | ||||||
|  |       - ../../../../testutil/pki/servercert.pem:/etc/clickhouse-server/server.crt | ||||||
|  |     restart: always | ||||||
|  |     ports: | ||||||
|  |       - 8123:8123 | ||||||
|  |       - 8443:8443 | ||||||
|  |       - 9000:9000 | ||||||
|  |       - 9009:9009 | ||||||
|  | @ -0,0 +1,12 @@ | ||||||
|  | ### ClickHouse input plugin | ||||||
|  | 
 | ||||||
|  | [[inputs.clickhouse]] | ||||||
|  |   timeout         = 2 | ||||||
|  |   user            = "default" | ||||||
|  |   servers         = ["http://127.0.0.1:8123"] | ||||||
|  |   auto_discovery  = true | ||||||
|  |   cluster_include = [] | ||||||
|  |   cluster_exclude = ["test_shard_localhost"] | ||||||
|  | 
 | ||||||
|  | [[outputs.file]] | ||||||
|  |   files = ["stdout"] | ||||||
|  | @ -0,0 +1,16 @@ | ||||||
|  | ### ClickHouse input plugin | ||||||
|  | 
 | ||||||
|  | [[inputs.clickhouse]] | ||||||
|  |   timeout         = 2 | ||||||
|  |   user            = "default" | ||||||
|  |   servers         = ["https://127.0.0.1:8443"] | ||||||
|  |   auto_discovery  = true | ||||||
|  |   cluster_include = [] | ||||||
|  |   cluster_exclude = ["test_shard_localhost"] | ||||||
|  |   insecure_skip_verify = false | ||||||
|  |   tls_cert = "./testutil/pki/clientcert.pem" | ||||||
|  |   tls_key = "./testutil/pki/clientkey.pem" | ||||||
|  |   tls_ca = "./testutil/pki/cacert.pem" | ||||||
|  | 
 | ||||||
|  | [[outputs.file]] | ||||||
|  |   files = ["stdout"] | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | <yandex> | ||||||
|  |   <https_port>8443</https_port> | ||||||
|  |   <tcp_port_secure>9440</tcp_port_secure> | ||||||
|  | </yandex> | ||||||
		Loading…
	
		Reference in New Issue