diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index 0de9cb13b..369795e7f 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -294,6 +294,7 @@ func flatten(data interface{}) []*metric { return metrics } +// exec executes the 'ceph' command with the supplied arguments, returning JSON formatted output func (c *Ceph) exec(command string) (string, error) { cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"} cmdArgs = append(cmdArgs, strings.Split(command, " ")...) @@ -317,145 +318,145 @@ func (c *Ceph) exec(command string) (string, error) { return output, nil } +// CephStatus is used to unmarshal "ceph -s" output +type CephStatus struct { + OSDMap struct { + OSDMap struct { + Epoch float64 `json:"epoch"` + NumOSDs float64 `json:"num_osds"` + NumUpOSDs float64 `json:"num_up_osds"` + NumInOSDs float64 `json:"num_in_osds"` + Full bool `json:"full"` + NearFull bool `json:"nearfull"` + NumRemappedPGs float64 `json:"num_rempapped_pgs"` + } `json:"osdmap"` + } `json:"osdmap"` + PGMap struct { + PGsByState []struct { + StateName string `json:"state_name"` + Count float64 `json:"count"` + } `json:"pgs_by_state"` + Version float64 `json:"version"` + NumPGs float64 `json:"num_pgs"` + DataBytes float64 `json:"data_bytes"` + BytesUsed float64 `json:"bytes_used"` + BytesAvail float64 `json:"bytes_avail"` + BytesTotal float64 `json:"bytes_total"` + ReadBytesSec float64 `json:"read_bytes_sec"` + WriteBytesSec float64 `json:"write_bytes_sec"` + OpPerSec float64 `json:"op_per_sec"` + } `json:"pgmap"` +} + +// decodeStatus decodes the output of 'ceph -s' func decodeStatus(acc telegraf.Accumulator, input string) error { - data := make(map[string]interface{}) - err := json.Unmarshal([]byte(input), &data) - if err != nil { + data := &CephStatus{} + if err := json.Unmarshal([]byte(input), data); err != nil { return fmt.Errorf("failed to parse json: '%s': %v", input, err) } - err = decodeStatusOsdmap(acc, data) - if err != nil { - return err + decoders := []func(telegraf.Accumulator, *CephStatus) error{ + decodeStatusOsdmap, + decodeStatusPgmap, + decodeStatusPgmapState, } - err = decodeStatusPgmap(acc, data) - if err != nil { - return err - } - - err = decodeStatusPgmapState(acc, data) - if err != nil { - return err + for _, decoder := range decoders { + if err := decoder(acc, data); err != nil { + return err + } } return nil } -func decodeStatusOsdmap(acc telegraf.Accumulator, data map[string]interface{}) error { - osdmap, ok := data["osdmap"].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) - } - fields, ok := osdmap["osdmap"].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) +// decodeStatusOsdmap decodes the OSD map portion of the output of 'ceph -s' +func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error { + fields := map[string]interface{}{ + "epoch": data.OSDMap.OSDMap.Epoch, + "num_osds": data.OSDMap.OSDMap.NumOSDs, + "num_up_osds": data.OSDMap.OSDMap.NumUpOSDs, + "num_in_osds": data.OSDMap.OSDMap.NumInOSDs, + "full": data.OSDMap.OSDMap.Full, + "nearfull": data.OSDMap.OSDMap.NearFull, + "num_rempapped_pgs": data.OSDMap.OSDMap.NumRemappedPGs, } acc.AddFields("ceph_osdmap", fields, map[string]string{}) return nil } -func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) error { - pgmap, ok := data["pgmap"].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) - } - fields := make(map[string]interface{}) - for key, value := range pgmap { - switch value.(type) { - case float64: - fields[key] = value - } +// decodeStatusPgmap decodes the PG map portion of the output of 'ceph -s' +func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error { + fields := map[string]interface{}{ + "version": data.PGMap.Version, + "num_pgs": data.PGMap.NumPGs, + "data_bytes": data.PGMap.DataBytes, + "bytes_used": data.PGMap.BytesUsed, + "bytes_avail": data.PGMap.BytesAvail, + "bytes_total": data.PGMap.BytesTotal, + "read_bytes_sec": data.PGMap.ReadBytesSec, + "write_bytes_sec": data.PGMap.WriteBytesSec, + "op_per_sec": data.PGMap.OpPerSec, } acc.AddFields("ceph_pgmap", fields, map[string]string{}) return nil } -func extractPgmapStates(data map[string]interface{}) ([]interface{}, error) { - const key = "pgs_by_state" - - pgmap, ok := data["pgmap"].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) - } - - s, ok := pgmap[key] - if !ok { - return nil, fmt.Errorf("WARNING %s - pgmap is missing the %s field", measurement, key) - } - - states, ok := s.([]interface{}) - if !ok { - return nil, fmt.Errorf("WARNING %s - pgmap[%s] is not a list", measurement, key) - } - return states, nil -} - -func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error { - states, err := extractPgmapStates(data) - if err != nil { - return err - } - for _, state := range states { - stateMap, ok := state.(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode pg state", measurement) - } - stateName, ok := stateMap["state_name"].(string) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement) - } - stateCount, ok := stateMap["count"].(float64) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement) - } - +// decodeStatusPgmapState decodes the PG map state portion of the output of 'ceph -s' +func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error { + for _, pgState := range data.PGMap.PGsByState { tags := map[string]string{ - "state": stateName, + "state": pgState.StateName, } fields := map[string]interface{}{ - "count": stateCount, + "count": pgState.Count, } acc.AddFields("ceph_pgmap_state", fields, tags) } return nil } +// CephDF is used to unmarshal 'ceph df' output +type CephDf struct { + Stats struct { + TotalSpace float64 `json:"total_space"` + TotalUsed float64 `json:"total_used"` + TotalAvail float64 `json:"total_avail"` + } `json:"stats"` + Pools []struct { + Name string `json:"name"` + Stats struct { + KBUsed float64 `json:"kb_used"` + BytesUsed float64 `json:"bytes_used"` + Objects float64 `json:"objects"` + } `json:"stats"` + } `json:"pools"` +} + +// decodeDf decodes the output of 'ceph df' func decodeDf(acc telegraf.Accumulator, input string) error { - data := make(map[string]interface{}) - err := json.Unmarshal([]byte(input), &data) - if err != nil { + data := &CephDf{} + if err := json.Unmarshal([]byte(input), data); err != nil { return fmt.Errorf("failed to parse json: '%s': %v", input, err) } // ceph.usage: records global utilization and number of objects - stats_fields, ok := data["stats"].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode df stats", measurement) + fields := map[string]interface{}{ + "total_space": data.Stats.TotalSpace, + "total_used": data.Stats.TotalUsed, + "total_avail": data.Stats.TotalAvail, } - acc.AddFields("ceph_usage", stats_fields, map[string]string{}) + acc.AddFields("ceph_usage", fields, map[string]string{}) // ceph.pool.usage: records per pool utilization and number of objects - pools, ok := data["pools"].([]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode df pools", measurement) - } - - for _, pool := range pools { - pool_map, ok := pool.(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode df pool", measurement) - } - pool_name, ok := pool_map["name"].(string) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode df pool name", measurement) - } - fields, ok := pool_map["stats"].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode df pool stats", measurement) - } + for _, pool := range data.Pools { tags := map[string]string{ - "name": pool_name, + "name": pool.Name, + } + fields := map[string]interface{}{ + "kb_used": pool.Stats.KBUsed, + "bytes_used": pool.Stats.BytesUsed, + "objects": pool.Stats.Objects, } acc.AddFields("ceph_pool_usage", fields, tags) } @@ -463,36 +464,40 @@ func decodeDf(acc telegraf.Accumulator, input string) error { return nil } +// CephOSDPoolStats is used to unmarshal 'ceph osd pool stats' output +type CephOSDPoolStats []struct { + PoolName string `json:"pool_name"` + ClientIORate struct { + ReadBytesSec float64 `json:"read_bytes_sec"` + WriteBytesSec float64 `json:"write_bytes_sec"` + OpPerSec float64 `json:"op_per_sec"` + } `json:"client_io_rate"` + RecoveryRate struct { + RecoveringObjectsPerSec float64 `json:"recovering_objects_per_sec"` + RecoveringBytesPerSec float64 `json:"recovering_bytes_per_sec"` + RecoveringKeysPerSec float64 `json:"recovering_keys_per_sec"` + } `json:"recovery_rate"` +} + +// decodeOsdPoolStats decodes the output of 'ceph osd pool stats' func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { - data := make([]map[string]interface{}, 0) - err := json.Unmarshal([]byte(input), &data) - if err != nil { + data := CephOSDPoolStats{} + if err := json.Unmarshal([]byte(input), &data); err != nil { return fmt.Errorf("failed to parse json: '%s': %v", input, err) } // ceph.pool.stats: records pre pool IO and recovery throughput for _, pool := range data { - pool_name, ok := pool["pool_name"].(string) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode osd pool stats name", measurement) - } - // Note: the 'recovery' object looks broken (in hammer), so it's omitted - objects := []string{ - "client_io_rate", - "recovery_rate", - } - fields := make(map[string]interface{}) - for _, object := range objects { - perfdata, ok := pool[object].(map[string]interface{}) - if !ok { - return fmt.Errorf("WARNING %s - unable to decode osd pool stats", measurement) - } - for key, value := range perfdata { - fields[key] = value - } - } tags := map[string]string{ - "name": pool_name, + "name": pool.PoolName, + } + fields := map[string]interface{}{ + "read_bytes_sec": pool.ClientIORate.ReadBytesSec, + "write_bytes_sec": pool.ClientIORate.WriteBytesSec, + "op_per_sec": pool.ClientIORate.OpPerSec, + "recovering_objects_per_sec": pool.RecoveryRate.RecoveringObjectsPerSec, + "recovering_bytes_per_sec": pool.RecoveryRate.RecoveringBytesPerSec, + "recovering_keys_per_sec": pool.RecoveryRate.RecoveringKeysPerSec, } acc.AddFields("ceph_pool_stats", fields, tags) } diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go index f4a3ebb83..9f3ded529 100644 --- a/plugins/inputs/ceph/ceph_test.go +++ b/plugins/inputs/ceph/ceph_test.go @@ -1,7 +1,6 @@ package ceph import ( - "encoding/json" "fmt" "io/ioutil" "os" @@ -18,6 +17,12 @@ const ( epsilon = float64(0.00000001) ) +type expectedResult struct { + metric string + fields map[string]interface{} + tags map[string]string +} + func TestParseSockId(t *testing.T) { s := parseSockId(sockFile(osdPrefix, 1), osdPrefix, sockSuffix) assert.Equal(t, s, "1") @@ -37,26 +42,33 @@ func TestParseOsdDump(t *testing.T) { assert.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"]) } -func TestDecodeStatusPgmapState(t *testing.T) { - data := make(map[string]interface{}) - err := json.Unmarshal([]byte(clusterStatusDump), &data) - assert.NoError(t, err) - +func TestDecodeStatus(t *testing.T) { acc := &testutil.Accumulator{} - err = decodeStatusPgmapState(acc, data) + err := decodeStatus(acc, clusterStatusDump) assert.NoError(t, err) - var results = []struct { - fields map[string]interface{} - tags map[string]string - }{ - {map[string]interface{}{"count": float64(2560)}, map[string]string{"state": "active+clean"}}, - {map[string]interface{}{"count": float64(10)}, map[string]string{"state": "active+scrubbing"}}, - {map[string]interface{}{"count": float64(5)}, map[string]string{"state": "active+backfilling"}}, + for _, r := range cephStatusResults { + acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags) } +} - for _, r := range results { - acc.AssertContainsTaggedFields(t, "ceph_pgmap_state", r.fields, r.tags) +func TestDecodeDf(t *testing.T) { + acc := &testutil.Accumulator{} + err := decodeDf(acc, cephDFDump) + assert.NoError(t, err) + + for _, r := range cephDfResults { + acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags) + } +} + +func TestDecodeOSDPoolStats(t *testing.T) { + acc := &testutil.Accumulator{} + err := decodeOsdPoolStats(acc, cephODSPoolStatsDump) + assert.NoError(t, err) + + for _, r := range cephOSDPoolStatsResults { + acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags) } } @@ -834,3 +846,203 @@ var clusterStatusDump = ` } } ` + +var cephStatusResults = []expectedResult{ + { + metric: "ceph_osdmap", + fields: map[string]interface{}{ + "epoch": float64(21734), + "num_osds": float64(24), + "num_up_osds": float64(24), + "num_in_osds": float64(24), + "full": false, + "nearfull": false, + "num_rempapped_pgs": float64(0), + }, + tags: map[string]string{}, + }, + { + metric: "ceph_pgmap", + fields: map[string]interface{}{ + "version": float64(52314277), + "num_pgs": float64(2560), + "data_bytes": float64(2700031960713), + "bytes_used": float64(7478347665408), + "bytes_avail": float64(9857462382592), + "bytes_total": float64(17335810048000), + "read_bytes_sec": float64(0), + "write_bytes_sec": float64(367217), + "op_per_sec": float64(98), + }, + tags: map[string]string{}, + }, + { + metric: "ceph_pgmap_state", + fields: map[string]interface{}{ + "count": float64(2560), + }, + tags: map[string]string{ + "state": "active+clean", + }, + }, + { + metric: "ceph_pgmap_state", + fields: map[string]interface{}{ + "count": float64(10), + }, + tags: map[string]string{ + "state": "active+scrubbing", + }, + }, + { + metric: "ceph_pgmap_state", + fields: map[string]interface{}{ + "count": float64(5), + }, + tags: map[string]string{ + "state": "active+backfilling", + }, + }, +} + +var cephDFDump = ` +{ "stats": { "total_space": 472345880, + "total_used": 71058504, + "total_avail": 377286864}, + "pools": [ + { "name": "data", + "id": 0, + "stats": { "kb_used": 0, + "bytes_used": 0, + "objects": 0}}, + { "name": "metadata", + "id": 1, + "stats": { "kb_used": 25, + "bytes_used": 25052, + "objects": 53}}, + { "name": "rbd", + "id": 2, + "stats": { "kb_used": 0, + "bytes_used": 0, + "objects": 0}}, + { "name": "test", + "id": 3, + "stats": { "kb_used": 55476, + "bytes_used": 56806602, + "objects": 1}}]}` + +var cephDfResults = []expectedResult{ + { + metric: "ceph_usage", + fields: map[string]interface{}{ + "total_space": float64(472345880), + "total_used": float64(71058504), + "total_avail": float64(377286864), + }, + tags: map[string]string{}, + }, + { + metric: "ceph_pool_usage", + fields: map[string]interface{}{ + "kb_used": float64(0), + "bytes_used": float64(0), + "objects": float64(0), + }, + tags: map[string]string{ + "name": "data", + }, + }, + { + metric: "ceph_pool_usage", + fields: map[string]interface{}{ + "kb_used": float64(25), + "bytes_used": float64(25052), + "objects": float64(53), + }, + tags: map[string]string{ + "name": "metadata", + }, + }, + { + metric: "ceph_pool_usage", + fields: map[string]interface{}{ + "kb_used": float64(0), + "bytes_used": float64(0), + "objects": float64(0), + }, + tags: map[string]string{ + "name": "rbd", + }, + }, + { + metric: "ceph_pool_usage", + fields: map[string]interface{}{ + "kb_used": float64(55476), + "bytes_used": float64(56806602), + "objects": float64(1), + }, + tags: map[string]string{ + "name": "test", + }, + }, +} + +var cephODSPoolStatsDump = ` +[ + { "pool_name": "data", + "pool_id": 0, + "recovery": {}, + "recovery_rate": {}, + "client_io_rate": {}}, + { "pool_name": "metadata", + "pool_id": 1, + "recovery": {}, + "recovery_rate": {}, + "client_io_rate": {}}, + { "pool_name": "rbd", + "pool_id": 2, + "recovery": {}, + "recovery_rate": {}, + "client_io_rate": {}}, + { "pool_name": "pbench", + "pool_id": 3, + "recovery": { "degraded_objects": 18446744073709551562, + "degraded_total": 412, + "degrated_ratio": "-13.107"}, + "recovery_rate": { "recovering_objects_per_sec": 279, + "recovering_bytes_per_sec": 176401059, + "recovering_keys_per_sec": 0}, + "client_io_rate": { "read_bytes_sec": 10566067, + "write_bytes_sec": 15165220376, + "op_per_sec": 9828}}]` + +var cephOSDPoolStatsResults = []expectedResult{ + { + metric: "ceph_pool_stats", + fields: map[string]interface{}{ + "read_bytes_sec": float64(0), + "write_bytes_sec": float64(0), + "op_per_sec": float64(0), + "recovering_objects_per_sec": float64(0), + "recovering_bytes_per_sec": float64(0), + "recovering_keys_per_sec": float64(0), + }, + tags: map[string]string{ + "name": "data", + }, + }, + { + metric: "ceph_pool_stats", + fields: map[string]interface{}{ + "read_bytes_sec": float64(10566067), + "write_bytes_sec": float64(15165220376), + "op_per_sec": float64(9828), + "recovering_objects_per_sec": float64(279), + "recovering_bytes_per_sec": float64(176401059), + "recovering_keys_per_sec": float64(0), + }, + tags: map[string]string{ + "name": "pbench", + }, + }, +}