diff --git a/CHANGELOG.md b/CHANGELOG.md index ea29700dc..6d23e6e27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [#1674](https://github.com/influxdata/telegraf/issues/1674): elasticsearch input: configurable timeout. - [#1607](https://github.com/influxdata/telegraf/pull/1607): Massage metric names in Instrumental output plugin - [#1572](https://github.com/influxdata/telegraf/pull/1572): mesos improvements. +- [#1513](https://github.com/influxdata/telegraf/issues/1513): Add Ceph Cluster Performance Statistics ### Bugfixes diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 902c7f7fb..2601ac6c0 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -511,6 +511,10 @@ # # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. # [[inputs.ceph]] +# ## This is the recommended interval to poll. Too frequent and you will lose +# ## data points due to timeouts during rebalancing and recovery +# interval = '1m' +# # ## All configuration values are optional, defaults are shown below # # ## location of ceph binary @@ -525,6 +529,26 @@ # # ## suffix used to identify socket files # socket_suffix = "asok" +# +# ## Ceph user to authenticate as, ceph will search for the corresponding keyring +# ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the +# ## client section of ceph.conf for example: +# ## +# ## [client.telegraf] +# ## keyring = /etc/ceph/client.telegraf.keyring +# ## +# ## Consult the ceph documentation for more detail on keyring generation. +# ceph_user = "client.admin" +# +# ## Ceph configuration to use to locate the cluster +# ceph_config = "/etc/ceph/ceph.conf" +# +# ## Whether to gather statistics via the admin socket +# gather_admin_socket_stats = true +# +# ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config +# ## to be specified +# gather_cluster_stats = true # # Read specific statistics per cgroup diff --git a/plugins/inputs/ceph/README.md b/plugins/inputs/ceph/README.md index ab358daaa..49ae09e73 100644 --- a/plugins/inputs/ceph/README.md +++ b/plugins/inputs/ceph/README.md @@ -2,7 +2,9 @@ Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. -The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds +*Admin Socket Stats* + +This gatherer works by scanning the configured SocketDir for OSD and MON socket files. When it finds a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump** The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are @@ -27,11 +29,26 @@ Would be parsed into the following metrics, all of which would be tagged with co - refresh_latency.sum: 5378.794002000 +*Cluster Stats* + +This gatherer works by invoking ceph commands against the cluster thus only requires the ceph client, valid +ceph configuration and an access key to function (the ceph_config and ceph_user configuration variables work +in conjunction to specify these prerequisites). It may be run on any server you wish which has access to +the cluster. The currently supported commands are: + +* ceph status +* ceph df +* ceph osd pool stats + ### Configuration: ``` # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster. [[inputs.ceph]] + ## This is the recommended interval to poll. Too frequent and you will lose + ## data points due to timeouts during rebalancing and recovery + interval = '1m' + ## All configuration values are optional, defaults are shown below ## location of ceph binary @@ -46,15 +63,86 @@ Would be parsed into the following metrics, all of which would be tagged with co ## suffix used to identify socket files socket_suffix = "asok" + + ## Ceph user to authenticate as, ceph will search for the corresponding keyring + ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the + ## client section of ceph.conf for example: + ## + ## [client.telegraf] + ## keyring = /etc/ceph/client.telegraf.keyring + ## + ## Consult the ceph documentation for more detail on keyring generation. + ceph_user = "client.admin" + + ## Ceph configuration to use to locate the cluster + ceph_config = "/etc/ceph/ceph.conf" + + ## Whether to gather statistics via the admin socket + gather_admin_socket_stats = true + + ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config + ## to be specified + gather_cluster_stats = true ``` ### Measurements & Fields: +*Admin Socket Stats* + All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go. +*Cluster Stats* + +* ceph\_osdmap + * epoch (float) + * full (boolean) + * nearfull (boolean) + * num\_in\_osds (float) + * num\_osds (float) + * num\_remremapped\_pgs (float) + * num\_up\_osds (float) + +* ceph\_pgmap + * bytes\_avail (float) + * bytes\_total (float) + * bytes\_used (float) + * data\_bytes (float) + * num\_pgs (float) + * op\_per\_sec (float) + * read\_bytes\_sec (float) + * version (float) + * write\_bytes\_sec (float) + * recovering\_bytes\_per\_sec (float) + * recovering\_keys\_per\_sec (float) + * recovering\_objects\_per\_sec (float) + +* ceph\_pgmap\_state + * state name e.g. active+clean (float) + +* ceph\_usage + * bytes\_used (float) + * kb\_used (float) + * max\_avail (float) + * objects (float) + +* ceph\_pool\_usage + * bytes\_used (float) + * kb\_used (float) + * max\_avail (float) + * objects (float) + +* ceph\_pool\_stats + * op\_per\_sec (float) + * read\_bytes\_sec (float) + * write\_bytes\_sec (float) + * recovering\_object\_per\_sec (float) + * recovering\_bytes\_per\_sec (float) + * recovering\_keys\_per\_sec (float) ### Tags: +*Admin Socket Stats* + All measurements will have the following tags: - type: either 'osd' or 'mon' to indicate which type of node was queried @@ -96,9 +184,21 @@ All measurements will have the following tags: - throttle-osd_client_bytes - throttle-osd_client_messages +*Cluster Stats* + +* ceph\_pg\_state has the following tags: + * state (state for which the value applies e.g. active+clean, active+remapped+backfill) +* ceph\_pool\_usage has the following tags: + * id + * name +* ceph\_pool\_stats has the following tags: + * id + * name ### Example Output: +*Admin Socket Stats* +
 telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d  -input-filter ceph
 * Plugin: ceph, Collection 1
@@ -107,3 +207,16 @@ telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegr
 > ceph,collection=throttle-mon_daemon_bytes,id=node-2,type=mon get=4058121,get_or_fail_fail=0,get_or_fail_success=0,get_sum=6027348117,max=419430400,put=4058121,put_sum=6027348117,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814815661
 > ceph,collection=throttle-msgr_dispatch_throttler-mon,id=node-2,type=mon get=54276277,get_or_fail_fail=0,get_or_fail_success=0,get_sum=370232877040,max=104857600,put=54276277,put_sum=370232877040,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814872064
 
+ +*Cluster Stats* + +
+> ceph_osdmap,host=ceph-mon-0 epoch=170772,full=false,nearfull=false,num_in_osds=340,num_osds=340,num_remapped_pgs=0,num_up_osds=340 1468841037000000000
+> ceph_pgmap,host=ceph-mon-0 bytes_avail=634895531270144,bytes_total=812117151809536,bytes_used=177221620539392,data_bytes=56979991615058,num_pgs=22952,op_per_sec=15869,read_bytes_sec=43956026,version=39387592,write_bytes_sec=165344818 1468841037000000000
+> ceph_pgmap_state,host=ceph-mon-0 active+clean=22952 1468928660000000000
+> ceph_usage,host=ceph-mon-0 total_avail_bytes=634895514791936,total_bytes=812117151809536,total_used_bytes=177221637017600 1468841037000000000
+> ceph_pool_usage,host=ceph-mon-0,id=150,name=cinder.volumes bytes_used=12648553794802,kb_used=12352103316,max_avail=154342562489244,objects=3026295 1468841037000000000
+> ceph_pool_usage,host=ceph-mon-0,id=182,name=cinder.volumes.flash bytes_used=8541308223964,kb_used=8341121313,max_avail=39388593563936,objects=2075066 1468841037000000000
+> ceph_pool_stats,host=ceph-mon-0,id=150,name=cinder.volumes op_per_sec=1706,read_bytes_sec=28671674,write_bytes_sec=29994541 1468841037000000000
+> ceph_pool_stats,host=ceph-mon-0,id=182,name=cinder.volumes.flash op_per_sec=9748,read_bytes_sec=9605524,write_bytes_sec=45593310 1468841037000000000
+
diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index d8ebf5017..d5ed464fa 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -23,33 +23,15 @@ const ( ) type Ceph struct { - CephBinary string - OsdPrefix string - MonPrefix string - SocketDir string - SocketSuffix string -} - -func (c *Ceph) setDefaults() { - if c.CephBinary == "" { - c.CephBinary = "/usr/bin/ceph" - } - - if c.OsdPrefix == "" { - c.OsdPrefix = osdPrefix - } - - if c.MonPrefix == "" { - c.MonPrefix = monPrefix - } - - if c.SocketDir == "" { - c.SocketDir = "/var/run/ceph" - } - - if c.SocketSuffix == "" { - c.SocketSuffix = sockSuffix - } + CephBinary string + OsdPrefix string + MonPrefix string + SocketDir string + SocketSuffix string + CephUser string + CephConfig string + GatherAdminSocketStats bool + GatherClusterStats bool } func (c *Ceph) Description() string { @@ -57,6 +39,10 @@ func (c *Ceph) Description() string { } var sampleConfig = ` + ## This is the recommended interval to poll. Too frequent and you will lose + ## data points due to timeouts during rebalancing and recovery + interval = '1m' + ## All configuration values are optional, defaults are shown below ## location of ceph binary @@ -71,6 +57,18 @@ var sampleConfig = ` ## suffix used to identify socket files socket_suffix = "asok" + + ## Ceph user to authenticate as + ceph_user = "client.admin" + + ## Ceph configuration to use to locate the cluster + ceph_config = "/etc/ceph/ceph.conf" + + ## Whether to gather statistics via the admin socket + gather_admin_socket_stats = true + + ## Whether to gather statistics via ceph commands + gather_cluster_stats = true ` func (c *Ceph) SampleConfig() string { @@ -78,7 +76,22 @@ func (c *Ceph) SampleConfig() string { } func (c *Ceph) Gather(acc telegraf.Accumulator) error { - c.setDefaults() + if c.GatherAdminSocketStats { + if err := c.gatherAdminSocketStats(acc); err != nil { + return err + } + } + + if c.GatherClusterStats { + if err := c.gatherClusterStats(acc); err != nil { + return err + } + } + + return nil +} + +func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { sockets, err := findSockets(c) if err != nil { return fmt.Errorf("failed to find sockets at path '%s': %v", c.SocketDir, err) @@ -104,8 +117,46 @@ func (c *Ceph) Gather(acc telegraf.Accumulator) error { return nil } +func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error { + jobs := []struct { + command string + parser func(telegraf.Accumulator, string) error + }{ + {"status", decodeStatus}, + {"df", decodeDf}, + {"osd pool stats", decodeOsdPoolStats}, + } + + // For each job, execute against the cluster, parse and accumulate the data points + for _, job := range jobs { + output, err := c.exec(job.command) + if err != nil { + return fmt.Errorf("error executing command: %v", err) + } + err = job.parser(acc, output) + if err != nil { + return fmt.Errorf("error parsing output: %v", err) + } + } + + return nil +} + func init() { - inputs.Add(measurement, func() telegraf.Input { return &Ceph{} }) + c := Ceph{ + CephBinary: "/usr/bin/ceph", + OsdPrefix: osdPrefix, + MonPrefix: monPrefix, + SocketDir: "/var/run/ceph", + SocketSuffix: sockSuffix, + CephUser: "client.admin", + CephConfig: "/etc/ceph/ceph.conf", + GatherAdminSocketStats: true, + GatherClusterStats: false, + } + + inputs.Add(measurement, func() telegraf.Input { return &c }) + } var perfDump = func(binary string, socket *socket) (string, error) { @@ -247,3 +298,192 @@ func flatten(data interface{}) []*metric { return metrics } + +func (c *Ceph) exec(command string) (string, error) { + cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"} + cmdArgs = append(cmdArgs, strings.Split(command, " ")...) + + cmd := exec.Command(c.CephBinary, cmdArgs...) + + var out bytes.Buffer + cmd.Stdout = &out + err := cmd.Run() + if err != nil { + return "", fmt.Errorf("error running ceph %v: %s", command, err) + } + + output := out.String() + + // Ceph doesn't sanitize its output, and may return invalid JSON. Patch this + // up for them, as having some inaccurate data is better than none. + output = strings.Replace(output, "-inf", "0", -1) + output = strings.Replace(output, "inf", "0", -1) + + return output, nil +} + +func decodeStatus(acc telegraf.Accumulator, input string) error { + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + err = decodeStatusOsdmap(acc, data) + if err != nil { + return err + } + + err = decodeStatusPgmap(acc, data) + if err != nil { + return err + } + + err = decodeStatusPgmapState(acc, data) + if err != nil { + return err + } + + return nil +} + +func decodeStatusOsdmap(acc telegraf.Accumulator, data map[string]interface{}) error { + osdmap, ok := data["osdmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) + } + fields, ok := osdmap["osdmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement) + } + acc.AddFields("ceph_osdmap", fields, map[string]string{}) + return nil +} + +func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) error { + pgmap, ok := data["pgmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) + } + fields := make(map[string]interface{}) + for key, value := range pgmap { + switch value.(type) { + case float64: + fields[key] = value + } + } + acc.AddFields("ceph_pgmap", fields, map[string]string{}) + return nil +} + +func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error { + pgmap, ok := data["pgmap"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement) + } + fields := make(map[string]interface{}) + for key, value := range pgmap { + switch value.(type) { + case []interface{}: + if key != "pgs_by_state" { + continue + } + for _, state := range value.([]interface{}) { + state_map, ok := state.(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state", measurement) + } + state_name, ok := state_map["state_name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement) + } + state_count, ok := state_map["count"].(float64) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement) + } + fields[state_name] = state_count + } + } + } + acc.AddFields("ceph_pgmap_state", fields, map[string]string{}) + return nil +} + +func decodeDf(acc telegraf.Accumulator, input string) error { + data := make(map[string]interface{}) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + // ceph.usage: records global utilization and number of objects + stats_fields, ok := data["stats"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df stats", measurement) + } + acc.AddFields("ceph_usage", stats_fields, map[string]string{}) + + // ceph.pool.usage: records per pool utilization and number of objects + pools, ok := data["pools"].([]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pools", measurement) + } + + for _, pool := range pools { + pool_map, ok := pool.(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool", measurement) + } + pool_name, ok := pool_map["name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool name", measurement) + } + fields, ok := pool_map["stats"].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode df pool stats", measurement) + } + tags := map[string]string{ + "name": pool_name, + } + acc.AddFields("ceph_pool_usage", fields, tags) + } + + return nil +} + +func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error { + data := make([]map[string]interface{}, 0) + err := json.Unmarshal([]byte(input), &data) + if err != nil { + return fmt.Errorf("failed to parse json: '%s': %v", input, err) + } + + // ceph.pool.stats: records pre pool IO and recovery throughput + for _, pool := range data { + pool_name, ok := pool["pool_name"].(string) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osd pool stats name", measurement) + } + // Note: the 'recovery' object looks broken (in hammer), so it's omitted + objects := []string{ + "client_io_rate", + "recovery_rate", + } + fields := make(map[string]interface{}) + for _, object := range objects { + perfdata, ok := pool[object].(map[string]interface{}) + if !ok { + return fmt.Errorf("WARNING %s - unable to decode osd pool stats", measurement) + } + for key, value := range perfdata { + fields[key] = value + } + } + tags := map[string]string{ + "name": pool_name, + } + acc.AddFields("ceph_pool_stats", fields, tags) + } + + return nil +} diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go index ce96943be..f7b17ece3 100644 --- a/plugins/inputs/ceph/ceph_test.go +++ b/plugins/inputs/ceph/ceph_test.go @@ -65,12 +65,17 @@ func TestFindSockets(t *testing.T) { assert.NoError(t, err) }() c := &Ceph{ - CephBinary: "foo", - SocketDir: tmpdir, + CephBinary: "foo", + OsdPrefix: "ceph-osd", + MonPrefix: "ceph-mon", + SocketDir: tmpdir, + SocketSuffix: "asok", + CephUser: "client.admin", + CephConfig: "/etc/ceph/ceph.conf", + GatherAdminSocketStats: true, + GatherClusterStats: false, } - c.setDefaults() - for _, st := range sockTestParams { createTestFiles(tmpdir, st)