Ceph Cluster Performance Input Plugin
The existing ceph input plugin only has access to the local admin daemon socket on the local host, and as such has access to a limited subset of data. This extends the plugin to use CLI commands to get access to the full spread of Ceph data. This patch collects global OSD map and IO statistics, PG state and per pool IO and utilization statistics.
This commit is contained in:
parent
602a36e241
commit
8dd253b0a5
|
@ -102,6 +102,7 @@ consistent with the behavior of `collection_jitter`.
|
||||||
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
- [#1543](https://github.com/influxdata/telegraf/pull/1543): Official Windows service.
|
||||||
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
- [#1414](https://github.com/influxdata/telegraf/pull/1414): Forking sensors command to remove C package dependency.
|
||||||
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
- [#1389](https://github.com/influxdata/telegraf/pull/1389): Add a new SNMP plugin.
|
||||||
|
- [#1513](https://github.com/influxdata/telegraf/issues/1513): Add Ceph Cluster Performance Statistics
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|
|
@ -511,6 +511,10 @@
|
||||||
|
|
||||||
# # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
# # Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
||||||
# [[inputs.ceph]]
|
# [[inputs.ceph]]
|
||||||
|
# ## This is the recommended interval to poll. Too frequent and you will lose
|
||||||
|
# ## data points due to timeouts during rebalancing and recovery
|
||||||
|
# interval = '1m'
|
||||||
|
#
|
||||||
# ## All configuration values are optional, defaults are shown below
|
# ## All configuration values are optional, defaults are shown below
|
||||||
#
|
#
|
||||||
# ## location of ceph binary
|
# ## location of ceph binary
|
||||||
|
@ -525,6 +529,26 @@
|
||||||
#
|
#
|
||||||
# ## suffix used to identify socket files
|
# ## suffix used to identify socket files
|
||||||
# socket_suffix = "asok"
|
# socket_suffix = "asok"
|
||||||
|
#
|
||||||
|
# ## Ceph user to authenticate as, ceph will search for the corresponding keyring
|
||||||
|
# ## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the
|
||||||
|
# ## client section of ceph.conf for example:
|
||||||
|
# ##
|
||||||
|
# ## [client.telegraf]
|
||||||
|
# ## keyring = /etc/ceph/client.telegraf.keyring
|
||||||
|
# ##
|
||||||
|
# ## Consult the ceph documentation for more detail on keyring generation.
|
||||||
|
# ceph_user = "client.admin"
|
||||||
|
#
|
||||||
|
# ## Ceph configuration to use to locate the cluster
|
||||||
|
# ceph_config = "/etc/ceph/ceph.conf"
|
||||||
|
#
|
||||||
|
# ## Whether to gather statistics via the admin socket
|
||||||
|
# gather_admin_socket_stats = true
|
||||||
|
#
|
||||||
|
# ## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config
|
||||||
|
# ## to be specified
|
||||||
|
# gather_cluster_stats = true
|
||||||
|
|
||||||
|
|
||||||
# # Read specific statistics per cgroup
|
# # Read specific statistics per cgroup
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
|
|
||||||
Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
||||||
|
|
||||||
The plugin works by scanning the configured SocketDir for OSD and MON socket files. When it finds
|
*Admin Socket Stats*
|
||||||
|
|
||||||
|
This gatherer works by scanning the configured SocketDir for OSD and MON socket files. When it finds
|
||||||
a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump**
|
a MON socket, it runs **ceph --admin-daemon $file perfcounters_dump**. For OSDs it runs **ceph --admin-daemon $file perf dump**
|
||||||
|
|
||||||
The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are
|
The resulting JSON is parsed and grouped into collections, based on top-level key. Top-level keys are
|
||||||
|
@ -27,11 +29,26 @@ Would be parsed into the following metrics, all of which would be tagged with co
|
||||||
- refresh_latency.sum: 5378.794002000
|
- refresh_latency.sum: 5378.794002000
|
||||||
|
|
||||||
|
|
||||||
|
*Cluster Stats*
|
||||||
|
|
||||||
|
This gatherer works by invoking ceph commands against the cluster thus only requires the ceph client, valid
|
||||||
|
ceph configuration and an access key to function (the ceph_config and ceph_user configuration variables work
|
||||||
|
in conjunction to specify these prerequisites). It may be run on any server you wish which has access to
|
||||||
|
the cluster. The currently supported commands are:
|
||||||
|
|
||||||
|
* ceph status
|
||||||
|
* ceph df
|
||||||
|
* ceph osd pool stats
|
||||||
|
|
||||||
### Configuration:
|
### Configuration:
|
||||||
|
|
||||||
```
|
```
|
||||||
# Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
# Collects performance metrics from the MON and OSD nodes in a Ceph storage cluster.
|
||||||
[[inputs.ceph]]
|
[[inputs.ceph]]
|
||||||
|
## This is the recommended interval to poll. Too frequent and you will lose
|
||||||
|
## data points due to timeouts during rebalancing and recovery
|
||||||
|
interval = '1m'
|
||||||
|
|
||||||
## All configuration values are optional, defaults are shown below
|
## All configuration values are optional, defaults are shown below
|
||||||
|
|
||||||
## location of ceph binary
|
## location of ceph binary
|
||||||
|
@ -46,15 +63,86 @@ Would be parsed into the following metrics, all of which would be tagged with co
|
||||||
|
|
||||||
## suffix used to identify socket files
|
## suffix used to identify socket files
|
||||||
socket_suffix = "asok"
|
socket_suffix = "asok"
|
||||||
|
|
||||||
|
## Ceph user to authenticate as, ceph will search for the corresponding keyring
|
||||||
|
## e.g. client.admin.keyring in /etc/ceph, or the explicit path defined in the
|
||||||
|
## client section of ceph.conf for example:
|
||||||
|
##
|
||||||
|
## [client.telegraf]
|
||||||
|
## keyring = /etc/ceph/client.telegraf.keyring
|
||||||
|
##
|
||||||
|
## Consult the ceph documentation for more detail on keyring generation.
|
||||||
|
ceph_user = "client.admin"
|
||||||
|
|
||||||
|
## Ceph configuration to use to locate the cluster
|
||||||
|
ceph_config = "/etc/ceph/ceph.conf"
|
||||||
|
|
||||||
|
## Whether to gather statistics via the admin socket
|
||||||
|
gather_admin_socket_stats = true
|
||||||
|
|
||||||
|
## Whether to gather statistics via ceph commands, requires ceph_user and ceph_config
|
||||||
|
## to be specified
|
||||||
|
gather_cluster_stats = true
|
||||||
```
|
```
|
||||||
|
|
||||||
### Measurements & Fields:
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
*Admin Socket Stats*
|
||||||
|
|
||||||
All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go.
|
All fields are collected under the **ceph** measurement and stored as float64s. For a full list of fields, see the sample perf dumps in ceph_test.go.
|
||||||
|
|
||||||
|
*Cluster Stats*
|
||||||
|
|
||||||
|
* ceph\_osdmap
|
||||||
|
* epoch (float)
|
||||||
|
* full (boolean)
|
||||||
|
* nearfull (boolean)
|
||||||
|
* num\_in\_osds (float)
|
||||||
|
* num\_osds (float)
|
||||||
|
* num\_remremapped\_pgs (float)
|
||||||
|
* num\_up\_osds (float)
|
||||||
|
|
||||||
|
* ceph\_pgmap
|
||||||
|
* bytes\_avail (float)
|
||||||
|
* bytes\_total (float)
|
||||||
|
* bytes\_used (float)
|
||||||
|
* data\_bytes (float)
|
||||||
|
* num\_pgs (float)
|
||||||
|
* op\_per\_sec (float)
|
||||||
|
* read\_bytes\_sec (float)
|
||||||
|
* version (float)
|
||||||
|
* write\_bytes\_sec (float)
|
||||||
|
* recovering\_bytes\_per\_sec (float)
|
||||||
|
* recovering\_keys\_per\_sec (float)
|
||||||
|
* recovering\_objects\_per\_sec (float)
|
||||||
|
|
||||||
|
* ceph\_pgmap\_state
|
||||||
|
* state name e.g. active+clean (float)
|
||||||
|
|
||||||
|
* ceph\_usage
|
||||||
|
* bytes\_used (float)
|
||||||
|
* kb\_used (float)
|
||||||
|
* max\_avail (float)
|
||||||
|
* objects (float)
|
||||||
|
|
||||||
|
* ceph\_pool\_usage
|
||||||
|
* bytes\_used (float)
|
||||||
|
* kb\_used (float)
|
||||||
|
* max\_avail (float)
|
||||||
|
* objects (float)
|
||||||
|
|
||||||
|
* ceph\_pool\_stats
|
||||||
|
* op\_per\_sec (float)
|
||||||
|
* read\_bytes\_sec (float)
|
||||||
|
* write\_bytes\_sec (float)
|
||||||
|
* recovering\_object\_per\_sec (float)
|
||||||
|
* recovering\_bytes\_per\_sec (float)
|
||||||
|
* recovering\_keys\_per\_sec (float)
|
||||||
|
|
||||||
### Tags:
|
### Tags:
|
||||||
|
|
||||||
|
*Admin Socket Stats*
|
||||||
|
|
||||||
All measurements will have the following tags:
|
All measurements will have the following tags:
|
||||||
|
|
||||||
- type: either 'osd' or 'mon' to indicate which type of node was queried
|
- type: either 'osd' or 'mon' to indicate which type of node was queried
|
||||||
|
@ -96,9 +184,21 @@ All measurements will have the following tags:
|
||||||
- throttle-osd_client_bytes
|
- throttle-osd_client_bytes
|
||||||
- throttle-osd_client_messages
|
- throttle-osd_client_messages
|
||||||
|
|
||||||
|
*Cluster Stats*
|
||||||
|
|
||||||
|
* ceph\_pg\_state has the following tags:
|
||||||
|
* state (state for which the value applies e.g. active+clean, active+remapped+backfill)
|
||||||
|
* ceph\_pool\_usage has the following tags:
|
||||||
|
* id
|
||||||
|
* name
|
||||||
|
* ceph\_pool\_stats has the following tags:
|
||||||
|
* id
|
||||||
|
* name
|
||||||
|
|
||||||
### Example Output:
|
### Example Output:
|
||||||
|
|
||||||
|
*Admin Socket Stats*
|
||||||
|
|
||||||
<pre>
|
<pre>
|
||||||
telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d -input-filter ceph
|
telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegraf/telegraf.d -input-filter ceph
|
||||||
* Plugin: ceph, Collection 1
|
* Plugin: ceph, Collection 1
|
||||||
|
@ -107,3 +207,16 @@ telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegr
|
||||||
> ceph,collection=throttle-mon_daemon_bytes,id=node-2,type=mon get=4058121,get_or_fail_fail=0,get_or_fail_success=0,get_sum=6027348117,max=419430400,put=4058121,put_sum=6027348117,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814815661
|
> ceph,collection=throttle-mon_daemon_bytes,id=node-2,type=mon get=4058121,get_or_fail_fail=0,get_or_fail_success=0,get_sum=6027348117,max=419430400,put=4058121,put_sum=6027348117,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814815661
|
||||||
> ceph,collection=throttle-msgr_dispatch_throttler-mon,id=node-2,type=mon get=54276277,get_or_fail_fail=0,get_or_fail_success=0,get_sum=370232877040,max=104857600,put=54276277,put_sum=370232877040,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814872064
|
> ceph,collection=throttle-msgr_dispatch_throttler-mon,id=node-2,type=mon get=54276277,get_or_fail_fail=0,get_or_fail_success=0,get_sum=370232877040,max=104857600,put=54276277,put_sum=370232877040,take=0,take_sum=0,val=0,wait.avgcount=0,wait.sum=0 1462821234814872064
|
||||||
</pre>
|
</pre>
|
||||||
|
|
||||||
|
*Cluster Stats*
|
||||||
|
|
||||||
|
<pre>
|
||||||
|
> ceph_osdmap,host=ceph-mon-0 epoch=170772,full=false,nearfull=false,num_in_osds=340,num_osds=340,num_remapped_pgs=0,num_up_osds=340 1468841037000000000
|
||||||
|
> ceph_pgmap,host=ceph-mon-0 bytes_avail=634895531270144,bytes_total=812117151809536,bytes_used=177221620539392,data_bytes=56979991615058,num_pgs=22952,op_per_sec=15869,read_bytes_sec=43956026,version=39387592,write_bytes_sec=165344818 1468841037000000000
|
||||||
|
> ceph_pgmap_state,host=ceph-mon-0 active+clean=22952 1468928660000000000
|
||||||
|
> ceph_usage,host=ceph-mon-0 total_avail_bytes=634895514791936,total_bytes=812117151809536,total_used_bytes=177221637017600 1468841037000000000
|
||||||
|
> ceph_pool_usage,host=ceph-mon-0,id=150,name=cinder.volumes bytes_used=12648553794802,kb_used=12352103316,max_avail=154342562489244,objects=3026295 1468841037000000000
|
||||||
|
> ceph_pool_usage,host=ceph-mon-0,id=182,name=cinder.volumes.flash bytes_used=8541308223964,kb_used=8341121313,max_avail=39388593563936,objects=2075066 1468841037000000000
|
||||||
|
> ceph_pool_stats,host=ceph-mon-0,id=150,name=cinder.volumes op_per_sec=1706,read_bytes_sec=28671674,write_bytes_sec=29994541 1468841037000000000
|
||||||
|
> ceph_pool_stats,host=ceph-mon-0,id=182,name=cinder.volumes.flash op_per_sec=9748,read_bytes_sec=9605524,write_bytes_sec=45593310 1468841037000000000
|
||||||
|
</pre>
|
||||||
|
|
|
@ -28,28 +28,10 @@ type Ceph struct {
|
||||||
MonPrefix string
|
MonPrefix string
|
||||||
SocketDir string
|
SocketDir string
|
||||||
SocketSuffix string
|
SocketSuffix string
|
||||||
}
|
CephUser string
|
||||||
|
CephConfig string
|
||||||
func (c *Ceph) setDefaults() {
|
GatherAdminSocketStats bool
|
||||||
if c.CephBinary == "" {
|
GatherClusterStats bool
|
||||||
c.CephBinary = "/usr/bin/ceph"
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.OsdPrefix == "" {
|
|
||||||
c.OsdPrefix = osdPrefix
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.MonPrefix == "" {
|
|
||||||
c.MonPrefix = monPrefix
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.SocketDir == "" {
|
|
||||||
c.SocketDir = "/var/run/ceph"
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.SocketSuffix == "" {
|
|
||||||
c.SocketSuffix = sockSuffix
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ceph) Description() string {
|
func (c *Ceph) Description() string {
|
||||||
|
@ -57,6 +39,10 @@ func (c *Ceph) Description() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
|
## This is the recommended interval to poll. Too frequent and you will lose
|
||||||
|
## data points due to timeouts during rebalancing and recovery
|
||||||
|
interval = '1m'
|
||||||
|
|
||||||
## All configuration values are optional, defaults are shown below
|
## All configuration values are optional, defaults are shown below
|
||||||
|
|
||||||
## location of ceph binary
|
## location of ceph binary
|
||||||
|
@ -71,6 +57,18 @@ var sampleConfig = `
|
||||||
|
|
||||||
## suffix used to identify socket files
|
## suffix used to identify socket files
|
||||||
socket_suffix = "asok"
|
socket_suffix = "asok"
|
||||||
|
|
||||||
|
## Ceph user to authenticate as
|
||||||
|
ceph_user = "client.admin"
|
||||||
|
|
||||||
|
## Ceph configuration to use to locate the cluster
|
||||||
|
ceph_config = "/etc/ceph/ceph.conf"
|
||||||
|
|
||||||
|
## Whether to gather statistics via the admin socket
|
||||||
|
gather_admin_socket_stats = true
|
||||||
|
|
||||||
|
## Whether to gather statistics via ceph commands
|
||||||
|
gather_cluster_stats = true
|
||||||
`
|
`
|
||||||
|
|
||||||
func (c *Ceph) SampleConfig() string {
|
func (c *Ceph) SampleConfig() string {
|
||||||
|
@ -78,7 +76,22 @@ func (c *Ceph) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Ceph) Gather(acc telegraf.Accumulator) error {
|
func (c *Ceph) Gather(acc telegraf.Accumulator) error {
|
||||||
c.setDefaults()
|
if c.GatherAdminSocketStats {
|
||||||
|
if err := c.gatherAdminSocketStats(acc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.GatherClusterStats {
|
||||||
|
if err := c.gatherClusterStats(acc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
|
||||||
sockets, err := findSockets(c)
|
sockets, err := findSockets(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to find sockets at path '%s': %v", c.SocketDir, err)
|
return fmt.Errorf("failed to find sockets at path '%s': %v", c.SocketDir, err)
|
||||||
|
@ -104,8 +117,46 @@ func (c *Ceph) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error {
|
||||||
|
jobs := []struct {
|
||||||
|
command string
|
||||||
|
parser func(telegraf.Accumulator, string) error
|
||||||
|
}{
|
||||||
|
{"status", decodeStatus},
|
||||||
|
{"df", decodeDf},
|
||||||
|
{"osd pool stats", decodeOsdPoolStats},
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each job, execute against the cluster, parse and accumulate the data points
|
||||||
|
for _, job := range jobs {
|
||||||
|
output, err := c.exec(job.command)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error executing command: %v", err)
|
||||||
|
}
|
||||||
|
err = job.parser(acc, output)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error parsing output: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add(measurement, func() telegraf.Input { return &Ceph{} })
|
c := Ceph{
|
||||||
|
CephBinary: "/usr/bin/ceph",
|
||||||
|
OsdPrefix: osdPrefix,
|
||||||
|
MonPrefix: monPrefix,
|
||||||
|
SocketDir: "/var/run/ceph",
|
||||||
|
SocketSuffix: sockSuffix,
|
||||||
|
CephUser: "client.admin",
|
||||||
|
CephConfig: "/etc/ceph/ceph.conf",
|
||||||
|
GatherAdminSocketStats: true,
|
||||||
|
GatherClusterStats: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
inputs.Add(measurement, func() telegraf.Input { return &c })
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var perfDump = func(binary string, socket *socket) (string, error) {
|
var perfDump = func(binary string, socket *socket) (string, error) {
|
||||||
|
@ -247,3 +298,192 @@ func flatten(data interface{}) []*metric {
|
||||||
|
|
||||||
return metrics
|
return metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Ceph) exec(command string) (string, error) {
|
||||||
|
cmdArgs := []string{"--conf", c.CephConfig, "--name", c.CephUser, "--format", "json"}
|
||||||
|
cmdArgs = append(cmdArgs, strings.Split(command, " ")...)
|
||||||
|
|
||||||
|
cmd := exec.Command(c.CephBinary, cmdArgs...)
|
||||||
|
|
||||||
|
var out bytes.Buffer
|
||||||
|
cmd.Stdout = &out
|
||||||
|
err := cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("error running ceph %v: %s", command, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
output := out.String()
|
||||||
|
|
||||||
|
// Ceph doesn't sanitize its output, and may return invalid JSON. Patch this
|
||||||
|
// up for them, as having some inaccurate data is better than none.
|
||||||
|
output = strings.Replace(output, "-inf", "0", -1)
|
||||||
|
output = strings.Replace(output, "inf", "0", -1)
|
||||||
|
|
||||||
|
return output, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeStatus(acc telegraf.Accumulator, input string) error {
|
||||||
|
data := make(map[string]interface{})
|
||||||
|
err := json.Unmarshal([]byte(input), &data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse json: '%s': %v", input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = decodeStatusOsdmap(acc, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = decodeStatusPgmap(acc, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = decodeStatusPgmapState(acc, data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeStatusOsdmap(acc telegraf.Accumulator, data map[string]interface{}) error {
|
||||||
|
osdmap, ok := data["osdmap"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement)
|
||||||
|
}
|
||||||
|
fields, ok := osdmap["osdmap"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode osdmap", measurement)
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_osdmap", fields, map[string]string{})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) error {
|
||||||
|
pgmap, ok := data["pgmap"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
for key, value := range pgmap {
|
||||||
|
switch value.(type) {
|
||||||
|
case float64:
|
||||||
|
fields[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_pgmap", fields, map[string]string{})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error {
|
||||||
|
pgmap, ok := data["pgmap"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
for key, value := range pgmap {
|
||||||
|
switch value.(type) {
|
||||||
|
case []interface{}:
|
||||||
|
if key != "pgs_by_state" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, state := range value.([]interface{}) {
|
||||||
|
state_map, ok := state.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode pg state", measurement)
|
||||||
|
}
|
||||||
|
state_name, ok := state_map["state_name"].(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement)
|
||||||
|
}
|
||||||
|
state_count, ok := state_map["count"].(float64)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement)
|
||||||
|
}
|
||||||
|
fields[state_name] = state_count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_pgmap_state", fields, map[string]string{})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeDf(acc telegraf.Accumulator, input string) error {
|
||||||
|
data := make(map[string]interface{})
|
||||||
|
err := json.Unmarshal([]byte(input), &data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse json: '%s': %v", input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ceph.usage: records global utilization and number of objects
|
||||||
|
stats_fields, ok := data["stats"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode df stats", measurement)
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_usage", stats_fields, map[string]string{})
|
||||||
|
|
||||||
|
// ceph.pool.usage: records per pool utilization and number of objects
|
||||||
|
pools, ok := data["pools"].([]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode df pools", measurement)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pool := range pools {
|
||||||
|
pool_map, ok := pool.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode df pool", measurement)
|
||||||
|
}
|
||||||
|
pool_name, ok := pool_map["name"].(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode df pool name", measurement)
|
||||||
|
}
|
||||||
|
fields, ok := pool_map["stats"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode df pool stats", measurement)
|
||||||
|
}
|
||||||
|
tags := map[string]string{
|
||||||
|
"name": pool_name,
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_pool_usage", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error {
|
||||||
|
data := make([]map[string]interface{}, 0)
|
||||||
|
err := json.Unmarshal([]byte(input), &data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse json: '%s': %v", input, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ceph.pool.stats: records pre pool IO and recovery throughput
|
||||||
|
for _, pool := range data {
|
||||||
|
pool_name, ok := pool["pool_name"].(string)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode osd pool stats name", measurement)
|
||||||
|
}
|
||||||
|
// Note: the 'recovery' object looks broken (in hammer), so it's omitted
|
||||||
|
objects := []string{
|
||||||
|
"client_io_rate",
|
||||||
|
"recovery_rate",
|
||||||
|
}
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
for _, object := range objects {
|
||||||
|
perfdata, ok := pool[object].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("WARNING %s - unable to decode osd pool stats", measurement)
|
||||||
|
}
|
||||||
|
for key, value := range perfdata {
|
||||||
|
fields[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tags := map[string]string{
|
||||||
|
"name": pool_name,
|
||||||
|
}
|
||||||
|
acc.AddFields("ceph_pool_stats", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -66,11 +66,16 @@ func TestFindSockets(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
c := &Ceph{
|
c := &Ceph{
|
||||||
CephBinary: "foo",
|
CephBinary: "foo",
|
||||||
|
OsdPrefix: "ceph-osd",
|
||||||
|
MonPrefix: "ceph-mon",
|
||||||
SocketDir: tmpdir,
|
SocketDir: tmpdir,
|
||||||
|
SocketSuffix: "asok",
|
||||||
|
CephUser: "client.admin",
|
||||||
|
CephConfig: "/etc/ceph/ceph.conf",
|
||||||
|
GatherAdminSocketStats: true,
|
||||||
|
GatherClusterStats: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.setDefaults()
|
|
||||||
|
|
||||||
for _, st := range sockTestParams {
|
for _, st := range sockTestParams {
|
||||||
createTestFiles(tmpdir, st)
|
createTestFiles(tmpdir, st)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue