diff --git a/plugins/all/all.go b/plugins/all/all.go index b510d2521..db4480747 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -23,4 +23,5 @@ import ( _ "github.com/influxdb/telegraf/plugins/redis" _ "github.com/influxdb/telegraf/plugins/rethinkdb" _ "github.com/influxdb/telegraf/plugins/system" + _ "github.com/influxdb/telegraf/plugins/zookeeper" ) diff --git a/plugins/zookeeper/zookeeper.go b/plugins/zookeeper/zookeeper.go new file mode 100644 index 000000000..8619bc976 --- /dev/null +++ b/plugins/zookeeper/zookeeper.go @@ -0,0 +1,107 @@ +package zookeeper + +import ( + "bufio" + "fmt" + "net" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/influxdb/telegraf/plugins" +) + +// Zookeeper is a zookeeper plugin +type Zookeeper struct { + Servers []string +} + +var sampleConfig = ` + # An array of address to gather stats about. Specify an ip or hostname + # with port. ie localhost:2181, 10.0.0.1:2181, etc. + + # If no servers are specified, then localhost is used as the host. + # If no port is specified, 2181 is used + servers = [":2181"] +` + +var defaultTimeout = time.Second * time.Duration(5) + +// SampleConfig returns sample configuration message +func (z *Zookeeper) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Zookeeper plugin +func (z *Zookeeper) Description() string { + return `Reads 'mntr' stats from one or many zookeeper servers` +} + +// Gather reads stats from all configured servers accumulates stats +func (z *Zookeeper) Gather(acc plugins.Accumulator) error { + if len(z.Servers) == 0 { + return nil + } + + for _, serverAddress := range z.Servers { + if err := z.gatherServer(serverAddress, acc); err != nil { + return err + } + } + return nil +} + +func (z *Zookeeper) gatherServer(address string, acc plugins.Accumulator) error { + _, _, err := net.SplitHostPort(address) + if err != nil { + address = address + ":2181" + } + + c, err := net.DialTimeout("tcp", address, defaultTimeout) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return err + } + defer c.Close() + + fmt.Fprintf(c, "%s\n", "mntr") + + rdr := bufio.NewReader(c) + + scanner := bufio.NewScanner(rdr) + + for scanner.Scan() { + line := scanner.Text() + + re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`) + parts := re.FindStringSubmatch(string(line)) + + service := strings.Split(address, ":") + + if len(parts) != 3 || len(service) != 2 { + return fmt.Errorf("unexpected line in mntr response: %q", line) + } + + tags := map[string]string{"server": service[0], "port": service[1]} + + measurement := strings.TrimPrefix(parts[1], "zk_") + sValue := string(parts[2]) + + iVal, err := strconv.ParseInt(sValue, 10, 64) + if err == nil { + acc.Add(measurement, iVal, tags) + } else { + acc.Add(measurement, sValue, tags) + } + } + + return nil +} + +func init() { + plugins.Add("zookeeper", func() plugins.Plugin { + return &Zookeeper{} + }) +} diff --git a/plugins/zookeeper/zookeeper_test.go b/plugins/zookeeper/zookeeper_test.go new file mode 100644 index 000000000..2f8fba114 --- /dev/null +++ b/plugins/zookeeper/zookeeper_test.go @@ -0,0 +1,30 @@ +package zookeeper + +import ( + "testing" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemcachedGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + z := &Zookeeper{ + Servers: []string{testutil.GetLocalHost()}, + } + + var acc testutil.Accumulator + + err := z.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{"zookeeper_avg_latency", "zookeeper_packets_sent", "zookeeper_znode_count", "zookeeper_open_file_descriptor_count", "zookeeper_max_file_descriptor_count"} + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric), metric) + } +}