diff --git a/plugins/all/all.go b/plugins/all/all.go index 8acedb33f..0b0f6236d 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + "github.com/influxdb/telegraf/plugins/ceph" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" _ "github.com/influxdb/telegraf/plugins/redis" diff --git a/plugins/ceph/ceph.go b/plugins/ceph/ceph.go new file mode 100644 index 000000000..8eb816cbe --- /dev/null +++ b/plugins/ceph/ceph.go @@ -0,0 +1,435 @@ +package ceph + +import ( + "encoding/json" + "fmt" + "github.com/influxdb/telegraf/plugins" + "os/exec" + "strconv" + "strings" + "time" +) + +var sampleConfig = ` +# Gather metrics for CEPH +# Specify cluster name +#cluster="ceph" +# Specify CEPH Bin Location +#binLocation="/usr/bin/ceph" +# Specify CEPH Socket Directory +#socketDir="/var/run/ceph" +` + +type CephMetrics struct { + Cluster string + BinLocation string + SocketDir string +} + +func (_ *CephMetrics) SampleConfig() string { + return sampleConfig +} + +func (_ *CephMetrics) Description() string { + return "Reading Ceph Metrics" +} + +func (ceph *CephMetrics) Gather(acc plugins.Accumulator) error { + + if ceph.Cluster == "" { + ceph.Cluster = "ceph" + } + + if ceph.BinLocation == "" { + ceph.BinLocation = "/usr/bin/ceph" + } + + if ceph.SocketDir == "" { + ceph.SocketDir = "/var/run/ceph" + } + + ceph.gatherMetrics(acc) + return nil +} + +func init() { + plugins.Add("ceph", func() plugins.Plugin { + return &CephMetrics{} + }) +} + +func (ceph *CephMetrics) gatherMetrics(acc plugins.Accumulator) { + + var quorum QuorumStat + + out, err := exec.Command("/bin/hostname").Output() + + if err != nil { + return + } + + hostname := string(out) + + if err := ceph.cephCommand(&quorum, "quorum_status"); err != nil { + return + } + + ceph.getOSDPerf(acc) + + if strings.TrimSpace(hostname) != strings.TrimSpace(quorum.LeaderName) { + fmt.Printf("Not a leader: quorum leader %s, host %s", quorum.LeaderName, hostname) + return + } + + ceph.getCommon(acc) + ceph.getPool(acc) + ceph.getPg(acc) + ceph.getOSDDaemon(acc) + +} + +func (ceph *CephMetrics) getCommon(acc plugins.Accumulator) { + var health CephHealth + var quorum QuorumStat + var poolStatsList []PoolStats + var cephDf CephDF + var cephStatus CephStatus + + if err := ceph.cephCommand(&cephStatus, "status"); err != nil { + return + } + + if err := ceph.cephCommand(&health, "health"); err != nil { + return + } + + if err := ceph.cephCommand(&quorum, "quorum_status"); err != nil { + return + } + + if err := ceph.cephCommand(&poolStatsList, "osd", "pool", "stats"); err != nil { + return + } + + if err := ceph.cephCommand(&cephDf, "df"); err != nil { + return + } + + tags := map[string]string{"cluster": ceph.Cluster} + + //Monitors + var monitorStr string + monitors := quorum.MonitorMap.Mons + monitorValueMap := make(map[string]interface{}) + monitorValueMap["value"] = len(monitors) + + for _, value := range monitors { + monitorStr = fmt.Sprint(monitorStr, ",", value.Name) + } + monitorValueMap["name"] = monitorStr + + //Quorum Names + var memberStr string + quorum_name := quorum.QuorumName + quorumValueMap := make(map[string]interface{}) + quorumValueMap["value"] = len(quorum_name) + + for _, value := range quorum_name { + memberStr = fmt.Sprint(memberStr, ",", value) + } + quorumValueMap["members"] = memberStr + + //clientIOs + sumOps := 0 + sumWrs := 0 + for _, stat := range poolStatsList { + sumOps += stat.ClientIoRate.OpsPerSec + sumWrs += stat.ClientIoRate.WriteBytesPerSecond / 1024 + } + + // OSD Epoch + epoch := cephStatus.OSDMap.OSDMap.Epoch + acc.Add("osd_epoch", fmt.Sprintf("%d", epoch), map[string]string{"cluster": ceph.Cluster}) + acc.Add("health", health.OverallStatus, tags) + acc.Add("total_storage", cephDf.Stats.TotalBytes, tags) + acc.Add("used", cephDf.Stats.TotalUsedBytes, tags) + acc.Add("available", cephDf.Stats.TotalAvailableBytes, tags) + acc.Add("client_io_kbs", fmt.Sprintf("%d", sumWrs), tags) + acc.Add("client_io_ops", fmt.Sprintf("%d", sumOps), tags) + acc.AddValuesWithTime("monitor", monitorValueMap, tags, time.Now()) + acc.AddValuesWithTime("quorum", quorumValueMap, tags, time.Now()) +} + +func (ceph *CephMetrics) getPool(acc plugins.Accumulator) { + var cephDf CephDF + var poolStatsList []PoolStats + var pgDump PgDump + + if err := ceph.cephCommand(&poolStatsList, "osd", "pool", "stats"); err != nil { + return + } + + if err := ceph.cephCommand(&cephDf, "df"); err != nil { + return + } + + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + for _, pool := range cephDf.Pools { + poolId := pool.Id + numObjects := pool.Stats.Objects + numBytes := pool.Stats.UsedBytes + maxAvail := pool.Stats.Available + numKb := pool.Stats.UsedKb + + utilized := 0.0 + if numBytes != 0 { + utilized = (float64(numBytes) / float64(maxAvail)) * 100.0 + } + + var quota PoolQuota + + err := ceph.cephCommand("a, "osd", "pool", "get-quota", pool.Name) + if err != nil { + continue + } + maxObjects := quota.MaxObjects + maxBytes := quota.MaxBytes + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pool_objects", numObjects, tags) + acc.Add("pool_used", numBytes, tags) + acc.Add("pool_usedKb", numKb, tags) + acc.Add("pool_max_objects", maxObjects, tags) + acc.Add("pool_maxbytes", maxBytes, tags) + acc.Add("pool_utilization", utilized, tags) + } + + acc.Add("pool", fmt.Sprintf("%d", len(cephDf.Pools)), map[string]string{"cluster": ceph.Cluster}) + + for _, stat := range poolStatsList { + poolId := stat.PoolId + kbs := stat.ClientIoRate.WriteBytesPerSecond / 1024 + ops := stat.ClientIoRate.OpsPerSec + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pool_io_kbs", kbs, tags) + acc.Add("pool_io_ops", ops, tags) + } + + for _, pgPoolStat := range pgDump.PoolStats { + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", pgPoolStat.PoolId)} + for k, v := range pgPoolStat.StatSum { + acc.Add(fmt.Sprintf("pool_%s", k), fmt.Sprint(v), tags) + } + } + +} + +func (ceph *CephMetrics) getPg(acc plugins.Accumulator) { + + var cephStatus CephStatus + if err := ceph.cephCommand(&cephStatus, "status"); err != nil { + return + } + + pgMap := cephStatus.PgMap + + for _, value := range pgMap.PgByState { + tags := map[string]string{"cluster": ceph.Cluster, "state": value.Name} + acc.Add("pg_count", value.Count, tags) + } + + tags := map[string]string{"cluster": ceph.Cluster} + acc.Add("pg_map_count", pgMap.PgCount, tags) + acc.Add("pg_data_bytes", pgMap.DataBytes, tags) + acc.Add("pg_data_avail", pgMap.BytesAvail, tags) + acc.Add("pg_data_total", pgMap.BytesTotal, tags) + acc.Add("pg_data_used", pgMap.BytesUsed, tags) + + var pgDump PgDump + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + poolOsdPgMap := make(PoolOsdPgMap, len(pgDump.PoolStats)) + totalOsdPgs := make(map[int]int, len(pgDump.OsdStats)) + + for _, pgStat := range pgDump.PgStats { + poolId, _ := strconv.Atoi(strings.Split(pgStat.PgId, ".")[0]) + + osdPgMap := poolOsdPgMap[poolId] + if osdPgMap == nil { + osdPgMap = make(map[int]int, len(pgDump.OsdStats)) + poolOsdPgMap[poolId] = osdPgMap + } + + for _, osd := range pgStat.Up { + osdPgMap[osd] = osdPgMap[osd] + 1 + totalOsdPgs[osd] = totalOsdPgs[osd] + 1 + } + } + + for poolId, osdPgMap := range poolOsdPgMap { + poolPg := 0 + for osdId, pgs := range osdPgMap { + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId), "osd": fmt.Sprintf("%d", osdId)} + poolPg += pgs + acc.Add("pg_distribution", pgs, tags) + } + + tags := map[string]string{"cluster": ceph.Cluster, "pool": fmt.Sprintf("%d", poolId)} + acc.Add("pg_distribution_pool", poolPg, tags) + } + + for osd, pg := range totalOsdPgs { + tags := map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osd)} + acc.Add("pg_distribution_osd", pg, tags) + } + + clusterTag := map[string]string{"cluster": ceph.Cluster} + for k, v := range pgDump.PgStatSum.StatSum { + acc.Add(fmt.Sprintf("pg_stats_%s", k), fmt.Sprintf("%d", v), clusterTag) + } + +} + +func (ceph *CephMetrics) getOSDDaemon(acc plugins.Accumulator) { + + var osd OsdDump + var osdPerf OsdPerf + var pgDump PgDump + + if err := ceph.cephCommand(&pgDump, "pg", "dump"); err != nil { + return + } + + if err := ceph.cephCommand(&osdPerf, "osd", "perf"); err != nil { + return + } + + if err := ceph.cephCommand(&osd, "osd", "dump"); err != nil { + return + } + + up := 0 + in := 0 + down := 0 + out := 0 + osds := osd.Osds + + for _, osdStat := range osds { + + if osdStat.Up == 1 { + up += 1 + } else { + down += 1 + } + + if osdStat.In == 1 { + in += 1 + } else { + out += 1 + } + } + + acc.Add("osd_count", len(osd.Osds), map[string]string{"cluster": ceph.Cluster}) + acc.Add("osd_count", in, map[string]string{"cluster": ceph.Cluster, "state": "in"}) + acc.Add("osd_count", out, map[string]string{"cluster": ceph.Cluster, "state": "out"}) + acc.Add("osd_count", up, map[string]string{"cluster": ceph.Cluster, "state": "up"}) + acc.Add("osd_count", down, map[string]string{"cluster": ceph.Cluster, "state": "down"}) + + // OSD Utilization + for _, osdStat := range pgDump.OsdStats { + osdNum := osdStat.Osd + used := float64(osdStat.UsedKb) + total := float64(osdStat.TotalKb) + utilized := (used / total) * 100.0 + + tag := map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)} + acc.Add("osd_utilization", utilized, tag) + acc.Add("osd_used", utilized, tag) + acc.Add("osd_total", total, tag) + } + + //OSD Commit and Apply Latency + for _, perf := range osdPerf.PerfInfo { + osdNum := perf.Id + commit := perf.Stats.CommitLatency + apply := perf.Stats.ApplyLatency + + acc.Add("osd_latency_commit", commit, map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)}) + acc.Add("osd_latency_apply", apply, map[string]string{"cluster": ceph.Cluster, "osd": fmt.Sprintf("%d", osdNum)}) + } + +} + +func (ceph *CephMetrics) getOSDPerf(acc plugins.Accumulator) { + var osdPerf OsdPerfDump + + out, err := exec.Command("/bin/ls", ceph.SocketDir).Output() + if err != nil { + return + } + fileStr := string(out) + osdsArray := strings.Split(fileStr, "\n") + osds := make([]string, len(osdsArray)) + index := 0 + + for _, value := range osdsArray { + if !strings.Contains(value, "osd") { + continue + } + osds[index] = value + index++ + location := fmt.Sprint(ceph.SocketDir, "/", value) + args := []string{"--admin-daemon", location, "perf", "dump"} + if err := ceph.cephCommand(&osdPerf, args...); err != nil { + return + } + osdId := string(value[strings.LastIndex(value, ".")-1]) + tags := map[string]string{"cluster": ceph.Cluster, "osd": osdId} + osd := osdPerf.Osd + + // osd-.osd.recovery_ops ? + acc.Add("op_wip", osd.OpWip, tags) + acc.Add("op", osd.Op, tags) + acc.Add("op_in_bytes", osd.OpInBytes, tags) + acc.Add("op_out_bytes", osd.OpOutBytes, tags) + acc.Add("op_r", osd.OpRead, tags) + acc.Add("op_r_out_bytes", osd.OpReadOutBytes, tags) + acc.Add("op_w", osd.OpWrite, tags) + acc.Add("op_w_in_bytes", osd.OpWriteInBytes, tags) + acc.Add("op_rw", osd.OpReadWrite, tags) + acc.Add("op_rw_in_bytes", osd.OpReadWriteInBytes, tags) + acc.Add("op_rw_out_bytes", osd.OpReadWriteOutBytes, tags) + acc.AddValuesWithTime("op_latency", getOSDLatencyCalc(&osd.OpLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_process_latency", getOSDLatencyCalc(&osd.OpProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_r", getOSDLatencyCalc(&osd.OpReadLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_r_process_latency", getOSDLatencyCalc(&osd.OpReadProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_latency", getOSDLatencyCalc(&osd.OpWriteLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_process_latency", getOSDLatencyCalc(&osd.OpWriteProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_latency", getOSDLatencyCalc(&osd.OpReadWriteLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_process_latency", getOSDLatencyCalc(&osd.OpReadWriteProcessLatency.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_rw_rlat", getOSDLatencyCalc(&osd.OpReadWriteRlat.OSDLatencyCalc), tags, time.Now()) + acc.AddValuesWithTime("op_w_rlat", getOSDLatencyCalc(&osd.OpWriteRlat.OSDLatencyCalc), tags, time.Now()) + } +} + +func getOSDLatencyCalc(osdLatency *OSDLatency) map[string]interface{} { + latencyMap := make(map[string]interface{}) + latencyMap["avgcount"] = osdLatency.AvgCount + latencyMap["sum"] = osdLatency.Sum + return latencyMap +} + +func (ceph *CephMetrics) cephCommand(v interface{}, args ...string) error { + args = append(args, "-f", "json") + out, err := exec.Command(ceph.BinLocation, args...).Output() + if err != nil { + return err + } + return json.Unmarshal(out, v) +} diff --git a/plugins/ceph/ceph_data.go b/plugins/ceph/ceph_data.go new file mode 100644 index 000000000..f89b2d924 --- /dev/null +++ b/plugins/ceph/ceph_data.go @@ -0,0 +1,171 @@ +package ceph + +type QuorumStat struct { + LeaderName string `json:"quorum_leader_name"` + QuorumName []string `json:"quorum_names"` + MonitorMap struct { + Epoch int `json:"election_epoch"` + Mons []struct { + Name string `json:"name"` + Address string `json:"addr"` + } `json:"mons"` + } `json:"monmap"` +} + +type CephHealth struct { + OverallStatus string `json:"overall_status"` +} +type CephStatus struct { + Quorum []int `json:"quorum"` + OSDMap struct { + OSDMap struct { + Epoch int `json:"epoch"` + } `json:"osdmap"` + } `json:"osdmap"` + Health struct { + OverallStatus string `json:"overall_status"` + } `json:"health"` + PgMap struct { + PgByState []struct { + Name string `json:"state_name"` + Count int `json:"count"` + } `json:"pgs_by_state"` + PgCount int `json:"num_pgs"` + DataBytes int64 `json:"data_bytes"` + BytesUsed int64 `json:"bytes_used"` + BytesAvail int64 `json:"bytes_avail"` + BytesTotal int64 `json:"bytes_total"` + } `json:"pgmap"` +} + +type CephDF struct { + Stats struct { + TotalBytes int64 `json:"total_bytes"` + TotalUsedBytes int64 `json:"total_used_bytes"` + TotalAvailableBytes int64 `json:"total_avail_bytes"` + } `json:"stats"` + Pools []struct { + Name string `json:"name"` + Id int `json:"id"` + Stats struct { + UsedKb int64 `json:"kb_used"` + UsedBytes int64 `json:"bytes_used"` + Available int64 `json:"max_avail"` + Objects int64 `json::"objects"` + } `json:"stats"` + } `json:"pools"` +} + +type PoolStats struct { + PoolName string `json:"pool_name"` + PoolId int `json:"pool_id"` + ClientIoRate struct { + WriteBytesPerSecond int `json:"write_bytes_sec"` + OpsPerSec int `json:"op_per_sec"` + } `json:"client_io_rate"` +} + +type PoolQuota struct { + PoolName string `json:"pool_name"` + PoolId int `json:"pool_id"` + MaxObjects int64 `json:"quota_max_objects"` + MaxBytes int64 `json:"quota_max_bytes"` +} + +type OsdDump struct { + Osds []struct { + OsdNum int `json:"osd"` + Uuid string `json:"uuid"` + Up int `json:"up"` + In int `json:"in"` + OsdState []string `json:"state"` + } `json:"osds"` +} + +type OsdPerf struct { + PerfInfo []struct { + Id int `json:"id"` + Stats struct { + CommitLatency int `json:"commit_latency_ms"` + ApplyLatency int `json:"apply_latency_ms"` + } `json:"perf_stats"` + } `json:"osd_perf_infos"` +} + +type PgDump struct { + PgStatSum struct { + StatSum map[string]int64 `json:"stat_sum"` + } `json:"pg_stats_sum"` + PoolStats []struct { + PoolId int `json:"poolid"` + StatSum map[string]interface{} `json:"stat_sum"` + } `json:"pool_stats"` + PgStats []struct { + PgId string `json:"pgid"` + Up []int `json:"up"` + Acting []int `json:"acting"` + UpPrimary int `json:"up_primary"` + ActingPrimary int `json:"acting_primary"` + } `json:"pg_stats"` + OsdStats []struct { + Osd int `json:"osd"` + TotalKb int64 `json:"kb"` + UsedKb int64 `json:"kb_used"` + AvailableKb int64 `json:"kb_avail"` + } `json:"osd_stats"` +} + +type OsdPerfDump struct { + Osd struct { + RecoveryOps int + OpWip int `json:"op_wip"` + Op int `json:"op"` + OpInBytes int `json:"op_in_bytes"` + OpOutBytes int `json:"op_out_bytes"` + OpRead int `json:"op_r"` + OpReadOutBytes int `json:"op_r_out_bytes"` + OpWrite int `json:"op_w"` + OpWriteInBytes int `json:"op_w_in_bytes"` + OpReadWrite int `json:"op_rw"` + OpReadWriteInBytes int `json:"op_rw_in_btyes"` + OpReadWriteOutBytes int `json:"op_rw_out_bytes"` + + OpLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_latency"` + OpProcessLatency struct { + OSDLatencyCalc OSDLatency + } + OpReadLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_r_latency"` + OpReadProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_r_process_latency"` + OpWriteRlat struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_rlat"` + OpWriteLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_latency"` + OpWriteProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_w_process_latency"` + OpReadWriteRlat struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_rlat"` + OpReadWriteLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_latency"` + OpReadWriteProcessLatency struct { + OSDLatencyCalc OSDLatency + } `json:"op_rw_process_latency"` + } `json:"osd"` +} + +type OSDLatency struct { + AvgCount int `json:"avgcount"` + Sum float64 `json:"sum"` +} + +type PoolOsdPgMap map[int]map[int]int diff --git a/plugins/ceph/ceph_test.go b/plugins/ceph/ceph_test.go new file mode 100644 index 000000000..569529288 --- /dev/null +++ b/plugins/ceph/ceph_test.go @@ -0,0 +1,106 @@ +package ceph + +import ( + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestCephGenerateMetrics(t *testing.T) { + p := &CephMetrics{ + Cluster: "ceph", + BinLocation: "/usr/bin/ceph", + SocketDir: "/var/run/ceph", + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + intMetrics := []string{ + "ceph_pool", + // "osd_count", + // "osd_utilization", + // "total_storage", + // "used", + // "available", + // "client_io_kbs", + // "client_io_ops", + // "pool_used", + // "pool_usedKb", + // "pool_maxbytes", + // "pool_utilization", + + // "osd_used", + // "osd_total", + // "osd_epoch", + + // "osd_latency_commit", + // "osd_latency_apply", + + // "op", + // "op_in_bytes", + // "op_out_bytes", + // "op_r", + // "op_r_out_byes", + // "op_w", + // "op_w_in_bytes", + // "op_rw", + // "op_rw_in_bytes", + // "op_rw_out_bytes", + + // "pool_objects", + // "pg_map_count", + // "pg_data_bytes", + // "pg_data_avail", + // "pg_data_total", + // "pg_data_used", + // "pg_distribution", + // "pg_distribution_pool", + // "pg_distribution_osd", + } + + floatMetrics := []string{ + "sum", + } + + for _, metric := range intMetrics { + assert.True(t, acc.HasIntValue(metric)) + } + + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatValue(metric)) + } +} + +func TestCephGenerateMetricsDefault(t *testing.T) { + p := &CephMetrics{} + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("ceph_osd_count") + require.True(t, ok) + assert.Equal(t, "ceph", point.Tags["cluster"]) + +} + +func TestCephTagsMetricsWithDifferentCluster(t *testing.T) { + p := &CephMetrics{ + Cluster: "cephTest", + } + + var acc testutil.Accumulator + + err := p.Gather(&acc) + require.NoError(t, err) + + point, ok := acc.Get("cephTest_osd_count") + require.True(t, ok) + + assert.Equal(t, "cephTest", point.Tags["cluster"]) +}