From c5f200917a4b6244d6e5ee05be7cd9d827c808cc Mon Sep 17 00:00:00 2001 From: Tait Clarridge Date: Wed, 21 Oct 2015 15:05:14 -0400 Subject: [PATCH] Add aerospike plugin support - Does not use the aerospike client, but sends the stats command using the aerospike required format - Queries available namespaces and gets stats for all of them closes #300 --- plugins/aerospike/README.md | 265 ++++++++++++++++++++++ plugins/aerospike/aerospike.go | 335 ++++++++++++++++++++++++++++ plugins/aerospike/aerospike_test.go | 112 ++++++++++ plugins/all/all.go | 1 + scripts/docker-compose.yml | 5 + 5 files changed, 718 insertions(+) create mode 100644 plugins/aerospike/README.md create mode 100644 plugins/aerospike/aerospike.go create mode 100644 plugins/aerospike/aerospike_test.go diff --git a/plugins/aerospike/README.md b/plugins/aerospike/README.md new file mode 100644 index 000000000..6fb6bb189 --- /dev/null +++ b/plugins/aerospike/README.md @@ -0,0 +1,265 @@ +## Telegraf Plugin: Aerospike + +#### Plugin arguments: +- **servers** string array: List of aerospike servers to query (def: 127.0.0.1:3000) + +#### Description + +The aerospike plugin queries aerospike server(s) and get node statistics. It also collects stats for +all the configured namespaces. + +For what the measurements mean, please consult the [Aerospike Metrics Reference Docs](http://www.aerospike.com/docs/reference/metrics). + +The metric names, to make it less complicated in querying, have replaced all `-` with `_` as Aerospike metrics come in both forms (no idea why). + +# Measurements: +#### Aerospike Statistics [values]: + +Meta: +- units: Integer + +Measurement names: +- batch_index_queue +- batch_index_unused_buffers +- batch_queue +- batch_tree_count +- client_connections +- data_used_bytes_memory +- index_used_bytes_memory +- info_queue +- migrate_progress_recv +- migrate_progress_send +- migrate_rx_objs +- migrate_tx_objs +- objects +- ongoing_write_reqs +- partition_absent +- partition_actual +- partition_desync +- partition_object_count +- partition_ref_count +- partition_replica +- proxy_in_progress +- query_agg_avg_rec_count +- query_avg_rec_count +- query_lookup_avg_rec_count +- queue +- record_locks +- record_refs +- sindex_used_bytes_memory +- sindex_gc_garbage_cleaned +- system_free_mem_pct +- total_bytes_disk +- total_bytes_memory +- tree_count +- scans_active +- uptime +- used_bytes_disk +- used_bytes_memory +- cluster_size +- waiting_transactions + +#### Aerospike Statistics [cumulative]: + +Meta: +- units: Integer + +Measurement names: +- batch_errors +- batch_index_complete +- batch_index_errors +- batch_index_initiate +- batch_index_timeout +- batch_initiate +- batch_timeout +- err_duplicate_proxy_request +- err_out_of_space +- err_replica_non_null_node +- err_replica_null_node +- err_rw_cant_put_unique +- err_rw_pending_limit +- err_rw_request_not_found +- err_storage_queue_full +- err_sync_copy_null_master +- err_sync_copy_null_node +- err_tsvc_requests +- err_write_fail_bin_exists +- err_write_fail_generation +- err_write_fail_generation_xdr +- err_write_fail_incompatible_type +- err_write_fail_key_exists +- err_write_fail_key_mismatch +- err_write_fail_not_found +- err_write_fail_noxdr +- err_write_fail_parameter +- err_write_fail_prole_delete +- err_write_fail_prole_generation +- err_write_fail_prole_unknown +- err_write_fail_unknown +- fabric_msgs_rcvd +- fabric_msgs_sent +- heartbeat_received_foreign +- heartbeat_received_self +- migrate_msgs_recv +- migrate_msgs_sent +- migrate_num_incoming_accepted +- migrate_num_incoming_refused +- proxy_action +- proxy_initiate +- proxy_retry +- proxy_retry_new_dest +- proxy_retry_q_full +- proxy_retry_same_dest +- proxy_unproxy +- query_abort +- query_agg +- query_agg_abort +- query_agg_err +- query_agg_success +- query_bad_records +- query_fail +- query_long_queue_full +- query_long_running +- query_lookup_abort +- query_lookup_err +- query_lookups +- query_lookup_success +- query_reqs +- query_short_queue_full +- query_short_running +- query_success +- query_tracked +- read_dup_prole +- reaped_fds +- rw_err_ack_badnode +- rw_err_ack_internal +- rw_err_ack_nomatch +- rw_err_dup_cluster_key +- rw_err_dup_internal +- rw_err_dup_send +- rw_err_write_cluster_key +- rw_err_write_internal +- rw_err_write_send +- sindex_ucgarbage_found +- sindex_gc_locktimedout +- sindex_gc_inactivity_dur +- sindex_gc_activity_dur +- sindex_gc_list_creation_time +- sindex_gc_list_deletion_time +- sindex_gc_objects_validated +- sindex_gc_garbage_found +- stat_cluster_key_err_ack_dup_trans_reenqueue +- stat_cluster_key_err_ack_rw_trans_reenqueue +- stat_cluster_key_prole_retry +- stat_cluster_key_regular_processed +- stat_cluster_key_trans_to_proxy_retry +- stat_deleted_set_object +- stat_delete_success +- stat_duplicate_operation +- stat_evicted_objects +- stat_evicted_objects_time +- stat_evicted_set_objects +- stat_expired_objects +- stat_nsup_deletes_not_shipped +- stat_proxy_errs +- stat_proxy_reqs +- stat_proxy_reqs_xdr +- stat_proxy_success +- stat_read_errs_notfound +- stat_read_errs_other +- stat_read_reqs +- stat_read_reqs_xdr +- stat_read_success +- stat_rw_timeout +- stat_slow_trans_queue_batch_pop +- stat_slow_trans_queue_pop +- stat_slow_trans_queue_push +- stat_write_errs +- stat_write_errs_notfound +- stat_write_errs_other +- stat_write_reqs +- stat_write_reqs_xdr +- stat_write_success +- stat_xdr_pipe_miss +- stat_xdr_pipe_writes +- stat_zero_bin_records +- storage_defrag_corrupt_record +- storage_defrag_wait +- transactions +- basic_scans_succeeded +- basic_scans_failed +- aggr_scans_succeeded +- aggr_scans_failed +- udf_bg_scans_succeeded +- udf_bg_scans_failed +- udf_delete_err_others +- udf_delete_reqs +- udf_delete_success +- udf_lua_errs +- udf_query_rec_reqs +- udf_read_errs_other +- udf_read_reqs +- udf_read_success +- udf_replica_writes +- udf_scan_rec_reqs +- udf_write_err_others +- udf_write_reqs +- udf_write_success +- write_master +- write_prole + +#### Aerospike Statistics [percentage]: + +Meta: +- units: percent (out of 100) + +Measurement names: +- free_pct_disk +- free_pct_memory + +# Measurements: +#### Aerospike Namespace Statistics [values]: + +Meta: +- units: Integer +- tags: `namespace=` + +Measurement names: +- available_bin_names +- available_pct +- current_time +- data_used_bytes_memory +- index_used_bytes_memory +- master_objects +- max_evicted_ttl +- max_void_time +- non_expirable_objects +- objects +- prole_objects +- sindex_used_bytes_memory +- total_bytes_disk +- total_bytes_memory +- used_bytes_disk +- used_bytes_memory + +#### Aerospike Namespace Statistics [cumulative]: + +Meta: +- units: Integer +- tags: `namespace=` + +Measurement names: +- evicted_objects +- expired_objects +- set_deleted_objects +- set_evicted_objects + +#### Aerospike Namespace Statistics [percentage]: + +Meta: +- units: percent (out of 100) +- tags: `namespace=` + +Measurement names: +- free_pct_disk +- free_pct_memory diff --git a/plugins/aerospike/aerospike.go b/plugins/aerospike/aerospike.go new file mode 100644 index 000000000..128c583bb --- /dev/null +++ b/plugins/aerospike/aerospike.go @@ -0,0 +1,335 @@ +package aerospike + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/influxdb/telegraf/plugins" + "net" + "strconv" + "strings" + "sync" +) + +const ( + MSG_HEADER_SIZE = 8 + MSG_TYPE = 1 // Info is 1 + MSG_VERSION = 2 +) + +var ( + STATISTICS_COMMAND = []byte("statistics\n") + NAMESPACES_COMMAND = []byte("namespaces\n") +) + +type aerospikeMessageHeader struct { + Version uint8 + Type uint8 + DataLen [6]byte +} + +type aerospikeMessage struct { + aerospikeMessageHeader + Data []byte +} + +// Taken from aerospike-client-go/types/message.go +func (msg *aerospikeMessage) Serialize() []byte { + msg.DataLen = msgLenToBytes(int64(len(msg.Data))) + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, msg.aerospikeMessageHeader) + binary.Write(buf, binary.BigEndian, msg.Data[:]) + return buf.Bytes() +} + +type aerospikeInfoCommand struct { + msg *aerospikeMessage +} + +// Taken from aerospike-client-go/info.go +func (nfo *aerospikeInfoCommand) parseMultiResponse() (map[string]string, error) { + responses := make(map[string]string) + offset := int64(0) + begin := int64(0) + + dataLen := int64(len(nfo.msg.Data)) + + // Create reusable StringBuilder for performance. + for offset < dataLen { + b := nfo.msg.Data[offset] + + if b == '\t' { + name := nfo.msg.Data[begin:offset] + offset++ + begin = offset + + // Parse field value. + for offset < dataLen { + if nfo.msg.Data[offset] == '\n' { + break + } + offset++ + } + + if offset > begin { + value := nfo.msg.Data[begin:offset] + responses[string(name)] = string(value) + } else { + responses[string(name)] = "" + } + offset++ + begin = offset + } else if b == '\n' { + if offset > begin { + name := nfo.msg.Data[begin:offset] + responses[string(name)] = "" + } + offset++ + begin = offset + } else { + offset++ + } + } + + if offset > begin { + name := nfo.msg.Data[begin:offset] + responses[string(name)] = "" + } + return responses, nil +} + +type Aerospike struct { + Servers []string +} + +var sampleConfig = ` + # Aerospike servers to connect to (with port) + # Default: servers = ["127.0.0.1:3000"] + # + # This plugin will query all namespaces the aerospike + # server has configured and get stats for them. + servers = ["aerospike01:3000"] + ` + +func (a *Aerospike) SampleConfig() string { + return sampleConfig +} + +func (a *Aerospike) Description() string { + return "Read stats from an aerospike server" +} + +func (a *Aerospike) Gather(acc plugins.Accumulator) error { + if len(a.Servers) == 0 { + return a.gatherServer("127.0.0.1:3000", acc) + } + + var wg sync.WaitGroup + + var outerr error + + for _, server := range a.Servers { + wg.Add(1) + go func(server string) { + defer wg.Done() + outerr = a.gatherServer(server, acc) + }(server) + } + + wg.Wait() + return outerr +} + +func (a *Aerospike) gatherServer(host string, acc plugins.Accumulator) error { + aerospikeInfo, err := getMap(STATISTICS_COMMAND, host) + if err != nil { + return fmt.Errorf("Aerospike info failed: %s", err) + } + readAerospikeStats(aerospikeInfo, acc, host, "") + namespaces, err := getList(NAMESPACES_COMMAND, host) + if err != nil { + return fmt.Errorf("Aerospike namespace list failed: %s", err) + } + for ix := range namespaces { + nsInfo, err := getMap([]byte("namespace/"+namespaces[ix]+"\n"), host) + if err != nil { + return fmt.Errorf("Aerospike namespace '%s' query failed: %s", namespaces[ix], err) + } + readAerospikeStats(nsInfo, acc, host, namespaces[ix]) + } + return nil +} + +func getMap(key []byte, host string) (map[string]string, error) { + data, err := get(key, host) + if err != nil { + return nil, fmt.Errorf("Failed to get data: %s", err) + } + parsed, err := unmarshalMapInfo(data, string(key)) + if err != nil { + return nil, fmt.Errorf("Failed to unmarshal data: %s", err) + } + + return parsed, nil +} + +func getList(key []byte, host string) ([]string, error) { + data, err := get(key, host) + if err != nil { + return nil, fmt.Errorf("Failed to get data: %s", err) + } + parsed, err := unmarshalListInfo(data, string(key)) + if err != nil { + return nil, fmt.Errorf("Failed to unmarshal data: %s", err) + } + + return parsed, nil +} + +func get(key []byte, host string) (map[string]string, error) { + var err error + var data map[string]string + + asInfo := &aerospikeInfoCommand{ + msg: &aerospikeMessage{ + aerospikeMessageHeader: aerospikeMessageHeader{ + Version: uint8(MSG_VERSION), + Type: uint8(MSG_TYPE), + DataLen: msgLenToBytes(int64(len(key))), + }, + Data: key, + }, + } + + cmd := asInfo.msg.Serialize() + addr, err := net.ResolveTCPAddr("tcp", host) + if err != nil { + return data, fmt.Errorf("Lookup failed for '%s': %s", host, err) + } + + conn, err := net.DialTCP("tcp", nil, addr) + if err != nil { + return data, fmt.Errorf("Connection failed for '%s': %s", host, err) + } + defer conn.Close() + + _, err = conn.Write(cmd) + if err != nil { + return data, fmt.Errorf("Failed to send to '%s': %s", host, err) + } + + msgHeader := bytes.NewBuffer(make([]byte, MSG_HEADER_SIZE)) + _, err = readLenFromConn(conn, msgHeader.Bytes(), MSG_HEADER_SIZE) + if err != nil { + return data, fmt.Errorf("Failed to read header: %s", err) + } + err = binary.Read(msgHeader, binary.BigEndian, &asInfo.msg.aerospikeMessageHeader) + if err != nil { + return data, fmt.Errorf("Failed to unmarshal header: %s", err) + } + + msgLen := msgLenFromBytes(asInfo.msg.aerospikeMessageHeader.DataLen) + + if int64(len(asInfo.msg.Data)) != msgLen { + asInfo.msg.Data = make([]byte, msgLen) + } + + _, err = readLenFromConn(conn, asInfo.msg.Data, len(asInfo.msg.Data)) + if err != nil { + return data, fmt.Errorf("Failed to read from connection to '%s': %s", host, err) + } + + data, err = asInfo.parseMultiResponse() + if err != nil { + return data, fmt.Errorf("Failed to parse response from '%s': %s", host, err) + } + + return data, err +} + +func readAerospikeStats(stats map[string]string, acc plugins.Accumulator, host, namespace string) { + for key, value := range stats { + tags := map[string]string{ + "host": host, + } + + if namespace != "" { + tags["namespace"] = namespace + } + + // We are going to ignore all string based keys + val, err := strconv.ParseInt(value, 10, 64) + if err == nil { + if strings.Contains(key, "-") { + key = strings.Replace(key, "-", "_", -1) + } + acc.Add(key, val, tags) + } + } +} + +func unmarshalMapInfo(infoMap map[string]string, key string) (map[string]string, error) { + key = strings.TrimSuffix(key, "\n") + res := map[string]string{} + + v, exists := infoMap[key] + if !exists { + return res, fmt.Errorf("Key '%s' missing from info", key) + } + + values := strings.Split(v, ";") + for i := range values { + kv := strings.Split(values[i], "=") + if len(kv) > 1 { + res[kv[0]] = kv[1] + } + } + + return res, nil +} + +func unmarshalListInfo(infoMap map[string]string, key string) ([]string, error) { + key = strings.TrimSuffix(key, "\n") + + v, exists := infoMap[key] + if !exists { + return []string{}, fmt.Errorf("Key '%s' missing from info", key) + } + + values := strings.Split(v, ";") + return values, nil +} + +func readLenFromConn(c net.Conn, buffer []byte, length int) (total int, err error) { + var r int + for total < length { + r, err = c.Read(buffer[total:length]) + total += r + if err != nil { + break + } + } + return +} + +// Taken from aerospike-client-go/types/message.go +func msgLenToBytes(DataLen int64) [6]byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(DataLen)) + res := [6]byte{} + copy(res[:], b[2:]) + return res +} + +// Taken from aerospike-client-go/types/message.go +func msgLenFromBytes(buf [6]byte) int64 { + nbytes := append([]byte{0, 0}, buf[:]...) + DataLen := binary.BigEndian.Uint64(nbytes) + return int64(DataLen) +} + +func init() { + plugins.Add("aerospike", func() plugins.Plugin { + return &Aerospike{} + }) +} diff --git a/plugins/aerospike/aerospike_test.go b/plugins/aerospike/aerospike_test.go new file mode 100644 index 000000000..badd305a2 --- /dev/null +++ b/plugins/aerospike/aerospike_test.go @@ -0,0 +1,112 @@ +package aerospike + +import ( + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "reflect" + "testing" +) + +func TestAerospikeStatistics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + a := &Aerospike{ + Servers: []string{testutil.GetLocalHost() + ":3000"}, + } + + var acc testutil.Accumulator + + err := a.Gather(&acc) + require.NoError(t, err) + + // Only use a few of the metrics + asMetrics := []string{ + "transactions", + "stat_write_errs", + "stat_read_reqs", + "stat_write_reqs", + } + + for _, metric := range asMetrics { + assert.True(t, acc.HasIntValue(metric), metric) + } + +} + +func TestAerospikeMsgLenFromToBytes(t *testing.T) { + var i int64 = 8 + assert.True(t, i == msgLenFromBytes(msgLenToBytes(i))) +} + +func TestReadAerospikeStatsNoNamespace(t *testing.T) { + // Also test for re-writing + var acc testutil.Accumulator + stats := map[string]string{ + "stat-write-errs": "12345", + "stat_read_reqs": "12345", + } + readAerospikeStats(stats, &acc, "host1", "") + for k := range stats { + if k == "stat-write-errs" { + k = "stat_write_errs" + } + assert.True(t, acc.HasMeasurement(k)) + assert.True(t, acc.CheckValue(k, int64(12345))) + } +} + +func TestReadAerospikeStatsNamespace(t *testing.T) { + var acc testutil.Accumulator + stats := map[string]string{ + "stat_write_errs": "12345", + "stat_read_reqs": "12345", + } + readAerospikeStats(stats, &acc, "host1", "test") + + tags := map[string]string{ + "host": "host1", + "namespace": "test", + } + for k := range stats { + assert.True(t, acc.ValidateTaggedValue(k, int64(12345), tags) == nil) + } +} + +func TestAerospikeUnmarshalList(t *testing.T) { + i := map[string]string{ + "test": "one;two;three", + } + + expected := []string{"one", "two", "three"} + + list, err := unmarshalListInfo(i, "test2") + assert.True(t, err != nil) + + list, err = unmarshalListInfo(i, "test") + assert.True(t, err == nil) + equal := true + for ix := range expected { + if list[ix] != expected[ix] { + equal = false + break + } + } + assert.True(t, equal) +} + +func TestAerospikeUnmarshalMap(t *testing.T) { + i := map[string]string{ + "test": "key1=value1;key2=value2", + } + + expected := map[string]string{ + "key1": "value1", + "key2": "value2", + } + m, err := unmarshalMapInfo(i, "test") + assert.True(t, err == nil) + assert.True(t, reflect.DeepEqual(m, expected)) +} diff --git a/plugins/all/all.go b/plugins/all/all.go index 3f4163d17..f29a4987b 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/aerospike" _ "github.com/influxdb/telegraf/plugins/apache" _ "github.com/influxdb/telegraf/plugins/bcache" _ "github.com/influxdb/telegraf/plugins/disque" diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index a04d9ff91..2fabb72a0 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -42,3 +42,8 @@ redis: image: redis ports: - "6379:6379" + +aerospike: + image: aerospike/aerospike-server + ports: + - "3000:3000"