From 6baa06121e47acdef15e7b8da3d2ab4890cf7c4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Vizcaino?= Date: Wed, 1 Feb 2017 15:47:23 +0100 Subject: [PATCH] Ceph: represent pgmap states using tags (#2229) * ceph: maps are already refs, no need to use a pointer * ceph: pgmap_states are represented in a single metric "count", differenciated by tag * Update CHANGELOG --- CHANGELOG.md | 19 ++++ plugins/inputs/ceph/README.md | 7 +- plugins/inputs/ceph/ceph.go | 88 +++++++++-------- plugins/inputs/ceph/ceph_test.go | 161 +++++++++++++++++++++++++++++-- 4 files changed, 228 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7c7cde15..aa0ae6056 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ ### Release Notes +- Ceph: the `ceph_pgmap_state` metric content has been modified to use a unique field `count`, with each state expressed as a `state` tag. + +Telegraf < 1.3: + +``` +# field_name value +active+clean 123 +active+clean+scrubbing 3 +``` + +Telegraf >= 1.3: + +``` +# field_name value tag +count 123 state=active+clean +count 3 state=active+clean+scrubbing +``` + - The [Riemann output plugin](./plugins/outputs/riemann) has been rewritten and the previous riemann plugin is _incompatible_ with the new one. The reasons for this are outlined in issue [#1878](https://github.com/influxdata/telegraf/issues/1878). @@ -14,6 +32,7 @@ It is highly recommended that all users migrate to the new riemann output plugin - [#2204](https://github.com/influxdata/telegraf/pull/2204): Extend http_response to support searching for a substring in response. Return 1 if found, else 0. - [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin. - [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin. +- [#2229](https://github.com/influxdata/telegraf/pull/2229): `ceph_pgmap_state` metric now uses a single field `count`, with PG state published as `state` tag. - [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations. - [#2330](https://github.com/influxdata/telegraf/pull/2330): Keep -config-directory when running as Windows service. - [#1900](https://github.com/influxdata/telegraf/pull/1900): Riemann plugin rewrite. diff --git a/plugins/inputs/ceph/README.md b/plugins/inputs/ceph/README.md index b3bba1e50..771ec665b 100644 --- a/plugins/inputs/ceph/README.md +++ b/plugins/inputs/ceph/README.md @@ -117,7 +117,7 @@ All fields are collected under the **ceph** measurement and stored as float64s. * recovering\_objects\_per\_sec (float) * ceph\_pgmap\_state - * state name e.g. active+clean (float) + * count (float) * ceph\_usage * bytes\_used (float) @@ -186,7 +186,7 @@ All measurements will have the following tags: *Cluster Stats* -* ceph\_pg\_state has the following tags: +* ceph\_pgmap\_state has the following tags: * state (state for which the value applies e.g. active+clean, active+remapped+backfill) * ceph\_pool\_usage has the following tags: * id @@ -213,7 +213,8 @@ telegraf -test -config /etc/telegraf/telegraf.conf -config-directory /etc/telegr
 > ceph_osdmap,host=ceph-mon-0 epoch=170772,full=false,nearfull=false,num_in_osds=340,num_osds=340,num_remapped_pgs=0,num_up_osds=340 1468841037000000000
 > ceph_pgmap,host=ceph-mon-0 bytes_avail=634895531270144,bytes_total=812117151809536,bytes_used=177221620539392,data_bytes=56979991615058,num_pgs=22952,op_per_sec=15869,read_bytes_sec=43956026,version=39387592,write_bytes_sec=165344818 1468841037000000000
-> ceph_pgmap_state,host=ceph-mon-0 active+clean=22952 1468928660000000000
+> ceph_pgmap_state,host=ceph-mon-0,state=active+clean count=22952 1468928660000000000
+> ceph_pgmap_state,host=ceph-mon-0,state=active+degraded count=16 1468928660000000000
 > ceph_usage,host=ceph-mon-0 total_avail_bytes=634895514791936,total_bytes=812117151809536,total_used_bytes=177221637017600 1468841037000000000
 > ceph_pool_usage,host=ceph-mon-0,id=150,name=cinder.volumes bytes_used=12648553794802,kb_used=12352103316,max_avail=154342562489244,objects=3026295 1468841037000000000
 > ceph_pool_usage,host=ceph-mon-0,id=182,name=cinder.volumes.flash bytes_used=8541308223964,kb_used=8341121313,max_avail=39388593563936,objects=2075066 1468841037000000000
diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go
index e43c3d7d3..7c03b6262 100644
--- a/plugins/inputs/ceph/ceph.go
+++ b/plugins/inputs/ceph/ceph.go
@@ -4,13 +4,14 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
-	"github.com/influxdata/telegraf"
-	"github.com/influxdata/telegraf/plugins/inputs"
 	"io/ioutil"
 	"log"
 	"os/exec"
 	"path/filepath"
 	"strings"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/inputs"
 )
 
 const (
@@ -108,7 +109,7 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
 			log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err)
 			continue
 		}
