From 698826c6325f865db06442fa976522102d7ea962 Mon Sep 17 00:00:00 2001 From: madz Date: Wed, 1 Jul 2015 10:29:01 +0800 Subject: [PATCH 1/6] ceph-metrics plugin for telegraf. :) Test to follow --- plugins/all/all.go | 1 + plugins/ceph/ceph.go | 435 ++++++++++++++++++++++++++++++++++++++ plugins/ceph/ceph_data.go | 171 +++++++++++++++ plugins/ceph/ceph_test.go | 106 ++++++++++ 4 files changed, 713 insertions(+) create mode 100644 plugins/ceph/ceph.go create mode 100644 plugins/ceph/ceph_data.go create mode 100644 plugins/ceph/ceph_test.go diff --git a/plugins/all/all.go b/plugins/all/all.go index 8acedb33f..0b0f6236d 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + "github.com/influxdb/telegraf/plugins/ceph" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" diff --git a/plugins/ceph/ceph.go b/plugins/ceph/ceph.go new file mode 100644 index 000000000..8eb816cbe --- /dev/null +++ b/plugins/ceph/ceph.go @@ -0,0 +1,435 @@ +package ceph + +import ( + "encoding/json" + "fmt" + "github.com/influxdb/telegraf/plugins" + "os/exec" + "strconv" + "strings" + "time" +) + +var sampleConfig = ` +# Gather metrics for CEPH +# Specify cluster name +#cluster="ceph" +# Specify CEPH Bin Location +#binLocation="/usr/bin/ceph" +# Specify CEPH Socket Directory +#socketDir="/var/run/ceph" +` + +type CephMetrics struct { + Cluster string + BinLocation string + SocketDir string +} + +func (_ *CephMetrics) SampleConfig() string { + return sampleConfig +} + +func (_ *CephMetrics) Description() string { + return "Reading Ceph Metrics" +} + +func (ceph *CephMetrics) Gather(acc plugins.Accumulator) error { + + if ceph.Cluster == "" { + ceph.Cluster = "ceph" + } + + if ceph.BinLocation == "" { + ceph.BinLocation = "/usr/bin/ceph" + } + + if ceph.SocketDir == "" { + ceph.SocketDir = "/var/run/ceph" + } + + ceph.gatherMetrics(acc) + return nil +} + +func init() { + plugins.Add("ceph", func() plugins.Plugin { + return &CephMetrics{} + }) +} + +func (ceph *CephMetrics) gatherMetrics(acc plugins.Accumulator) { + + var quorum QuorumStat + + out, err := exec.Command("/bin/hostname").Output() + + if err != nil { + return + } + + hostname := string(out) + + if err := ceph.cephCommand(&quorum, "quorum_status"); err != nil { + return + } + + ceph.getOSDPerf(acc) + + if strings.TrimSpace(hostname) != strings.TrimSpace(quorum.LeaderName) { + fmt.Printf("Not a leader: quorum leader %s, host %s", quorum.LeaderName, hostname) + return + } + + ceph.getCommon(acc) + ceph.getPool(acc) + ceph.getPg(acc) + ceph.getOSDDaemon(acc) + +} + +func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { + var health CephHealth + var quorum QuorumStat + var poolStatsList []PoolStats + var cephDf CephDF + var cephStatus CephStatus + + if err := ceph.cephCommand(&cephStatus, "status"); err != nil { + return + } + + if err := ceph.cephCommand(&health, "health"); err != nil { + return + } + + if err := ceph.cephCommand(&quorum, "quorum_status"); err != nil { + return + } + + if err := ceph.cephCommand(&poolStatsList, "osd", "pool", "stats"); err != nil { + return + } + + if err := ceph.cephCommand(&cephDf, "df"); err != nil { + return + } + + tags := map[string]string{"cluster": ceph.Cluster} + + //Monitors + var monitorStr string + monitors := quorum.MonitorMap.Mons + monitorValueMap := make(map[string]interface{}) + monitorValueMap["value"] = len(monitors) + + for _, value := range monitors { + monitorStr = fmt.Sprint(monitorStr, ",", value.Name) + } + monitorValueMap["name"] = monitorStr + + //Quorum Names + var memberStr string + quorum_name := quorum.QuorumName + quorumValueMap := make(map[string]interface{}) + quorumValueMap["value"] = len(quorum_name) + + for _, value := range quorum_name { + memberStr = fmt.Sprint(memberStr, ",", value) + } + quorumValueMap["members"] = memberStr + + //clientIOs + sumOps := 0 + sumWrs := 0 + for _, stat := range poolStatsList { + sumOps += stat.ClientIoRate.OpsPerSec + sumWrs += stat.ClientIoRate.WriteBytesPerSecond / 1024 + } + + // OSD Epoch + epoch := cephStatus.OSDMap.OSDMap.Epoch + acc.Add("osd_epoch", fmt.Sprintf("%d", epoch), map[string]string{"cluster": ceph.Cluster}) + acc.Add("health", health.OverallStatus, tags) + acc.Add("total_storage", cephDf.Stats.TotalBytes, tags) + acc.Add("used", cephDf.Stats.TotalUsedBytes, tags) + acc.Add("available", cephDf.Stats.TotalAvailableBytes, tags) + acc.Add("client_io_kbs", fmt.Sprintf("%d", sumWrs), tags) + acc.Add("client_io_ops", fmt.Sprintf("%d", sumOps), tags) + acc.AddValuesWithTime("monitor", monitorValueMap, tags, time.Now()) + acc.AddValuesWithTime("quorum", quorumValueMap, tags, time.Now()) +} + +func (ceph *CephMetrics) getPool(acc plugins.Accumulator) { + var cephDf CephDF + var poolStatsList []PoolStats + var pgDump PgDump + + if err := ceph.cephCommand(&poolStatsList, "osd", "pool", "stats"); err != nil { + return + } + + if err := ceph.cephCommand(&cephDf, "df"); err != nil { + return + } + + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + for _, pool := range cephDf.Pools { + poolId := pool.Id + numObjects := pool.Stats.Objects + numBytes := pool.Stats.UsedBytes + maxAvail := pool.Stats.Available + numKb := pool.Stats.UsedKb + + utilized := 0.0 + if numBytes != 0 { + utilized = (float64(numBytes) / float64(maxAvail)) * 100.0 + } + + var quota PoolQuota + + err := ceph.cephCommand("a, "osd", "pool", "get-quota", pool.Name) + if err != nil { + continue + } + maxObjects := quota.MaxObjects + maxBytes := quota.MaxBytes + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pool_objects", numObjects, tags) + acc.Add("pool_used", numBytes, tags) + acc.Add("pool_usedKb", numKb, tags) + acc.Add("pool_max_objects", maxObjects, tags) + acc.Add("pool_maxbytes", maxBytes, tags) + acc.Add("pool_utilization", utilized, tags) + } + + acc.Add("pool", fmt.Sprintf("%d", len(cephDf.Pools)), map[string]string{"cluster": ceph.Cluster}) + + for _, stat := range poolStatsList { + poolId := stat.PoolId + kbs := stat.ClientIoRate.WriteBytesPerSecond / 1024 + ops := stat.ClientIoRate.OpsPerSec + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pool_io_kbs", kbs, tags) + acc.Add("pool_io_ops", ops, tags) + } + + for _, pgPoolStat := range pgDump.PoolStats { + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", pgPoolStat.PoolId)} + for k, v := range pgPoolStat.StatSum { + acc.Add(fmt.Sprintf("pool_%s", k), fmt.Sprint(v), tags) + } + } + +} + +func (ceph *CephMetrics) getPg(acc plugins.Accumulator) { + + var cephStatus CephStatus + if err := ceph.cephCommand(&cephStatus, "status"); err != nil { + return + } + + pgMap := cephStatus.PgMap + + for _, value := range pgMap.PgByState { + tags := map[string]string{"cluster": ceph.Cluster, "state": value.Name} + acc.Add("pg_count", value.Count, tags) + } + + tags := map[string]string{"cluster": ceph.Cluster} + acc.Add("pg_map_count", pgMap.PgCount, tags) + acc.Add("pg_data_bytes", pgMap.DataBytes, tags) + acc.Add("pg_data_avail", pgMap.BytesAvail, tags) + acc.Add("pg_data_total", pgMap.BytesTotal, tags) + acc.Add("pg_data_used", pgMap.BytesUsed, tags) + + var pgDump PgDump + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + poolOsdPgMap := make(PoolOsdPgMap, len(pgDump.PoolStats)) + totalOsdPgs := make(map[int]int, len(pgDump.OsdStats)) + + for _, pgStat := range pgDump.PgStats { + poolId, _ := strconv.Atoi(strings.Split(pgStat.PgId, ".")[0]) + + osdPgMap := poolOsdPgMap[poolId] + if osdPgMap == nil { + osdPgMap = make(map[int]int, len(pgDump.OsdStats)) + poolOsdPgMap[poolId] = osdPgMap + } + + for _, osd := range pgStat.Up { + osdPgMap[osd] = osdPgMap[osd] + 1 + totalOsdPgs[osd] = totalOsdPgs[osd] + 1 + } + } + + for poolId, osdPgMap := range poolOsdPgMap { + poolPg := 0 + for osdId, pgs := range osdPgMap { + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId), "osd": fmt.Sprintf("%d", osdId)} + poolPg += pgs + acc.Add("pg_distribution", pgs, tags) + } + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pg_distribution_pool", poolPg, tags) + } + + for osd, pg := range totalOsdPgs { + tags := map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osd)} + acc.Add("pg_distribution_osd", pg, tags) + } + + clusterTag := map[string]string{"cluster": ceph.Cluster} + for k, v := range pgDump.PgStatSum.StatSum { + acc.Add(fmt.Sprintf("pg_stats_%s", k), fmt.Sprintf("%d", v), clusterTag) + } + +} + +func (ceph *CephMetrics) getOSDDaemon(acc plugins.Accumulator) { + + var osd OsdDump + var osdPerf OsdPerf + var pgDump PgDump + + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + if err := ceph.cephCommand(&osdPerf, "osd", "perf"); err != nil { + return + } + + if err := ceph.cephCommand(&osd, "osd", "dump"); err != nil { + return + } + + up := 0 + in := 0 + down := 0 + out := 0 + osds := osd.Osds + + for _, osdStat := range osds { + + if osdStat.Up == 1 { + up += 1 + } else { + down += 1 + } + + if osdStat.In == 1 { + in += 1 + } else { + out += 1 + } + } + + acc.Add("osd_count", len(osd.Osds), map[string]string{"cluster": ceph.Cluster}) + acc.Add("osd_count", in, map[string]string{"cluster": ceph.Cluster, "state": "in"}) + acc.Add("osd_count", out, map[string]string{"cluster": ceph.Cluster, "state": "out"}) + acc.Add("osd_count", up, map[string]string{"cluster": ceph.Cluster, "state": "up"}) + acc.Add("osd_count", down, map[string]string{"cluster": ceph.Cluster, "state": "down"}) + + // OSD Utilization + for _, osdStat := range pgDump.OsdStats { + osdNum := osdStat.Osd + used := float64(osdStat.UsedKb) + total := float64(osdStat.TotalKb) + utilized := (used / total) * 100.0 + + tag := map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)} + acc.Add("osd_utilization", utilized, tag) + acc.Add("osd_used", utilized, tag) + acc.Add("osd_total", total, tag) + } + + //OSD Commit and Apply Latency + for _, perf := range osdPerf.PerfInfo { + osdNum := perf.Id + commit := perf.Stats.CommitLatency + apply := perf.Stats.ApplyLatency + + acc.Add("osd_latency_commit", commit, map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)}) + acc.Add("osd_latency_apply", apply, map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)}) + } + +} + +func (ceph *CephMetrics) getOSDPerf(acc plugins.Accumulator) { + var osdPerf OsdPerfDump + + out, err := exec.Command("/bin/ls", ceph.SocketDir).Output() + if err != nil { + return + } + fileStr := string(out) + osdsArray := strings.Split(fileStr, "\n") + osds := make([]string, len(osdsArray)) + index := 0 + + for _, value := range osdsArray { + if !strings.Contains(value, "osd") { + continue + } + osds[index] = value + index++ + location := fmt.Sprint(ceph.SocketDir, "/", value) + args := []string{"--admin-daemon", location, "perf", "dump"} + if err := ceph.cephCommand(&osdPerf, args...); err != nil { + return + } + osdId := string(value[strings.LastIndex(value, ".")-1]) + tags := map[string]string{"cluster": ceph.Cluster, "osd": osdId} + osd := osdPerf.Osd + + // osd-.osd.recovery_ops ? + acc.Add("op_wip", osd.OpWip, tags) + acc.Add("op", osd.Op, tags) + acc.Add("op_in_bytes", osd.OpInBytes, tags) + acc.Add("op_out_bytes", osd.OpOutBytes, tags) + acc.Add("op_r", osd.OpRead, tags) + acc.Add("op_r_out_bytes", osd.OpReadOutBytes, tags) + acc.Add("op_w", osd.OpWrite, tags) + acc.Add("op_w_in_bytes", osd.OpWriteInBytes, tags) + acc.Add("op_rw", osd.OpReadWrite, tags) + acc.Add("op_rw_in_bytes", osd.OpReadWriteInBytes, tags) + acc.Add("op_rw_out_bytes", osd.OpReadWriteOutBytes, tags) + acc.AddValuesWithTime("op_latency", getOSDLatencyCalc(&osd.OpLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_process_latency", getOSDLatencyCalc(&osd.OpProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_r", getOSDLatencyCalc(&osd.OpReadLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_r_process_latency", getOSDLatencyCalc(&osd.OpReadProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_latency", getOSDLatencyCalc(&osd.OpWriteLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_process_latency", getOSDLatencyCalc(&osd.OpWriteProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_latency", getOSDLatencyCalc(&osd.OpReadWriteLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_process_latency", getOSDLatencyCalc(&osd.OpReadWriteProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_rlat", getOSDLatencyCalc(&osd.OpReadWriteRlat.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_rlat", getOSDLatencyCalc(&osd.OpWriteRlat.OSDLatencyCalc), tags, time.Now()) + } +} + +func getOSDLatencyCalc(osdLatency *OSDLatency) map[string]interface{} { + latencyMap := make(map[string]interface{}) + latencyMap["avgcount"] = osdLatency.AvgCount + latencyMap["sum"] = osdLatency.Sum + return latencyMap +} + +func (ceph *CephMetrics) cephCommand(v interface{}, args ...string) error { + args = append(args, "-f", "json") + out, err := exec.Command(ceph.BinLocation, args...).Output() + if err != nil { + return err + } + return json.Unmarshal(out, v) +} diff --git a/plugins/ceph/ceph_data.go b/plugins/ceph/ceph_data.go new file mode 100644 index 000000000..f89b2d924 --- /dev/null +++ b/plugins/ceph/ceph_data.go @@ -0,0 +1,171 @@ +package ceph + +type QuorumStat struct { + LeaderName string `json:"quorum_leader_name"` + QuorumName []string `json:"quorum_names"` + MonitorMap struct { + Epoch int `json:"election_epoch"` + Mons []struct { + Name string `json:"name"` + Address string `json:"addr"` + } `json:"mons"` + } `json:"monmap"` +} + +type CephHealth struct { + OverallStatus string `json:"overall_status"` +} +type CephStatus struct { + Quorum []int `json:"quorum"` + OSDMap struct { + OSDMap struct { + Epoch int `json:"epoch"` + } `json:"osdmap"` + } `json:"osdmap"` + Health struct { + OverallStatus string `json:"overall_status"` + } `json:"health"` + PgMap struct { + PgByState []struct { + Name string `json:"state_name"` + Count int `json:"count"` + } `json:"pgs_by_state"` + PgCount int `json:"num_pgs"` + DataBytes int64 `json:"data_bytes"` + BytesUsed int64 `json:"bytes_used"` + BytesAvail int64 `json:"bytes_avail"` + BytesTotal int64 `json:"bytes_total"` + } `json:"pgmap"` +} + +type CephDF struct { + Stats struct { + TotalBytes int64 `json:"total_bytes"` + TotalUsedBytes int64 `json:"total_used_bytes"` + TotalAvailableBytes int64 `json:"total_avail_bytes"` + } `json:"stats"` + Pools []struct { + Name string `json:"name"` + Id int `json:"id"` + Stats struct { + UsedKb int64 `json:"kb_used"` + UsedBytes int64 `json:"bytes_used"` + Available int64 `json:"max_avail"` + Objects int64 `json::"objects"` + } `json:"stats"` + } `json:"pools"` +} + +type PoolStats struct { + PoolName string `json:"pool_name"` + PoolId int `json:"pool_id"` + ClientIoRate struct { + WriteBytesPerSecond int `json:"write_bytes_sec"` + OpsPerSec int `json:"op_per_sec"` + } `json:"client_io_rate"` +} + +type PoolQuota struct { + PoolName string `json:"pool_name"` + PoolId int `json:"pool_id"` + MaxObjects int64 `json:"quota_max_objects"` + MaxBytes int64 `json:"quota_max_bytes"` +} + +type OsdDump struct { + Osds []struct { + OsdNum int `json:"osd"` + Uuid string `json:"uuid"` + Up int `json:"up"` + In int `json:"in"` + OsdState []string `json:"state"` + } `json:"osds"` +} + +type OsdPerf struct { + PerfInfo []struct { + Id int `json:"id"` + Stats struct { + CommitLatency int `json:"commit_latency_ms"` + ApplyLatency int `json:"apply_latency_ms"` + } `json:"perf_stats"` + } `json:"osd_perf_infos"` +} + +type PgDump struct { + PgStatSum struct { + StatSum map[string]int64 `json:"stat_sum"` + } `json:"pg_stats_sum"` + PoolStats []struct { + PoolId int `json:"poolid"` + StatSum map[string]interface{} `json:"stat_sum"` + } `json:"pool_stats"` + PgStats []struct { + PgId string `json:"pgid"` + Up []int `json:"up"` + Acting []int `json:"acting"` + UpPrimary int `json:"up_primary"` + ActingPrimary int `json:"acting_primary"` + } `json:"pg_stats"` + OsdStats []struct { + Osd int `json:"osd"` + TotalKb int64 `json:"kb"` + UsedKb int64 `json:"kb_used"` + AvailableKb int64 `json:"kb_avail"` + } `json:"osd_stats"` +} + +type OsdPerfDump struct { + Osd struct { + RecoveryOps int + OpWip int `json:"op_wip"` + Op int `json:"op"` + OpInBytes int `json:"op_in_bytes"` + OpOutBytes int `json:"op_out_bytes"` + OpRead int `json:"op_r"` + OpReadOutBytes int `json:"op_r_out_bytes"` + OpWrite int `json:"op_w"` + OpWriteInBytes int `json:"op_w_in_bytes"` + OpReadWrite int `json:"op_rw"` + OpReadWriteInBytes int `json:"op_rw_in_btyes"` + OpReadWriteOutBytes int `json:"op_rw_out_bytes"` + + OpLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_latency"` + OpProcessLatency struct { + OSDLatencyCalc OSDLatency + } + OpReadLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_r_latency"` + OpReadProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_r_process_latency"` + OpWriteRlat struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_rlat"` + OpWriteLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_latency"` + OpWriteProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_process_latency"` + OpReadWriteRlat struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_rlat"` + OpReadWriteLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_latency"` + OpReadWriteProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_process_latency"` + } `json:"osd"` +} + +type OSDLatency struct { + AvgCount int `json:"avgcount"` + Sum float64 `json:"sum"` +} + +type PoolOsdPgMap map[int]map[int]int diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go new file mode 100644 index 000000000..569529288 --- /dev/null +++ b/plugins/ceph/ceph_test.go @@ -0,0 +1,106 @@ +package ceph + +import ( + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestCephGenerateMetrics(t *testing.T) { + p := &CephMetrics{ + Cluster: "ceph", + BinLocation: "/usr/bin/ceph", + SocketDir: "/var/run/ceph", + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{ + "ceph_pool", + // "osd_count", + // "osd_utilization", + // "total_storage", + // "used", + // "available", + // "client_io_kbs", + // "client_io_ops", + // "pool_used", + // "pool_usedKb", + // "pool_maxbytes", + // "pool_utilization", + + // "osd_used", + // "osd_total", + // "osd_epoch", + + // "osd_latency_commit", + // "osd_latency_apply", + + // "op", + // "op_in_bytes", + // "op_out_bytes", + // "op_r", + // "op_r_out_byes", + // "op_w", + // "op_w_in_bytes", + // "op_rw", + // "op_rw_in_bytes", + // "op_rw_out_bytes", + + // "pool_objects", + // "pg_map_count", + // "pg_data_bytes", + // "pg_data_avail", + // "pg_data_total", + // "pg_data_used", + // "pg_distribution", + // "pg_distribution_pool", + // "pg_distribution_osd", + } + + floatMetrics := []string{ + "sum", + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric)) + } + + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatValue(metric)) + } +} + +func TestCephGenerateMetricsDefault(t *testing.T) { + p := &CephMetrics{} + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("ceph_osd_count") + require.True(t, ok) + assert.Equal(t, "ceph", point.Tags["cluster"]) + +} + +func TestCephTagsMetricsWithDifferentCluster(t *testing.T) { + p := &CephMetrics{ + Cluster: "cephTest", + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("cephTest_osd_count") + require.True(t, ok) + + assert.Equal(t, "cephTest", point.Tags["cluster"]) +} From 97b4f68d725f985e4b85092655f769eac088d7d9 Mon Sep 17 00:00:00 2001 From: madz Date: Thu, 2 Jul 2015 18:28:58 +0800 Subject: [PATCH 2/6] add fixes on the comment --- plugins/ceph/ceph.go | 54 ++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/plugins/ceph/ceph.go b/plugins/ceph/ceph.go index 8eb816cbe..e8df1ad46 100644 --- a/plugins/ceph/ceph.go +++ b/plugins/ceph/ceph.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "github.com/influxdb/telegraf/plugins" + "io/ioutil" + "os" "os/exec" "strconv" "strings" @@ -62,14 +64,12 @@ func (ceph *CephMetrics) gatherMetrics(acc plugins.Accumulator) { var quorum QuorumStat - out, err := exec.Command("/bin/hostname").Output() + hostname, err := os.Hostname() if err != nil { return } - hostname := string(out) - if err := ceph.cephCommand(&quorum, "quorum_status"); err != nil { return } @@ -77,7 +77,7 @@ func (ceph *CephMetrics) gatherMetrics(acc plugins.Accumulator) { ceph.getOSDPerf(acc) if strings.TrimSpace(hostname) != strings.TrimSpace(quorum.LeaderName) { - fmt.Printf("Not a leader: quorum leader %s, host %s", quorum.LeaderName, hostname) + fmt.Printf("Not a leader: Quorum leader %s, Host %s", quorum.LeaderName, hostname) return } @@ -118,26 +118,22 @@ func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { tags := map[string]string{"cluster": ceph.Cluster} //Monitors - var monitorStr string monitors := quorum.MonitorMap.Mons + monitorNames := make([]string, len(monitors)) monitorValueMap := make(map[string]interface{}) monitorValueMap["value"] = len(monitors) - for _, value := range monitors { - monitorStr = fmt.Sprint(monitorStr, ",", value.Name) + for i, value := range monitors { + monitorNames[i] = value.Name } - monitorValueMap["name"] = monitorStr + + monitorValueMap["name"] = strings.Join(monitorNames, ",") //Quorum Names - var memberStr string quorum_name := quorum.QuorumName quorumValueMap := make(map[string]interface{}) quorumValueMap["value"] = len(quorum_name) - - for _, value := range quorum_name { - memberStr = fmt.Sprint(memberStr, ",", value) - } - quorumValueMap["members"] = memberStr + quorumValueMap["members"] = strings.Join(quorum_name, ",") //clientIOs sumOps := 0 @@ -149,13 +145,13 @@ func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { // OSD Epoch epoch := cephStatus.OSDMap.OSDMap.Epoch - acc.Add("osd_epoch", fmt.Sprintf("%d", epoch), map[string]string{"cluster": ceph.Cluster}) + acc.Add("osd_epoch", epoch, map[string]string{"cluster": ceph.Cluster}) acc.Add("health", health.OverallStatus, tags) acc.Add("total_storage", cephDf.Stats.TotalBytes, tags) acc.Add("used", cephDf.Stats.TotalUsedBytes, tags) acc.Add("available", cephDf.Stats.TotalAvailableBytes, tags) - acc.Add("client_io_kbs", fmt.Sprintf("%d", sumWrs), tags) - acc.Add("client_io_ops", fmt.Sprintf("%d", sumOps), tags) + acc.Add("client_io_kbs", sumWrs, tags) + acc.Add("client_io_ops", sumOps, tags) acc.AddValuesWithTime("monitor", monitorValueMap, tags, time.Now()) acc.AddValuesWithTime("quorum", quorumValueMap, tags, time.Now()) } @@ -369,27 +365,27 @@ func (ceph *CephMetrics) getOSDDaemon(acc plugins.Accumulator) { func (ceph *CephMetrics) getOSDPerf(acc plugins.Accumulator) { var osdPerf OsdPerfDump - out, err := exec.Command("/bin/ls", ceph.SocketDir).Output() + osdsArray, err := ioutil.ReadDir(ceph.SocketDir) if err != nil { return } - fileStr := string(out) - osdsArray := strings.Split(fileStr, "\n") - osds := make([]string, len(osdsArray)) - index := 0 - for _, value := range osdsArray { - if !strings.Contains(value, "osd") { + for _, file := range osdsArray { + name := file.Name() + + if !strings.HasPrefix(name, ceph.Cluster+"-osd") { continue } - osds[index] = value - index++ - location := fmt.Sprint(ceph.SocketDir, "/", value) + + location := fmt.Sprint(ceph.SocketDir, "/", name) args := []string{"--admin-daemon", location, "perf", "dump"} + if err := ceph.cephCommand(&osdPerf, args...); err != nil { + fmt.Println("error ", err) return } - osdId := string(value[strings.LastIndex(value, ".")-1]) + + osdId := string(name[strings.LastIndex(name, ".")-1]) tags := map[string]string{"cluster": ceph.Cluster, "osd": osdId} osd := osdPerf.Osd @@ -426,7 +422,7 @@ func getOSDLatencyCalc(osdLatency *OSDLatency) map[string]interface{} { } func (ceph *CephMetrics) cephCommand(v interface{}, args ...string) error { - args = append(args, "-f", "json") + args = append(args, "-f", "json", "--cluster="+ceph.Cluster) out, err := exec.Command(ceph.BinLocation, args...).Output() if err != nil { return err From a8f4e1d443e902437d7e519aef20227d23b9592a Mon Sep 17 00:00:00 2001 From: madz Date: Fri, 3 Jul 2015 14:08:34 +0800 Subject: [PATCH 3/6] ceph metrics test --- plugins/ceph/ceph_test.go | 112 +++++++++++++++----------------------- 1 file changed, 45 insertions(+), 67 deletions(-) diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go index 569529288..0f27f0965 100644 --- a/plugins/ceph/ceph_test.go +++ b/plugins/ceph/ceph_test.go @@ -19,60 +19,53 @@ func TestCephGenerateMetrics(t *testing.T) { err := p.Gather(&acc) require.NoError(t, err) - intMetrics := []string{ - "ceph_pool", - // "osd_count", - // "osd_utilization", - // "total_storage", - // "used", - // "available", - // "client_io_kbs", - // "client_io_ops", - // "pool_used", - // "pool_usedKb", - // "pool_maxbytes", - // "pool_utilization", + sample := p.SampleConfig() + assert.NotNil(t, sample) + assert.Equal(t, p.Cluster, "ceph", "Same Cluster") - // "osd_used", - // "osd_total", - // "osd_epoch", - - // "osd_latency_commit", - // "osd_latency_apply", - - // "op", - // "op_in_bytes", - // "op_out_bytes", - // "op_r", - // "op_r_out_byes", - // "op_w", - // "op_w_in_bytes", - // "op_rw", - // "op_rw_in_bytes", - // "op_rw_out_bytes", - - // "pool_objects", - // "pg_map_count", - // "pg_data_bytes", - // "pg_data_avail", - // "pg_data_total", - // "pg_data_used", - // "pg_distribution", - // "pg_distribution_pool", - // "pg_distribution_osd", - } - - floatMetrics := []string{ - "sum", - } + intMetrics := []string{"pg_map_count"} + // "pg_data_avail", + // "osd_count", + // "osd_utilization", + // "total_storage", + // "used", + // "available", + // "client_io_kbs", + // "client_io_ops", + // "pool_used", + // "pool_usedKb", + // "pool_maxbytes", + // "pool_utilization", + // "osd_used", + // "osd_total", + // "osd_epoch", + // "osd_latency_commit", + // "osd_latency_apply", + // "op", + // "op_in_bytes", + // "op_out_bytes", + // "op_r", + // "op_r_out_byes", + // "op_w", + // "op_w_in_bytes", + // "op_rw", + // "op_rw_in_bytes", + // "op_rw_out_bytes", + // "pool_objects", + // "pg_map_count", + // "pg_data_bytes", + // "pg_data_avail", + // "pg_data_total", + // "pg_data_used", + // "pg_distribution", + // "pg_distribution_pool", + // "pg_distribution_osd", + // } for _, metric := range intMetrics { assert.True(t, acc.HasIntValue(metric)) } - for _, metric := range floatMetrics { - assert.True(t, acc.HasFloatValue(metric)) - } } func TestCephGenerateMetricsDefault(t *testing.T) { @@ -82,25 +75,10 @@ func TestCephGenerateMetricsDefault(t *testing.T) { err := p.Gather(&acc) require.NoError(t, err) + assert.True(t, len(acc.Points) > 0) - point, ok := acc.Get("ceph_osd_count") - require.True(t, ok) - assert.Equal(t, "ceph", point.Tags["cluster"]) + // point, ok := acc.Get("ceph_op_wip") + // require.True(t, ok) + // assert.Equal(t, "ceph", point.Tags["cluster"]) } - -func TestCephTagsMetricsWithDifferentCluster(t *testing.T) { - p := &CephMetrics{ - Cluster: "cephTest", - } - - var acc testutil.Accumulator - - err := p.Gather(&acc) - require.NoError(t, err) - - point, ok := acc.Get("cephTest_osd_count") - require.True(t, ok) - - assert.Equal(t, "cephTest", point.Tags["cluster"]) -} From 5ae390066f6083c5bd572365d3e36a703f52ab65 Mon Sep 17 00:00:00 2001 From: madz Date: Fri, 3 Jul 2015 15:30:08 +0800 Subject: [PATCH 4/6] completed test. change int to int64. --- plugins/ceph/ceph.go | 31 ++++++++------- plugins/ceph/ceph_data.go | 74 ++++++++++++++++++------------------ plugins/ceph/ceph_test.go | 80 +++++++++++++++++++-------------------- 3 files changed, 91 insertions(+), 94 deletions(-) diff --git a/plugins/ceph/ceph.go b/plugins/ceph/ceph.go index e8df1ad46..197756a8f 100644 --- a/plugins/ceph/ceph.go +++ b/plugins/ceph/ceph.go @@ -136,11 +136,11 @@ func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { quorumValueMap["members"] = strings.Join(quorum_name, ",") //clientIOs - sumOps := 0 - sumWrs := 0 + sumOps := int64(0) + sumWrs := int64(0) for _, stat := range poolStatsList { - sumOps += stat.ClientIoRate.OpsPerSec - sumWrs += stat.ClientIoRate.WriteBytesPerSecond / 1024 + sumOps += int64(stat.ClientIoRate.OpsPerSec) + sumWrs += int64(stat.ClientIoRate.WriteBytesPerSecond) / 1024 } // OSD Epoch @@ -148,8 +148,8 @@ func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { acc.Add("osd_epoch", epoch, map[string]string{"cluster": ceph.Cluster}) acc.Add("health", health.OverallStatus, tags) acc.Add("total_storage", cephDf.Stats.TotalBytes, tags) - acc.Add("used", cephDf.Stats.TotalUsedBytes, tags) - acc.Add("available", cephDf.Stats.TotalAvailableBytes, tags) + acc.Add("used_storage", cephDf.Stats.TotalUsedBytes, tags) + acc.Add("available_storage", cephDf.Stats.TotalAvailableBytes, tags) acc.Add("client_io_kbs", sumWrs, tags) acc.Add("client_io_ops", sumOps, tags) acc.AddValuesWithTime("monitor", monitorValueMap, tags, time.Now()) @@ -241,9 +241,9 @@ func (ceph *CephMetrics) getPg(acc plugins.Accumulator) { tags := map[string]string{"cluster": ceph.Cluster} acc.Add("pg_map_count", pgMap.PgCount, tags) acc.Add("pg_data_bytes", pgMap.DataBytes, tags) - acc.Add("pg_data_avail", pgMap.BytesAvail, tags) - acc.Add("pg_data_total", pgMap.BytesTotal, tags) - acc.Add("pg_data_used", pgMap.BytesUsed, tags) + acc.Add("pg_data_available_storage", pgMap.BytesAvail, tags) + acc.Add("pg_data_total_storage", pgMap.BytesTotal, tags) + acc.Add("pg_data_used_storage", pgMap.BytesUsed, tags) var pgDump PgDump if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { @@ -251,25 +251,25 @@ func (ceph *CephMetrics) getPg(acc plugins.Accumulator) { } poolOsdPgMap := make(PoolOsdPgMap, len(pgDump.PoolStats)) - totalOsdPgs := make(map[int]int, len(pgDump.OsdStats)) + totalOsdPgs := make(map[int64]int64, len(pgDump.OsdStats)) for _, pgStat := range pgDump.PgStats { - poolId, _ := strconv.Atoi(strings.Split(pgStat.PgId, ".")[0]) + poolId, _ := strconv.ParseInt(strings.Split(pgStat.PgId, ".")[0], 10, 64) osdPgMap := poolOsdPgMap[poolId] if osdPgMap == nil { - osdPgMap = make(map[int]int, len(pgDump.OsdStats)) + osdPgMap = make(map[int64]int64, len(pgDump.OsdStats)) poolOsdPgMap[poolId] = osdPgMap } for _, osd := range pgStat.Up { - osdPgMap[osd] = osdPgMap[osd] + 1 - totalOsdPgs[osd] = totalOsdPgs[osd] + 1 + osdPgMap[osd] = int64(osdPgMap[osd] + 1) + totalOsdPgs[osd] = int64(totalOsdPgs[osd] + 1) } } for poolId, osdPgMap := range poolOsdPgMap { - poolPg := 0 + poolPg := int64(0) for osdId, pgs := range osdPgMap { tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId), "osd": fmt.Sprintf("%d", osdId)} poolPg += pgs @@ -381,7 +381,6 @@ func (ceph *CephMetrics) getOSDPerf(acc plugins.Accumulator) { args := []string{"--admin-daemon", location, "perf", "dump"} if err := ceph.cephCommand(&osdPerf, args...); err != nil { - fmt.Println("error ", err) return } diff --git a/plugins/ceph/ceph_data.go b/plugins/ceph/ceph_data.go index f89b2d924..c89575bb4 100644 --- a/plugins/ceph/ceph_data.go +++ b/plugins/ceph/ceph_data.go @@ -4,7 +4,7 @@ type QuorumStat struct { LeaderName string `json:"quorum_leader_name"` QuorumName []string `json:"quorum_names"` MonitorMap struct { - Epoch int `json:"election_epoch"` + Epoch int64 `json:"election_epoch"` Mons []struct { Name string `json:"name"` Address string `json:"addr"` @@ -16,10 +16,10 @@ type CephHealth struct { OverallStatus string `json:"overall_status"` } type CephStatus struct { - Quorum []int `json:"quorum"` + Quorum []int64 `json:"quorum"` OSDMap struct { OSDMap struct { - Epoch int `json:"epoch"` + Epoch int64 `json:"epoch"` } `json:"osdmap"` } `json:"osdmap"` Health struct { @@ -28,9 +28,9 @@ type CephStatus struct { PgMap struct { PgByState []struct { Name string `json:"state_name"` - Count int `json:"count"` + Count int64 `json:"count"` } `json:"pgs_by_state"` - PgCount int `json:"num_pgs"` + PgCount int64 `json:"num_pgs"` DataBytes int64 `json:"data_bytes"` BytesUsed int64 `json:"bytes_used"` BytesAvail int64 `json:"bytes_avail"` @@ -46,7 +46,7 @@ type CephDF struct { } `json:"stats"` Pools []struct { Name string `json:"name"` - Id int `json:"id"` + Id int64 `json:"id"` Stats struct { UsedKb int64 `json:"kb_used"` UsedBytes int64 `json:"bytes_used"` @@ -58,36 +58,36 @@ type CephDF struct { type PoolStats struct { PoolName string `json:"pool_name"` - PoolId int `json:"pool_id"` + PoolId int64 `json:"pool_id"` ClientIoRate struct { - WriteBytesPerSecond int `json:"write_bytes_sec"` - OpsPerSec int `json:"op_per_sec"` + WriteBytesPerSecond int64 `json:"write_bytes_sec"` + OpsPerSec int64 `json:"op_per_sec"` } `json:"client_io_rate"` } type PoolQuota struct { PoolName string `json:"pool_name"` - PoolId int `json:"pool_id"` + PoolId int64 `json:"pool_id"` MaxObjects int64 `json:"quota_max_objects"` MaxBytes int64 `json:"quota_max_bytes"` } type OsdDump struct { Osds []struct { - OsdNum int `json:"osd"` + OsdNum int64 `json:"osd"` Uuid string `json:"uuid"` - Up int `json:"up"` - In int `json:"in"` + Up int64 `json:"up"` + In int64 `json:"in"` OsdState []string `json:"state"` } `json:"osds"` } type OsdPerf struct { PerfInfo []struct { - Id int `json:"id"` + Id int64 `json:"id"` Stats struct { - CommitLatency int `json:"commit_latency_ms"` - ApplyLatency int `json:"apply_latency_ms"` + CommitLatency int64 `json:"commit_latency_ms"` + ApplyLatency int64 `json:"apply_latency_ms"` } `json:"perf_stats"` } `json:"osd_perf_infos"` } @@ -97,18 +97,18 @@ type PgDump struct { StatSum map[string]int64 `json:"stat_sum"` } `json:"pg_stats_sum"` PoolStats []struct { - PoolId int `json:"poolid"` + PoolId int64 `json:"poolid"` StatSum map[string]interface{} `json:"stat_sum"` } `json:"pool_stats"` PgStats []struct { - PgId string `json:"pgid"` - Up []int `json:"up"` - Acting []int `json:"acting"` - UpPrimary int `json:"up_primary"` - ActingPrimary int `json:"acting_primary"` + PgId string `json:"pgid"` + Up []int64 `json:"up"` + Acting []int64 `json:"acting"` + UpPrimary int64 `json:"up_primary"` + ActingPrimary int64 `json:"acting_primary"` } `json:"pg_stats"` OsdStats []struct { - Osd int `json:"osd"` + Osd int64 `json:"osd"` TotalKb int64 `json:"kb"` UsedKb int64 `json:"kb_used"` AvailableKb int64 `json:"kb_avail"` @@ -117,18 +117,18 @@ type PgDump struct { type OsdPerfDump struct { Osd struct { - RecoveryOps int - OpWip int `json:"op_wip"` - Op int `json:"op"` - OpInBytes int `json:"op_in_bytes"` - OpOutBytes int `json:"op_out_bytes"` - OpRead int `json:"op_r"` - OpReadOutBytes int `json:"op_r_out_bytes"` - OpWrite int `json:"op_w"` - OpWriteInBytes int `json:"op_w_in_bytes"` - OpReadWrite int `json:"op_rw"` - OpReadWriteInBytes int `json:"op_rw_in_btyes"` - OpReadWriteOutBytes int `json:"op_rw_out_bytes"` + RecoveryOps int64 + OpWip int64 `json:"op_wip"` + Op int64 `json:"op"` + OpInBytes int64 `json:"op_in_bytes"` + OpOutBytes int64 `json:"op_out_bytes"` + OpRead int64 `json:"op_r"` + OpReadOutBytes int64 `json:"op_r_out_bytes"` + OpWrite int64 `json:"op_w"` + OpWriteInBytes int64 `json:"op_w_in_bytes"` + OpReadWrite int64 `json:"op_rw"` + OpReadWriteInBytes int64 `json:"op_rw_in_btyes"` + OpReadWriteOutBytes int64 `json:"op_rw_out_bytes"` OpLatency struct { OSDLatencyCalc OSDLatency @@ -164,8 +164,8 @@ type OsdPerfDump struct { } type OSDLatency struct { - AvgCount int `json:"avgcount"` + AvgCount int64 `json:"avgcount"` Sum float64 `json:"sum"` } -type PoolOsdPgMap map[int]map[int]int +type PoolOsdPgMap map[int64]map[int64]int64 diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go index 0f27f0965..c0762cd4f 100644 --- a/plugins/ceph/ceph_test.go +++ b/plugins/ceph/ceph_test.go @@ -23,44 +23,42 @@ func TestCephGenerateMetrics(t *testing.T) { assert.NotNil(t, sample) assert.Equal(t, p.Cluster, "ceph", "Same Cluster") - intMetrics := []string{"pg_map_count"} - // "pg_data_avail", - // "osd_count", - // "osd_utilization", - // "total_storage", - // "used", - // "available", - // "client_io_kbs", - // "client_io_ops", - // "pool_used", - // "pool_usedKb", - // "pool_maxbytes", - // "pool_utilization", - // "osd_used", - // "osd_total", - // "osd_epoch", - // "osd_latency_commit", - // "osd_latency_apply", - // "op", - // "op_in_bytes", - // "op_out_bytes", - // "op_r", - // "op_r_out_byes", - // "op_w", - // "op_w_in_bytes", - // "op_rw", - // "op_rw_in_bytes", - // "op_rw_out_bytes", - // "pool_objects", - // "pg_map_count", - // "pg_data_bytes", - // "pg_data_avail", - // "pg_data_total", - // "pg_data_used", - // "pg_distribution", - // "pg_distribution_pool", - // "pg_distribution_osd", - // } + intMetrics := []string{ + "pg_data_bytes", + "pg_data_avail", + // "osd_count", + // "osd_utilization", + // "total_storage", + // "used", + // "available", + // "client_io_kbs", + // "client_io_ops", + // "pool_used", + // "pool_usedKb", + // "pool_maxbytes", + // "pool_utilization", + // "osd_used", + // "osd_total", + // "osd_epoch", + // "osd_latency_commit", + // "osd_latency_apply", + // "op", + // "op_in_bytes", + // "op_out_bytes", + // "op_r", + // "op_r_out_byes", + // "op_w", + // "op_w_in_bytes", + // "op_rw", + // "op_rw_in_bytes", + // "op_rw_out_bytes", + // "pool_objects", + // "pg_map_count", + "pg_data_bytes", + "pg_data_avail", + "pg_data_total", + "pg_data_used", + } for _, metric := range intMetrics { assert.True(t, acc.HasIntValue(metric)) @@ -77,8 +75,8 @@ func TestCephGenerateMetricsDefault(t *testing.T) { require.NoError(t, err) assert.True(t, len(acc.Points) > 0) - // point, ok := acc.Get("ceph_op_wip") - // require.True(t, ok) - // assert.Equal(t, "ceph", point.Tags["cluster"]) + point, ok := acc.Get("op_wip") + require.True(t, ok) + assert.Equal(t, "ceph", point.Tags["cluster"]) } From cfb92e0e37405ad86f04fef2652d72cd4b2f4385 Mon Sep 17 00:00:00 2001 From: madz Date: Fri, 3 Jul 2015 16:17:49 +0800 Subject: [PATCH 5/6] modified as commented on code review --- plugins/ceph/ceph.go | 14 ++++---- plugins/ceph/ceph_test.go | 72 +++++++++++++++++++++------------------ 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/plugins/ceph/ceph.go b/plugins/ceph/ceph.go index 197756a8f..5e567a589 100644 --- a/plugins/ceph/ceph.go +++ b/plugins/ceph/ceph.go @@ -136,11 +136,11 @@ func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { quorumValueMap["members"] = strings.Join(quorum_name, ",") //clientIOs - sumOps := int64(0) - sumWrs := int64(0) + var sumOps int64 = 0 + var sumWrs int64 = 0 for _, stat := range poolStatsList { - sumOps += int64(stat.ClientIoRate.OpsPerSec) - sumWrs += int64(stat.ClientIoRate.WriteBytesPerSecond) / 1024 + sumOps += stat.ClientIoRate.OpsPerSec + sumWrs += stat.ClientIoRate.WriteBytesPerSecond / 1024 } // OSD Epoch @@ -269,7 +269,7 @@ func (ceph *CephMetrics) getPg(acc plugins.Accumulator) { } for poolId, osdPgMap := range poolOsdPgMap { - poolPg := int64(0) + var poolPg int64 = 0 for osdId, pgs := range osdPgMap { tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId), "osd": fmt.Sprintf("%d", osdId)} poolPg += pgs @@ -346,8 +346,8 @@ func (ceph *CephMetrics) getOSDDaemon(acc plugins.Accumulator) { tag := map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)} acc.Add("osd_utilization", utilized, tag) - acc.Add("osd_used", utilized, tag) - acc.Add("osd_total", total, tag) + acc.Add("osd_used_storage", used, tag) + acc.Add("osd_total_storage", total, tag) } //OSD Commit and Apply Latency diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go index c0762cd4f..39f2855f6 100644 --- a/plugins/ceph/ceph_test.go +++ b/plugins/ceph/ceph_test.go @@ -24,46 +24,52 @@ func TestCephGenerateMetrics(t *testing.T) { assert.Equal(t, p.Cluster, "ceph", "Same Cluster") intMetrics := []string{ + "total_storage", + "used_storage", + "available_storage", + "client_io_kbs", + "client_io_ops", + + "pool_used", + "pool_usedKb", + "pool_maxbytes", + "pool_objects", + + "osd_epoch", + "op_in_bytes", + "op_out_bytes", + "op_r", + "op_w", + "op_w_in_bytes", + "op_rw", + "op_rw_in_bytes", + "op_rw_out_bytes", + + "pg_map_count", "pg_data_bytes", - "pg_data_avail", - // "osd_count", - // "osd_utilization", - // "total_storage", - // "used", - // "available", - // "client_io_kbs", - // "client_io_ops", - // "pool_used", - // "pool_usedKb", - // "pool_maxbytes", - // "pool_utilization", - // "osd_used", - // "osd_total", - // "osd_epoch", - // "osd_latency_commit", - // "osd_latency_apply", - // "op", - // "op_in_bytes", - // "op_out_bytes", - // "op_r", - // "op_r_out_byes", - // "op_w", - // "op_w_in_bytes", - // "op_rw", - // "op_rw_in_bytes", - // "op_rw_out_bytes", - // "pool_objects", - // "pg_map_count", - "pg_data_bytes", - "pg_data_avail", - "pg_data_total", - "pg_data_used", + "pg_data_total_storage", + "pg_data_used_storage", + "pg_distribution", + "pg_distribution_pool", + "pg_distribution_osd", + } + + floatMetrics := []string{ + "osd_utilization", + "pool_utilization", + "osd_used_storage", + "osd_total_storage", } for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric)) } + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatValue(metric)) + } + } func TestCephGenerateMetricsDefault(t *testing.T) { From a59ea7313eeb7ee47f3d7d24b41c0ad0f68ede1c Mon Sep 17 00:00:00 2001 From: madz Date: Fri, 3 Jul 2015 16:19:27 +0800 Subject: [PATCH 6/6] remove space --- plugins/ceph/ceph_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go index 39f2855f6..08f3d040b 100644 --- a/plugins/ceph/ceph_test.go +++ b/plugins/ceph/ceph_test.go @@ -62,7 +62,6 @@ func TestCephGenerateMetrics(t *testing.T) { } for _, metric := range intMetrics { - assert.True(t, acc.HasIntValue(metric)) }