diff --git a/plugins/inputs/aerospike/README.md b/plugins/inputs/aerospike/README.md index 6fb6bb189..a25ec39ee 100644 --- a/plugins/inputs/aerospike/README.md +++ b/plugins/inputs/aerospike/README.md @@ -1,7 +1,13 @@ ## Telegraf Plugin: Aerospike #### Plugin arguments: -- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000) +- **server** string : Server endpoint in hostname:port format +- **enableAuth** bool : Use authentication when collecting stats +- **autoAuthDisable** bool : If auth is enabled and auth is not supported, disable authentication. +- **username** : Username +- **password** : Password + + #### Description @@ -263,3 +269,10 @@ Meta: Measurement names: - free_pct_disk - free_pct_memory + + +#### Aerospike Latency Histogram: +Measurement names: +- latency>1ms +- latency>8ms +- latency>64ms \ No newline at end of file diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index cd2ebe25c..1c570b82e 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -4,23 +4,44 @@ import ( "bytes" "encoding/binary" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "net" "strconv" "strings" - "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + + "github.com/jameskeane/bcrypt" ) const ( MSG_HEADER_SIZE = 8 - MSG_TYPE = 1 // Info is 1 + MSG_TYPE_INFO = 1 // Info is 1 + MSG_TYPE_AUTH = 2 // MSG_VERSION = 2 + + // Field IDs + USER byte = 0 + CREDENTIAL byte = 3 + + // Commands + AUTHENTICATE byte = 0 + + //constants from aerospike doc + ERR_PASSWORD = 62 + ERR_USER = 60 + ERR_NOT_ENABLED = 52 + ERR_SCHEME = 53 + ERR_EXPIRED_PASSWORD = 63 + ERR_NOT_SUPPORTED = 51 ) +var errorCode2Msg map[int]string + var ( STATISTICS_COMMAND = []byte("statistics\n") NAMESPACES_COMMAND = []byte("namespaces\n") + LATENCY_COMMAND = []byte("latency:back=60;\n") //get latency of previous minute ) type aerospikeMessageHeader struct { @@ -99,15 +120,49 @@ func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) return responses, nil } +//wrapper for field +type Field struct { + size int32 + typeId byte + data []byte +} + +//creates new field +func newField(typeId byte, data []byte) *Field { + return &Field{ + size: int32(len(data) + 1), + typeId: typeId, + data: data, + } +} + +//serializes field and writes it to buf +func (f *Field) WriteToBuf(buf *bytes.Buffer) { + binary.Write(buf, binary.BigEndian, f.size) + binary.Write(buf, binary.BigEndian, f.typeId) + buf.Write(f.data) + +} + type Aerospike struct { - Servers []string + Server string + EnableAuth bool + AutoAuthDisable bool + Username string + Password string } var sampleConfig = ` - ## Aerospike servers to connect to (with port) + ## Aerospike servers to connect to (with port), + ## provide username,password and set EnableAuth =true if authentication is enabled + ## autoAuthDisable disables authentication if not supported ## This plugin will query all namespaces the aerospike ## server has configured and get stats for them. - servers = ["localhost:3000"] + server = "localhost:3000" + enableAuth = false + authAuthDisable = false + username = "" + password = "" ` func (a *Aerospike) SampleConfig() string { @@ -119,48 +174,53 @@ func (a *Aerospike) Description() string { } func (a *Aerospike) Gather(acc telegraf.Accumulator) error { - if len(a.Servers) == 0 { - return a.gatherServer("127.0.0.1:3000", acc) - } - var wg sync.WaitGroup + host := a.Server - var outerr error - - for _, server := range a.Servers { - wg.Add(1) - go func(server string) { - defer wg.Done() - outerr = a.gatherServer(server, acc) - }(server) - } - - wg.Wait() - return outerr -} - -func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error { - aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) + aerospikeInfo, err := a.getMap(STATISTICS_COMMAND, host) if err != nil { return fmt.Errorf("Aerospike info failed: %s", err) } - readAerospikeStats(aerospikeInfo, acc, host, "") - namespaces, err := getList(NAMESPACES_COMMAND, host) + fields := make(map[string]interface{}) + readAerospikeStats(aerospikeInfo, fields, host, "") + + latencyInfo, err := a.get(LATENCY_COMMAND, host) + if err != nil { + fmt.Println("gathering latency failed ", err) + return fmt.Errorf("Latency info failed %s", err.Error()) + } + + tags := map[string]string{ + "aerospike_host": host, + "namespace": "_service", + } + + readAerospikeLatency(latencyInfo, fields, host) + + acc.AddFields("aerospike", fields, tags) + + namespaces, err := a.getList(NAMESPACES_COMMAND, host) if err != nil { return fmt.Errorf("Aerospike namespace list failed: %s", err) } + for ix := range namespaces { - nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) + nsInfo, err := a.getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) if err != nil { return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) } - readAerospikeStats(nsInfo, acc, host, namespaces[ix]) + fields := make(map[string]interface{}) + readAerospikeStats(nsInfo, fields, host, namespaces[ix]) + + tags["namespace"] = namespaces[ix] + acc.AddFields("aerospike", fields, tags) + } return nil } -func getMap(key []byte, host string) (map[string]string, error) { - data, err := get(key, host) +func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) { + data, err := a.get(key, host) if err != nil { return nil, fmt.Errorf("Failed to get data: %s", err) } @@ -172,8 +232,8 @@ func getMap(key []byte, host string) (map[string]string, error) { return parsed, nil } -func getList(key []byte, host string) ([]string, error) { - data, err := get(key, host) +func (a *Aerospike) getList(key []byte, host string) ([]string, error) { + data, err := a.get(key, host) if err != nil { return nil, fmt.Errorf("Failed to get data: %s", err) } @@ -185,7 +245,7 @@ func getList(key []byte, host string) ([]string, error) { return parsed, nil } -func get(key []byte, host string) (map[string]string, error) { +func (a *Aerospike) get(key []byte, host string) (map[string]string, error) { var err error var data map[string]string @@ -193,7 +253,7 @@ func get(key []byte, host string) (map[string]string, error) { msg: &aerospikeMessage{ aerospikeMessageHeader: aerospikeMessageHeader{ Version: uint8(MSG_VERSION), - Type: uint8(MSG_TYPE), + Type: uint8(MSG_TYPE_INFO), DataLen: msgLenToBytes(int64(len(key))), }, Data: key, @@ -212,6 +272,15 @@ func get(key []byte, host string) (map[string]string, error) { } defer conn.Close() + if a.EnableAuth { + fmt.Println("Going to authenticate") + err = a.authenticate(conn) + if err != nil { + fmt.Println("Authentication failed with error ", err) + return data, err + } + } + _, err = conn.Write(cmd) if err != nil { return data, fmt.Errorf("Failed to send to '%s': %s", host, err) @@ -246,13 +315,97 @@ func get(key []byte, host string) (map[string]string, error) { return data, err } +func (a *Aerospike) authenticate(conn *net.TCPConn) error { + + buf := bytes.NewBuffer([]byte{}) + header := make([]byte, 16) + + for i := 0; i < 16; i++ { + header[i] = 0 + } + header[2] = AUTHENTICATE + header[3] = byte(2) //field count + + binary.Write(buf, binary.BigEndian, header) + usernameField := newField(USER, []byte(a.Username)) + usernameField.WriteToBuf(buf) + + pw, err := bcrypt.Hash(a.Password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO") + if err != nil { + fmt.Println("Failed to hash password", err) + return err + } + + passwordField := newField(CREDENTIAL, []byte(pw)) + passwordField.WriteToBuf(buf) + + data := buf.Bytes() + + asInfo := &aerospikeInfoCommand{ + msg: &aerospikeMessage{ + aerospikeMessageHeader: aerospikeMessageHeader{ + Version: uint8(MSG_VERSION), + Type: uint8(MSG_TYPE_AUTH), + DataLen: msgLenToBytes(int64(len(data))), + }, + Data: data, + }, + } + + cmd := asInfo.msg.Serialize() + _, err = conn.Write(cmd) + if err != nil { + return err + } + + msgHeaderData := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE)) + var msgHeader aerospikeMessageHeader + + _, err = readLenFromConn(conn, msgHeaderData.Bytes(), MSG_HEADER_SIZE) + if err != nil { + return fmt.Errorf("Failed to read header: %s", err) + } + err = binary.Read(msgHeaderData, binary.BigEndian, &msgHeader) + if err != nil { + return fmt.Errorf("Failed to unmarshal header: %s", err) + } + + msgLen := msgLenFromBytes(msgHeader.DataLen) + buffer := make([]byte, msgLen) + _, err = readLenFromConn(conn, buffer, int(msgLen)) + if err != nil { + return fmt.Errorf("Failed to read from connection to '%s': ", err) + } + + fmt.Println("Got: ", buffer) + + errorCode := int(buffer[1]) + + if (errorCode == ERR_NOT_SUPPORTED || errorCode == ERR_NOT_ENABLED) && a.AutoAuthDisable { + fmt.Println("Disabling auth for ", a.Server) + a.EnableAuth = false + return nil + + } + + val, exist := errorCode2Msg[errorCode] + if exist { + return fmt.Errorf("Authentication failed: %s", val) + } else if errorCode != 0 { + return fmt.Errorf("Authentication request failed with errorcode %d", errorCode) + } + + fmt.Println("Authenticated with return code ", errorCode) + + return nil +} + func readAerospikeStats( stats map[string]string, - acc telegraf.Accumulator, + fields map[string]interface{}, host string, namespace string, ) { - fields := make(map[string]interface{}) tags := map[string]string{ "aerospike_host": host, "namespace": "_service", @@ -271,16 +424,66 @@ func readAerospikeStats( fields[key] = val } } - acc.AddFields("aerospike", fields, tags) } +func readAerospikeLatency( + stats map[string]string, + fields map[string]interface{}, + host string, +) { + + key := strings.TrimSuffix(string(LATENCY_COMMAND), "\n") + data := stats[key] + + splitted := strings.Split(data, ";") + + for i := 0; i < len(splitted)-1; i = i + 2 { + ind := strings.Index(splitted[i], ":") + if ind == -1 { + continue + } + + metrName := splitted[i][0:ind] + spl1 := splitted[i][ind:] + ind = strings.Index(spl1, ",") + spl1 = spl1[ind+1:] + unitTimes := strings.Split(spl1, ",") + + vals := strings.Split(splitted[i+1], ",") + + //fmt.Println("Got ", metrName, " dime ", unitTimes, " vals ", vals) + + for i := 1; i < len(unitTimes); i++ { + metric := metrName + "_" + unitTimes[i] //strings.Replace(unitTimes[i], ">", "_gt_", 1) + value, err := strconv.ParseFloat(vals[i], 64) + if err != nil { + fmt.Println("Failed to parse float when parsing latency ") + continue + } + fields[metric] = value + } + + } + //fmt.Println("aerospike", "Tags", tags, "Fields", fields) + +} func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { key = strings.TrimSuffix(key, "\n") res := map[string]string{} v, exists := infoMap[key] if !exists { - return res, fmt.Errorf("Key '%s' missing from info", key) + errString := "" + for k, v := range infoMap { + if strings.HasPrefix(k, "ERROR:") || strings.HasPrefix(k, "Error:") { + errString = k + ": " + v + } + } + if errString == "" { + return res, fmt.Errorf("Key '%s' missing from info", key) + } + return res, fmt.Errorf("Key '%s' missing from info: Probable Error: '%s' ", key, errString) + } values := strings.Split(v, ";") @@ -334,7 +537,21 @@ func msgLenFromBytes(buf [6]byte) int64 { return int64(DataLen) } +func populateErrorCode2Msg() { + errorCode2Msg = make(map[int]string) + + errorCode2Msg[ERR_USER] = "No user supplied or unknown user." + errorCode2Msg[ERR_PASSWORD] = "Password does not exists or not recognized." + errorCode2Msg[ERR_NOT_ENABLED] = "Security functionality not enabled by connected server." + errorCode2Msg[ERR_SCHEME] = "Security scheme not supported." + errorCode2Msg[ERR_EXPIRED_PASSWORD] = "Expired password." + errorCode2Msg[ERR_NOT_SUPPORTED] = "Security functionality not supported by connected server." +} + func init() { + + populateErrorCode2Msg() + inputs.Add("aerospike", func() telegraf.Input { return &Aerospike{} }) diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index 74b70eb1d..8c904cf75 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -1,6 +1,8 @@ package aerospike import ( + "bytes" + "encoding/binary" "reflect" "testing" @@ -15,7 +17,7 @@ func TestAerospikeStatistics(t *testing.T) { } a := &Aerospike{ - Servers: []string{testutil.GetLocalHost() + ":3000"}, + Server: testutil.GetLocalHost() + ":3000", } var acc testutil.Accumulator @@ -49,7 +51,14 @@ func TestReadAerospikeStatsNoNamespace(t *testing.T) { "stat-write-errs": "12345", "stat_read_reqs": "12345", } - readAerospikeStats(stats, &acc, "host1", "") + + fields_ := make(map[string]interface{}) + readAerospikeStats(stats, fields_, "host1", "") + tags_ := map[string]string{ + "aerospike_host": "host1", + "namespace": "_service", + } + acc.AddFields("aerospike", fields_, tags_) fields := map[string]interface{}{ "stat_write_errs": int64(12345), @@ -68,7 +77,15 @@ func TestReadAerospikeStatsNamespace(t *testing.T) { "stat_write_errs": "12345", "stat_read_reqs": "12345", } - readAerospikeStats(stats, &acc, "host1", "test") + + fields_ := make(map[string]interface{}) + readAerospikeStats(stats, fields_, "host1", "test") + tags_ := map[string]string{ + "aerospike_host": "host1", + "namespace": "test", + } + + acc.AddFields("aerospike", fields_, tags_) fields := map[string]interface{}{ "stat_write_errs": int64(12345), @@ -116,3 +133,25 @@ func TestAerospikeUnmarshalMap(t *testing.T) { assert.True(t, err == nil) assert.True(t, reflect.DeepEqual(m, expected)) } + +func TestFieldSerialization(t *testing.T) { + typeId := byte(12) + data := []byte("test string") + + f := newField(typeId, data) + + buf := bytes.Buffer{} + f.WriteToBuf(&buf) + + var sz int32 + var rType byte + + binary.Read(&buf, binary.BigEndian, &sz) + binary.Read(&buf, binary.BigEndian, &rType) + + assert.True(t, int(sz) == len(data)+1) + assert.True(t, rType == typeId) + + assert.True(t, bytes.Equal(data, buf.Bytes())) + +}