package zookeeper import ( "bufio" "fmt" "net" "os" "regexp" "strconv" "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/registry/inputs" ) // Zookeeper is a zookeeper plugin type Zookeeper struct { Servers []string } var sampleConfig = ` ## An array of address to gather stats about. Specify an ip or hostname ## with port. ie localhost:2181, 10.0.0.1:2181, etc. ## If no servers are specified, then localhost is used as the host. ## If no port is specified, 2181 is used servers = [":2181"] ` var defaultTimeout = time.Second * time.Duration(5) // SampleConfig returns sample configuration message func (z *Zookeeper) SampleConfig() string { return sampleConfig } // Description returns description of Zookeeper plugin func (z *Zookeeper) Description() string { return `Reads 'mntr' stats from one or many zookeeper servers` } // Gather reads stats from all configured servers accumulates stats func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { if len(z.Servers) == 0 { return nil } for _, serverAddress := range z.Servers { if err := z.gatherServer(serverAddress, acc); err != nil { return err } } return nil } func (z *Zookeeper) gatherServer(address string, acc telegraf.Accumulator) error { var zookeeper_state string _, _, err := net.SplitHostPort(address) if err != nil { address = address + ":2181" } c, err := net.DialTimeout("tcp", address, defaultTimeout) if err != nil { fmt.Fprintln(os.Stderr, err) return err } defer c.Close() // Extend connection c.SetDeadline(time.Now().Add(defaultTimeout)) fmt.Fprintf(c, "%s\n", "mntr") rdr := bufio.NewReader(c) scanner := bufio.NewScanner(rdr) service := strings.Split(address, ":") if len(service) != 2 { return fmt.Errorf("Invalid service address: %s", address) } fields := make(map[string]interface{}) for scanner.Scan() { line := scanner.Text() re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`) parts := re.FindStringSubmatch(string(line)) if len(parts) != 3 { return fmt.Errorf("unexpected line in mntr response: %q", line) } measurement := strings.TrimPrefix(parts[1], "zk_") if measurement == "server_state" { zookeeper_state = parts[2] } else { sValue := string(parts[2]) iVal, err := strconv.ParseInt(sValue, 10, 64) if err == nil { fields[measurement] = iVal } else { fields[measurement] = sValue } } } tags := map[string]string{ "server": service[0], "port": service[1], "state": zookeeper_state, } acc.AddFields("zookeeper", fields, tags) return nil } func init() { inputs.Add("zookeeper", func() telegraf.Input { return &Zookeeper{} }) }