fix concurrency panic in ntpq input. Closes #7101 (#7143)

This commit is contained in:
Steven Soroka 2020-03-10 16:48:30 -04:00 committed by GitHub
parent c31ba94bb8
commit 3bc53558a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 107 deletions

View File

@ -21,30 +21,11 @@ var tagHeaders map[string]string = map[string]string{
"t": "type",
}
// Mapping of the ntpq tag key to the index in the command output
var tagI map[string]int = map[string]int{
"remote": -1,
"refid": -1,
"stratum": -1,
"type": -1,
}
// Mapping of float metrics to their index in the command output
var floatI map[string]int = map[string]int{
"delay": -1,
"offset": -1,
"jitter": -1,
}
// Mapping of int metrics to their index in the command output
var intI map[string]int = map[string]int{
"when": -1,
"poll": -1,
"reach": -1,
}
type NTPQ struct {
runQ func() ([]byte, error)
runQ func() ([]byte, error)
tagI map[string]int
floatI map[string]int
intI map[string]int
DNSLookup bool `toml:"dns_lookup"`
}
@ -101,19 +82,19 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
for i, field := range fields {
// Check if field is a tag:
if tagKey, ok := tagHeaders[field]; ok {
tagI[tagKey] = i
n.tagI[tagKey] = i
continue
}
// check if field is a float metric:
if _, ok := floatI[field]; ok {
floatI[field] = i
if _, ok := n.floatI[field]; ok {
n.floatI[field] = i
continue
}
// check if field is an int metric:
if _, ok := intI[field]; ok {
intI[field] = i
if _, ok := n.intI[field]; ok {
n.intI[field] = i
continue
}
}
@ -125,7 +106,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
mFields := make(map[string]interface{})
// Get tags from output
for key, index := range tagI {
for key, index := range n.tagI {
if index == -1 {
continue
}
@ -133,7 +114,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
}
// Get integer metrics from output
for key, index := range intI {
for key, index := range n.intI {
if index == -1 || index >= len(fields) {
continue
}
@ -183,7 +164,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
}
// get float metrics from output
for key, index := range floatI {
for key, index := range n.floatI {
if index == -1 || index >= len(fields) {
continue
}
@ -223,10 +204,40 @@ func (n *NTPQ) runq() ([]byte, error) {
return cmd.Output()
}
func newNTPQ() *NTPQ {
// Mapping of the ntpq tag key to the index in the command output
tagI := map[string]int{
"remote": -1,
"refid": -1,
"stratum": -1,
"type": -1,
}
// Mapping of float metrics to their index in the command output
floatI := map[string]int{
"delay": -1,
"offset": -1,
"jitter": -1,
}
// Mapping of int metrics to their index in the command output
intI := map[string]int{
"when": -1,
"poll": -1,
"reach": -1,
}
n := &NTPQ{
tagI: tagI,
floatI: floatI,
intI: intI,
}
n.runQ = n.runq
return n
}
func init() {
inputs.Add("ntpq", func() telegraf.Input {
n := &NTPQ{}
n.runQ = n.runq
return n
return newNTPQ()
})
}

View File

@ -16,9 +16,8 @@ func TestSingleNTPQ(t *testing.T) {
ret: []byte(singleNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -46,9 +45,8 @@ func TestBadIntNTPQ(t *testing.T) {
ret: []byte(badIntParseNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.Error(t, acc.GatherError(n.Gather))
@ -75,9 +73,8 @@ func TestBadFloatNTPQ(t *testing.T) {
ret: []byte(badFloatParseNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.Error(t, acc.GatherError(n.Gather))
@ -104,9 +101,8 @@ func TestDaysNTPQ(t *testing.T) {
ret: []byte(whenDaysNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -134,9 +130,8 @@ func TestHoursNTPQ(t *testing.T) {
ret: []byte(whenHoursNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -164,9 +159,8 @@ func TestMinutesNTPQ(t *testing.T) {
ret: []byte(whenMinutesNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -194,9 +188,8 @@ func TestBadWhenNTPQ(t *testing.T) {
ret: []byte(whenBadNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.Error(t, acc.GatherError(n.Gather))
@ -226,9 +219,8 @@ func TestParserNTPQ(t *testing.T) {
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -289,9 +281,8 @@ func TestMultiNTPQ(t *testing.T) {
ret: []byte(multiNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -330,14 +321,12 @@ func TestMultiNTPQ(t *testing.T) {
}
func TestBadHeaderNTPQ(t *testing.T) {
resetVars()
tt := tester{
ret: []byte(badHeaderNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -360,14 +349,12 @@ func TestBadHeaderNTPQ(t *testing.T) {
}
func TestMissingDelayColumnNTPQ(t *testing.T) {
resetVars()
tt := tester{
ret: []byte(missingDelayNTPQ),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.NoError(t, acc.GatherError(n.Gather))
@ -393,9 +380,8 @@ func TestFailedNTPQ(t *testing.T) {
ret: []byte(singleNTPQ),
err: fmt.Errorf("Test failure"),
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{}
assert.Error(t, acc.GatherError(n.Gather))
@ -445,9 +431,8 @@ func TestNoRefID(t *testing.T) {
ret: []byte(noRefID),
err: nil,
}
n := &NTPQ{
runQ: tt.runqTest,
}
n := newNTPQ()
n.runQ = tt.runqTest
acc := testutil.Accumulator{
TimeFunc: func() time.Time { return now },
@ -466,38 +451,6 @@ func (t *tester) runqTest() ([]byte, error) {
return t.ret, t.err
}
func resetVars() {
// Mapping of ntpq header names to tag keys
tagHeaders = map[string]string{
"remote": "remote",
"refid": "refid",
"st": "stratum",
"t": "type",
}
// Mapping of the ntpq tag key to the index in the command output
tagI = map[string]int{
"remote": -1,
"refid": -1,
"stratum": -1,
"type": -1,
}
// Mapping of float metrics to their index in the command output
floatI = map[string]int{
"delay": -1,
"offset": -1,
"jitter": -1,
}
// Mapping of int metrics to their index in the command output
intI = map[string]int{
"when": -1,
"poll": -1,
"reach": -1,
}
}
var singleNTPQ = ` remote refid st t when poll reach delay offset jitter
==============================================================================
*uschi5-ntp-002. 10.177.80.46 2 u 101 256 37 51.016 233.010 17.462