diff --git a/plugins/inputs/aerospike/README.md b/plugins/inputs/aerospike/README.md index a25ec39ee..73bec401e 100644 --- a/plugins/inputs/aerospike/README.md +++ b/plugins/inputs/aerospike/README.md @@ -1,13 +1,7 @@ ## Telegraf Plugin: Aerospike #### Plugin arguments: -- **server** string : Server endpoint in hostname:port format -- **enableAuth** bool : Use authentication when collecting stats -- **autoAuthDisable** bool : If auth is enabled and auth is not supported, disable authentication. -- **username** : Username -- **password** : Password - - +- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000). For authenticated servers use "user:password@host:port" #### Description @@ -270,9 +264,7 @@ Measurement names: - free_pct_disk - free_pct_memory - -#### Aerospike Latency Histogram: -Measurement names: +Latency Histogram: - latency>1ms - latency>8ms -- latency>64ms \ No newline at end of file +- latency>64ms diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 8bebd8235..455f1499d 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -7,6 +7,7 @@ import ( "net" "strconv" "strings" + "sync" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -144,25 +145,24 @@ func (f *Field) WriteToBuf(buf *bytes.Buffer) { } +type endPointInfo struct { + endpoint string + authEnabled bool + username string + password string +} + type Aerospike struct { - Server string - EnableAuth bool - AutoAuthDisable bool - Username string - Password string + Servers []string } var sampleConfig = ` ## Aerospike servers to connect to (with port), - ## provide username,password and set EnableAuth =true if authentication is enabled - ## autoAuthDisable disables authentication if not supported + ## if auth is enabled provide "user:password@host:port format" ## This plugin will query all namespaces the aerospike ## server has configured and get stats for them. - server = "localhost:3000" - enableAuth = false - autoAuthDisable = false - username = "" - password = "" + # servers = ["user:password@localhost:3000"] + servers = ["localhost:3000"] ` func (a *Aerospike) SampleConfig() string { @@ -175,41 +175,72 @@ func (a *Aerospike) Description() string { func (a *Aerospike) Gather(acc telegraf.Accumulator) error { - host := a.Server + if len(a.Servers) == 0 { + epInfo := &endPointInfo{ + endpoint: "localhost:3000", + authEnabled: false, + } - aerospikeInfo, err := a.getMap(STATISTICS_COMMAND, host) + return epInfo.gatherServer(acc) + } + + var wg sync.WaitGroup + + var outerr error + + for _, server := range a.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + epInfo, err := parseHostInfo(server) + if err != nil { + outerr = err + + } else { + outerr = epInfo.gatherServer(acc) + } + }(server) + } + + wg.Wait() + return outerr +} + +func (e *endPointInfo) gatherServer(acc telegraf.Accumulator) error { + + aerospikeInfo, err := e.getMap(STATISTICS_COMMAND) if err != nil { return fmt.Errorf("Aerospike info failed: %s", err) } fields := make(map[string]interface{}) - readAerospikeStats(aerospikeInfo, fields, host, "") + readAerospikeStats(aerospikeInfo, fields, e.endpoint, "") - latencyInfo, err := a.get(LATENCY_COMMAND, host) + latencyInfo, err := e.get(LATENCY_COMMAND) if err != nil { return fmt.Errorf("Latency info failed %s", err.Error()) } tags := map[string]string{ - "aerospike_host": host, + "aerospike_host": e.endpoint, "namespace": "_service", } - readAerospikeLatency(latencyInfo, fields, host) + readAerospikeLatency(latencyInfo, fields, e.endpoint) acc.AddFields("aerospike", fields, tags) - namespaces, err := a.getList(NAMESPACES_COMMAND, host) + namespaces, err := e.getList(NAMESPACES_COMMAND) if err != nil { return fmt.Errorf("Aerospike namespace list failed: %s", err) } for ix := range namespaces { - nsInfo, err := a.getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) + nsInfo, err := e.getMap([]byte("namespace/" + namespaces[ix] + "\n")) if err != nil { return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) } fields := make(map[string]interface{}) - readAerospikeStats(nsInfo, fields, host, namespaces[ix]) + readAerospikeStats(nsInfo, fields, e.endpoint, namespaces[ix]) tags["namespace"] = namespaces[ix] acc.AddFields("aerospike", fields, tags) @@ -218,8 +249,38 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { return nil } -func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) { - data, err := a.get(key, host) +func parseHostInfo(server string) (*endPointInfo, error) { + + indexOfAt := strings.LastIndex(server, "@") + + if indexOfAt == -1 { + return &endPointInfo{ + endpoint: server, + authEnabled: false, + username: "", + password: "", + }, nil + } + + endpoint := server[indexOfAt+1:] + + indexOfColon := strings.Index(server[:indexOfAt-1], ":") + if indexOfColon == -1 { + return nil, fmt.Errorf("Can't find username,password in '%s'", server) + } + username := server[0:indexOfColon] + password := server[indexOfColon+1 : indexOfAt] + + return &endPointInfo{ + endpoint: endpoint, + authEnabled: true, + username: username, + password: password, + }, nil +} + +func (e *endPointInfo) getMap(key []byte) (map[string]string, error) { + data, err := e.get(key) if err != nil { return nil, fmt.Errorf("Failed to get data: %s", err) } @@ -231,8 +292,8 @@ func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) { return parsed, nil } -func (a *Aerospike) getList(key []byte, host string) ([]string, error) { - data, err := a.get(key, host) +func (e *endPointInfo) getList(key []byte) ([]string, error) { + data, err := e.get(key) if err != nil { return nil, fmt.Errorf("Failed to get data: %s", err) } @@ -244,7 +305,7 @@ func (a *Aerospike) getList(key []byte, host string) ([]string, error) { return parsed, nil } -func (a *Aerospike) get(key []byte, host string) (map[string]string, error) { +func (e *endPointInfo) get(key []byte) (map[string]string, error) { var err error var data map[string]string @@ -260,28 +321,27 @@ func (a *Aerospike) get(key []byte, host string) (map[string]string, error) { } cmd := asInfo.msg.Serialize() - addr, err := net.ResolveTCPAddr("tcp", host) + addr, err := net.ResolveTCPAddr("tcp", e.endpoint) if err != nil { - return data, fmt.Errorf("Lookup failed for '%s': %s", host, err) + return data, fmt.Errorf("Lookup failed for '%s': %s", e.endpoint, err) } conn, err := net.DialTCP("tcp", nil, addr) if err != nil { - return data, fmt.Errorf("Connection failed for '%s': %s", host, err) + return data, fmt.Errorf("Connection failed for '%s': %s", e.endpoint, err) } defer conn.Close() - if a.EnableAuth { - err = a.authenticate(conn) + if e.authEnabled { + err = e.authenticate(conn) if err != nil { - //fmt.Println("Authentication failed with error ", err) return data, err } } _, err = conn.Write(cmd) if err != nil { - return data, fmt.Errorf("Failed to send to '%s': %s", host, err) + return data, fmt.Errorf("Failed to send to '%s': %s", e.endpoint, err) } msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE)) @@ -302,18 +362,18 @@ func (a *Aerospike) get(key []byte, host string) (map[string]string, error) { _, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data)) if err != nil { - return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err) + return data, fmt.Errorf("Failed to read from connection to '%s': %s", e.endpoint, err) } data, err = asInfo.parseMultiResponse() if err != nil { - return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err) + return data, fmt.Errorf("Failed to parse response from '%s': %s", e.endpoint, err) } return data, err } -func (a *Aerospike) authenticate(conn *net.TCPConn) error { +func (e *endPointInfo) authenticate(conn *net.TCPConn) error { buf := bytes.NewBuffer([]byte{}) header := make([]byte, 16) @@ -325,10 +385,10 @@ func (a *Aerospike) authenticate(conn *net.TCPConn) error { header[3] = byte(2) //field count binary.Write(buf, binary.BigEndian, header) - usernameField := newField(USER, []byte(a.Username)) + usernameField := newField(USER, []byte(e.username)) usernameField.WriteToBuf(buf) - pw, err := bcrypt.Hash(a.Password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO") + pw, err := bcrypt.Hash(e.password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO") if err != nil { return err } @@ -376,13 +436,6 @@ func (a *Aerospike) authenticate(conn *net.TCPConn) error { errorCode := int(buffer[1]) - if (errorCode == ERR_NOT_SUPPORTED || errorCode == ERR_NOT_ENABLED) && a.AutoAuthDisable { - fmt.Println("Disabling auth for ", a.Server) - a.EnableAuth = false - return nil - - } - val, exist := errorCode2Msg[errorCode] if exist { return fmt.Errorf("Authentication failed: %s", val) @@ -450,7 +503,7 @@ func readAerospikeLatency( metric := metrName + "_" + unitTimes[i] //strings.Replace(unitTimes[i], ">", "_gt_", 1) value, err := strconv.ParseFloat(vals[i], 64) if err != nil { - //fmt.Println("Failed to parse float when parsing latency ") + fmt.Println("Failed to parse float when parsing latency ") continue } fields[metric] = value diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 8c904cf75..f85f710c1 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -3,6 +3,7 @@ package aerospike import ( "bytes" "encoding/binary" + "fmt" "reflect" "testing" @@ -17,7 +18,7 @@ func TestAerospikeStatistics(t *testing.T) { } a := &Aerospike{ - Server: testutil.GetLocalHost() + ":3000", + Servers: []string{testutil.GetLocalHost() + ":3000"}, } var acc testutil.Accumulator @@ -155,3 +156,28 @@ func TestFieldSerialization(t *testing.T) { assert.True(t, bytes.Equal(data, buf.Bytes())) } + +func TestEndpointInfoParsing(t *testing.T) { + sinfo := "user:password@localhost:3212" + + ep, err := parseHostInfo(sinfo) + + fmt.Println(ep.username, " ", ep.password, " ", ep.endpoint) + + assert.True(t, err == nil) + assert.True(t, ep.username == "user") + assert.True(t, ep.password == "password") + assert.True(t, ep.endpoint == "localhost:3212") + assert.True(t, ep.authEnabled) + + sinfo = "eatyourpotato:4321" + + ep, err = parseHostInfo(sinfo) + + fmt.Println(ep.username, " ", ep.password, " ", ep.endpoint) + + assert.True(t, err == nil) + assert.True(t, ep.endpoint == "eatyourpotato:4321") + assert.False(t, ep.authEnabled) + +}