Added authentication and enabled latency info collection

This commit is contained in:
azhar 2016-07-04 14:24:46 +05:30
parent c046232425
commit 6d2a993f04
3 changed files with 314 additions and 45 deletions

View File

@ -1,7 +1,13 @@
## Telegraf Plugin: Aerospike ## Telegraf Plugin: Aerospike
#### Plugin arguments: #### Plugin arguments:
- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000) - **server** string : Server endpoint in hostname:port format
- **enableAuth** bool : Use authentication when collecting stats
- **autoAuthDisable** bool : If auth is enabled and auth is not supported, disable authentication.
- **username** : Username
- **password** : Password
#### Description #### Description
@ -263,3 +269,10 @@ Meta:
Measurement names: Measurement names:
- free_pct_disk - free_pct_disk
- free_pct_memory - free_pct_memory
#### Aerospike Latency Histogram:
Measurement names:
- latency>1ms
- latency>8ms
- latency>64ms

View File

@ -4,23 +4,44 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/jameskeane/bcrypt"
) )
const ( const (
MSG_HEADER_SIZE = 8 MSG_HEADER_SIZE = 8
MSG_TYPE = 1 // Info is 1 MSG_TYPE_INFO = 1 // Info is 1
MSG_TYPE_AUTH = 2 //
MSG_VERSION = 2 MSG_VERSION = 2
// Field IDs
USER byte = 0
CREDENTIAL byte = 3
// Commands
AUTHENTICATE byte = 0
//constants from aerospike doc
ERR_PASSWORD = 62
ERR_USER = 60
ERR_NOT_ENABLED = 52
ERR_SCHEME = 53
ERR_EXPIRED_PASSWORD = 63
ERR_NOT_SUPPORTED = 51
) )
var errorCode2Msg map[int]string
var ( var (
STATISTICS_COMMAND = []byte("statistics\n") STATISTICS_COMMAND = []byte("statistics\n")
NAMESPACES_COMMAND = []byte("namespaces\n") NAMESPACES_COMMAND = []byte("namespaces\n")
LATENCY_COMMAND = []byte("latency:back=60;\n") //get latency of previous minute
) )
type aerospikeMessageHeader struct { type aerospikeMessageHeader struct {
@ -99,15 +120,49 @@ func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error)
return responses, nil return responses, nil
} }
//wrapper for field
type Field struct {
size int32
typeId byte
data []byte
}
//creates new field
func newField(typeId byte, data []byte) *Field {
return &Field{
size: int32(len(data) + 1),
typeId: typeId,
data: data,
}
}
//serializes field and writes it to buf
func (f *Field) WriteToBuf(buf *bytes.Buffer) {
binary.Write(buf, binary.BigEndian, f.size)
binary.Write(buf, binary.BigEndian, f.typeId)
buf.Write(f.data)
}
type Aerospike struct { type Aerospike struct {
Servers []string Server string
EnableAuth bool
AutoAuthDisable bool
Username string
Password string
} }
var sampleConfig = ` var sampleConfig = `
## Aerospike servers to connect to (with port) ## Aerospike servers to connect to (with port),
## provide username,password and set EnableAuth =true if authentication is enabled
## autoAuthDisable disables authentication if not supported
## This plugin will query all namespaces the aerospike ## This plugin will query all namespaces the aerospike
## server has configured and get stats for them. ## server has configured and get stats for them.
servers = ["localhost:3000"] server = "localhost:3000"
enableAuth = false
authAuthDisable = false
username = ""
password = ""
` `
func (a *Aerospike) SampleConfig() string { func (a *Aerospike) SampleConfig() string {
@ -119,48 +174,53 @@ func (a *Aerospike) Description() string {
} }
func (a *Aerospike) Gather(acc telegraf.Accumulator) error { func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
if len(a.Servers) == 0 {
return a.gatherServer("127.0.0.1:3000", acc)
}
var wg sync.WaitGroup host := a.Server
var outerr error aerospikeInfo, err := a.getMap(STATISTICS_COMMAND, host)
for _, server := range a.Servers {
wg.Add(1)
go func(server string) {
defer wg.Done()
outerr = a.gatherServer(server, acc)
}(server)
}
wg.Wait()
return outerr
}
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error {
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike info failed: %s", err) return fmt.Errorf("Aerospike info failed: %s", err)
} }
readAerospikeStats(aerospikeInfo, acc, host, "") fields := make(map[string]interface{})
namespaces, err := getList(NAMESPACES_COMMAND, host) readAerospikeStats(aerospikeInfo, fields, host, "")
latencyInfo, err := a.get(LATENCY_COMMAND, host)
if err != nil {
fmt.Println("gathering latency failed ", err)
return fmt.Errorf("Latency info failed %s", err.Error())
}
tags := map[string]string{
"aerospike_host": host,
"namespace": "_service",
}
readAerospikeLatency(latencyInfo, fields, host)
acc.AddFields("aerospike", fields, tags)
namespaces, err := a.getList(NAMESPACES_COMMAND, host)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace list failed: %s", err) return fmt.Errorf("Aerospike namespace list failed: %s", err)
} }
for ix := range namespaces { for ix := range namespaces {
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) nsInfo, err := a.getMap([]byte("namespace/"+namespaces[ix]+"\n"), host)
if err != nil { if err != nil {
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
} }
readAerospikeStats(nsInfo, acc, host, namespaces[ix]) fields := make(map[string]interface{})
readAerospikeStats(nsInfo, fields, host, namespaces[ix])
tags["namespace"] = namespaces[ix]
acc.AddFields("aerospike", fields, tags)
} }
return nil return nil
} }
func getMap(key []byte, host string) (map[string]string, error) { func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) {
data, err := get(key, host) data, err := a.get(key, host)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err) return nil, fmt.Errorf("Failed to get data: %s", err)
} }
@ -172,8 +232,8 @@ func getMap(key []byte, host string) (map[string]string, error) {
return parsed, nil return parsed, nil
} }
func getList(key []byte, host string) ([]string, error) { func (a *Aerospike) getList(key []byte, host string) ([]string, error) {
data, err := get(key, host) data, err := a.get(key, host)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err) return nil, fmt.Errorf("Failed to get data: %s", err)
} }
@ -185,7 +245,7 @@ func getList(key []byte, host string) ([]string, error) {
return parsed, nil return parsed, nil
} }
func get(key []byte, host string) (map[string]string, error) { func (a *Aerospike) get(key []byte, host string) (map[string]string, error) {
var err error var err error
var data map[string]string var data map[string]string
@ -193,7 +253,7 @@ func get(key []byte, host string) (map[string]string, error) {
msg: &aerospikeMessage{ msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{ aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION), Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE), Type: uint8(MSG_TYPE_INFO),
DataLen: msgLenToBytes(int64(len(key))), DataLen: msgLenToBytes(int64(len(key))),
}, },
Data: key, Data: key,
@ -212,6 +272,15 @@ func get(key []byte, host string) (map[string]string, error) {
} }
defer conn.Close() defer conn.Close()
if a.EnableAuth {
fmt.Println("Going to authenticate")
err = a.authenticate(conn)
if err != nil {
fmt.Println("Authentication failed with error ", err)
return data, err
}
}
_, err = conn.Write(cmd) _, err = conn.Write(cmd)
if err != nil { if err != nil {
return data, fmt.Errorf("Failed to send to '%s': %s", host, err) return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
@ -246,13 +315,97 @@ func get(key []byte, host string) (map[string]string, error) {
return data, err return data, err
} }
func (a *Aerospike) authenticate(conn *net.TCPConn) error {
buf := bytes.NewBuffer([]byte{})
header := make([]byte, 16)
for i := 0; i < 16; i++ {
header[i] = 0
}
header[2] = AUTHENTICATE
header[3] = byte(2) //field count
binary.Write(buf, binary.BigEndian, header)
usernameField := newField(USER, []byte(a.Username))
usernameField.WriteToBuf(buf)
pw, err := bcrypt.Hash(a.Password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO")
if err != nil {
fmt.Println("Failed to hash password", err)
return err
}
passwordField := newField(CREDENTIAL, []byte(pw))
passwordField.WriteToBuf(buf)
data := buf.Bytes()
asInfo := &aerospikeInfoCommand{
msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE_AUTH),
DataLen: msgLenToBytes(int64(len(data))),
},
Data: data,
},
}
cmd := asInfo.msg.Serialize()
_, err = conn.Write(cmd)
if err != nil {
return err
}
msgHeaderData := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
var msgHeader aerospikeMessageHeader
_, err = readLenFromConn(conn, msgHeaderData.Bytes(), MSG_HEADER_SIZE)
if err != nil {
return fmt.Errorf("Failed to read header: %s", err)
}
err = binary.Read(msgHeaderData, binary.BigEndian, &msgHeader)
if err != nil {
return fmt.Errorf("Failed to unmarshal header: %s", err)
}
msgLen := msgLenFromBytes(msgHeader.DataLen)
buffer := make([]byte, msgLen)
_, err = readLenFromConn(conn, buffer, int(msgLen))
if err != nil {
return fmt.Errorf("Failed to read from connection to '%s': ", err)
}
fmt.Println("Got: ", buffer)
errorCode := int(buffer[1])
if (errorCode == ERR_NOT_SUPPORTED || errorCode == ERR_NOT_ENABLED) && a.AutoAuthDisable {
fmt.Println("Disabling auth for ", a.Server)
a.EnableAuth = false
return nil
}
val, exist := errorCode2Msg[errorCode]
if exist {
return fmt.Errorf("Authentication failed: %s", val)
} else if errorCode != 0 {
return fmt.Errorf("Authentication request failed with errorcode %d", errorCode)
}
fmt.Println("Authenticated with return code ", errorCode)
return nil
}
func readAerospikeStats( func readAerospikeStats(
stats map[string]string, stats map[string]string,
acc telegraf.Accumulator, fields map[string]interface{},
host string, host string,
namespace string, namespace string,
) { ) {
fields := make(map[string]interface{})
tags := map[string]string{ tags := map[string]string{
"aerospike_host": host, "aerospike_host": host,
"namespace": "_service", "namespace": "_service",
@ -271,16 +424,66 @@ func readAerospikeStats(
fields[key] = val fields[key] = val
} }
} }
acc.AddFields("aerospike", fields, tags)
} }
func readAerospikeLatency(
stats map[string]string,
fields map[string]interface{},
host string,
) {
key := strings.TrimSuffix(string(LATENCY_COMMAND), "\n")
data := stats[key]
splitted := strings.Split(data, ";")
for i := 0; i < len(splitted)-1; i = i + 2 {
ind := strings.Index(splitted[i], ":")
if ind == -1 {
continue
}
metrName := splitted[i][0:ind]
spl1 := splitted[i][ind:]
ind = strings.Index(spl1, ",")
spl1 = spl1[ind+1:]
unitTimes := strings.Split(spl1, ",")
vals := strings.Split(splitted[i+1], ",")
//fmt.Println("Got ", metrName, " dime ", unitTimes, " vals ", vals)
for i := 1; i < len(unitTimes); i++ {
metric := metrName + "_" + unitTimes[i] //strings.Replace(unitTimes[i], ">", "_gt_", 1)
value, err := strconv.ParseFloat(vals[i], 64)
if err != nil {
fmt.Println("Failed to parse float when parsing latency ")
continue
}
fields[metric] = value
}
}
//fmt.Println("aerospike", "Tags", tags, "Fields", fields)
}
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
key = strings.TrimSuffix(key, "\n") key = strings.TrimSuffix(key, "\n")
res := map[string]string{} res := map[string]string{}
v, exists := infoMap[key] v, exists := infoMap[key]
if !exists { if !exists {
return res, fmt.Errorf("Key '%s' missing from info", key) errString := ""
for k, v := range infoMap {
if strings.HasPrefix(k, "ERROR:") || strings.HasPrefix(k, "Error:") {
errString = k + ": " + v
}
}
if errString == "" {
return res, fmt.Errorf("Key '%s' missing from info", key)
}
return res, fmt.Errorf("Key '%s' missing from info: Probable Error: '%s' ", key, errString)
} }
values := strings.Split(v, ";") values := strings.Split(v, ";")
@ -334,7 +537,21 @@ func msgLenFromBytes(buf [6]byte) int64 {
return int64(DataLen) return int64(DataLen)
} }
func populateErrorCode2Msg() {
errorCode2Msg = make(map[int]string)
errorCode2Msg[ERR_USER] = "No user supplied or unknown user."
errorCode2Msg[ERR_PASSWORD] = "Password does not exists or not recognized."
errorCode2Msg[ERR_NOT_ENABLED] = "Security functionality not enabled by connected server."
errorCode2Msg[ERR_SCHEME] = "Security scheme not supported."
errorCode2Msg[ERR_EXPIRED_PASSWORD] = "Expired password."
errorCode2Msg[ERR_NOT_SUPPORTED] = "Security functionality not supported by connected server."
}
func init() { func init() {
populateErrorCode2Msg()
inputs.Add("aerospike", func() telegraf.Input { inputs.Add("aerospike", func() telegraf.Input {
return &Aerospike{} return &Aerospike{}
}) })

View File

@ -1,6 +1,8 @@
package aerospike package aerospike
import ( import (
"bytes"
"encoding/binary"
"reflect" "reflect"
"testing" "testing"
@ -15,7 +17,7 @@ func TestAerospikeStatistics(t *testing.T) {
} }
a := &Aerospike{ a := &Aerospike{
Servers: []string{testutil.GetLocalHost() + ":3000"}, Server: testutil.GetLocalHost() + ":3000",
} }
var acc testutil.Accumulator var acc testutil.Accumulator
@ -49,7 +51,14 @@ func TestReadAerospikeStatsNoNamespace(t *testing.T) {
"stat-write-errs": "12345", "stat-write-errs": "12345",
"stat_read_reqs": "12345", "stat_read_reqs": "12345",
} }
readAerospikeStats(stats, &acc, "host1", "")
fields_ := make(map[string]interface{})
readAerospikeStats(stats, fields_, "host1", "")
tags_ := map[string]string{
"aerospike_host": "host1",
"namespace": "_service",
}
acc.AddFields("aerospike", fields_, tags_)
fields := map[string]interface{}{ fields := map[string]interface{}{
"stat_write_errs": int64(12345), "stat_write_errs": int64(12345),
@ -68,7 +77,15 @@ func TestReadAerospikeStatsNamespace(t *testing.T) {
"stat_write_errs": "12345", "stat_write_errs": "12345",
"stat_read_reqs": "12345", "stat_read_reqs": "12345",
} }
readAerospikeStats(stats, &acc, "host1", "test")
fields_ := make(map[string]interface{})
readAerospikeStats(stats, fields_, "host1", "test")
tags_ := map[string]string{
"aerospike_host": "host1",
"namespace": "test",
}
acc.AddFields("aerospike", fields_, tags_)
fields := map[string]interface{}{ fields := map[string]interface{}{
"stat_write_errs": int64(12345), "stat_write_errs": int64(12345),
@ -116,3 +133,25 @@ func TestAerospikeUnmarshalMap(t *testing.T) {
assert.True(t, err == nil) assert.True(t, err == nil)
assert.True(t, reflect.DeepEqual(m, expected)) assert.True(t, reflect.DeepEqual(m, expected))
} }
func TestFieldSerialization(t *testing.T) {
typeId := byte(12)
data := []byte("test string")
f := newField(typeId, data)
buf := bytes.Buffer{}
f.WriteToBuf(&buf)
var sz int32
var rType byte
binary.Read(&buf, binary.BigEndian, &sz)
binary.Read(&buf, binary.BigEndian, &rType)
assert.True(t, int(sz) == len(data)+1)
assert.True(t, rType == typeId)
assert.True(t, bytes.Equal(data, buf.Bytes()))
}