Add TLS support to zookeeper input (#3949)

This commit is contained in:
Daniel Nelson 2018-03-29 12:42:25 -07:00 committed by GitHub
parent 006ccbf05b
commit da4fcccd8f
3 changed files with 134 additions and 56 deletions

View File

@ -88,6 +88,7 @@
- [#3631](https://github.com/influxdata/telegraf/issues/3631): InfluxDB Line Protocol parser now accepts DOS line endings. - [#3631](https://github.com/influxdata/telegraf/issues/3631): InfluxDB Line Protocol parser now accepts DOS line endings.
- [#2496](https://github.com/influxdata/telegraf/issues/2496): An option has been added to skip database creation in the InfluxDB output. - [#2496](https://github.com/influxdata/telegraf/issues/2496): An option has been added to skip database creation in the InfluxDB output.
- [#3366](https://github.com/influxdata/telegraf/issues/3366): Add support for connecting to InfluxDB over a unix domain socket. - [#3366](https://github.com/influxdata/telegraf/issues/3366): Add support for connecting to InfluxDB over a unix domain socket.
- [#3946](https://github.com/influxdata/telegraf/pull/3946): Add optional unsigned integer support to the influx data format.
### Bugfixes ### Bugfixes

View File

@ -1,35 +1,11 @@
## Telegraf Plugin: Zookeeper ## Zookeeper Input Plugin
#### Description
The zookeeper plugin collects variables outputted from the 'mntr' command The zookeeper plugin collects variables outputted from the 'mntr' command
[Zookeeper Admin](https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html). [Zookeeper Admin](https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html).
``` ### Configuration
echo mntr | nc localhost 2181
zk_version 3.4.0 ```toml
zk_avg_latency 0
zk_max_latency 0
zk_min_latency 0
zk_packets_received 70
zk_packets_sent 69
zk_outstanding_requests 0
zk_server_state leader
zk_znode_count 4
zk_watch_count 0
zk_ephemerals_count 0
zk_approximate_data_size 27
zk_followers 4 - only exposed by the Leader
zk_synced_followers 4 - only exposed by the Leader
zk_pending_syncs 0 - only exposed by the Leader
zk_open_file_descriptor_count 23 - only available on Unix platforms
zk_max_file_descriptor_count 1024 - only available on Unix platforms
```
## Configuration
```
# Reads 'mntr' stats from one or many zookeeper servers # Reads 'mntr' stats from one or many zookeeper servers
[[inputs.zookeeper]] [[inputs.zookeeper]]
## An array of address to gather stats about. Specify an ip or hostname ## An array of address to gather stats about. Specify an ip or hostname
@ -38,28 +14,72 @@ echo mntr | nc localhost 2181
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 2181 is used ## If no port is specified, 2181 is used
servers = [":2181"] servers = [":2181"]
## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
## Optional SSL Config
# enable_ssl = true
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## If false, skip chain & host verification
# insecure_skip_verify = true
``` ```
## InfluxDB Measurement: ### Metrics:
Exact field names are based on Zookeeper response and may vary between
configuration, platform, and version.
- zookeeper
- tags:
- server
- port
- state
- fields:
- approximate_data_size (integer)
- avg_latency (integer)
- ephemerals_count (integer)
- max_file_descriptor_count (integer)
- max_latency (integer)
- min_latency (integer)
- num_alive_connections (integer)
- open_file_descriptor_count (integer)
- outstanding_requests (integer)
- packets_received (integer)
- packets_sent (integer)
- version (string)
- watch_count (integer)
- znode_count (integer)
- followers (integer, leader only)
- synced_followers (integer, leader only)
- pending_syncs (integer, leader only)
### Debugging:
If you have any issues please check the direct Zookeeper output using netcat:
```sh
$ echo mntr | nc localhost 2181
zk_version 3.4.9-3--1, built on Thu, 01 Jun 2017 16:26:44 -0700
zk_avg_latency 0
zk_max_latency 0
zk_min_latency 0
zk_packets_received 8
zk_packets_sent 7
zk_num_alive_connections 1
zk_outstanding_requests 0
zk_server_state standalone
zk_znode_count 129
zk_watch_count 0
zk_ephemerals_count 0
zk_approximate_data_size 10044
zk_open_file_descriptor_count 44
zk_max_file_descriptor_count 4096
```
### Example Output
``` ```
M zookeeper zookeeper,server=localhost,port=2181,state=standalone ephemerals_count=0i,approximate_data_size=10044i,open_file_descriptor_count=44i,max_latency=0i,packets_received=7i,outstanding_requests=0i,znode_count=129i,max_file_descriptor_count=4096i,version="3.4.9-3--1",avg_latency=0i,packets_sent=6i,num_alive_connections=1i,watch_count=0i,min_latency=0i 1522351112000000000
T host
T port
T state
F approximate_data_size integer
F avg_latency integer
F ephemerals_count integer
F max_file_descriptor_count integer
F max_latency integer
F min_latency integer
F num_alive_connections integer
F open_file_descriptor_count integer
F outstanding_requests integer
F packets_received integer
F packets_sent integer
F version string
F watch_count integer
F znode_count integer
``` ```

View File

@ -2,21 +2,33 @@ package zookeeper
import ( import (
"bufio" "bufio"
"context"
"crypto/tls"
"fmt" "fmt"
"net" "net"
"os"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
// Zookeeper is a zookeeper plugin // Zookeeper is a zookeeper plugin
type Zookeeper struct { type Zookeeper struct {
Servers []string Servers []string
Timeout internal.Duration
EnableSSL bool `toml:"enable_ssl"`
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
InsecureSkipVerify bool `toml:"insecure_skip_verify"`
initialized bool
tlsConfig *tls.Config
} }
var sampleConfig = ` var sampleConfig = `
@ -26,9 +38,20 @@ var sampleConfig = `
## If no servers are specified, then localhost is used as the host. ## If no servers are specified, then localhost is used as the host.
## If no port is specified, 2181 is used ## If no port is specified, 2181 is used
servers = [":2181"] servers = [":2181"]
## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
## Optional SSL Config
# enable_ssl = true
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## If false, skip chain & host verification
# insecure_skip_verify = true
` `
var defaultTimeout = time.Second * time.Duration(5) var defaultTimeout = 5 * time.Second
// SampleConfig returns sample configuration message // SampleConfig returns sample configuration message
func (z *Zookeeper) SampleConfig() string { func (z *Zookeeper) SampleConfig() string {
@ -40,34 +63,68 @@ func (z *Zookeeper) Description() string {
return `Reads 'mntr' stats from one or many zookeeper servers` return `Reads 'mntr' stats from one or many zookeeper servers`
} }
func (z *Zookeeper) dial(ctx context.Context, addr string) (net.Conn, error) {
var dialer net.Dialer
if z.EnableSSL {
deadline, ok := ctx.Deadline()
if ok {
dialer.Deadline = deadline
}
return tls.DialWithDialer(&dialer, "tcp", addr, z.tlsConfig)
} else {
return dialer.DialContext(ctx, "tcp", addr)
}
}
// Gather reads stats from all configured servers accumulates stats // Gather reads stats from all configured servers accumulates stats
func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { func (z *Zookeeper) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
if !z.initialized {
tlsConfig, err := internal.GetTLSConfig(
z.SSLCert, z.SSLKey, z.SSLCA, z.InsecureSkipVerify)
if err != nil {
return err
}
z.tlsConfig = tlsConfig
z.initialized = true
}
if z.Timeout.Duration < 1*time.Second {
z.Timeout.Duration = defaultTimeout
}
ctx, cancel := context.WithTimeout(ctx, z.Timeout.Duration)
defer cancel()
if len(z.Servers) == 0 { if len(z.Servers) == 0 {
z.Servers = []string{":2181"} z.Servers = []string{":2181"}
} }
for _, serverAddress := range z.Servers { for _, serverAddress := range z.Servers {
acc.AddError(z.gatherServer(serverAddress, acc)) acc.AddError(z.gatherServer(ctx, serverAddress, acc))
} }
return nil return nil
} }
func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { func (z *Zookeeper) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error {
var zookeeper_state string var zookeeper_state string
_, _, err := net.SplitHostPort(address) _, _, err := net.SplitHostPort(address)
if err != nil { if err != nil {
address = address + ":2181" address = address + ":2181"
} }
c, err := net.DialTimeout("tcp", address, defaultTimeout) c, err := z.dial(ctx, address)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err)
return err return err
} }
defer c.Close() defer c.Close()
// Extend connection // Apply deadline to connection
c.SetDeadline(time.Now().Add(defaultTimeout)) deadline, ok := ctx.Deadline()
if ok {
c.SetDeadline(deadline)
}
fmt.Fprintf(c, "%s\n", "mntr") fmt.Fprintf(c, "%s\n", "mntr")
rdr := bufio.NewReader(c) rdr := bufio.NewReader(c)