Refactor aerospike plugin to use client lib
This commit is contained in:
parent
300d9adbd0
commit
704d9ad76c
|
@ -2,6 +2,12 @@
|
||||||
|
|
||||||
### Release Notes
|
### Release Notes
|
||||||
|
|
||||||
|
**Breaking Change**: Aerospike main server node measurements have been renamed
|
||||||
|
aerospike_node. Aerospike namespace measurements have been renamed to
|
||||||
|
aerospike_namespace. They will also now be tagged with the node_name
|
||||||
|
that they correspond to. This has been done to differentiate measurements
|
||||||
|
that pertain to node vs. namespace statistics.
|
||||||
|
|
||||||
**Breaking Change**: users of github_webhooks must change to the new
|
**Breaking Change**: users of github_webhooks must change to the new
|
||||||
`[[inputs.webhooks]]` plugin.
|
`[[inputs.webhooks]]` plugin.
|
||||||
|
|
||||||
|
@ -35,6 +41,7 @@ should now look like:
|
||||||
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
|
- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
|
||||||
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
|
- [#1387](https://github.com/influxdata/telegraf/pull/1387): **Breaking Change** - Redis `role` tag renamed to `replication_role` to avoid global_tags override
|
||||||
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL
|
- [#1437](https://github.com/influxdata/telegraf/pull/1437): Fetching Galera status metrics in MySQL
|
||||||
|
- [#1500](https://github.com/influxdata/telegraf/pull/1500): Aerospike plugin refactored to use official client lib.
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
2
Godeps
2
Godeps
|
@ -1,5 +1,6 @@
|
||||||
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
|
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
|
||||||
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
|
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
|
||||||
|
github.com/aerospike/aerospike-client-go 45863b7fd8640dc12f7fdd397104d97e1986f25a
|
||||||
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
|
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
|
||||||
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
|
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
|
||||||
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
|
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
|
||||||
|
@ -50,6 +51,7 @@ github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
|
||||||
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
|
||||||
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
|
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
|
||||||
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
|
||||||
|
github.com/yuin/gopher-lua bf3808abd44b1e55143a2d7f08571aaa80db1808
|
||||||
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
|
github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363
|
||||||
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
|
golang.org/x/crypto 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3
|
||||||
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172
|
golang.org/x/net 6acef71eb69611914f7a30939ea9f6e194c78172
|
||||||
|
|
|
@ -1,104 +1,19 @@
|
||||||
package aerospike
|
package aerospike
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/errchan"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
|
||||||
|
as "github.com/aerospike/aerospike-client-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
MSG_HEADER_SIZE = 8
|
|
||||||
MSG_TYPE = 1 // Info is 1
|
|
||||||
MSG_VERSION = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
STATISTICS_COMMAND = []byte("statistics\n")
|
|
||||||
NAMESPACES_COMMAND = []byte("namespaces\n")
|
|
||||||
)
|
|
||||||
|
|
||||||
type aerospikeMessageHeader struct {
|
|
||||||
Version uint8
|
|
||||||
Type uint8
|
|
||||||
DataLen [6]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type aerospikeMessage struct {
|
|
||||||
aerospikeMessageHeader
|
|
||||||
Data []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// Taken from aerospike-client-go/types/message.go
|
|
||||||
func (msg *aerospikeMessage) Serialize() []byte {
|
|
||||||
msg.DataLen = msgLenToBytes(int64(len(msg.Data)))
|
|
||||||
buf := bytes.NewBuffer([]byte{})
|
|
||||||
binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader)
|
|
||||||
binary.Write(buf, binary.BigEndian, msg.Data[:])
|
|
||||||
return buf.Bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
type aerospikeInfoCommand struct {
|
|
||||||
msg *aerospikeMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// Taken from aerospike-client-go/info.go
|
|
||||||
func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) {
|
|
||||||
responses := make(map[string]string)
|
|
||||||
offset := int64(0)
|
|
||||||
begin := int64(0)
|
|
||||||
|
|
||||||
dataLen := int64(len(nfo.msg.Data))
|
|
||||||
|
|
||||||
// Create reusable StringBuilder for performance.
|
|
||||||
for offset < dataLen {
|
|
||||||
b := nfo.msg.Data[offset]
|
|
||||||
|
|
||||||
if b == '\t' {
|
|
||||||
name := nfo.msg.Data[begin:offset]
|
|
||||||
offset++
|
|
||||||
begin = offset
|
|
||||||
|
|
||||||
// Parse field value.
|
|
||||||
for offset < dataLen {
|
|
||||||
if nfo.msg.Data[offset] == '\n' {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
offset++
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset > begin {
|
|
||||||
value := nfo.msg.Data[begin:offset]
|
|
||||||
responses[string(name)] = string(value)
|
|
||||||
} else {
|
|
||||||
responses[string(name)] = ""
|
|
||||||
}
|
|
||||||
offset++
|
|
||||||
begin = offset
|
|
||||||
} else if b == '\n' {
|
|
||||||
if offset > begin {
|
|
||||||
name := nfo.msg.Data[begin:offset]
|
|
||||||
responses[string(name)] = ""
|
|
||||||
}
|
|
||||||
offset++
|
|
||||||
begin = offset
|
|
||||||
} else {
|
|
||||||
offset++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset > begin {
|
|
||||||
name := nfo.msg.Data[begin:offset]
|
|
||||||
responses[string(name)] = ""
|
|
||||||
}
|
|
||||||
return responses, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Aerospike struct {
|
type Aerospike struct {
|
||||||
Servers []string
|
Servers []string
|
||||||
}
|
}
|
||||||
|
@ -115,7 +30,7 @@ func (a *Aerospike) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) Description() string {
|
func (a *Aerospike) Description() string {
|
||||||
return "Read stats from an aerospike server"
|
return "Read stats from aerospike server(s)"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
|
@ -124,214 +39,90 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
errChan := errchan.New(len(a.Servers))
|
||||||
var outerr error
|
wg.Add(len(a.Servers))
|
||||||
|
|
||||||
for _, server := range a.Servers {
|
for _, server := range a.Servers {
|
||||||
wg.Add(1)
|
go func(serv string) {
|
||||||
go func(server string) {
|
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
outerr = a.gatherServer(server, acc)
|
errChan.C <- a.gatherServer(serv, acc)
|
||||||
}(server)
|
}(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return outerr
|
return errChan.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Aerospike) gatherServer(host string, acc telegraf.Accumulator) error {
|
func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error {
|
||||||
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host)
|
host, port, err := net.SplitHostPort(hostport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike info failed: %s", err)
|
return err
|
||||||
}
|
}
|
||||||
readAerospikeStats(aerospikeInfo, acc, host, "")
|
|
||||||
namespaces, err := getList(NAMESPACES_COMMAND, host)
|
iport, err := strconv.Atoi(port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike namespace list failed: %s", err)
|
iport = 3000
|
||||||
}
|
}
|
||||||
for ix := range namespaces {
|
|
||||||
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host)
|
c, err := as.NewClient(host, iport)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
|
return err
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
nodes := c.GetNodes()
|
||||||
|
for _, n := range nodes {
|
||||||
|
tags := map[string]string{
|
||||||
|
"node_name": n.GetName(),
|
||||||
|
"aerospike_host": hostport,
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
stats, err := as.RequestNodeStats(n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for k, v := range stats {
|
||||||
|
if iv, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||||
|
fields[strings.Replace(k, "-", "_", -1)] = iv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acc.AddFields("aerospike_node", fields, tags, time.Now())
|
||||||
|
|
||||||
|
info, err := as.RequestNodeInfo(n, "namespaces")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
namespaces := strings.Split(info["namespaces"], ";")
|
||||||
|
|
||||||
|
for _, namespace := range namespaces {
|
||||||
|
nTags := copyTags(tags)
|
||||||
|
nTags["namespace"] = namespace
|
||||||
|
nFields := make(map[string]interface{})
|
||||||
|
info, err := as.RequestNodeInfo(n, "namespace/"+namespace)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stats := strings.Split(info["namespace/"+namespace], ";")
|
||||||
|
for _, stat := range stats {
|
||||||
|
parts := strings.Split(stat, "=")
|
||||||
|
if len(parts) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if iv, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
|
||||||
|
nFields[strings.Replace(parts[0], "-", "_", -1)] = iv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())
|
||||||
}
|
}
|
||||||
readAerospikeStats(nsInfo, acc, host, namespaces[ix])
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMap(key []byte, host string) (map[string]string, error) {
|
func copyTags(m map[string]string) map[string]string {
|
||||||
data, err := get(key, host)
|
out := make(map[string]string)
|
||||||
if err != nil {
|
for k, v := range m {
|
||||||
return nil, fmt.Errorf("Failed to get data: %s", err)
|
out[k] = v
|
||||||
}
|
}
|
||||||
parsed, err := unmarshalMapInfo(data, string(key))
|
return out
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return parsed, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getList(key []byte, host string) ([]string, error) {
|
|
||||||
data, err := get(key, host)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to get data: %s", err)
|
|
||||||
}
|
|
||||||
parsed, err := unmarshalListInfo(data, string(key))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return parsed, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func get(key []byte, host string) (map[string]string, error) {
|
|
||||||
var err error
|
|
||||||
var data map[string]string
|
|
||||||
|
|
||||||
asInfo := &aerospikeInfoCommand{
|
|
||||||
msg: &aerospikeMessage{
|
|
||||||
aerospikeMessageHeader: aerospikeMessageHeader{
|
|
||||||
Version: uint8(MSG_VERSION),
|
|
||||||
Type: uint8(MSG_TYPE),
|
|
||||||
DataLen: msgLenToBytes(int64(len(key))),
|
|
||||||
},
|
|
||||||
Data: key,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := asInfo.msg.Serialize()
|
|
||||||
addr, err := net.ResolveTCPAddr("tcp", host)
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := net.DialTCP("tcp", nil, addr)
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
_, err = conn.Write(cmd)
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
|
|
||||||
_, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE)
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Failed to read header: %s", err)
|
|
||||||
}
|
|
||||||
err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader)
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Failed to unmarshal header: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen)
|
|
||||||
|
|
||||||
if int64(len(asInfo.msg.Data)) != msgLen {
|
|
||||||
asInfo.msg.Data = make([]byte, msgLen)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err = asInfo.parseMultiResponse()
|
|
||||||
if err != nil {
|
|
||||||
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func readAerospikeStats(
|
|
||||||
stats map[string]string,
|
|
||||||
acc telegraf.Accumulator,
|
|
||||||
host string,
|
|
||||||
namespace string,
|
|
||||||
) {
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
tags := map[string]string{
|
|
||||||
"aerospike_host": host,
|
|
||||||
"namespace": "_service",
|
|
||||||
}
|
|
||||||
|
|
||||||
if namespace != "" {
|
|
||||||
tags["namespace"] = namespace
|
|
||||||
}
|
|
||||||
for key, value := range stats {
|
|
||||||
// We are going to ignore all string based keys
|
|
||||||
val, err := strconv.ParseInt(value, 10, 64)
|
|
||||||
if err == nil {
|
|
||||||
if strings.Contains(key, "-") {
|
|
||||||
key = strings.Replace(key, "-", "_", -1)
|
|
||||||
}
|
|
||||||
fields[key] = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
acc.AddFields("aerospike", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
|
|
||||||
key = strings.TrimSuffix(key, "\n")
|
|
||||||
res := map[string]string{}
|
|
||||||
|
|
||||||
v, exists := infoMap[key]
|
|
||||||
if !exists {
|
|
||||||
return res, fmt.Errorf("Key '%s' missing from info", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
values := strings.Split(v, ";")
|
|
||||||
for i := range values {
|
|
||||||
kv := strings.Split(values[i], "=")
|
|
||||||
if len(kv) > 1 {
|
|
||||||
res[kv[0]] = kv[1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) {
|
|
||||||
key = strings.TrimSuffix(key, "\n")
|
|
||||||
|
|
||||||
v, exists := infoMap[key]
|
|
||||||
if !exists {
|
|
||||||
return []string{}, fmt.Errorf("Key '%s' missing from info", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
values := strings.Split(v, ";")
|
|
||||||
return values, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) {
|
|
||||||
var r int
|
|
||||||
for total < length {
|
|
||||||
r, err = c.Read(buffer[total:length])
|
|
||||||
total += r
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Taken from aerospike-client-go/types/message.go
|
|
||||||
func msgLenToBytes(DataLen int64) [6]byte {
|
|
||||||
b := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint64(b, uint64(DataLen))
|
|
||||||
res := [6]byte{}
|
|
||||||
copy(res[:], b[2:])
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
// Taken from aerospike-client-go/types/message.go
|
|
||||||
func msgLenFromBytes(buf [6]byte) int64 {
|
|
||||||
nbytes := append([]byte{0, 0}, buf[:]...)
|
|
||||||
DataLen := binary.BigEndian.Uint64(nbytes)
|
|
||||||
return int64(DataLen)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package aerospike
|
package aerospike
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
@ -22,84 +21,30 @@ func TestAerospikeStatistics(t *testing.T) {
|
||||||
|
|
||||||
err := a.Gather(&acc)
|
err := a.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
|
assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAerospikeMsgLenFromToBytes(t *testing.T) {
|
func TestAerospikeStatisticsPartialErr(t *testing.T) {
|
||||||
var i int64 = 8
|
if testing.Short() {
|
||||||
assert.True(t, i == msgLenFromBytes(msgLenToBytes(i)))
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
a := &Aerospike{
|
||||||
|
Servers: []string{
|
||||||
|
testutil.GetLocalHost() + ":3000",
|
||||||
|
testutil.GetLocalHost() + ":9999",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReadAerospikeStatsNoNamespace(t *testing.T) {
|
|
||||||
// Also test for re-writing
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
stats := map[string]string{
|
|
||||||
"stat-write-errs": "12345",
|
|
||||||
"stat_read_reqs": "12345",
|
|
||||||
}
|
|
||||||
readAerospikeStats(stats, &acc, "host1", "")
|
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
err := a.Gather(&acc)
|
||||||
"stat_write_errs": int64(12345),
|
require.Error(t, err)
|
||||||
"stat_read_reqs": int64(12345),
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"aerospike_host": "host1",
|
|
||||||
"namespace": "_service",
|
|
||||||
}
|
|
||||||
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReadAerospikeStatsNamespace(t *testing.T) {
|
assert.True(t, acc.HasMeasurement("aerospike_node"))
|
||||||
var acc testutil.Accumulator
|
assert.True(t, acc.HasMeasurement("aerospike_namespace"))
|
||||||
stats := map[string]string{
|
assert.True(t, acc.HasIntField("aerospike_node", "batch_error"))
|
||||||
"stat_write_errs": "12345",
|
|
||||||
"stat_read_reqs": "12345",
|
|
||||||
}
|
|
||||||
readAerospikeStats(stats, &acc, "host1", "test")
|
|
||||||
|
|
||||||
fields := map[string]interface{}{
|
|
||||||
"stat_write_errs": int64(12345),
|
|
||||||
"stat_read_reqs": int64(12345),
|
|
||||||
}
|
|
||||||
tags := map[string]string{
|
|
||||||
"aerospike_host": "host1",
|
|
||||||
"namespace": "test",
|
|
||||||
}
|
|
||||||
acc.AssertContainsTaggedFields(t, "aerospike", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAerospikeUnmarshalList(t *testing.T) {
|
|
||||||
i := map[string]string{
|
|
||||||
"test": "one;two;three",
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := []string{"one", "two", "three"}
|
|
||||||
|
|
||||||
list, err := unmarshalListInfo(i, "test2")
|
|
||||||
assert.True(t, err != nil)
|
|
||||||
|
|
||||||
list, err = unmarshalListInfo(i, "test")
|
|
||||||
assert.True(t, err == nil)
|
|
||||||
equal := true
|
|
||||||
for ix := range expected {
|
|
||||||
if list[ix] != expected[ix] {
|
|
||||||
equal = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.True(t, equal)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAerospikeUnmarshalMap(t *testing.T) {
|
|
||||||
i := map[string]string{
|
|
||||||
"test": "key1=value1;key2=value2",
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := map[string]string{
|
|
||||||
"key1": "value1",
|
|
||||||
"key2": "value2",
|
|
||||||
}
|
|
||||||
m, err := unmarshalMapInfo(i, "test")
|
|
||||||
assert.True(t, err == nil)
|
|
||||||
assert.True(t, reflect.DeepEqual(m, expected))
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue