Changed config file format
This commit is contained in:
parent
0fc2a195da
commit
9a9c996efc
|
@ -1,13 +1,7 @@
|
||||||
## Telegraf Plugin: Aerospike
|
## Telegraf Plugin: Aerospike
|
||||||
|
|
||||||
#### Plugin arguments:
|
#### Plugin arguments:
|
||||||
- **server** string : Server endpoint in hostname:port format
|
- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000). For authenticated servers use "user:password@host:port"
|
||||||
- **enableAuth** bool : Use authentication when collecting stats
|
|
||||||
- **autoAuthDisable** bool : If auth is enabled and auth is not supported, disable authentication.
|
|
||||||
- **username** : Username
|
|
||||||
- **password** : Password
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#### Description
|
#### Description
|
||||||
|
|
||||||
|
@ -270,9 +264,7 @@ Measurement names:
|
||||||
- free_pct_disk
|
- free_pct_disk
|
||||||
- free_pct_memory
|
- free_pct_memory
|
||||||
|
|
||||||
|
Latency Histogram:
|
||||||
#### Aerospike Latency Histogram:
|
|
||||||
Measurement names:
|
|
||||||
- latency>1ms
|
- latency>1ms
|
||||||
- latency>8ms
|
- latency>8ms
|
||||||
- latency>64ms
|
- latency>64ms
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
@ -144,25 +145,24 @@ func (f *Field) WriteToBuf(buf *bytes.Buffer) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type endPointInfo struct {
|
||||||
|
endpoint string
|
||||||
|
authEnabled bool
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
}
|
||||||
|
|
||||||
type Aerospike struct {
|
type Aerospike struct {
|
||||||
Server string
|
Servers []string
|
||||||
EnableAuth bool
|
|
||||||
AutoAuthDisable bool
|
|
||||||
Username string
|
|
||||||
Password string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
## Aerospike servers to connect to (with port),
|
## Aerospike servers to connect to (with port),
|
||||||
## provide username,password and set EnableAuth =true if authentication is enabled
|
## if auth is enabled provide "user:password@host:port format"
|
||||||
## autoAuthDisable disables authentication if not supported
|
|
||||||
## This plugin will query all namespaces the aerospike
|
## This plugin will query all namespaces the aerospike
|
||||||
## server has configured and get stats for them.
|
## server has configured and get stats for them.
|
||||||
server = "localhost:3000"
|
# servers = ["user:password@localhost:3000"]
|
||||||
enableAuth = false
|
servers = ["localhost:3000"]
|
||||||
autoAuthDisable = false
|
|
||||||
username = ""
|
|
||||||
password = ""
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func (a *Aerospike) SampleConfig() string {
|
func (a *Aerospike) SampleConfig() string {
|
||||||
|
@ -175,41 +175,72 @@ func (a *Aerospike) Description() string {
|
||||||
|
|
||||||
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
host := a.Server
|
if len(a.Servers) == 0 {
|
||||||
|
epInfo := &endPointInfo{
|
||||||
|
endpoint: "localhost:3000",
|
||||||
|
authEnabled: false,
|
||||||
|
}
|
||||||
|
|
||||||
aerospikeInfo, err := a.getMap(STATISTICS_COMMAND, host)
|
return epInfo.gatherServer(acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
var outerr error
|
||||||
|
|
||||||
|
for _, server := range a.Servers {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(server string) {
|
||||||
|
defer wg.Done()
|
||||||
|
epInfo, err := parseHostInfo(server)
|
||||||
|
if err != nil {
|
||||||
|
outerr = err
|
||||||
|
|
||||||
|
} else {
|
||||||
|
outerr = epInfo.gatherServer(acc)
|
||||||
|
}
|
||||||
|
}(server)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return outerr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *endPointInfo) gatherServer(acc telegraf.Accumulator) error {
|
||||||
|
|
||||||
|
aerospikeInfo, err := e.getMap(STATISTICS_COMMAND)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike info failed: %s", err)
|
return fmt.Errorf("Aerospike info failed: %s", err)
|
||||||
}
|
}
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
readAerospikeStats(aerospikeInfo, fields, host, "")
|
readAerospikeStats(aerospikeInfo, fields, e.endpoint, "")
|
||||||
|
|
||||||
latencyInfo, err := a.get(LATENCY_COMMAND, host)
|
latencyInfo, err := e.get(LATENCY_COMMAND)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Latency info failed %s", err.Error())
|
return fmt.Errorf("Latency info failed %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"aerospike_host": host,
|
"aerospike_host": e.endpoint,
|
||||||
"namespace": "_service",
|
"namespace": "_service",
|
||||||
}
|
}
|
||||||
|
|
||||||
readAerospikeLatency(latencyInfo, fields, host)
|
readAerospikeLatency(latencyInfo, fields, e.endpoint)
|
||||||
|
|
||||||
acc.AddFields("aerospike", fields, tags)
|
acc.AddFields("aerospike", fields, tags)
|
||||||
|
|
||||||
namespaces, err := a.getList(NAMESPACES_COMMAND, host)
|
namespaces, err := e.getList(NAMESPACES_COMMAND)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike namespace list failed: %s", err)
|
return fmt.Errorf("Aerospike namespace list failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for ix := range namespaces {
|
for ix := range namespaces {
|
||||||
nsInfo, err := a.getMap([]byte("namespace/"+namespaces[ix]+"\n"), host)
|
nsInfo, err := e.getMap([]byte("namespace/" + namespaces[ix] + "\n"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
|
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
|
||||||
}
|
}
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
readAerospikeStats(nsInfo, fields, host, namespaces[ix])
|
readAerospikeStats(nsInfo, fields, e.endpoint, namespaces[ix])
|
||||||
|
|
||||||
tags["namespace"] = namespaces[ix]
|
tags["namespace"] = namespaces[ix]
|
||||||
acc.AddFields("aerospike", fields, tags)
|
acc.AddFields("aerospike", fields, tags)
|
||||||
|
@ -218,8 +249,38 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) {
|
func parseHostInfo(server string) (*endPointInfo, error) {
|
||||||
data, err := a.get(key, host)
|
|
||||||
|
indexOfAt := strings.LastIndex(server, "@")
|
||||||
|
|
||||||
|
if indexOfAt == -1 {
|
||||||
|
return &endPointInfo{
|
||||||
|
endpoint: server,
|
||||||
|
authEnabled: false,
|
||||||
|
username: "",
|
||||||
|
password: "",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint := server[indexOfAt+1:]
|
||||||
|
|
||||||
|
indexOfColon := strings.Index(server[:indexOfAt-1], ":")
|
||||||
|
if indexOfColon == -1 {
|
||||||
|
return nil, fmt.Errorf("Can't find username,password in '%s'", server)
|
||||||
|
}
|
||||||
|
username := server[0:indexOfColon]
|
||||||
|
password := server[indexOfColon+1 : indexOfAt]
|
||||||
|
|
||||||
|
return &endPointInfo{
|
||||||
|
endpoint: endpoint,
|
||||||
|
authEnabled: true,
|
||||||
|
username: username,
|
||||||
|
password: password,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *endPointInfo) getMap(key []byte) (map[string]string, error) {
|
||||||
|
data, err := e.get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to get data: %s", err)
|
return nil, fmt.Errorf("Failed to get data: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -231,8 +292,8 @@ func (a *Aerospike) getMap(key []byte, host string) (map[string]string, error) {
|
||||||
return parsed, nil
|
return parsed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) getList(key []byte, host string) ([]string, error) {
|
func (e *endPointInfo) getList(key []byte) ([]string, error) {
|
||||||
data, err := a.get(key, host)
|
data, err := e.get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to get data: %s", err)
|
return nil, fmt.Errorf("Failed to get data: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -244,7 +305,7 @@ func (a *Aerospike) getList(key []byte, host string) ([]string, error) {
|
||||||
return parsed, nil
|
return parsed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) get(key []byte, host string) (map[string]string, error) {
|
func (e *endPointInfo) get(key []byte) (map[string]string, error) {
|
||||||
var err error
|
var err error
|
||||||
var data map[string]string
|
var data map[string]string
|
||||||
|
|
||||||
|
@ -260,28 +321,27 @@ func (a *Aerospike) get(key []byte, host string) (map[string]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := asInfo.msg.Serialize()
|
cmd := asInfo.msg.Serialize()
|
||||||
addr, err := net.ResolveTCPAddr("tcp", host)
|
addr, err := net.ResolveTCPAddr("tcp", e.endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
|
return data, fmt.Errorf("Lookup failed for '%s': %s", e.endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := net.DialTCP("tcp", nil, addr)
|
conn, err := net.DialTCP("tcp", nil, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
|
return data, fmt.Errorf("Connection failed for '%s': %s", e.endpoint, err)
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
if a.EnableAuth {
|
if e.authEnabled {
|
||||||
err = a.authenticate(conn)
|
err = e.authenticate(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//fmt.Println("Authentication failed with error ", err)
|
|
||||||
return data, err
|
return data, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = conn.Write(cmd)
|
_, err = conn.Write(cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
|
return data, fmt.Errorf("Failed to send to '%s': %s", e.endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
|
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
|
||||||
|
@ -302,18 +362,18 @@ func (a *Aerospike) get(key []byte, host string) (map[string]string, error) {
|
||||||
|
|
||||||
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
|
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
|
return data, fmt.Errorf("Failed to read from connection to '%s': %s", e.endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err = asInfo.parseMultiResponse()
|
data, err = asInfo.parseMultiResponse()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
|
return data, fmt.Errorf("Failed to parse response from '%s': %s", e.endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return data, err
|
return data, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) authenticate(conn *net.TCPConn) error {
|
func (e *endPointInfo) authenticate(conn *net.TCPConn) error {
|
||||||
|
|
||||||
buf := bytes.NewBuffer([]byte{})
|
buf := bytes.NewBuffer([]byte{})
|
||||||
header := make([]byte, 16)
|
header := make([]byte, 16)
|
||||||
|
@ -325,10 +385,10 @@ func (a *Aerospike) authenticate(conn *net.TCPConn) error {
|
||||||
header[3] = byte(2) //field count
|
header[3] = byte(2) //field count
|
||||||
|
|
||||||
binary.Write(buf, binary.BigEndian, header)
|
binary.Write(buf, binary.BigEndian, header)
|
||||||
usernameField := newField(USER, []byte(a.Username))
|
usernameField := newField(USER, []byte(e.username))
|
||||||
usernameField.WriteToBuf(buf)
|
usernameField.WriteToBuf(buf)
|
||||||
|
|
||||||
pw, err := bcrypt.Hash(a.Password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO")
|
pw, err := bcrypt.Hash(e.password, "$2a$10$7EqJtq98hPqEX7fNZaFWoO")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -376,13 +436,6 @@ func (a *Aerospike) authenticate(conn *net.TCPConn) error {
|
||||||
|
|
||||||
errorCode := int(buffer[1])
|
errorCode := int(buffer[1])
|
||||||
|
|
||||||
if (errorCode == ERR_NOT_SUPPORTED || errorCode == ERR_NOT_ENABLED) && a.AutoAuthDisable {
|
|
||||||
fmt.Println("Disabling auth for ", a.Server)
|
|
||||||
a.EnableAuth = false
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
val, exist := errorCode2Msg[errorCode]
|
val, exist := errorCode2Msg[errorCode]
|
||||||
if exist {
|
if exist {
|
||||||
return fmt.Errorf("Authentication failed: %s", val)
|
return fmt.Errorf("Authentication failed: %s", val)
|
||||||
|
@ -450,7 +503,7 @@ func readAerospikeLatency(
|
||||||
metric := metrName + "_" + unitTimes[i] //strings.Replace(unitTimes[i], ">", "_gt_", 1)
|
metric := metrName + "_" + unitTimes[i] //strings.Replace(unitTimes[i], ">", "_gt_", 1)
|
||||||
value, err := strconv.ParseFloat(vals[i], 64)
|
value, err := strconv.ParseFloat(vals[i], 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//fmt.Println("Failed to parse float when parsing latency ")
|
fmt.Println("Failed to parse float when parsing latency ")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fields[metric] = value
|
fields[metric] = value
|
||||||
|
|
|
@ -3,6 +3,7 @@ package aerospike
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -17,7 +18,7 @@ func TestAerospikeStatistics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
a := &Aerospike{
|
a := &Aerospike{
|
||||||
Server: testutil.GetLocalHost() + ":3000",
|
Servers: []string{testutil.GetLocalHost() + ":3000"},
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
@ -155,3 +156,28 @@ func TestFieldSerialization(t *testing.T) {
|
||||||
assert.True(t, bytes.Equal(data, buf.Bytes()))
|
assert.True(t, bytes.Equal(data, buf.Bytes()))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEndpointInfoParsing(t *testing.T) {
|
||||||
|
sinfo := "user:password@localhost:3212"
|
||||||
|
|
||||||
|
ep, err := parseHostInfo(sinfo)
|
||||||
|
|
||||||
|
fmt.Println(ep.username, " ", ep.password, " ", ep.endpoint)
|
||||||
|
|
||||||
|
assert.True(t, err == nil)
|
||||||
|
assert.True(t, ep.username == "user")
|
||||||
|
assert.True(t, ep.password == "password")
|
||||||
|
assert.True(t, ep.endpoint == "localhost:3212")
|
||||||
|
assert.True(t, ep.authEnabled)
|
||||||
|
|
||||||
|
sinfo = "eatyourpotato:4321"
|
||||||
|
|
||||||
|
ep, err = parseHostInfo(sinfo)
|
||||||
|
|
||||||
|
fmt.Println(ep.username, " ", ep.password, " ", ep.endpoint)
|
||||||
|
|
||||||
|
assert.True(t, err == nil)
|
||||||
|
assert.True(t, ep.endpoint == "eatyourpotato:4321")
|
||||||
|
assert.False(t, ep.authEnabled)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue