Add aerospike plugin support

- Does not use the aerospike client, but sends the stats command
  using the aerospike required format
- Queries available namespaces and gets stats for all of them

closes #300
This commit is contained in:
Tait Clarridge 2015-10-21 15:05:14 -04:00 committed by Cameron Sparr
parent 21622a1a17
commit c5f200917a
5 changed files with 718 additions and 0 deletions

265
plugins/aerospike/README.md Normal file
View File

@ -0,0 +1,265 @@
## Telegraf Plugin: Aerospike
#### Plugin arguments:
- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000)
#### Description
The aerospike plugin queries aerospike server(s) and get node statistics. It also collects stats for
all the configured namespaces.
For what the measurements mean, please consult the [Aerospike Metrics Reference Docs](http://www.aerospike.com/docs/reference/metrics).
The metric names, to make it less complicated in querying, have replaced all `-` with `_` as Aerospike metrics come in both forms (no idea why).
# Measurements:
#### Aerospike Statistics [values]:
Meta:
- units: Integer
Measurement names:
- batch_index_queue
- batch_index_unused_buffers
- batch_queue
- batch_tree_count
- client_connections
- data_used_bytes_memory
- index_used_bytes_memory
- info_queue
- migrate_progress_recv
- migrate_progress_send
- migrate_rx_objs
- migrate_tx_objs
- objects
- ongoing_write_reqs
- partition_absent
- partition_actual
- partition_desync
- partition_object_count
- partition_ref_count
- partition_replica
- proxy_in_progress
- query_agg_avg_rec_count
- query_avg_rec_count
- query_lookup_avg_rec_count
- queue
- record_locks
- record_refs
- sindex_used_bytes_memory
- sindex_gc_garbage_cleaned
- system_free_mem_pct
- total_bytes_disk
- total_bytes_memory
- tree_count
- scans_active
- uptime
- used_bytes_disk
- used_bytes_memory
- cluster_size
- waiting_transactions
#### Aerospike Statistics [cumulative]:
Meta:
- units: Integer
Measurement names:
- batch_errors
- batch_index_complete
- batch_index_errors
- batch_index_initiate
- batch_index_timeout
- batch_initiate
- batch_timeout
- err_duplicate_proxy_request
- err_out_of_space
- err_replica_non_null_node
- err_replica_null_node
- err_rw_cant_put_unique
- err_rw_pending_limit
- err_rw_request_not_found
- err_storage_queue_full
- err_sync_copy_null_master
- err_sync_copy_null_node
- err_tsvc_requests
- err_write_fail_bin_exists
- err_write_fail_generation
- err_write_fail_generation_xdr
- err_write_fail_incompatible_type
- err_write_fail_key_exists
- err_write_fail_key_mismatch
- err_write_fail_not_found
- err_write_fail_noxdr
- err_write_fail_parameter
- err_write_fail_prole_delete
- err_write_fail_prole_generation
- err_write_fail_prole_unknown
- err_write_fail_unknown
- fabric_msgs_rcvd
- fabric_msgs_sent
- heartbeat_received_foreign
- heartbeat_received_self
- migrate_msgs_recv
- migrate_msgs_sent
- migrate_num_incoming_accepted
- migrate_num_incoming_refused
- proxy_action
- proxy_initiate
- proxy_retry
- proxy_retry_new_dest
- proxy_retry_q_full
- proxy_retry_same_dest
- proxy_unproxy
- query_abort
- query_agg
- query_agg_abort
- query_agg_err
- query_agg_success
- query_bad_records
- query_fail
- query_long_queue_full
- query_long_running
- query_lookup_abort
- query_lookup_err
- query_lookups
- query_lookup_success
- query_reqs
- query_short_queue_full
- query_short_running
- query_success
- query_tracked
- read_dup_prole
- reaped_fds
- rw_err_ack_badnode
- rw_err_ack_internal
- rw_err_ack_nomatch
- rw_err_dup_cluster_key
- rw_err_dup_internal
- rw_err_dup_send
- rw_err_write_cluster_key
- rw_err_write_internal
- rw_err_write_send
- sindex_ucgarbage_found
- sindex_gc_locktimedout
- sindex_gc_inactivity_dur
- sindex_gc_activity_dur
- sindex_gc_list_creation_time
- sindex_gc_list_deletion_time
- sindex_gc_objects_validated
- sindex_gc_garbage_found
- stat_cluster_key_err_ack_dup_trans_reenqueue
- stat_cluster_key_err_ack_rw_trans_reenqueue
- stat_cluster_key_prole_retry
- stat_cluster_key_regular_processed
- stat_cluster_key_trans_to_proxy_retry
- stat_deleted_set_object
- stat_delete_success
- stat_duplicate_operation
- stat_evicted_objects
- stat_evicted_objects_time
- stat_evicted_set_objects
- stat_expired_objects
- stat_nsup_deletes_not_shipped
- stat_proxy_errs
- stat_proxy_reqs
- stat_proxy_reqs_xdr
- stat_proxy_success
- stat_read_errs_notfound
- stat_read_errs_other
- stat_read_reqs
- stat_read_reqs_xdr
- stat_read_success
- stat_rw_timeout
- stat_slow_trans_queue_batch_pop
- stat_slow_trans_queue_pop
- stat_slow_trans_queue_push
- stat_write_errs
- stat_write_errs_notfound
- stat_write_errs_other
- stat_write_reqs
- stat_write_reqs_xdr
- stat_write_success
- stat_xdr_pipe_miss
- stat_xdr_pipe_writes
- stat_zero_bin_records
- storage_defrag_corrupt_record
- storage_defrag_wait
- transactions
- basic_scans_succeeded
- basic_scans_failed
- aggr_scans_succeeded
- aggr_scans_failed
- udf_bg_scans_succeeded
- udf_bg_scans_failed
- udf_delete_err_others
- udf_delete_reqs
- udf_delete_success
- udf_lua_errs
- udf_query_rec_reqs
- udf_read_errs_other
- udf_read_reqs
- udf_read_success
- udf_replica_writes
- udf_scan_rec_reqs
- udf_write_err_others
- udf_write_reqs
- udf_write_success
- write_master
- write_prole
#### Aerospike Statistics [percentage]:
Meta:
- units: percent (out of 100)
Measurement names:
- free_pct_disk
- free_pct_memory
# Measurements:
#### Aerospike Namespace Statistics [values]:
Meta:
- units: Integer
- tags: `namespace=<namespace>`
Measurement names:
- available_bin_names
- available_pct
- current_time
- data_used_bytes_memory
- index_used_bytes_memory
- master_objects
- max_evicted_ttl
- max_void_time
- non_expirable_objects
- objects
- prole_objects
- sindex_used_bytes_memory
- total_bytes_disk
- total_bytes_memory
- used_bytes_disk
- used_bytes_memory
#### Aerospike Namespace Statistics [cumulative]:
Meta:
- units: Integer
- tags: `namespace=<namespace>`
Measurement names:
- evicted_objects
- expired_objects
- set_deleted_objects
- set_evicted_objects
#### Aerospike Namespace Statistics [percentage]:
Meta:
- units: percent (out of 100)
- tags: `namespace=<namespace>`
Measurement names:
- free_pct_disk
- free_pct_memory

View File

@ -0,0 +1,335 @@
package aerospike
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/influxdb/telegraf/plugins"
"net"
"strconv"
"strings"
"sync"
)
const (
MSG_HEADER_SIZE = 8
MSG_TYPE = 1 // Info is 1
MSG_VERSION = 2
)
var (
STATISTICS_COMMAND = []byte("statistics\n")
NAMESPACES_COMMAND = []byte("namespaces\n")
)
type aerospikeMessageHeader struct {
Version uint8
Type uint8
DataLen [6]byte
}
type aerospikeMessage struct {
aerospikeMessageHeader
Data []byte
}
// Taken from aerospike-client-go/types/message.go
func (msg *aerospikeMessage) Serialize() []byte {
msg.DataLen = msgLenToBytes(int64(len(msg.Data)))
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader)
binary.Write(buf, binary.BigEndian, msg.Data[:])
return buf.Bytes()
}
type aerospikeInfoCommand struct {
msg *aerospikeMessage
}
// Taken from aerospike-client-go/info.go
func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) {
responses := make(map[string]string)
offset := int64(0)
begin := int64(0)
dataLen := int64(len(nfo.msg.Data))
// Create reusable StringBuilder for performance.
for offset < dataLen {
b := nfo.msg.Data[offset]
if b == '\t' {
name := nfo.msg.Data[begin:offset]
offset++
begin = offset
// Parse field value.
for offset < dataLen {
if nfo.msg.Data[offset] == '\n' {
break
}
offset++
}
if offset > begin {
value := nfo.msg.Data[begin:offset]
responses[string(name)] = string(value)
} else {
responses[string(name)] = ""
}
offset++
begin = offset
} else if b == '\n' {
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
offset++
begin = offset
} else {
offset++
}
}
if offset > begin {
name := nfo.msg.Data[begin:offset]
responses[string(name)] = ""
}
return responses, nil
}
type Aerospike struct {
Servers []string
}
var sampleConfig = `
# Aerospike servers to connect to (with port)
# Default: servers = ["127.0.0.1:3000"]
#
# This plugin will query all namespaces the aerospike
# server has configured and get stats for them.
servers = ["aerospike01:3000"]
`
func (a *Aerospike) SampleConfig() string {
return sampleConfig
}
func (a *Aerospike) Description() string {
return "Read stats from an aerospike server"
}
func (a *Aerospike) Gather(acc plugins.Accumulator) error {
if len(a.Servers) == 0 {
return a.gatherServer("127.0.0.1:3000", acc)
}
var wg sync.WaitGroup
var outerr error
for _, server := range a.Servers {
wg.Add(1)
go func(server string) {
defer wg.Done()
outerr = a.gatherServer(server, acc)
}(server)
}
wg.Wait()
return outerr
}
func (a *Aerospike) gatherServer(host string, acc plugins.Accumulator) error {
aerospikeInfo, err := getMap(STATISTICS_COMMAND, host)
if err != nil {
return fmt.Errorf("Aerospike info failed: %s", err)
}
readAerospikeStats(aerospikeInfo, acc, host, "")
namespaces, err := getList(NAMESPACES_COMMAND, host)
if err != nil {
return fmt.Errorf("Aerospike namespace list failed: %s", err)
}
for ix := range namespaces {
nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host)
if err != nil {
return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err)
}
readAerospikeStats(nsInfo, acc, host, namespaces[ix])
}
return nil
}
func getMap(key []byte, host string) (map[string]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
}
parsed, err := unmarshalMapInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func getList(key []byte, host string) ([]string, error) {
data, err := get(key, host)
if err != nil {
return nil, fmt.Errorf("Failed to get data: %s", err)
}
parsed, err := unmarshalListInfo(data, string(key))
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal data: %s", err)
}
return parsed, nil
}
func get(key []byte, host string) (map[string]string, error) {
var err error
var data map[string]string
asInfo := &aerospikeInfoCommand{
msg: &aerospikeMessage{
aerospikeMessageHeader: aerospikeMessageHeader{
Version: uint8(MSG_VERSION),
Type: uint8(MSG_TYPE),
DataLen: msgLenToBytes(int64(len(key))),
},
Data: key,
},
}
cmd := asInfo.msg.Serialize()
addr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return data, fmt.Errorf("Lookup failed for '%s': %s", host, err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return data, fmt.Errorf("Connection failed for '%s': %s", host, err)
}
defer conn.Close()
_, err = conn.Write(cmd)
if err != nil {
return data, fmt.Errorf("Failed to send to '%s': %s", host, err)
}
msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE))
_, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE)
if err != nil {
return data, fmt.Errorf("Failed to read header: %s", err)
}
err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader)
if err != nil {
return data, fmt.Errorf("Failed to unmarshal header: %s", err)
}
msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen)
if int64(len(asInfo.msg.Data)) != msgLen {
asInfo.msg.Data = make([]byte, msgLen)
}
_, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data))
if err != nil {
return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err)
}
data, err = asInfo.parseMultiResponse()
if err != nil {
return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err)
}
return data, err
}
func readAerospikeStats(stats map[string]string, acc plugins.Accumulator, host, namespace string) {
for key, value := range stats {
tags := map[string]string{
"host": host,
}
if namespace != "" {
tags["namespace"] = namespace
}
// We are going to ignore all string based keys
val, err := strconv.ParseInt(value, 10, 64)
if err == nil {
if strings.Contains(key, "-") {
key = strings.Replace(key, "-", "_", -1)
}
acc.Add(key, val, tags)
}
}
}
func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) {
key = strings.TrimSuffix(key, "\n")
res := map[string]string{}
v, exists := infoMap[key]
if !exists {
return res, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
for i := range values {
kv := strings.Split(values[i], "=")
if len(kv) > 1 {
res[kv[0]] = kv[1]
}
}
return res, nil
}
func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) {
key = strings.TrimSuffix(key, "\n")
v, exists := infoMap[key]
if !exists {
return []string{}, fmt.Errorf("Key '%s' missing from info", key)
}
values := strings.Split(v, ";")
return values, nil
}
func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) {
var r int
for total < length {
r, err = c.Read(buffer[total:length])
total += r
if err != nil {
break
}
}
return
}
// Taken from aerospike-client-go/types/message.go
func msgLenToBytes(DataLen int64) [6]byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(DataLen))
res := [6]byte{}
copy(res[:], b[2:])
return res
}
// Taken from aerospike-client-go/types/message.go
func msgLenFromBytes(buf [6]byte) int64 {
nbytes := append([]byte{0, 0}, buf[:]...)
DataLen := binary.BigEndian.Uint64(nbytes)
return int64(DataLen)
}
func init() {
plugins.Add("aerospike", func() plugins.Plugin {
return &Aerospike{}
})
}

View File

@ -0,0 +1,112 @@
package aerospike
import (
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"reflect"
"testing"
)
func TestAerospikeStatistics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
a := &Aerospike{
Servers: []string{testutil.GetLocalHost() + ":3000"},
}
var acc testutil.Accumulator
err := a.Gather(&acc)
require.NoError(t, err)
// Only use a few of the metrics
asMetrics := []string{
"transactions",
"stat_write_errs",
"stat_read_reqs",
"stat_write_reqs",
}
for _, metric := range asMetrics {
assert.True(t, acc.HasIntValue(metric), metric)
}
}
func TestAerospikeMsgLenFromToBytes(t *testing.T) {
var i int64 = 8
assert.True(t, i == msgLenFromBytes(msgLenToBytes(i)))
}
func TestReadAerospikeStatsNoNamespace(t *testing.T) {
// Also test for re-writing
var acc testutil.Accumulator
stats := map[string]string{
"stat-write-errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "")
for k := range stats {
if k == "stat-write-errs" {
k = "stat_write_errs"
}
assert.True(t, acc.HasMeasurement(k))
assert.True(t, acc.CheckValue(k, int64(12345)))
}
}
func TestReadAerospikeStatsNamespace(t *testing.T) {
var acc testutil.Accumulator
stats := map[string]string{
"stat_write_errs": "12345",
"stat_read_reqs": "12345",
}
readAerospikeStats(stats, &acc, "host1", "test")
tags := map[string]string{
"host": "host1",
"namespace": "test",
}
for k := range stats {
assert.True(t, acc.ValidateTaggedValue(k, int64(12345), tags) == nil)
}
}
func TestAerospikeUnmarshalList(t *testing.T) {
i := map[string]string{
"test": "one;two;three",
}
expected := []string{"one", "two", "three"}
list, err := unmarshalListInfo(i, "test2")
assert.True(t, err != nil)
list, err = unmarshalListInfo(i, "test")
assert.True(t, err == nil)
equal := true
for ix := range expected {
if list[ix] != expected[ix] {
equal = false
break
}
}
assert.True(t, equal)
}
func TestAerospikeUnmarshalMap(t *testing.T) {
i := map[string]string{
"test": "key1=value1;key2=value2",
}
expected := map[string]string{
"key1": "value1",
"key2": "value2",
}
m, err := unmarshalMapInfo(i, "test")
assert.True(t, err == nil)
assert.True(t, reflect.DeepEqual(m, expected))
}

View File

@ -1,6 +1,7 @@
package all package all
import ( import (
_ "github.com/influxdb/telegraf/plugins/aerospike"
_ "github.com/influxdb/telegraf/plugins/apache" _ "github.com/influxdb/telegraf/plugins/apache"
_ "github.com/influxdb/telegraf/plugins/bcache" _ "github.com/influxdb/telegraf/plugins/bcache"
_ "github.com/influxdb/telegraf/plugins/disque" _ "github.com/influxdb/telegraf/plugins/disque"

View File

@ -42,3 +42,8 @@ redis:
image: redis image: redis
ports: ports:
- "6379:6379" - "6379:6379"
aerospike:
image: aerospike/aerospike-server
ports:
- "3000:3000"