Add option to unbound module to use threads as tags (#3969)
This commit is contained in:
		
							parent
							
								
									8826cdc423
								
							
						
					
					
						commit
						83b03ecb18
					
				|  | @ -22,6 +22,11 @@ This plugin gathers stats from [Unbound - a validating, recursive, and caching D | ||||||
|    ## IP of server to connect to, read from unbound conf default, optionally ':port' |    ## IP of server to connect to, read from unbound conf default, optionally ':port' | ||||||
|    ## Will lookup IP if given a hostname |    ## Will lookup IP if given a hostname | ||||||
|    server = "127.0.0.1:8953" |    server = "127.0.0.1:8953" | ||||||
|  | 
 | ||||||
|  |    ## Output thread related values in a separate measurement "unbound_threads", with additional tag | ||||||
|  |    ## "thread" identifying the thread number (0 ... the number of configured threads) | ||||||
|  |    ## By default, thread related metrics are output as additional fields in measurement "unbound"  | ||||||
|  |    thread_as_tag = false | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
| ### Measurements & Fields: | ### Measurements & Fields: | ||||||
|  | @ -129,7 +134,7 @@ telegraf ALL=(ALL) NOPASSWD: /usr/sbin/unbound-control | ||||||
| 
 | 
 | ||||||
| Please use the solution you see as most appropriate. | Please use the solution you see as most appropriate. | ||||||
| 
 | 
 | ||||||
| ### Example Output: | ### Example Output (default): | ||||||
| 
 | 
 | ||||||
| ``` | ``` | ||||||
|  telegraf --config etc/telegraf.conf --input-filter unbound --test |  telegraf --config etc/telegraf.conf --input-filter unbound --test | ||||||
|  | @ -137,3 +142,14 @@ Please use the solution you see as most appropriate. | ||||||
| > unbound,host=localhost total_num_cachehits=0,total_num_prefetch=0,total_requestlist_avg=0,total_requestlist_max=0,total_recursion_time_median=0,total_num_queries=0,total_requestlist_overwritten=0,total_requestlist_current_all=0,time_up=159185.583967,total_num_recursivereplies=0,total_requestlist_exceeded=0,total_requestlist_current_user=0,total_recursion_time_avg=0,total_tcpusage=0,total_num_cachemiss=0 1510130793000000000 | > unbound,host=localhost total_num_cachehits=0,total_num_prefetch=0,total_requestlist_avg=0,total_requestlist_max=0,total_recursion_time_median=0,total_num_queries=0,total_requestlist_overwritten=0,total_requestlist_current_all=0,time_up=159185.583967,total_num_recursivereplies=0,total_requestlist_exceeded=0,total_requestlist_current_user=0,total_recursion_time_avg=0,total_tcpusage=0,total_num_cachemiss=0 1510130793000000000 | ||||||
| 
 | 
 | ||||||
| ``` | ``` | ||||||
|  | 
 | ||||||
|  | ### Example Output (with thread_as_tag = true, unbound configured with num_threads: 2) | ||||||
|  | 
 | ||||||
|  | ``` | ||||||
|  |  telegraf --config etc/telegraf.conf --input-filter unbound --test | ||||||
|  | * Plugin: inputs.unbound, Collection 1 | ||||||
|  | > unbound,host=localhost total_requestlist_avg=0,total_requestlist_exceeded=0,total_requestlist_overwritten=0,total_requestlist_current_user=0,total_recursion_time_avg=0.029186,total_tcpusage=0,total_num_queries=51,total_num_queries_ip_ratelimited=0,total_num_recursivereplies=6,total_requestlist_max=0,time_now=1522804978.784814,time_elapsed=310.435217,total_num_cachemiss=6,total_num_zero_ttl=0,time_up=310.435217,total_num_cachehits=45,total_num_prefetch=0,total_requestlist_current_all=0,total_recursion_time_median=0.016384 1522804979000000000 | ||||||
|  | > unbound_threads,host=localhost,thread=0 num_queries_ip_ratelimited=0,requestlist_current_user=0,recursion_time_avg=0.029186,num_prefetch=0,requestlist_overwritten=0,requestlist_exceeded=0,requestlist_current_all=0,tcpusage=0,num_cachehits=37,num_cachemiss=6,num_recursivereplies=6,requestlist_avg=0,num_queries=43,num_zero_ttl=0,requestlist_max=0,recursion_time_median=0.032768 1522804979000000000 | ||||||
|  | > unbound_threads,host=localhost,thread=1 num_zero_ttl=0,recursion_time_avg=0,num_queries_ip_ratelimited=0,num_cachehits=8,num_prefetch=0,requestlist_exceeded=0,recursion_time_median=0,tcpusage=0,num_cachemiss=0,num_recursivereplies=0,requestlist_max=0,requestlist_overwritten=0,requestlist_current_user=0,num_queries=8,requestlist_avg=0,requestlist_current_all=0 1522804979000000000 | ||||||
|  | 
 | ||||||
|  | ``` | ||||||
|  | @ -17,14 +17,15 @@ import ( | ||||||
| 	"github.com/influxdata/telegraf/plugins/inputs" | 	"github.com/influxdata/telegraf/plugins/inputs" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error) | type runner func(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error) | ||||||
| 
 | 
 | ||||||
| // Unbound is used to store configuration values
 | // Unbound is used to store configuration values
 | ||||||
| type Unbound struct { | type Unbound struct { | ||||||
| 	Binary  string | 	Binary      string | ||||||
| 	Timeout internal.Duration | 	Timeout     internal.Duration | ||||||
| 	UseSudo bool | 	UseSudo     bool | ||||||
| 	Server  string | 	Server      string | ||||||
|  | 	ThreadAsTag bool | ||||||
| 
 | 
 | ||||||
| 	filter filter.Filter | 	filter filter.Filter | ||||||
| 	run    runner | 	run    runner | ||||||
|  | @ -49,8 +50,14 @@ var sampleConfig = ` | ||||||
|   ## IP of server to connect to, read from unbound conf default, optionally ':port' |   ## IP of server to connect to, read from unbound conf default, optionally ':port' | ||||||
|   ## Will lookup IP if given a hostname |   ## Will lookup IP if given a hostname | ||||||
|   server = "127.0.0.1:8953" |   server = "127.0.0.1:8953" | ||||||
|  | 
 | ||||||
|  |   ## Output thread related values in a separate measurement "unbound_threads", with additional tag | ||||||
|  |   ## "thread" identifying the thread number (0 ... the number of configured threads) | ||||||
|  |   ## By default, thread related metrics are output as additional fields in a single metric point | ||||||
|  |   thread_as_tag = false | ||||||
| ` | ` | ||||||
| 
 | 
 | ||||||
|  | // Description displays what this plugin is about
 | ||||||
| func (s *Unbound) Description() string { | func (s *Unbound) Description() string { | ||||||
| 	return "A plugin to collect stats from Unbound - a validating, recursive, and caching DNS resolver" | 	return "A plugin to collect stats from Unbound - a validating, recursive, and caching DNS resolver" | ||||||
| } | } | ||||||
|  | @ -61,7 +68,7 @@ func (s *Unbound) SampleConfig() string { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Shell out to unbound_stat and return the output
 | // Shell out to unbound_stat and return the output
 | ||||||
| func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string) (*bytes.Buffer, error) { | func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Server string, ThreadAsTag bool) (*bytes.Buffer, error) { | ||||||
| 	cmdArgs := []string{"stats_noreset"} | 	cmdArgs := []string{"stats_noreset"} | ||||||
| 
 | 
 | ||||||
| 	if Server != "" { | 	if Server != "" { | ||||||
|  | @ -113,19 +120,21 @@ func unboundRunner(cmdName string, Timeout internal.Duration, UseSudo bool, Serv | ||||||
| func (s *Unbound) Gather(acc telegraf.Accumulator) error { | func (s *Unbound) Gather(acc telegraf.Accumulator) error { | ||||||
| 
 | 
 | ||||||
| 	// Always exclude histrogram statistics
 | 	// Always exclude histrogram statistics
 | ||||||
| 	stat_excluded := []string{"histogram.*"} | 	statExcluded := []string{"histogram.*"} | ||||||
| 	filter_excluded, err := filter.Compile(stat_excluded) | 	filterExcluded, err := filter.Compile(statExcluded) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server) | 	out, err := s.run(s.Binary, s.Timeout, s.UseSudo, s.Server, s.ThreadAsTag) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("error gathering metrics: %s", err) | 		return fmt.Errorf("error gathering metrics: %s", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Process values
 | 	// Process values
 | ||||||
| 	fields := make(map[string]interface{}) | 	fields := make(map[string]interface{}) | ||||||
|  | 	fieldsThreads := make(map[string]map[string]interface{}) | ||||||
|  | 
 | ||||||
| 	scanner := bufio.NewScanner(out) | 	scanner := bufio.NewScanner(out) | ||||||
| 	for scanner.Scan() { | 	for scanner.Scan() { | ||||||
| 
 | 
 | ||||||
|  | @ -140,32 +149,65 @@ func (s *Unbound) Gather(acc telegraf.Accumulator) error { | ||||||
| 		value := cols[1] | 		value := cols[1] | ||||||
| 
 | 
 | ||||||
| 		// Filter value
 | 		// Filter value
 | ||||||
| 		if filter_excluded.Match(stat) { | 		if filterExcluded.Match(stat) { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		field := strings.Replace(stat, ".", "_", -1) | 		fieldValue, err := strconv.ParseFloat(value, 64) | ||||||
| 
 |  | ||||||
| 		fields[field], err = strconv.ParseFloat(value, 64) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v\n", | 			acc.AddError(fmt.Errorf("Expected a numerical value for %s = %v", | ||||||
| 				stat, value)) | 				stat, value)) | ||||||
|  | 			continue | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 		// is this a thread related value?
 | ||||||
|  | 		if s.ThreadAsTag && strings.HasPrefix(stat, "thread") { | ||||||
|  | 			// split the stat
 | ||||||
|  | 			statTokens := strings.Split(stat, ".") | ||||||
|  | 			// make sure we split something
 | ||||||
|  | 			if len(statTokens) > 1 { | ||||||
|  | 				// set the thread identifier
 | ||||||
|  | 				threadID := strings.TrimPrefix(statTokens[0], "thread") | ||||||
|  | 				// make sure we have a proper thread ID
 | ||||||
|  | 				if _, err = strconv.Atoi(threadID); err == nil { | ||||||
|  | 					// create new slice without the thread identifier (skip first token)
 | ||||||
|  | 					threadTokens := statTokens[1:] | ||||||
|  | 					// re-define stat
 | ||||||
|  | 					field := strings.Join(threadTokens[:], "_") | ||||||
|  | 					if fieldsThreads[threadID] == nil { | ||||||
|  | 						fieldsThreads[threadID] = make(map[string]interface{}) | ||||||
|  | 					} | ||||||
|  | 					fieldsThreads[threadID][field] = fieldValue | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} else { | ||||||
|  | 			field := strings.Replace(stat, ".", "_", -1) | ||||||
|  | 			fields[field] = fieldValue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	acc.AddFields("unbound", fields, nil) | 	acc.AddFields("unbound", fields, nil) | ||||||
| 
 | 
 | ||||||
|  | 	if s.ThreadAsTag && len(fieldsThreads) > 0 { | ||||||
|  | 		for thisThreadID, thisThreadFields := range fieldsThreads { | ||||||
|  | 			thisThreadTag := map[string]string{"thread": thisThreadID} | ||||||
|  | 			acc.AddFields("unbound_threads", thisThreadFields, thisThreadTag) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func init() { | func init() { | ||||||
| 	inputs.Add("unbound", func() telegraf.Input { | 	inputs.Add("unbound", func() telegraf.Input { | ||||||
| 		return &Unbound{ | 		return &Unbound{ | ||||||
| 			run:     unboundRunner, | 			run:         unboundRunner, | ||||||
| 			Binary:  defaultBinary, | 			Binary:      defaultBinary, | ||||||
| 			Timeout: defaultTimeout, | 			Timeout:     defaultTimeout, | ||||||
| 			UseSudo: false, | 			UseSudo:     false, | ||||||
| 			Server:  "", | 			Server:      "", | ||||||
|  | 			ThreadAsTag: false, | ||||||
| 		} | 		} | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -12,8 +12,8 @@ import ( | ||||||
| 
 | 
 | ||||||
| var TestTimeout = internal.Duration{Duration: time.Second} | var TestTimeout = internal.Duration{Duration: time.Second} | ||||||
| 
 | 
 | ||||||
| func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string) func(string, internal.Duration, bool, string) (*bytes.Buffer, error) { | func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Server string, ThreadAsTag bool) func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) { | ||||||
| 	return func(string, internal.Duration, bool, string) (*bytes.Buffer, error) { | 	return func(string, internal.Duration, bool, string, bool) (*bytes.Buffer, error) { | ||||||
| 		return bytes.NewBuffer([]byte(output)), nil | 		return bytes.NewBuffer([]byte(output)), nil | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | @ -21,7 +21,7 @@ func UnboundControl(output string, Timeout internal.Duration, useSudo bool, Serv | ||||||
| func TestParseFullOutput(t *testing.T) { | func TestParseFullOutput(t *testing.T) { | ||||||
| 	acc := &testutil.Accumulator{} | 	acc := &testutil.Accumulator{} | ||||||
| 	v := &Unbound{ | 	v := &Unbound{ | ||||||
| 		run: UnboundControl(fullOutput, TestTimeout, true, ""), | 		run: UnboundControl(fullOutput, TestTimeout, true, "", false), | ||||||
| 	} | 	} | ||||||
| 	err := v.Gather(acc) | 	err := v.Gather(acc) | ||||||
| 
 | 
 | ||||||
|  | @ -35,6 +35,26 @@ func TestParseFullOutput(t *testing.T) { | ||||||
| 	acc.AssertContainsFields(t, "unbound", parsedFullOutput) | 	acc.AssertContainsFields(t, "unbound", parsedFullOutput) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestParseFullOutputThreadAsTag(t *testing.T) { | ||||||
|  | 	acc := &testutil.Accumulator{} | ||||||
|  | 	v := &Unbound{ | ||||||
|  | 		run:         UnboundControl(fullOutput, TestTimeout, true, "", true), | ||||||
|  | 		ThreadAsTag: true, | ||||||
|  | 	} | ||||||
|  | 	err := v.Gather(acc) | ||||||
|  | 
 | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	assert.True(t, acc.HasMeasurement("unbound")) | ||||||
|  | 	assert.True(t, acc.HasMeasurement("unbound_threads")) | ||||||
|  | 
 | ||||||
|  | 	assert.Len(t, acc.Metrics, 2) | ||||||
|  | 	assert.Equal(t, acc.NFields(), 63) | ||||||
|  | 
 | ||||||
|  | 	acc.AssertContainsFields(t, "unbound", parsedFullOutputThreadAsTagMeasurementUnbound) | ||||||
|  | 	acc.AssertContainsFields(t, "unbound_threads", parsedFullOutputThreadAsTagMeasurementUnboundThreads) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| var parsedFullOutput = map[string]interface{}{ | var parsedFullOutput = map[string]interface{}{ | ||||||
| 	"thread0_num_queries":              float64(11907596), | 	"thread0_num_queries":              float64(11907596), | ||||||
| 	"thread0_num_cachehits":            float64(11489288), | 	"thread0_num_cachehits":            float64(11489288), | ||||||
|  | @ -101,6 +121,74 @@ var parsedFullOutput = map[string]interface{}{ | ||||||
| 	"unwanted_replies":                 float64(0), | 	"unwanted_replies":                 float64(0), | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | var parsedFullOutputThreadAsTagMeasurementUnboundThreads = map[string]interface{}{ | ||||||
|  | 	"num_queries":              float64(11907596), | ||||||
|  | 	"num_cachehits":            float64(11489288), | ||||||
|  | 	"num_cachemiss":            float64(418308), | ||||||
|  | 	"num_prefetch":             float64(0), | ||||||
|  | 	"num_recursivereplies":     float64(418308), | ||||||
|  | 	"requestlist_avg":          float64(0.400229), | ||||||
|  | 	"requestlist_max":          float64(11), | ||||||
|  | 	"requestlist_overwritten":  float64(0), | ||||||
|  | 	"requestlist_exceeded":     float64(0), | ||||||
|  | 	"requestlist_current_all":  float64(0), | ||||||
|  | 	"requestlist_current_user": float64(0), | ||||||
|  | 	"recursion_time_avg":       float64(0.015020), | ||||||
|  | 	"recursion_time_median":    float64(0.00292343), | ||||||
|  | } | ||||||
|  | var parsedFullOutputThreadAsTagMeasurementUnbound = map[string]interface{}{ | ||||||
|  | 	"total_num_queries":              float64(11907596), | ||||||
|  | 	"total_num_cachehits":            float64(11489288), | ||||||
|  | 	"total_num_cachemiss":            float64(418308), | ||||||
|  | 	"total_num_prefetch":             float64(0), | ||||||
|  | 	"total_num_recursivereplies":     float64(418308), | ||||||
|  | 	"total_requestlist_avg":          float64(0.400229), | ||||||
|  | 	"total_requestlist_max":          float64(11), | ||||||
|  | 	"total_requestlist_overwritten":  float64(0), | ||||||
|  | 	"total_requestlist_exceeded":     float64(0), | ||||||
|  | 	"total_requestlist_current_all":  float64(0), | ||||||
|  | 	"total_requestlist_current_user": float64(0), | ||||||
|  | 	"total_recursion_time_avg":       float64(0.015020), | ||||||
|  | 	"total_recursion_time_median":    float64(0.00292343), | ||||||
|  | 	"time_now":                       float64(1509968734.735180), | ||||||
|  | 	"time_up":                        float64(1472897.672099), | ||||||
|  | 	"time_elapsed":                   float64(1472897.672099), | ||||||
|  | 	"mem_total_sbrk":                 float64(7462912), | ||||||
|  | 	"mem_cache_rrset":                float64(285056), | ||||||
|  | 	"mem_cache_message":              float64(320000), | ||||||
|  | 	"mem_mod_iterator":               float64(16532), | ||||||
|  | 	"mem_mod_validator":              float64(112097), | ||||||
|  | 	"num_query_type_A":               float64(7062688), | ||||||
|  | 	"num_query_type_PTR":             float64(43097), | ||||||
|  | 	"num_query_type_TXT":             float64(2998), | ||||||
|  | 	"num_query_type_AAAA":            float64(4499711), | ||||||
|  | 	"num_query_type_SRV":             float64(5691), | ||||||
|  | 	"num_query_type_ANY":             float64(293411), | ||||||
|  | 	"num_query_class_IN":             float64(11907596), | ||||||
|  | 	"num_query_opcode_QUERY":         float64(11907596), | ||||||
|  | 	"num_query_tcp":                  float64(293411), | ||||||
|  | 	"num_query_ipv6":                 float64(0), | ||||||
|  | 	"num_query_flags_QR":             float64(0), | ||||||
|  | 	"num_query_flags_AA":             float64(0), | ||||||
|  | 	"num_query_flags_TC":             float64(0), | ||||||
|  | 	"num_query_flags_RD":             float64(11907596), | ||||||
|  | 	"num_query_flags_RA":             float64(0), | ||||||
|  | 	"num_query_flags_Z":              float64(0), | ||||||
|  | 	"num_query_flags_AD":             float64(1), | ||||||
|  | 	"num_query_flags_CD":             float64(0), | ||||||
|  | 	"num_query_edns_present":         float64(6202), | ||||||
|  | 	"num_query_edns_DO":              float64(6201), | ||||||
|  | 	"num_answer_rcode_NOERROR":       float64(11857463), | ||||||
|  | 	"num_answer_rcode_SERVFAIL":      float64(17), | ||||||
|  | 	"num_answer_rcode_NXDOMAIN":      float64(50116), | ||||||
|  | 	"num_answer_rcode_nodata":        float64(3914360), | ||||||
|  | 	"num_answer_secure":              float64(44289), | ||||||
|  | 	"num_answer_bogus":               float64(1), | ||||||
|  | 	"num_rrset_bogus":                float64(0), | ||||||
|  | 	"unwanted_queries":               float64(0), | ||||||
|  | 	"unwanted_replies":               float64(0), | ||||||
|  | } | ||||||
|  | 
 | ||||||
| var fullOutput = `thread0.num.queries=11907596 | var fullOutput = `thread0.num.queries=11907596 | ||||||
| thread0.num.cachehits=11489288 | thread0.num.cachehits=11489288 | ||||||
| thread0.num.cachemiss=418308 | thread0.num.cachemiss=418308 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue