From 9f39d083dbdebb7f2c6d94929e062593c34a8e81 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 14 Mar 2016 12:56:33 +0000 Subject: [PATCH] Input plugin for running ntp queries see #235 --- CHANGELOG.md | 1 + plugins/inputs/EXAMPLE_README.md | 2 - plugins/inputs/all/all.go | 1 + plugins/inputs/ntpq/README.md | 60 ++++ plugins/inputs/ntpq/ntpq.go | 202 +++++++++++++ plugins/inputs/ntpq/ntpq_test.go | 422 ++++++++++++++++++++++++++++ plugins/inputs/ntpq/ntpq_windows.go | 3 + 7 files changed, 689 insertions(+), 2 deletions(-) create mode 100644 plugins/inputs/ntpq/README.md create mode 100644 plugins/inputs/ntpq/ntpq.go create mode 100644 plugins/inputs/ntpq/ntpq_test.go create mode 100644 plugins/inputs/ntpq/ntpq_windows.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3545c35c6..cfba45536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! - [#235](https://github.com/influxdata/telegraf/issues/235): Add number of users to the `system` input plugin. - [#826](https://github.com/influxdata/telegraf/pull/826): "kernel" linux plugin for /proc/stat metrics (context switches, interrupts, etc.) +- [#847](https://github.com/influxdata/telegraf/pull/847): `ntpq`: Input plugin for running ntp query executable and gathering metrics. ### Bugfixes - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" diff --git a/plugins/inputs/EXAMPLE_README.md b/plugins/inputs/EXAMPLE_README.md index 9207cd2ab..6bebf1e88 100644 --- a/plugins/inputs/EXAMPLE_README.md +++ b/plugins/inputs/EXAMPLE_README.md @@ -30,8 +30,6 @@ The example plugin gathers metrics about example things ### Example Output: -Give an example `-test` output here - ``` $ ./telegraf -config telegraf.conf -input-filter example -test measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455 diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 2808ce2b5..a3300df66 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -29,6 +29,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" + _ "github.com/influxdata/telegraf/plugins/inputs/ntpq" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" diff --git a/plugins/inputs/ntpq/README.md b/plugins/inputs/ntpq/README.md new file mode 100644 index 000000000..80bf80f39 --- /dev/null +++ b/plugins/inputs/ntpq/README.md @@ -0,0 +1,60 @@ +# ntpq Input Plugin + +Get standard NTP query metrics, requires ntpq executable. + +Below is the documentation of the various headers returned from the NTP query +command when running `ntpq -p`. + +- remote – The remote peer or server being synced to. “LOCAL” is this local host +(included in case there are no remote peers or servers available); +- refid – Where or what the remote peer or server is itself synchronised to; +- st (stratum) – The remote peer or server Stratum +- t (type) – Type (u: unicast or manycast client, b: broadcast or multicast client, +l: local reference clock, s: symmetric peer, A: manycast server, +B: broadcast server, M: multicast server, see “Automatic Server Discovery“); +- when – When last polled (seconds ago, “h” hours ago, or “d” days ago); +- poll – Polling frequency: rfc5905 suggests this ranges in NTPv4 from 4 (16s) +to 17 (36h) (log2 seconds), however observation suggests the actual displayed +value is seconds for a much smaller range of 64 (26) to 1024 (210) seconds; +- reach – An 8-bit left-shift shift register value recording polls (bit set = +successful, bit reset = fail) displayed in octal; +- delay – Round trip communication delay to the remote peer or server (milliseconds); +- offset – Mean offset (phase) in the times reported between this local host and +the remote peer or server (RMS, milliseconds); +- jitter – Mean deviation (jitter) in the time reported for that remote peer or +server (RMS of difference of multiple time samples, milliseconds); + +### Configuration: + +```toml +# Get standard NTP query metrics, requires ntpq executable +[[inputs.ntpq]] + ## If false, set the -n ntpq flag. Can reduce metric gather times. + dns_lookup = true +``` + +### Measurements & Fields: + +- ntpq + - delay (float, milliseconds) + - jitter (float, milliseconds) + - offset (float, milliseconds) + - poll (int, seconds) + - reach (int) + - when (int, seconds) + +### Tags: + +- All measurements have the following tags: + - refid + - remote + - type + - stratum + +### Example Output: + +``` +$ telegraf -config ~/ws/telegraf.conf -input-filter ntpq -test +* Plugin: ntpq, Collection 1 +> ntpq,refid=.GPSs.,remote=*time.apple.com,stratum=1,type=u delay=91.797,jitter=3.735,offset=12.841,poll=64i,reach=377i,when=35i 1457960478909556134 +``` diff --git a/plugins/inputs/ntpq/ntpq.go b/plugins/inputs/ntpq/ntpq.go new file mode 100644 index 000000000..5e8ff6536 --- /dev/null +++ b/plugins/inputs/ntpq/ntpq.go @@ -0,0 +1,202 @@ +// +build !windows + +package ntpq + +import ( + "bufio" + "bytes" + "log" + "os/exec" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Mapping of ntpq header names to tag keys +var tagHeaders map[string]string = map[string]string{ + "remote": "remote", + "refid": "refid", + "st": "stratum", + "t": "type", +} + +// Mapping of the ntpq tag key to the index in the command output +var tagI map[string]int = map[string]int{ + "remote": -1, + "refid": -1, + "stratum": -1, + "type": -1, +} + +// Mapping of float metrics to their index in the command output +var floatI map[string]int = map[string]int{ + "delay": -1, + "offset": -1, + "jitter": -1, +} + +// Mapping of int metrics to their index in the command output +var intI map[string]int = map[string]int{ + "when": -1, + "poll": -1, + "reach": -1, +} + +type NTPQ struct { + runQ func() ([]byte, error) + + DNSLookup bool `toml:"dns_lookup"` +} + +func (n *NTPQ) Description() string { + return "Get standard NTP query metrics, requires ntpq executable." +} + +func (n *NTPQ) SampleConfig() string { + return ` + ## If false, set the -n ntpq flag. Can reduce metric gather time. + dns_lookup = true +` +} + +func (n *NTPQ) Gather(acc telegraf.Accumulator) error { + out, err := n.runQ() + if err != nil { + return err + } + + lineCounter := 0 + scanner := bufio.NewScanner(bytes.NewReader(out)) + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + if len(fields) < 2 { + continue + } + + // If lineCounter == 0, then this is the header line + if lineCounter == 0 { + for i, field := range fields { + // Check if field is a tag: + if tagKey, ok := tagHeaders[field]; ok { + tagI[tagKey] = i + continue + } + + // check if field is a float metric: + if _, ok := floatI[field]; ok { + floatI[field] = i + continue + } + + // check if field is an int metric: + if _, ok := intI[field]; ok { + intI[field] = i + continue + } + } + } else { + tags := make(map[string]string) + mFields := make(map[string]interface{}) + + // Get tags from output + for key, index := range tagI { + if index == -1 { + continue + } + tags[key] = fields[index] + } + + // Get integer metrics from output + for key, index := range intI { + if index == -1 { + continue + } + + if key == "when" { + when := fields[index] + switch { + case strings.HasSuffix(when, "h"): + m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "h")) + if err != nil { + log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + continue + } + // seconds in an hour + mFields[key] = int64(m) * 360 + continue + case strings.HasSuffix(when, "d"): + m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "d")) + if err != nil { + log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + continue + } + // seconds in a day + mFields[key] = int64(m) * 86400 + continue + case strings.HasSuffix(when, "m"): + m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "m")) + if err != nil { + log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + continue + } + // seconds in a day + mFields[key] = int64(m) * 60 + continue + } + } + + m, err := strconv.Atoi(fields[index]) + if err != nil { + log.Printf("ERROR ntpq: parsing int: %s", fields[index]) + continue + } + mFields[key] = int64(m) + } + + // get float metrics from output + for key, index := range floatI { + if index == -1 { + continue + } + + m, err := strconv.ParseFloat(fields[index], 64) + if err != nil { + log.Printf("ERROR ntpq: parsing float: %s", fields[index]) + continue + } + mFields[key] = m + } + + acc.AddFields("ntpq", mFields, tags) + } + + lineCounter++ + } + return nil +} + +func (n *NTPQ) runq() ([]byte, error) { + bin, err := exec.LookPath("ntpq") + if err != nil { + return nil, err + } + + var cmd *exec.Cmd + if n.DNSLookup { + cmd = exec.Command(bin, "-p") + } else { + cmd = exec.Command(bin, "-p", "-n") + } + + return cmd.Output() +} + +func init() { + inputs.Add("ntpq", func() telegraf.Input { + n := &NTPQ{} + n.runQ = n.runq + return n + }) +} diff --git a/plugins/inputs/ntpq/ntpq_test.go b/plugins/inputs/ntpq/ntpq_test.go new file mode 100644 index 000000000..228eddc62 --- /dev/null +++ b/plugins/inputs/ntpq/ntpq_test.go @@ -0,0 +1,422 @@ +// +build !windows + +package ntpq + +import ( + "fmt" + "testing" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" +) + +func TestSingleNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(singleNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(101), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestBadIntNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(badIntParseNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(101), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestBadFloatNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(badFloatParseNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(2), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestDaysNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(whenDaysNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(172800), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestHoursNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(whenHoursNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(720), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestMinutesNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(whenMinutesNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(120), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestBadWhenNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(whenBadNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestMultiNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(multiNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "delay": float64(54.033), + "jitter": float64(449514), + "offset": float64(243.426), + "poll": int64(1024), + "reach": int64(377), + "when": int64(740), + } + tags := map[string]string{ + "refid": "10.177.80.37", + "remote": "83.137.98.96", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) + + fields = map[string]interface{}{ + "delay": float64(60.785), + "jitter": float64(449539), + "offset": float64(232.597), + "poll": int64(1024), + "reach": int64(377), + "when": int64(739), + } + tags = map[string]string{ + "refid": "10.177.80.37", + "remote": "81.7.16.52", + "stratum": "2", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestBadHeaderNTPQ(t *testing.T) { + resetVars() + tt := tester{ + ret: []byte(badHeaderNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(101), + "poll": int64(256), + "reach": int64(37), + "delay": float64(51.016), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestMissingDelayColumnNTPQ(t *testing.T) { + resetVars() + tt := tester{ + ret: []byte(missingDelayNTPQ), + err: nil, + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.NoError(t, n.Gather(&acc)) + + fields := map[string]interface{}{ + "when": int64(101), + "poll": int64(256), + "reach": int64(37), + "offset": float64(233.010), + "jitter": float64(17.462), + } + tags := map[string]string{ + "remote": "*uschi5-ntp-002.", + "refid": "10.177.80.46", + "type": "u", + } + acc.AssertContainsTaggedFields(t, "ntpq", fields, tags) +} + +func TestFailedNTPQ(t *testing.T) { + tt := tester{ + ret: []byte(singleNTPQ), + err: fmt.Errorf("Test failure"), + } + n := &NTPQ{ + runQ: tt.runqTest, + } + + acc := testutil.Accumulator{} + assert.Error(t, n.Gather(&acc)) +} + +type tester struct { + ret []byte + err error +} + +func (t *tester) runqTest() ([]byte, error) { + return t.ret, t.err +} + +func resetVars() { + // Mapping of ntpq header names to tag keys + tagHeaders = map[string]string{ + "remote": "remote", + "refid": "refid", + "st": "stratum", + "t": "type", + } + + // Mapping of the ntpq tag key to the index in the command output + tagI = map[string]int{ + "remote": -1, + "refid": -1, + "stratum": -1, + "type": -1, + } + + // Mapping of float metrics to their index in the command output + floatI = map[string]int{ + "delay": -1, + "offset": -1, + "jitter": -1, + } + + // Mapping of int metrics to their index in the command output + intI = map[string]int{ + "when": -1, + "poll": -1, + "reach": -1, + } +} + +var singleNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462 +` + +var badHeaderNTPQ = `remote refid foobar t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462 +` + +var missingDelayNTPQ = `remote refid foobar t when poll reach offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 233.010 17.462 +` + +var whenDaysNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 2d 256 37 51.016 233.010 17.462 +` + +var whenHoursNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 2h 256 37 51.016 233.010 17.462 +` + +var whenMinutesNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 2m 256 37 51.016 233.010 17.462 +` + +var whenBadNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 2q 256 37 51.016 233.010 17.462 +` + +var badFloatParseNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 2 256 37 51.016 foobar 17.462 +` + +var badIntParseNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== +*uschi5-ntp-002. 10.177.80.46 2 u 101 foobar 37 51.016 233.010 17.462 +` + +var multiNTPQ = ` remote refid st t when poll reach delay offset jitter +============================================================================== + 83.137.98.96 10.177.80.37 2 u 740 1024 377 54.033 243.426 449514. + 81.7.16.52 10.177.80.37 2 u 739 1024 377 60.785 232.597 449539. + 131.188.3.221 10.177.80.37 2 u 783 1024 377 111.820 261.921 449528. + 5.9.29.107 10.177.80.37 2 u 703 1024 377 205.704 160.406 449602. + 91.189.94.4 10.177.80.37 2 u 673 1024 377 143.047 274.726 449445. +` diff --git a/plugins/inputs/ntpq/ntpq_windows.go b/plugins/inputs/ntpq/ntpq_windows.go new file mode 100644 index 000000000..a1f1a55fa --- /dev/null +++ b/plugins/inputs/ntpq/ntpq_windows.go @@ -0,0 +1,3 @@ +// +build windows + +package ntpq