Zookeeper plugin
Created a zookeeper plugin that fetches from the ‘mntr’ command will output measurements that are int and string based
This commit is contained in:
		
							parent
							
								
									181c3cdc28
								
							
						
					
					
						commit
						8fd06b96d7
					
				|  | @ -23,4 +23,5 @@ import ( | |||
| 	_ "github.com/influxdb/telegraf/plugins/redis" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/rethinkdb" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/system" | ||||
| 	_ "github.com/influxdb/telegraf/plugins/zookeeper" | ||||
| ) | ||||
|  |  | |||
|  | @ -0,0 +1,107 @@ | |||
| package zookeeper | ||||
| 
 | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"os" | ||||
| 	"regexp" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/influxdb/telegraf/plugins" | ||||
| ) | ||||
| 
 | ||||
| // Zookeeper is a zookeeper plugin
 | ||||
| type Zookeeper struct { | ||||
| 	Servers []string | ||||
| } | ||||
| 
 | ||||
| var sampleConfig = ` | ||||
| 	# An array of address to gather stats about. Specify an ip or hostname | ||||
| 	# with port. ie localhost:2181, 10.0.0.1:2181, etc. | ||||
| 
 | ||||
| 	# If no servers are specified, then localhost is used as the host. | ||||
| 	# If no port is specified, 2181 is used | ||||
| 	servers = [":2181"] | ||||
| ` | ||||
| 
 | ||||
| var defaultTimeout = time.Second * time.Duration(5) | ||||
| 
 | ||||
| // SampleConfig returns sample configuration message
 | ||||
| func (z *Zookeeper) SampleConfig() string { | ||||
| 	return sampleConfig | ||||
| } | ||||
| 
 | ||||
| // Description returns description of Zookeeper plugin
 | ||||
| func (z *Zookeeper) Description() string { | ||||
| 	return `Reads 'mntr' stats from one or many zookeeper servers` | ||||
| } | ||||
| 
 | ||||
| // Gather reads stats from all configured servers accumulates stats
 | ||||
| func (z *Zookeeper) Gather(acc plugins.Accumulator) error { | ||||
| 	if len(z.Servers) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	for _, serverAddress := range z.Servers { | ||||
| 		if err := z.gatherServer(serverAddress, acc); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (z *Zookeeper) gatherServer(address string, acc plugins.Accumulator) error { | ||||
| 	_, _, err := net.SplitHostPort(address) | ||||
| 	if err != nil { | ||||
| 		address = address + ":2181" | ||||
| 	} | ||||
| 
 | ||||
| 	c, err := net.DialTimeout("tcp", address, defaultTimeout) | ||||
| 	if err != nil { | ||||
| 		fmt.Fprintln(os.Stderr, err) | ||||
| 		return err | ||||
| 	} | ||||
| 	defer c.Close() | ||||
| 
 | ||||
| 	fmt.Fprintf(c, "%s\n", "mntr") | ||||
| 
 | ||||
| 	rdr := bufio.NewReader(c) | ||||
| 
 | ||||
| 	scanner := bufio.NewScanner(rdr) | ||||
| 
 | ||||
| 	for scanner.Scan() { | ||||
| 		line := scanner.Text() | ||||
| 
 | ||||
| 		re := regexp.MustCompile(`^zk_(\w+)\s+([\w\.\-]+)`) | ||||
| 		parts := re.FindStringSubmatch(string(line)) | ||||
| 
 | ||||
| 		service := strings.Split(address, ":") | ||||
| 
 | ||||
| 		if len(parts) != 3 || len(service) != 2 { | ||||
| 			return fmt.Errorf("unexpected line in mntr response: %q", line) | ||||
| 		} | ||||
| 
 | ||||
| 		tags := map[string]string{"server": service[0], "port": service[1]} | ||||
| 
 | ||||
| 		measurement := strings.TrimPrefix(parts[1], "zk_") | ||||
| 		sValue := string(parts[2]) | ||||
| 
 | ||||
| 		iVal, err := strconv.ParseInt(sValue, 10, 64) | ||||
| 		if err == nil { | ||||
| 			acc.Add(measurement, iVal, tags) | ||||
| 		} else { | ||||
| 			acc.Add(measurement, sValue, tags) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func init() { | ||||
| 	plugins.Add("zookeeper", func() plugins.Plugin { | ||||
| 		return &Zookeeper{} | ||||
| 	}) | ||||
| } | ||||
|  | @ -0,0 +1,30 @@ | |||
| package zookeeper | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/influxdb/telegraf/testutil" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| ) | ||||
| 
 | ||||
| func TestMemcachedGeneratesMetrics(t *testing.T) { | ||||
| 	if testing.Short() { | ||||
| 		t.Skip("Skipping integration test in short mode") | ||||
| 	} | ||||
| 
 | ||||
| 	z := &Zookeeper{ | ||||
| 		Servers: []string{testutil.GetLocalHost()}, | ||||
| 	} | ||||
| 
 | ||||
| 	var acc testutil.Accumulator | ||||
| 
 | ||||
| 	err := z.Gather(&acc) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	intMetrics := []string{"zookeeper_avg_latency", "zookeeper_packets_sent", "zookeeper_znode_count", "zookeeper_open_file_descriptor_count", "zookeeper_max_file_descriptor_count"} | ||||
| 
 | ||||
| 	for _, metric := range intMetrics { | ||||
| 		assert.True(t, acc.HasIntValue(metric), metric) | ||||
| 	} | ||||
| } | ||||
		Loading…
	
		Reference in New Issue