-		for tag, metrics := range *data {
+		for tag, metrics := range data {
 			acc.AddFields(measurement,
 				map[string]interface{}(metrics),
 				map[string]string{"type": s.sockType, "id": s.sockId, "collection": tag})
@@ -244,25 +245,19 @@ type taggedMetricMap map[string]metricMap
 
 // Parses a raw JSON string into a taggedMetricMap
 // Delegates the actual parsing to newTaggedMetricMap(..)
-func parseDump(dump string) (*taggedMetricMap, error) {
+func parseDump(dump string) (taggedMetricMap, error) {
 	data := make(map[string]interface{})
 	err := json.Unmarshal([]byte(dump), &data)
 	if err != nil {
 		return nil, fmt.Errorf("failed to parse json: '%s': %v", dump, err)
 	}
 
-	tmm := newTaggedMetricMap(data)
-
-	if err != nil {
-		return nil, fmt.Errorf("failed to tag dataset: '%v': %v", tmm, err)
-	}
-
-	return tmm, nil
+	return newTaggedMetricMap(data), nil
 }
 
 // Builds a TaggedMetricMap out of a generic string map.
 // The top-level key is used as a tag and all sub-keys are flattened into metrics
-func newTaggedMetricMap(data map[string]interface{}) *taggedMetricMap {
+func newTaggedMetricMap(data map[string]interface{}) taggedMetricMap {
 	tmm := make(taggedMetricMap)
 	for tag, datapoints := range data {
 		mm := make(metricMap)
@@ -271,7 +266,7 @@ func newTaggedMetricMap(data map[string]interface{}) *taggedMetricMap {
 		}
 		tmm[tag] = mm
 	}
-	return &tmm
+	return tmm
 }
 
 // Recursively flattens any k-v hierarchy present in data.
@@ -376,36 +371,53 @@ func decodeStatusPgmap(acc telegraf.Accumulator, data map[string]interface{}) er
 	return nil
 }
 
-func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error {
+func extractPgmapStates(data map[string]interface{}) ([]interface{}, error) {
+	const key = "pgs_by_state"
+
 	pgmap, ok := data["pgmap"].(map[string]interface{})
 	if !ok {
-		return fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
+		return nil, fmt.Errorf("WARNING %s - unable to decode pgmap", measurement)
 	}
-	fields := make(map[string]interface{})
-	for key, value := range pgmap {
-		switch value.(type) {
-		case []interface{}:
-			if key != "pgs_by_state" {
-				continue
-			}
-			for _, state := range value.([]interface{}) {
-				state_map, ok := state.(map[string]interface{})
-				if !ok {
-					return fmt.Errorf("WARNING %s - unable to decode pg state", measurement)
-				}
-				state_name, ok := state_map["state_name"].(string)
-				if !ok {
-					return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement)
-				}
-				state_count, ok := state_map["count"].(float64)
-				if !ok {
-					return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement)
-				}
-				fields[state_name] = state_count
-			}
+
+	s, ok := pgmap[key]
+	if !ok {
+		return nil, fmt.Errorf("WARNING %s - pgmap is missing the %s field", measurement, key)
+	}
+
+	states, ok := s.([]interface{})
+	if !ok {
+		return nil, fmt.Errorf("WARNING %s - pgmap[%s] is not a list", measurement, key)
+	}
+	return states, nil
+}
+
+func decodeStatusPgmapState(acc telegraf.Accumulator, data map[string]interface{}) error {
+	states, err := extractPgmapStates(data)
+	if err != nil {
+		return err
+	}
+	for _, state := range states {
+		stateMap, ok := state.(map[string]interface{})
+		if !ok {
+			return fmt.Errorf("WARNING %s - unable to decode pg state", measurement)
 		}
+		stateName, ok := stateMap["state_name"].(string)
+		if !ok {
+			return fmt.Errorf("WARNING %s - unable to decode pg state name", measurement)
+		}
+		stateCount, ok := stateMap["count"].(float64)
+		if !ok {
+			return fmt.Errorf("WARNING %s - unable to decode pg state count", measurement)
+		}
+
+		tags := map[string]string{
+			"state": stateName,
+		}
+		fields := map[string]interface{}{
+			"count": stateCount,
+		}
+		acc.AddFields("ceph_pgmap_state", fields, tags)
 	}
-	acc.AddFields("ceph_pgmap_state", fields, map[string]string{})
 	return nil
 }
 
diff --git a/plugins/inputs/ceph/ceph_test.go b/plugins/inputs/ceph/ceph_test.go
index f7b17ece3..4a75acd15 100644
--- a/plugins/inputs/ceph/ceph_test.go
+++ b/plugins/inputs/ceph/ceph_test.go
@@ -1,15 +1,17 @@
 package ceph
 
 import (
+	"encoding/json"
 	"fmt"
-	"github.com/influxdata/telegraf/testutil"
-	"github.com/stretchr/testify/assert"
 	"io/ioutil"
 	"os"
 	"path"
 	"strconv"
 	"strings"
 	"testing"
+
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/stretchr/testify/assert"
 )
 
 const (
@@ -24,15 +26,38 @@ func TestParseSockId(t *testing.T) {
 func TestParseMonDump(t *testing.T) {
 	dump, err := parseDump(monPerfDump)
 	assert.NoError(t, err)
-	assert.InEpsilon(t, 5678670180, (*dump)["cluster"]["osd_kb_used"], epsilon)
-	assert.InEpsilon(t, 6866.540527000, (*dump)["paxos"]["store_state_latency.sum"], epsilon)
+	assert.InEpsilon(t, 5678670180, dump["cluster"]["osd_kb_used"], epsilon)
+	assert.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
 }
 
 func TestParseOsdDump(t *testing.T) {
 	dump, err := parseDump(osdPerfDump)
 	assert.NoError(t, err)
-	assert.InEpsilon(t, 552132.109360000, (*dump)["filestore"]["commitcycle_interval.sum"], epsilon)
-	assert.Equal(t, float64(0), (*dump)["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
+	assert.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon)
+	assert.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
+}
+
+func TestDecodeStatusPgmapState(t *testing.T) {
+	data := make(map[string]interface{})
+	err := json.Unmarshal([]byte(clusterStatusDump), &data)
+	assert.NoError(t, err)
+
+	acc := &testutil.Accumulator{}
+	err = decodeStatusPgmapState(acc, data)
+	assert.NoError(t, err)
+
+	var results = []struct {
+		fields map[string]interface{}
+		tags   map[string]string
+	}{
+		{map[string]interface{}{"count": float64(2560)}, map[string]string{"state": "active+clean"}},
+		{map[string]interface{}{"count": float64(10)}, map[string]string{"state": "active+scrubbing"}},
+		{map[string]interface{}{"count": float64(5)}, map[string]string{"state": "active+backfilling"}},
+	}
+
+	for _, r := range results {
+		acc.AssertContainsTaggedFields(t, "ceph_pgmap_state", r.fields, r.tags)
+	}
 }
 
 func TestGather(t *testing.T) {
@@ -685,3 +710,127 @@ var osdPerfDump = `
       "wait": { "avgcount": 0,
           "sum": 0.000000000}}}
 `
+var clusterStatusDump = `
+{
+  "health": {
+    "health": {
+      "health_services": [
+        {
+          "mons": [
+            {
+              "name": "a",
+              "kb_total": 114289256,
+              "kb_used": 26995516,
+              "kb_avail": 81465132,
+              "avail_percent": 71,
+              "last_updated": "2017-01-03 17:20:57.595004",
+              "store_stats": {
+                "bytes_total": 942117141,
+                "bytes_sst": 0,
+                "bytes_log": 4345406,
+                "bytes_misc": 937771735,
+                "last_updated": "0.000000"
+              },
+              "health": "HEALTH_OK"
+            },
+            {
+              "name": "b",
+              "kb_total": 114289256,
+              "kb_used": 27871624,
+              "kb_avail": 80589024,
+              "avail_percent": 70,
+              "last_updated": "2017-01-03 17:20:47.784331",
+              "store_stats": {
+                "bytes_total": 454853104,
+                "bytes_sst": 0,
+                "bytes_log": 5788320,
+                "bytes_misc": 449064784,
+                "last_updated": "0.000000"
+              },
+              "health": "HEALTH_OK"
+            },
+            {
+              "name": "c",
+              "kb_total": 130258508,
+              "kb_used": 38076996,
+              "kb_avail": 85541692,
+              "avail_percent": 65,
+              "last_updated": "2017-01-03 17:21:03.311123",
+              "store_stats": {
+                "bytes_total": 455555199,
+                "bytes_sst": 0,
+                "bytes_log": 6950876,
+                "bytes_misc": 448604323,
+                "last_updated": "0.000000"
+              },
+              "health": "HEALTH_OK"
+            }
+          ]
+        }
+      ]
+    },
+    "timechecks": {
+      "epoch": 504,
+      "round": 34642,
+      "round_status": "finished",
+      "mons": [
+        { "name": "a", "skew": 0, "latency": 0, "health": "HEALTH_OK" },
+        { "name": "b", "skew": -0, "latency": 0.000951, "health": "HEALTH_OK" },
+        { "name": "c", "skew": -0, "latency": 0.000946, "health": "HEALTH_OK" }
+      ]
+    },
+    "summary": [],
+    "overall_status": "HEALTH_OK",
+    "detail": []
+  },
+  "fsid": "01234567-abcd-9876-0123-ffeeddccbbaa",
+  "election_epoch": 504,
+  "quorum": [ 0, 1, 2 ],
+  "quorum_names": [ "a", "b", "c" ],
+  "monmap": {
+    "epoch": 17,
+    "fsid": "01234567-abcd-9876-0123-ffeeddccbbaa",
+    "modified": "2016-04-11 14:01:52.600198",
+    "created": "0.000000",
+    "mons": [
+      { "rank": 0, "name": "a", "addr": "192.168.0.1:6789/0" },
+      { "rank": 1, "name": "b", "addr": "192.168.0.2:6789/0" },
+      { "rank": 2, "name": "c", "addr": "192.168.0.3:6789/0" }
+    ]
+  },
+  "osdmap": {
+    "osdmap": {
+      "epoch": 21734,
+      "num_osds": 24,
+      "num_up_osds": 24,
+      "num_in_osds": 24,
+      "full": false,
+      "nearfull": false,
+      "num_remapped_pgs": 0
+    }
+  },
+  "pgmap": {
+    "pgs_by_state": [
+      { "state_name": "active+clean", "count": 2560 },
+      { "state_name": "active+scrubbing", "count": 10 },
+      { "state_name": "active+backfilling", "count": 5 }
+    ],
+    "version": 52314277,
+    "num_pgs": 2560,
+    "data_bytes": 2700031960713,
+    "bytes_used": 7478347665408,
+    "bytes_avail": 9857462382592,
+    "bytes_total": 17335810048000,
+    "read_bytes_sec": 0,
+    "write_bytes_sec": 367217,
+    "op_per_sec": 98
+  },
+  "mdsmap": {
+    "epoch": 1,
+    "up": 0,
+    "in": 0,
+    "max": 0,
+    "by_rank": []
+  }
+}
+`