Input plugin for running ntp queries

see #235
This commit is contained in:
Cameron Sparr 2016-03-14 12:56:33 +00:00 committed by Michele Fadda
parent 667127b189
commit 9f39d083db
7 changed files with 689 additions and 2 deletions

View File

@ -17,6 +17,7 @@
- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert! - [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert!
- [#235](https://github.com/influxdata/telegraf/issues/235): Add number of users to the `system` input plugin. - [#235](https://github.com/influxdata/telegraf/issues/235): Add number of users to the `system` input plugin.
- [#826](https://github.com/influxdata/telegraf/pull/826): "kernel" linux plugin for /proc/stat metrics (context switches, interrupts, etc.) - [#826](https://github.com/influxdata/telegraf/pull/826): "kernel" linux plugin for /proc/stat metrics (context switches, interrupts, etc.)
- [#847](https://github.com/influxdata/telegraf/pull/847): `ntpq`: Input plugin for running ntp query executable and gathering metrics.
### Bugfixes ### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":" - [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"

View File

@ -30,8 +30,6 @@ The example plugin gathers metrics about example things
### Example Output: ### Example Output:
Give an example `-test` output here
``` ```
$ ./telegraf -config telegraf.conf -input-filter example -test $ ./telegraf -config telegraf.conf -input-filter example -test
measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455 measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455

View File

@ -29,6 +29,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/net_response" _ "github.com/influxdata/telegraf/plugins/inputs/net_response"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq" _ "github.com/influxdata/telegraf/plugins/inputs/nsq"
_ "github.com/influxdata/telegraf/plugins/inputs/ntpq"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/passenger"
_ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/ping"

View File

@ -0,0 +1,60 @@
# ntpq Input Plugin
Get standard NTP query metrics, requires ntpq executable.
Below is the documentation of the various headers returned from the NTP query
command when running `ntpq -p`.
- remote The remote peer or server being synced to. “LOCAL” is this local host
(included in case there are no remote peers or servers available);
- refid Where or what the remote peer or server is itself synchronised to;
- st (stratum) The remote peer or server Stratum
- t (type) Type (u: unicast or manycast client, b: broadcast or multicast client,
l: local reference clock, s: symmetric peer, A: manycast server,
B: broadcast server, M: multicast server, see “Automatic Server Discovery“);
- when When last polled (seconds ago, “h” hours ago, or “d” days ago);
- poll Polling frequency: rfc5905 suggests this ranges in NTPv4 from 4 (16s)
to 17 (36h) (log2 seconds), however observation suggests the actual displayed
value is seconds for a much smaller range of 64 (26) to 1024 (210) seconds;
- reach An 8-bit left-shift shift register value recording polls (bit set =
successful, bit reset = fail) displayed in octal;
- delay Round trip communication delay to the remote peer or server (milliseconds);
- offset Mean offset (phase) in the times reported between this local host and
the remote peer or server (RMS, milliseconds);
- jitter Mean deviation (jitter) in the time reported for that remote peer or
server (RMS of difference of multiple time samples, milliseconds);
### Configuration:
```toml
# Get standard NTP query metrics, requires ntpq executable
[[inputs.ntpq]]
## If false, set the -n ntpq flag. Can reduce metric gather times.
dns_lookup = true
```
### Measurements & Fields:
- ntpq
- delay (float, milliseconds)
- jitter (float, milliseconds)
- offset (float, milliseconds)
- poll (int, seconds)
- reach (int)
- when (int, seconds)
### Tags:
- All measurements have the following tags:
- refid
- remote
- type
- stratum
### Example Output:
```
$ telegraf -config ~/ws/telegraf.conf -input-filter ntpq -test
* Plugin: ntpq, Collection 1
> ntpq,refid=.GPSs.,remote=*time.apple.com,stratum=1,type=u delay=91.797,jitter=3.735,offset=12.841,poll=64i,reach=377i,when=35i 1457960478909556134
```

202
plugins/inputs/ntpq/ntpq.go Normal file
View File

@ -0,0 +1,202 @@
// +build !windows
package ntpq
import (
"bufio"
"bytes"
"log"
"os/exec"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// Mapping of ntpq header names to tag keys
var tagHeaders map[string]string = map[string]string{
"remote": "remote",
"refid": "refid",
"st": "stratum",
"t": "type",
}
// Mapping of the ntpq tag key to the index in the command output
var tagI map[string]int = map[string]int{
"remote": -1,
"refid": -1,
"stratum": -1,
"type": -1,
}
// Mapping of float metrics to their index in the command output
var floatI map[string]int = map[string]int{
"delay": -1,
"offset": -1,
"jitter": -1,
}
// Mapping of int metrics to their index in the command output
var intI map[string]int = map[string]int{
"when": -1,
"poll": -1,
"reach": -1,
}
type NTPQ struct {
runQ func() ([]byte, error)
DNSLookup bool `toml:"dns_lookup"`
}
func (n *NTPQ) Description() string {
return "Get standard NTP query metrics, requires ntpq executable."
}
func (n *NTPQ) SampleConfig() string {
return `
## If false, set the -n ntpq flag. Can reduce metric gather time.
dns_lookup = true
`
}
func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
out, err := n.runQ()
if err != nil {
return err
}
lineCounter := 0
scanner := bufio.NewScanner(bytes.NewReader(out))
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) < 2 {
continue
}
// If lineCounter == 0, then this is the header line
if lineCounter == 0 {
for i, field := range fields {
// Check if field is a tag:
if tagKey, ok := tagHeaders[field]; ok {
tagI[tagKey] = i
continue
}
// check if field is a float metric:
if _, ok := floatI[field]; ok {
floatI[field] = i
continue
}
// check if field is an int metric:
if _, ok := intI[field]; ok {
intI[field] = i
continue
}
}
} else {
tags := make(map[string]string)
mFields := make(map[string]interface{})
// Get tags from output
for key, index := range tagI {
if index == -1 {
continue
}
tags[key] = fields[index]
}
// Get integer metrics from output
for key, index := range intI {
if index == -1 {
continue
}
if key == "when" {
when := fields[index]
switch {
case strings.HasSuffix(when, "h"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "h"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
continue
}
// seconds in an hour
mFields[key] = int64(m) * 360
continue
case strings.HasSuffix(when, "d"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "d"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
continue
}
// seconds in a day
mFields[key] = int64(m) * 86400
continue
case strings.HasSuffix(when, "m"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "m"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
continue
}
// seconds in a day
mFields[key] = int64(m) * 60
continue
}
}
m, err := strconv.Atoi(fields[index])
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
continue
}
mFields[key] = int64(m)
}
// get float metrics from output
for key, index := range floatI {
if index == -1 {
continue
}
m, err := strconv.ParseFloat(fields[index], 64)
if err != nil {
log.Printf("ERROR ntpq: parsing float: %s", fields[index])
continue
}
mFields[key] = m
}
acc.AddFields("ntpq", mFields, tags)
}
lineCounter++
}
return nil
}
func (n *NTPQ) runq() ([]byte, error) {
bin, err := exec.LookPath("ntpq")
if err != nil {
return nil, err
}
var cmd *exec.Cmd
if n.DNSLookup {
cmd = exec.Command(bin, "-p")
} else {
cmd = exec.Command(bin, "-p", "-n")
}
return cmd.Output()
}
func init() {
inputs.Add("ntpq", func() telegraf.Input {
n := &NTPQ{}
n.runQ = n.runq
return n
})
}

View File

@ -0,0 +1,422 @@
// +build !windows
package ntpq
import (
"fmt"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestSingleNTPQ(t *testing.T) {
tt := tester{
ret: []byte(singleNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(101),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestBadIntNTPQ(t *testing.T) {
tt := tester{
ret: []byte(badIntParseNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(101),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestBadFloatNTPQ(t *testing.T) {
tt := tester{
ret: []byte(badFloatParseNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(2),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestDaysNTPQ(t *testing.T) {
tt := tester{
ret: []byte(whenDaysNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(172800),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestHoursNTPQ(t *testing.T) {
tt := tester{
ret: []byte(whenHoursNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(720),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestMinutesNTPQ(t *testing.T) {
tt := tester{
ret: []byte(whenMinutesNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(120),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestBadWhenNTPQ(t *testing.T) {
tt := tester{
ret: []byte(whenBadNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestMultiNTPQ(t *testing.T) {
tt := tester{
ret: []byte(multiNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"delay": float64(54.033),
"jitter": float64(449514),
"offset": float64(243.426),
"poll": int64(1024),
"reach": int64(377),
"when": int64(740),
}
tags := map[string]string{
"refid": "10.177.80.37",
"remote": "83.137.98.96",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
fields = map[string]interface{}{
"delay": float64(60.785),
"jitter": float64(449539),
"offset": float64(232.597),
"poll": int64(1024),
"reach": int64(377),
"when": int64(739),
}
tags = map[string]string{
"refid": "10.177.80.37",
"remote": "81.7.16.52",
"stratum": "2",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestBadHeaderNTPQ(t *testing.T) {
resetVars()
tt := tester{
ret: []byte(badHeaderNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(101),
"poll": int64(256),
"reach": int64(37),
"delay": float64(51.016),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestMissingDelayColumnNTPQ(t *testing.T) {
resetVars()
tt := tester{
ret: []byte(missingDelayNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.NoError(t, n.Gather(&acc))
fields := map[string]interface{}{
"when": int64(101),
"poll": int64(256),
"reach": int64(37),
"offset": float64(233.010),
"jitter": float64(17.462),
}
tags := map[string]string{
"remote": "*uschi5-ntp-002.",
"refid": "10.177.80.46",
"type": "u",
}
acc.AssertContainsTaggedFields(t, "ntpq", fields, tags)
}
func TestFailedNTPQ(t *testing.T) {
tt := tester{
ret: []byte(singleNTPQ),
err: fmt.Errorf("Test failure"),
}
n := &NTPQ{
runQ: tt.runqTest,
}
acc := testutil.Accumulator{}
assert.Error(t, n.Gather(&acc))
}
type tester struct {
ret []byte
err error
}
func (t *tester) runqTest() ([]byte, error) {
return t.ret, t.err
}
func resetVars() {
// Mapping of ntpq header names to tag keys
tagHeaders = map[string]string{
"remote": "remote",
"refid": "refid",
"st": "stratum",
"t": "type",
}
// Mapping of the ntpq tag key to the index in the command output
tagI = map[string]int{
"remote": -1,
"refid": -1,
"stratum": -1,
"type": -1,
}
// Mapping of float metrics to their index in the command output
floatI = map[string]int{
"delay": -1,
"offset": -1,
"jitter": -1,
}
// Mapping of int metrics to their index in the command output
intI = map[string]int{
"when": -1,
"poll": -1,
"reach": -1,
}
}
var singleNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462
`
var badHeaderNTPQ = `remote refid foobar t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462
`
var missingDelayNTPQ = `remote refid foobar t when poll reach offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 233.010 17.462
`
var whenDaysNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 2d 256 37 51.016 233.010 17.462
`
var whenHoursNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 2h 256 37 51.016 233.010 17.462
`
var whenMinutesNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 2m 256 37 51.016 233.010 17.462
`
var whenBadNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 2q 256 37 51.016 233.010 17.462
`
var badFloatParseNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 2 256 37 51.016 foobar 17.462
`
var badIntParseNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 101 foobar 37 51.016 233.010 17.462
`
var multiNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
83.137.98.96 10.177.80.37 2 u 740 1024 377 54.033 243.426 449514.
81.7.16.52 10.177.80.37 2 u 739 1024 377 60.785 232.597 449539.
131.188.3.221 10.177.80.37 2 u 783 1024 377 111.820 261.921 449528.
5.9.29.107 10.177.80.37 2 u 703 1024 377 205.704 160.406 449602.
91.189.94.4 10.177.80.37 2 u 673 1024 377 143.047 274.726 449445.
`

View File

@ -0,0 +1,3 @@
// +build windows
package ntpq