Ceph: represent pgmap states using tags (#2229)

* ceph: maps are already refs, no need to use a pointer

* ceph: pgmap_states are represented in a single metric "count", differenciated by tag

* Update CHANGELOG
This commit is contained in:
Jérôme Vizcaino
2017-02-01 15:47:23 +01:00
committed by Cameron Sparr
parent 97050e9669
commit c0bbde03ea
4 changed files with 228 additions and 47 deletions

View File

@@ -4,13 +4,14 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io/ioutil"
"log"
"os/exec"
"path/filepath"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
@@ -108,7 +109,7 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err)
continue
}
for tag, metrics := range *data {
for tag, metrics := range data {
acc.AddFields(measurement,
map[string]interface{}(metrics),
map[string]string{"type": s.sockType, "id": s.sockId, "collection": tag})
@@ -244,25 +245,19 @@ type taggedMetricMap map[string]metricMap
// Parses a raw JSON string into a taggedMetricMap
// Delegates the actual parsing to newTaggedMetricMap(..)
func parseDump(dump string) (*taggedMetricMap, error) {
func parseDump(dump string) (taggedMetricMap, error) {
data := make(map[string]interface{})
err := json.Unmarshal([]byte(dump), &data)
if err != nil {
return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err)
}
tmm := newTaggedMetricMap(data)
if err != nil {
return nil, fmt.Errorf("failed to tag dataset: '%v': %v", tmm, err)
}
return tmm, nil
return newTaggedMetricMap(data), nil
}
// Builds a TaggedMetricMap out of a generic string map.
// The top-level key is used as a tag and all sub-keys are flattened into metrics
func newTaggedMetricMap(data map[string]interface{}) *taggedMetricMap {
func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
tmm := make(taggedMetricMap)
for tag, datapoints := range data {
mm := make(metricMap)
@@ -271,7 +266,7 @@ func newTaggedMetricMap(data map[string]interface{}) *taggedMetricMap {
}
tmm[tag] = mm
}
return &tmm
return tmm
}
// Recursively flattens any k-v hierarchy present in data.
@@ -376,36 +371,53 @@ func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) er
return nil
}
func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error {
func extractPgmapStates(data map[string]interface{}) ([]interface{}, error) {
const key = "pgs_by_state"
pgmap, ok := data["pgmap"].(map[string]interface{})
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
return nil, fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
}
fields := make(map[string]interface{})
for key, value := range pgmap {
switch value.(type) {
case []interface{}:
if key != "pgs_by_state" {
continue
}
for _, state := range value.([]interface{}) {
state_map, ok := state.(map[string]interface{})
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state", measurement)
}
state_name, ok := state_map["state_name"].(string)
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement)
}
state_count, ok := state_map["count"].(float64)
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement)
}
fields[state_name] = state_count
}
s, ok := pgmap[key]
if !ok {
return nil, fmt.Errorf("WARNING %s - pgmap is missing the %s field", measurement, key)
}
states, ok := s.([]interface{})
if !ok {
return nil, fmt.Errorf("WARNING %s - pgmap[%s] is not a list", measurement, key)
}
return states, nil
}
func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error {
states, err := extractPgmapStates(data)
if err != nil {
return err
}
for _, state := range states {
stateMap, ok := state.(map[string]interface{})
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state", measurement)
}
stateName, ok := stateMap["state_name"].(string)
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement)
}
stateCount, ok := stateMap["count"].(float64)
if !ok {
return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement)
}
tags := map[string]string{
"state": stateName,
}
fields := map[string]interface{}{
"count": stateCount,
}
acc.AddFields("ceph_pgmap_state", fields, tags)
}
acc.AddFields("ceph_pgmap_state", fields, map[string]string{})
return nil
}