Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Rikaard Hosein 2016-10-03 18:04:42 -04:00
commit 93e3973be5
67 changed files with 1105 additions and 620 deletions

View File

@ -4,6 +4,7 @@
### Features
- [#1782](https://github.com/influxdata/telegraf/pull/1782): Allow numeric and non-string values for tag_keys.
- [#1694](https://github.com/influxdata/telegraf/pull/1694): Adding Gauge and Counter metric types.
- [#1606](https://github.com/influxdata/telegraf/pull/1606): Remove carraige returns from exec plugin output on Windows
- [#1674](https://github.com/influxdata/telegraf/issues/1674): elasticsearch input: configurable timeout.
@ -20,15 +21,28 @@
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.
- [#1699](https://github.com/influxdata/telegraf/pull/1699): Add database blacklist option for Postgresql
- [#1791](https://github.com/influxdata/telegraf/pull/1791): Add Docker container state metrics to Docker input plugin output
- [#1755](https://github.com/influxdata/telegraf/issues/1755): Add support to SNMP for IP & MAC address conversion.
- [#1729](https://github.com/influxdata/telegraf/issues/1729): Add support to SNMP for OID index suffixes.
- [#1813](https://github.com/influxdata/telegraf/pull/1813): Change default arguments for SNMP plugin.
- [#1686](https://github.com/influxdata/telegraf/pull/1686): Mesos input plugin: very high-cardinality mesos-task metrics removed.
- [#1838](https://github.com/influxdata/telegraf/pull/1838): Logging overhaul to centralize the logger & log levels, & provide a logfile config option.
### Bugfixes
- [#1746](https://github.com/influxdata/telegraf/issues/1746): Fix handling of non-string values for JSON keys listed in tag_keys.
- [#1628](https://github.com/influxdata/telegraf/issues/1628): Fix mongodb input panic on version 2.2.
- [#1733](https://github.com/influxdata/telegraf/issues/1733): Fix statsd scientific notation parsing
- [#1716](https://github.com/influxdata/telegraf/issues/1716): Sensors plugin strconv.ParseFloat: parsing "": invalid syntax
- [#1530](https://github.com/influxdata/telegraf/issues/1530): Fix prometheus_client reload panic
- [#1764](https://github.com/influxdata/telegraf/issues/1764): Fix kafka consumer panic when nil error is returned down errs channel.
- [#1768](https://github.com/influxdata/telegraf/pull/1768): Speed up statsd parsing.
- [#1751](https://github.com/influxdata/telegraf/issues/1751): Fix powerdns integer parse error handling.
- [#1752](https://github.com/influxdata/telegraf/issues/1752): Fix varnish plugin defaults not being used.
- [#1517](https://github.com/influxdata/telegraf/issues/1517): Fix windows glob paths.
- [#1137](https://github.com/influxdata/telegraf/issues/1137): Fix issue loading config directory on windows.
- [#1772](https://github.com/influxdata/telegraf/pull/1772): Windows remote management interactive service fix.
- [#1702](https://github.com/influxdata/telegraf/issues/1702): sqlserver, fix issue when case sensitive collation is activated.
- [#1823](https://github.com/influxdata/telegraf/issues/1823): Fix huge allocations in http_listener when dealing with huge payloads.
## v1.0.1 [unreleased]
@ -37,6 +51,7 @@
- [#1775](https://github.com/influxdata/telegraf/issues/1775): Prometheus output: Fix bug with multi-batch writes.
- [#1738](https://github.com/influxdata/telegraf/issues/1738): Fix unmarshal of influxdb metrics with null tags.
- [#1773](https://github.com/influxdata/telegraf/issues/1773): Add configurable timeout to influxdb input plugin.
- [#1785](https://github.com/influxdata/telegraf/pull/1785): Fix statsd no default value panic.
## v1.0 [2016-09-08]

1
Godeps
View File

@ -29,6 +29,7 @@ github.com/hpcloud/tail b2940955ab8b26e19d43a43c4da0475dd81bdb56
github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb e094138084855d444195b252314dfee9eae34cab
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a
github.com/klauspost/crc32 19b0b332c9e4516a6370a0456e6182c3b5036720

View File

@ -132,7 +132,7 @@ func (ac *accumulator) makeMetric(
// NaNs are invalid values in influxdb, skip measurement
if math.IsNaN(val) || math.IsInf(val, 0) {
if ac.debug {
log.Printf("Measurement [%s] field [%s] has a NaN or Inf "+
log.Printf("I! Measurement [%s] field [%s] has a NaN or Inf "+
"field, skipping",
measurement, k)
}
@ -163,7 +163,7 @@ func (ac *accumulator) makeMetric(
m, err = telegraf.NewMetric(measurement, tags, fields, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
log.Printf("E! Error adding point [%s]: %s\n", measurement, err.Error())
return nil
}
@ -182,7 +182,7 @@ func (ac *accumulator) AddError(err error) {
}
atomic.AddUint64(&ac.errCount, 1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("ERROR in input [%s]: %s", ac.inputConfig.Name, err)
log.Printf("E! Error in input [%s]: %s", ac.inputConfig.Name, err)
}
func (ac *accumulator) Debug() bool {

View File

@ -49,18 +49,16 @@ func (a *Agent) Connect() error {
switch ot := o.Output.(type) {
case telegraf.ServiceOutput:
if err := ot.Start(); err != nil {
log.Printf("Service for output %s failed to start, exiting\n%s\n",
log.Printf("E! Service for output %s failed to start, exiting\n%s\n",
o.Name, err.Error())
return err
}
}
if a.Config.Agent.Debug {
log.Printf("Attempting connection to output: %s\n", o.Name)
}
log.Printf("D! Attempting connection to output: %s\n", o.Name)
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s, "+
log.Printf("E! Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", o.Name, err)
time.Sleep(15 * time.Second)
err = o.Output.Connect()
@ -68,9 +66,7 @@ func (a *Agent) Connect() error {
return err
}
}
if a.Config.Agent.Debug {
log.Printf("Successfully connected to output: %s\n", o.Name)
}
log.Printf("D! Successfully connected to output: %s\n", o.Name)
}
return nil
}
@ -92,9 +88,9 @@ func panicRecover(input *models.RunningInput) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
log.Printf("FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
log.Printf("E! FATAL: Input [%s] panicked: %s, Stack:\n%s\n",
input.Name, err, trace)
log.Println("PLEASE REPORT THIS PANIC ON GITHUB with " +
log.Println("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
"stack trace, configuration, and OS information: " +
"https://github.com/influxdata/telegraf/issues/new")
}
@ -117,7 +113,6 @@ func (a *Agent) gatherer(
var outerr error
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)
@ -131,10 +126,8 @@ func (a *Agent) gatherer(
if outerr != nil {
return outerr
}
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}
log.Printf("D! Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
select {
case <-shutdown:
@ -167,11 +160,11 @@ func gatherWithTimeout(
select {
case err := <-done:
if err != nil {
log.Printf("ERROR in input [%s]: %s", input.Name, err)
log.Printf("E! ERROR in input [%s]: %s", input.Name, err)
}
return
case <-ticker.C:
log.Printf("ERROR: input [%s] took longer to collect than "+
log.Printf("E! ERROR: input [%s] took longer to collect than "+
"collection interval (%s)",
input.Name, timeout)
continue
@ -244,7 +237,7 @@ func (a *Agent) flush() {
defer wg.Done()
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
log.Printf("E! Error writing to output [%s]: %s\n",
output.Name, err.Error())
}
}(o)
@ -264,7 +257,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached metrics before shutdown")
log.Println("I! Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
@ -302,9 +295,9 @@ func copyMetric(m telegraf.Metric) telegraf.Metric {
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
log.Printf("Agent Config: Interval:%s, Debug:%#v, Quiet:%#v, Hostname:%#v, "+
log.Printf("I! Agent Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
"Flush Interval:%s \n",
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
a.Config.Agent.Interval.Duration, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
// channel shared between all input threads for accumulating metrics
@ -315,13 +308,12 @@ func (a *Agent) Run(shutdown chan struct{}) error {
switch p := input.Input.(type) {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
// Service input plugins should set their own precision of their
// metrics.
acc.DisablePrecision()
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
log.Printf("E! Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
}
@ -339,7 +331,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
go func() {
defer wg.Done()
if err := a.flusher(shutdown, metricC); err != nil {
log.Printf("Flusher routine failed, exiting: %s\n", err.Error())
log.Printf("E! Flusher routine failed, exiting: %s\n", err.Error())
close(shutdown)
}
}()
@ -354,7 +346,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
go func(in *models.RunningInput, interv time.Duration) {
defer wg.Done()
if err := a.gatherer(shutdown, in, interv, metricC); err != nil {
log.Printf(err.Error())
log.Printf("E! " + err.Error())
}
}(input, interval)
}

View File

@ -12,15 +12,17 @@ import (
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/kardianos/service"
)
var fDebug = flag.Bool("debug", false,
"show metrics as they're generated to stdout")
"turn on debug logging")
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
@ -109,12 +111,9 @@ Examples:
telegraf -config telegraf.conf -input-filter cpu:mem -output-filter influxdb
`
var logger service.Logger
var stop chan struct{}
var srvc service.Service
var svcConfig *service.Config
type program struct{}
@ -182,15 +181,6 @@ func reloadLoop(stop chan struct{}, s service.Service) {
}
}
return
case *fService != "" && runtime.GOOS == "windows":
if *fConfig != "" {
(*svcConfig).Arguments = []string{"-config", *fConfig}
}
err := service.Control(s, *fService)
if err != nil {
log.Fatal(err)
}
return
}
// If no other options are specified, load the config file and run.
@ -221,13 +211,12 @@ func reloadLoop(stop chan struct{}, s service.Service) {
log.Fatal(err)
}
if *fDebug {
ag.Config.Agent.Debug = true
}
if *fQuiet {
ag.Config.Agent.Quiet = true
}
// Setup logging
logger.SetupLogging(
ag.Config.Agent.Debug || *fDebug,
ag.Config.Agent.Quiet || *fQuiet,
ag.Config.Agent.Logfile,
)
if *fTest {
err = ag.Test()
@ -252,7 +241,7 @@ func reloadLoop(stop chan struct{}, s service.Service) {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
log.Printf("I! Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
@ -262,10 +251,10 @@ func reloadLoop(stop chan struct{}, s service.Service) {
}
}()
log.Printf("Starting Telegraf (version %s)\n", version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("Tags enabled: %s", c.ListTags())
log.Printf("I! Starting Telegraf (version %s)\n", version)
log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("I! Tags enabled: %s", c.ListTags())
if *fPidfile != "" {
f, err := os.Create(*fPidfile)
@ -302,8 +291,9 @@ func (p *program) Stop(s service.Service) error {
}
func main() {
flag.Parse()
if runtime.GOOS == "windows" {
svcConfig = &service.Config{
svcConfig := &service.Config{
Name: "telegraf",
DisplayName: "Telegraf Data Collector Service",
Description: "Collects data using a series of plugins and publishes it to" +
@ -316,13 +306,21 @@ func main() {
if err != nil {
log.Fatal(err)
}
logger, err = s.Logger(nil)
if err != nil {
log.Fatal(err)
}
err = s.Run()
if err != nil {
logger.Error(err)
// Handle the -service flag here to prevent any issues with tooling that
// may not have an interactive session, e.g. installing from Ansible.
if *fService != "" {
if *fConfig != "" {
(*svcConfig).Arguments = []string{"-config", *fConfig}
}
err := service.Control(s, *fService)
if err != nil {
log.Fatal(err)
}
} else {
err = s.Run()
if err != nil {
log.Println("E! " + err.Error())
}
}
} else {
stop = make(chan struct{})

View File

@ -30,12 +30,15 @@
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
@ -57,10 +60,15 @@
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Run telegraf in debug mode
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stdout.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
@ -1064,8 +1072,6 @@
# # "tasks",
# # "messages",
# # ]
# ## Include mesos tasks statistics, default is false
# # slave_tasks = true
# # Read metrics from one or many MongoDB servers
@ -1442,25 +1448,29 @@
# # Retrieves SNMP values from remote agents
# [[inputs.snmp]]
# agents = [ "127.0.0.1:161" ]
# ## Timeout for each SNMP query.
# timeout = "5s"
# ## Number of retries to attempt within timeout.
# retries = 3
# ## SNMP version, values can be 1, 2, or 3
# version = 2
#
# # SNMPv1 & SNMPv2 parameters
# ## SNMP community string.
# community = "public"
#
# # SNMPv2 & SNMPv3 parameters
# max_repetitions = 50
# ## The GETBULK max-repetitions parameter
# max_repetitions = 10
#
# # SNMPv3 parameters
# ## SNMPv3 auth parameters
# #sec_name = "myuser"
# #auth_protocol = "md5" # Values: "MD5", "SHA", ""
# #auth_password = "password123"
# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
# #auth_protocol = "md5" # Values: "MD5", "SHA", ""
# #auth_password = "pass"
# #sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
# #context_name = ""
# #priv_protocol = "" # Values: "DES", "AES", ""
# #priv_protocol = "" # Values: "DES", "AES", ""
# #priv_password = ""
#
# # measurement name
# ## measurement name
# name = "system"
# [[inputs.snmp.field]]
# name = "hostname"
@ -1475,7 +1485,7 @@
# oid = "HOST-RESOURCES-MIB::hrMemorySize"
#
# [[inputs.snmp.table]]
# # measurement name
# ## measurement name
# name = "remote_servers"
# inherit_tags = [ "hostname" ]
# [[inputs.snmp.table.field]]
@ -1490,7 +1500,7 @@
# oid = ".1.0.0.0.1.2"
#
# [[inputs.snmp.table]]
# # auto populate table's fields using the MIB
# ## auto populate table's fields using the MIB
# oid = "HOST-RESOURCES-MIB::hrNetworkTable"

View File

@ -42,10 +42,14 @@
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Logging configuration:
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
quiet = false
## Specify the log file name. The empty string means to log to stdout.
logfile = "/Program Files/Telegraf/telegraf.log"
## Override default hostname, if empty use os.Hostname()
hostname = ""
@ -85,7 +89,7 @@
# Windows Performance Counters plugin.
# These are the recommended method of monitoring system metrics on windows,
# as the regular system plugins (inputs.cpu, inputs.mem, etc.) rely on WMI,
# which utilizes a lot of system resources.
# which utilize more system resources.
#
# See more configuration examples at:
# https://github.com/influxdata/telegraf/tree/master/plugins/inputs/win_perf_counters
@ -95,70 +99,104 @@
# Processor usage, alternative to native, reports on a per core.
ObjectName = "Processor"
Instances = ["*"]
Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"]
Counters = [
"% Idle Time",
"% Interrupt Time",
"% Privileged Time",
"% User Time",
"% Processor Time",
]
Measurement = "win_cpu"
#IncludeTotal=false #Set to true to include _Total instance when querying for all (*).
# Set to true to include _Total instance when querying for all (*).
#IncludeTotal=false
[[inputs.win_perf_counters.object]]
# Disk times and queues
ObjectName = "LogicalDisk"
Instances = ["*"]
Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"]
Counters = [
"% Idle Time",
"% Disk Time","% Disk Read Time",
"% Disk Write Time",
"% User Time",
"Current Disk Queue Length",
]
Measurement = "win_disk"
#IncludeTotal=false #Set to true to include _Total instance when querying for all (*).
# Set to true to include _Total instance when querying for all (*).
#IncludeTotal=false
[[inputs.win_perf_counters.object]]
ObjectName = "System"
Counters = ["Context Switches/sec","System Calls/sec"]
Counters = [
"Context Switches/sec",
"System Calls/sec",
]
Instances = ["------"]
Measurement = "win_system"
#IncludeTotal=false #Set to true to include _Total instance when querying for all (*).
# Set to true to include _Total instance when querying for all (*).
#IncludeTotal=false
[[inputs.win_perf_counters.object]]
# Example query where the Instance portion must be removed to get data back, such as from the Memory object.
# Example query where the Instance portion must be removed to get data back,
# such as from the Memory object.
ObjectName = "Memory"
Counters = ["Available Bytes","Cache Faults/sec","Demand Zero Faults/sec","Page Faults/sec","Pages/sec","Transition Faults/sec","Pool Nonpaged Bytes","Pool Paged Bytes"]
Instances = ["------"] # Use 6 x - to remove the Instance bit from the query.
Counters = [
"Available Bytes",
"Cache Faults/sec",
"Demand Zero Faults/sec",
"Page Faults/sec",
"Pages/sec",
"Transition Faults/sec",
"Pool Nonpaged Bytes",
"Pool Paged Bytes",
]
# Use 6 x - to remove the Instance bit from the query.
Instances = ["------"]
Measurement = "win_mem"
#IncludeTotal=false #Set to true to include _Total instance when querying for all (*).
# Set to true to include _Total instance when querying for all (*).
#IncludeTotal=false
# Windows system plugins using WMI (disabled by default, using
# win_perf_counters over WMI is recommended)
# Read metrics about cpu usage
#[[inputs.cpu]]
## Whether to report per-cpu stats or not
#percpu = true
## Whether to report total system cpu stats or not
#totalcpu = true
## Comment this line if you want the raw CPU time metrics
#fielddrop = ["time_*"]
# # Read metrics about cpu usage
# [[inputs.cpu]]
# ## Whether to report per-cpu stats or not
# percpu = true
# ## Whether to report total system cpu stats or not
# totalcpu = true
# ## Comment this line if you want the raw CPU time metrics
# fielddrop = ["time_*"]
# Read metrics about disk usage by mount point
#[[inputs.disk]]
## By default, telegraf gather stats for all mountpoints.
## Setting mountpoints will restrict the stats to the specified mountpoints.
## mount_points=["/"]
## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually
## present on /run, /var/run, /dev/shm or /dev).
#ignore_fs = ["tmpfs", "devtmpfs"]
# # Read metrics about disk usage by mount point
# [[inputs.disk]]
# ## By default, telegraf gather stats for all mountpoints.
# ## Setting mountpoints will restrict the stats to the specified mountpoints.
# ## mount_points=["/"]
#
# ## Ignore some mountpoints by filesystem type. For example (dev)tmpfs (usually
# ## present on /run, /var/run, /dev/shm or /dev).
# # ignore_fs = ["tmpfs", "devtmpfs"]
# Read metrics about disk IO by device
#[[inputs.diskio]]
## By default, telegraf will gather stats for all devices including
## disk partitions.
## Setting devices will restrict the stats to the specified devices.
## devices = ["sda", "sdb"]
## Uncomment the following line if you do not need disk serial numbers.
## skip_serial_number = true
# Read metrics about memory usage
#[[inputs.mem]]
# no configuration
# # Read metrics about disk IO by device
# [[inputs.diskio]]
# ## By default, telegraf will gather stats for all devices including
# ## disk partitions.
# ## Setting devices will restrict the stats to the specified devices.
# ## devices = ["sda", "sdb"]
# ## Uncomment the following line if you do not need disk serial numbers.
# ## skip_serial_number = true
# Read metrics about swap memory usage
#[[inputs.swap]]
# no configuration
# # Read metrics about memory usage
# [[inputs.mem]]
# # no configuration
# # Read metrics about swap memory usage
# [[inputs.swap]]
# # no configuration

View File

@ -125,6 +125,9 @@ type AgentConfig struct {
// Debug is the option for running in debug mode
Debug bool
// Logfile specifies the file to send logs to
Logfile string
// Quiet is the option for running in quiet mode
Quiet bool
Hostname string
@ -195,12 +198,15 @@ var header = `# Telegraf Configuration
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
@ -222,10 +228,15 @@ var header = `# Telegraf Configuration
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Run telegraf in debug mode
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stdout.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
@ -404,24 +415,21 @@ func PrintOutputConfig(name string) error {
}
func (c *Config) LoadDirectory(path string) error {
directoryEntries, err := ioutil.ReadDir(path)
if err != nil {
return err
}
for _, entry := range directoryEntries {
if entry.IsDir() {
continue
walkfn := func(thispath string, info os.FileInfo, _ error) error {
if info.IsDir() {
return nil
}
name := entry.Name()
name := info.Name()
if len(name) < 6 || name[len(name)-5:] != ".conf" {
continue
return nil
}
err := c.LoadConfig(filepath.Join(path, name))
err := c.LoadConfig(thispath)
if err != nil {
return err
}
return nil
}
return nil
return filepath.Walk(path, walkfn)
}
// Try to find a default config file at these locations (in order):
@ -438,7 +446,7 @@ func getDefaultConfigPath() (string, error) {
}
for _, path := range []string{envfile, homefile, etcfile} {
if _, err := os.Stat(path); err == nil {
log.Printf("Using config file: %s", path)
log.Printf("I! Using config file: %s", path)
return path, nil
}
}
@ -469,7 +477,7 @@ func (c *Config) LoadConfig(path string) error {
return fmt.Errorf("%s: invalid configuration", path)
}
if err = config.UnmarshalTable(subTable, c.Tags); err != nil {
log.Printf("Could not parse [global_tags] config\n")
log.Printf("E! Could not parse [global_tags] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
@ -482,7 +490,7 @@ func (c *Config) LoadConfig(path string) error {
return fmt.Errorf("%s: invalid configuration", path)
}
if err = config.UnmarshalTable(subTable, c.Agent); err != nil {
log.Printf("Could not parse [agent] config\n")
log.Printf("E! Could not parse [agent] config\n")
return fmt.Errorf("Error parsing %s, %s", path, err)
}
}
@ -835,7 +843,7 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
if node, ok := tbl.Fields["tags"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
if err := config.UnmarshalTable(subtbl, cp.Tags); err != nil {
log.Printf("Could not parse tags for input %s\n", name)
log.Printf("E! Could not parse tags for input %s\n", name)
}
}
}

View File

@ -12,21 +12,23 @@ import (
var sepStr = fmt.Sprintf("%v", string(os.PathSeparator))
type GlobPath struct {
path string
hasMeta bool
g glob.Glob
root string
path string
hasMeta bool
hasSuperMeta bool
g glob.Glob
root string
}
func Compile(path string) (*GlobPath, error) {
out := GlobPath{
hasMeta: hasMeta(path),
path: path,
hasMeta: hasMeta(path),
hasSuperMeta: hasSuperMeta(path),
path: path,
}
// if there are no glob meta characters in the path, don't bother compiling
// a glob object or finding the root directory. (see short-circuit in Match)
if !out.hasMeta {
if !out.hasMeta || !out.hasSuperMeta {
return &out, nil
}
@ -48,6 +50,17 @@ func (g *GlobPath) Match() map[string]os.FileInfo {
}
return out
}
if !g.hasSuperMeta {
out := make(map[string]os.FileInfo)
files, _ := filepath.Glob(g.path)
for _, file := range files {
info, err := os.Stat(file)
if !os.IsNotExist(err) {
out[file] = info
}
}
return out
}
return walkFilePath(g.root, g.g)
}
@ -96,3 +109,8 @@ func findRootDir(path string) string {
func hasMeta(path string) bool {
return strings.IndexAny(path, "*?[") >= 0
}
// hasSuperMeta reports whether path contains any super magic glob characters (**).
func hasSuperMeta(path string) bool {
return strings.Index(path, "**") >= 0
}

View File

@ -198,7 +198,7 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
return err
case <-timer.C:
if err := c.Process.Kill(); err != nil {
log.Printf("FATAL error killing process: %s", err)
log.Printf("E! FATAL error killing process: %s", err)
return err
}
// wait for the command to return after killing it

View File

@ -85,7 +85,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
if !ro.Quiet {
log.Printf("Output [%s] buffer fullness: %d / %d metrics. "+
log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+
"Total gathered metrics: %d. Total dropped metrics: %d.",
ro.Name,
ro.failMetrics.Len()+ro.metrics.Len(),
@ -142,7 +142,7 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Output [%s] wrote batch of %d metrics in %s\n",
log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n",
ro.Name, len(metrics), elapsed)
}
}

58
logger/logger.go Normal file
View File

@ -0,0 +1,58 @@
package logger
import (
"io"
"log"
"os"
"github.com/influxdata/wlog"
)
// newTelegrafWriter returns a logging-wrapped writer.
func newTelegrafWriter(w io.Writer) io.Writer {
return &telegrafLog{
writer: wlog.NewWriter(w),
}
}
type telegrafLog struct {
writer io.Writer
}
func (t *telegrafLog) Write(p []byte) (n int, err error) {
return t.writer.Write(p)
}
// SetupLogging configures the logging output.
// debug will set the log level to DEBUG
// quiet will set the log level to ERROR
// logfile will direct the logging output to a file. Empty string is
// interpreted as stdout. If there is an error opening the file the
// logger will fallback to stdout.
func SetupLogging(debug, quiet bool, logfile string) {
if debug {
wlog.SetLevel(wlog.DEBUG)
}
if quiet {
wlog.SetLevel(wlog.ERROR)
}
var oFile *os.File
if logfile != "" {
if _, err := os.Stat(logfile); os.IsNotExist(err) {
if oFile, err = os.Create(logfile); err != nil {
log.Printf("E! Unable to create %s (%s), using stdout", logfile, err)
oFile = os.Stdout
}
} else {
if oFile, err = os.OpenFile(logfile, os.O_APPEND|os.O_WRONLY, os.ModeAppend); err != nil {
log.Printf("E! Unable to append to %s (%s), using stdout", logfile, err)
oFile = os.Stdout
}
}
} else {
oFile = os.Stdout
}
log.SetOutput(newTelegrafWriter(oFile))
}

View File

@ -88,7 +88,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
if err == nil {
fields[strings.Replace(k, "-", "_", -1)] = val
} else {
log.Printf("skipping aerospike field %v with int64 overflow", k)
log.Printf("I! skipping aerospike field %v with int64 overflow", k)
}
}
acc.AddFields("aerospike_node", fields, tags, time.Now())
@ -121,7 +121,7 @@ func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) erro
if err == nil {
nFields[strings.Replace(parts[0], "-", "_", -1)] = val
} else {
log.Printf("skipping aerospike field %v with int64 overflow", parts[0])
log.Printf("I! skipping aerospike field %v with int64 overflow", parts[0])
}
}
acc.AddFields("aerospike_namespace", nFields, nTags, time.Now())

View File

@ -274,7 +274,7 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error {
m = newCassandraMetric(serverTokens["host"], metric, acc)
} else {
// unsupported metric type
log.Printf("Unsupported Cassandra metric [%s], skipping",
log.Printf("I! Unsupported Cassandra metric [%s], skipping",
metric)
continue
}

View File

@ -100,12 +100,12 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error {
for _, s := range sockets {
dump, err := perfDump(c.CephBinary, s)
if err != nil {
log.Printf("error reading from socket '%s': %v", s.socket, err)
log.Printf("E! error reading from socket '%s': %v", s.socket, err)
continue
}
data, err := parseDump(dump)
if err != nil {
log.Printf("error parsing dump from socket '%s': %v", s.socket, err)
log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err)
continue
}
for tag, metrics := range *data {
@ -293,7 +293,7 @@ func flatten(data interface{}) []*metric {
}
}
default:
log.Printf("Ignoring unexpected type '%T' for value %v", val, val)
log.Printf("I! Ignoring unexpected type '%T' for value %v", val, val)
}
return metrics

View File

@ -93,13 +93,14 @@ func (c *Conntrack) Gather(acc telegraf.Accumulator) error {
contents, err := ioutil.ReadFile(fName)
if err != nil {
log.Printf("failed to read file '%s': %v", fName, err)
log.Printf("E! failed to read file '%s': %v", fName, err)
continue
}
v := strings.TrimSpace(string(contents))
fields[metricKey], err = strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("failed to parse metric, expected number but "+
log.Printf("E! failed to parse metric, expected number but "+
" found '%s': %v", v, err)
}
}

View File

@ -126,7 +126,7 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
defer wg.Done()
err := d.gatherContainer(c, acc)
if err != nil {
log.Printf("Error gathering container %s stats: %s\n",
log.Printf("E! Error gathering container %s stats: %s\n",
c.Names, err.Error())
}
}(container)

View File

@ -1,7 +1,9 @@
package http_listener
import (
"io/ioutil"
"bufio"
"bytes"
"fmt"
"log"
"net"
"net/http"
@ -72,7 +74,7 @@ func (t *HttpListener) Start(acc telegraf.Accumulator) error {
go t.httpListen()
log.Printf("Started HTTP listener service on %s\n", t.ServiceAddress)
log.Printf("I! Started HTTP listener service on %s\n", t.ServiceAddress)
return nil
}
@ -87,7 +89,7 @@ func (t *HttpListener) Stop() {
t.wg.Wait()
log.Println("Stopped HTTP listener service on ", t.ServiceAddress)
log.Println("I! Stopped HTTP listener service on ", t.ServiceAddress)
}
// httpListen listens for HTTP requests.
@ -111,25 +113,34 @@ func (t *HttpListener) httpListen() error {
func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
body, err := ioutil.ReadAll(req.Body)
if err != nil {
log.Printf("Problem reading request: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR reading request", http.StatusInternalServerError)
return
}
switch req.URL.Path {
case "/write":
var metrics []telegraf.Metric
metrics, err = t.parser.Parse(body)
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
var http400msg bytes.Buffer
var partial string
scanner := bufio.NewScanner(req.Body)
scanner.Buffer([]byte(""), 128*1024)
for scanner.Scan() {
metrics, err := t.parser.Parse(scanner.Bytes())
if err == nil {
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
partial = "partial write: "
} else {
http400msg.WriteString(err.Error() + " ")
}
res.WriteHeader(http.StatusNoContent)
}
if err := scanner.Err(); err != nil {
http.Error(res, "Internal server error: "+err.Error(), http.StatusInternalServerError)
} else if http400msg.Len() > 0 {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusBadRequest)
res.Write([]byte(fmt.Sprintf(`{"error":"%s%s"}`, partial, http400msg.String())))
} else {
log.Printf("Problem parsing body: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR parsing metrics", http.StatusInternalServerError)
res.WriteHeader(http.StatusNoContent)
}
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB

File diff suppressed because one or more lines are too long

View File

@ -90,7 +90,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
@ -115,7 +115,7 @@ func (k *Kafka) Start(acc telegraf.Accumulator) error {
// Start the kafka message reader
go k.receiver()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
@ -129,12 +129,12 @@ func (k *Kafka) receiver() {
return
case err := <-k.errs:
if err != nil {
log.Printf("Kafka Consumer Error: %s\n", err)
log.Printf("E! Kafka Consumer Error: %s\n", err)
}
case msg := <-k.in:
metrics, err := k.parser.Parse(msg.Value)
if err != nil {
log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s",
log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
string(msg.Value), err.Error())
}
@ -158,7 +158,7 @@ func (k *Kafka) Stop() {
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
log.Printf("Error closing kafka consumer: %s\n", err.Error())
log.Printf("E! Error closing kafka consumer: %s\n", err.Error())
}
}

View File

@ -202,21 +202,21 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case INT:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err)
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
fields[k] = iv
}
case FLOAT:
fv, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Printf("ERROR parsing %s to float: %s", v, err)
log.Printf("E! Error parsing %s to float: %s", v, err)
} else {
fields[k] = fv
}
case DURATION:
d, err := time.ParseDuration(v)
if err != nil {
log.Printf("ERROR parsing %s to duration: %s", v, err)
log.Printf("E! Error parsing %s to duration: %s", v, err)
} else {
fields[k] = int64(d)
}
@ -227,14 +227,14 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
case EPOCH:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err)
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
timestamp = time.Unix(iv, 0)
}
case EPOCH_NANO:
iv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("ERROR parsing %s to int: %s", v, err)
log.Printf("E! Error parsing %s to int: %s", v, err)
} else {
timestamp = time.Unix(0, iv)
}
@ -265,7 +265,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
// if we still haven't found a timestamp layout, log it and we will
// just use time.Now()
if !foundTs {
log.Printf("ERROR parsing timestamp [%s], could not find any "+
log.Printf("E! Error parsing timestamp [%s], could not find any "+
"suitable time layouts.", v)
}
case DROP:
@ -275,7 +275,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
if err == nil {
timestamp = ts
} else {
log.Printf("ERROR parsing %s to time layout [%s]: %s", v, t, err)
log.Printf("E! Error parsing %s to time layout [%s]: %s", v, t, err)
}
}
}

View File

@ -134,7 +134,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
@ -167,7 +167,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
var line *tail.Line
for line = range tailer.Lines {
if line.Err != nil {
log.Printf("ERROR tailing file %s, Error: %s\n",
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, line.Err)
continue
}
@ -216,7 +216,7 @@ func (l *LogParserPlugin) Stop() {
for _, t := range l.tailers {
err := t.Stop()
if err != nil {
log.Printf("ERROR stopping tail on file %s\n", t.Filename)
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
}
t.Cleanup()
}

View File

@ -134,7 +134,7 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
req.URL.RawQuery = params.String()
req.Header.Set("User-Agent", "Telegraf-MailChimp-Plugin")
if api.Debug {
log.Printf("Request URL: %s", req.URL.String())
log.Printf("D! Request URL: %s", req.URL.String())
}
resp, err := client.Do(req)
@ -148,7 +148,7 @@ func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
return nil, err
}
if api.Debug {
log.Printf("Response Body:%s", string(body))
log.Printf("D! Response Body:%s", string(body))
}
if err = chimpErrorCheck(body); err != nil {

View File

@ -35,13 +35,10 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
```
By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
values. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally, enabling `slave_tasks` will allow
gathering metrics from tasks running on specified slaves (this option is disabled by default).
values. User needs to specify master/slave nodes this plugin will gather metrics from.
### Measurements & Fields:
@ -235,31 +232,6 @@ Mesos slave metric groups
- slave/valid_framework_messages
- slave/valid_status_updates
Mesos tasks metric groups
- executor_id
- executor_name
- framework_id
- source
- statistics
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp
### Tags:
- All master/slave measurements have the following tags:
@ -269,16 +241,11 @@ Mesos tasks metric groups
- All master measurements have the extra tags:
- state (leader/follower)
- Tasks measurements have the following tags:
- server
- framework_id
- task_id
### Example Output:
```
$ telegraf -config ~/mesos.conf -input-filter mesos -test
* Plugin: mesos, Collection 1
mesos,role=master,state=leader,host=172.17.8.102,server=172.17.8.101
mesos,role=master,state=leader,host=172.17.8.102,server=172.17.8.101
allocator/event_queue_dispatches=0,master/cpus_percent=0,
master/cpus_revocable_percent=0,master/cpus_revocable_total=0,
master/cpus_revocable_used=0,master/cpus_total=2,
@ -297,15 +264,3 @@ master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ...
```
Meoso tasks metrics (if enabled):
```
mesos-tasks,host=172.17.8.102,server=172.17.8.101,framework_id=e3060235-c4ed-4765-9d36-784e3beca07f-0000,task_id=hello-world.e4b5b497-2ccd-11e6-a659-0242fb222ce2
cpus_limit=0.2,cpus_system_time_secs=142.49,cpus_user_time_secs=388.14,
mem_anon_bytes=359129088,mem_cache_bytes=3964928,
mem_critical_pressure_counter=0,mem_file_bytes=3964928,
mem_limit_bytes=767557632,mem_low_pressure_counter=0,
mem_mapped_file_bytes=114688,mem_medium_pressure_counter=0,
mem_rss_bytes=359129088,mem_swap_bytes=0,mem_total_bytes=363094016,
mem_total_memsw_bytes=363094016,mem_unevictable_bytes=0,
timestamp=1465486052.70525 1465486053052811792...
```

View File

@ -30,7 +30,7 @@ type Mesos struct {
MasterCols []string `toml:"master_collections"`
Slaves []string
SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool
//SlaveTasks bool
}
var allMetrics = map[Role][]string{
@ -66,8 +66,6 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
`
// SampleConfig returns a sample configuration block
@ -90,7 +88,7 @@ func (m *Mesos) SetDefaults() {
}
if m.Timeout == 0 {
log.Println("[mesos] Missing timeout value, setting default value (100ms)")
log.Println("I! [mesos] Missing timeout value, setting default value (100ms)")
m.Timeout = 100
}
}
@ -121,16 +119,16 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return
}(v)
if !m.SlaveTasks {
continue
}
// if !m.SlaveTasks {
// continue
// }
wg.Add(1)
go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done()
return
}(v)
// wg.Add(1)
// go func(c string) {
// errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
// wg.Done()
// return
// }(v)
}
wg.Wait()
@ -385,7 +383,7 @@ func getMetrics(role Role, group string) []string {
ret, ok := m[group]
if !ok {
log.Printf("[mesos] Unkown %s metrics group: %s\n", role, group)
log.Printf("I! [mesos] Unkown %s metrics group: %s\n", role, group)
return []string{}
}
@ -459,7 +457,6 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
}
for _, task := range metrics {
tags["task_id"] = task.ExecutorID
tags["framework_id"] = task.FrameworkID
jf := jsonparser.JSONFlattener{}
@ -468,7 +465,9 @@ func (m *Mesos) gatherSlaveTaskMetrics(address string, defaultPort string, acc t
if err != nil {
return err
}
timestamp := time.Unix(int64(jf.Fields["timestamp"].(float64)), 0)
jf.Fields["executor_id"] = task.ExecutorID
acc.AddFields("mesos_tasks", jf.Fields, tags, timestamp)
}

View File

@ -9,14 +9,14 @@ import (
"os"
"testing"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"
)
var masterMetrics map[string]interface{}
var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}
// var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server
func randUUID() string {
@ -216,31 +216,31 @@ func generateMetrics() {
slaveMetrics[k] = rand.Float64()
}
slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_%s", randUUID()),
"executor_name": "Some task description",
"framework_id": randUUID(),
"source": fmt.Sprintf("task_source_%s", randUUID()),
"statistics": map[string]interface{}{
"cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(),
},
}
// slaveTaskMetrics = map[string]interface{}{
// "executor_id": fmt.Sprintf("task_name.%s", randUUID()),
// "executor_name": "Some task description",
// "framework_id": randUUID(),
// "source": fmt.Sprintf("task_source.%s", randUUID()),
// "statistics": map[string]interface{}{
// "cpus_limit": rand.Float64(),
// "cpus_system_time_secs": rand.Float64(),
// "cpus_user_time_secs": rand.Float64(),
// "mem_anon_bytes": float64(rand.Int63()),
// "mem_cache_bytes": float64(rand.Int63()),
// "mem_critical_pressure_counter": float64(rand.Int63()),
// "mem_file_bytes": float64(rand.Int63()),
// "mem_limit_bytes": float64(rand.Int63()),
// "mem_low_pressure_counter": float64(rand.Int63()),
// "mem_mapped_file_bytes": float64(rand.Int63()),
// "mem_medium_pressure_counter": float64(rand.Int63()),
// "mem_rss_bytes": float64(rand.Int63()),
// "mem_swap_bytes": float64(rand.Int63()),
// "mem_total_bytes": float64(rand.Int63()),
// "mem_total_memsw_bytes": float64(rand.Int63()),
// "mem_unevictable_bytes": float64(rand.Int63()),
// "timestamp": rand.Float64(),
// },
// }
}
func TestMain(m *testing.M) {
@ -260,11 +260,11 @@ func TestMain(m *testing.M) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics)
})
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
})
// slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusOK)
// w.Header().Set("Content-Type", "application/json")
// json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
// })
slaveTestServer = httptest.NewServer(slaveRouter)
rc := m.Run()
@ -324,10 +324,10 @@ func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator
m := Mesos{
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true,
Timeout: 10,
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
// SlaveTasks: true,
Timeout: 10,
}
err := m.Gather(&acc)
@ -338,17 +338,17 @@ func TestMesosSlave(t *testing.T) {
acc.AssertContainsFields(t, "mesos", slaveMetrics)
jf := jsonparser.JSONFlattener{}
err = jf.FlattenJSON("", slaveTaskMetrics)
// expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1)
// for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) {
// expectedFields[k] = v
// }
// expectedFields["executor_id"] = slaveTaskMetrics["executor_id"]
if err != nil {
t.Errorf(err.Error())
}
acc.AssertContainsFields(
t,
"mesos_tasks",
slaveTaskMetrics["statistics"].(map[string]interface{}))
// acc.AssertContainsTaggedFields(
// t,
// "mesos_tasks",
// expectedFields,
// map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)})
}
func TestSlaveFilter(t *testing.T) {

View File

@ -47,7 +47,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
},
}, result_repl)
if err != nil {
log.Println("Not gathering replica set status, member not in replica set (" + err.Error() + ")")
log.Println("E! Not gathering replica set status, member not in replica set (" + err.Error() + ")")
}
jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
@ -62,7 +62,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
names := []string{}
names, err = s.Session.DatabaseNames()
if err != nil {
log.Println("Error getting database names (" + err.Error() + ")")
log.Println("E! Error getting database names (" + err.Error() + ")")
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
@ -73,7 +73,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool) error
},
}, db_stat_line)
if err != nil {
log.Println("Error getting db stats from " + db_name + "(" + err.Error() + ")")
log.Println("E! Error getting db stats from " + db_name + "(" + err.Error() + ")")
}
db := &Db{
Name: db_name,

View File

@ -133,7 +133,7 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return nil
}
func (m *MQTTConsumer) onConnect(c mqtt.Client) {
log.Printf("MQTT Client Connected")
log.Printf("I! MQTT Client Connected")
if !m.PersistentSession || !m.started {
topics := make(map[string]byte)
for _, topic := range m.Topics {
@ -142,7 +142,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
strings.Join(m.Topics[:], ","), subscribeToken.Error())
}
m.started = true
@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
}
func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
log.Printf("MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())
log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())
return
}
@ -166,7 +166,7 @@ func (m *MQTTConsumer) receiver() {
topic := msg.Topic()
metrics, err := m.parser.Parse(msg.Payload())
if err != nil {
log.Printf("MQTT PARSE ERROR\nmessage: %s\nerror: %s",
log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
string(msg.Payload()), err.Error())
}

View File

@ -313,6 +313,10 @@ var mappings = []*mapping{
onServer: "wsrep_",
inExport: "wsrep_",
},
{
onServer: "Uptime_",
inExport: "uptime_",
},
}
var (

View File

@ -119,7 +119,7 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
// Start the message reader
go n.receiver()
log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
log.Printf("I! Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
return nil
@ -134,11 +134,11 @@ func (n *natsConsumer) receiver() {
case <-n.done:
return
case err := <-n.errs:
log.Printf("error reading from %s\n", err.Error())
log.Printf("E! error reading from %s\n", err.Error())
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
log.Printf("subject: %s, error: %s", msg.Subject, err.Error())
log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error())
}
for _, metric := range metrics {
@ -157,7 +157,7 @@ func (n *natsConsumer) clean() {
for _, sub := range n.Subs {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Error unsubscribing from subject %s in queue %s: %s\n",
log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n",
sub.Subject, sub.Queue, err.Error())
}
}

View File

@ -62,7 +62,7 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
metrics, err := n.parser.Parse(message.Body)
if err != nil {
log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
log.Printf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
return nil
}
for _, metric := range metrics {

View File

@ -132,7 +132,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
case strings.HasSuffix(when, "h"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "h"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
log.Printf("E! Error ntpq: parsing int: %s", fields[index])
continue
}
// seconds in an hour
@ -141,7 +141,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
case strings.HasSuffix(when, "d"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "d"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
log.Printf("E! Error ntpq: parsing int: %s", fields[index])
continue
}
// seconds in a day
@ -150,7 +150,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
case strings.HasSuffix(when, "m"):
m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "m"))
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
log.Printf("E! Error ntpq: parsing int: %s", fields[index])
continue
}
// seconds in a day
@ -161,7 +161,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
m, err := strconv.Atoi(fields[index])
if err != nil {
log.Printf("ERROR ntpq: parsing int: %s", fields[index])
log.Printf("E! Error ntpq: parsing int: %s", fields[index])
continue
}
mFields[key] = int64(m)
@ -178,7 +178,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error {
m, err := strconv.ParseFloat(fields[index], 64)
if err != nil {
log.Printf("ERROR ntpq: parsing float: %s", fields[index])
log.Printf("E! Error ntpq: parsing float: %s", fields[index])
continue
}
mFields[key] = m

View File

@ -269,9 +269,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
fields := make(map[string]interface{})
COLUMN:
for col, val := range columnMap {
if acc.Debug() {
log.Printf("postgresql_extensible: column: %s = %T: %s\n", col, *val, *val)
}
log.Printf("D! postgresql_extensible: column: %s = %T: %s\n", col, *val, *val)
_, ignore := ignoredColumns[col]
if ignore || *val == nil {
continue

View File

@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
@ -86,10 +87,7 @@ func (p *Powerdns) gatherServer(address string, acc telegraf.Accumulator) error
metrics := string(buf)
// Process data
fields, err := parseResponse(metrics)
if err != nil {
return err
}
fields := parseResponse(metrics)
// Add server socket as a tag
tags := map[string]string{"server": address}
@ -99,22 +97,27 @@ func (p *Powerdns) gatherServer(address string, acc telegraf.Accumulator) error
return nil
}
func parseResponse(metrics string) (map[string]interface{}, error) {
func parseResponse(metrics string) map[string]interface{} {
values := make(map[string]interface{})
s := strings.Split(metrics, ",")
for _, metric := range s[:len(s)-1] {
m := strings.Split(metric, "=")
if len(m) < 2 {
continue
}
i, err := strconv.ParseInt(m[1], 10, 64)
if err != nil {
return values, err
log.Printf("E! powerdns: Error parsing integer for metric [%s]: %s",
metric, err)
continue
}
values[m[0]] = i
}
return values, nil
return values
}
func init() {

View File

@ -25,6 +25,30 @@ var metrics = "corrupt-packets=0,deferred-cache-inserts=0,deferred-cache-lookup=
"key-cache-size=0,latency=26,meta-cache-size=0,qsize-q=0," +
"signature-cache-size=0,sys-msec=2889,uptime=86317,user-msec=2167,"
// first metric has no "="
var corruptMetrics = "corrupt-packets--0,deferred-cache-inserts=0,deferred-cache-lookup=0," +
"dnsupdate-answers=0,dnsupdate-changes=0,dnsupdate-queries=0," +
"dnsupdate-refused=0,packetcache-hit=0,packetcache-miss=1,packetcache-size=0," +
"query-cache-hit=0,query-cache-miss=6,rd-queries=1,recursing-answers=0," +
"recursing-questions=0,recursion-unanswered=0,security-status=3," +
"servfail-packets=0,signatures=0,tcp-answers=0,tcp-queries=0," +
"timedout-packets=0,udp-answers=1,udp-answers-bytes=50,udp-do-queries=0," +
"udp-queries=0,udp4-answers=1,udp4-queries=1,udp6-answers=0,udp6-queries=0," +
"key-cache-size=0,latency=26,meta-cache-size=0,qsize-q=0," +
"signature-cache-size=0,sys-msec=2889,uptime=86317,user-msec=2167,"
// integer overflow
var intOverflowMetrics = "corrupt-packets=18446744073709550195,deferred-cache-inserts=0,deferred-cache-lookup=0," +
"dnsupdate-answers=0,dnsupdate-changes=0,dnsupdate-queries=0," +
"dnsupdate-refused=0,packetcache-hit=0,packetcache-miss=1,packetcache-size=0," +
"query-cache-hit=0,query-cache-miss=6,rd-queries=1,recursing-answers=0," +
"recursing-questions=0,recursion-unanswered=0,security-status=3," +
"servfail-packets=0,signatures=0,tcp-answers=0,tcp-queries=0," +
"timedout-packets=0,udp-answers=1,udp-answers-bytes=50,udp-do-queries=0," +
"udp-queries=0,udp4-answers=1,udp4-queries=1,udp6-answers=0,udp6-queries=0," +
"key-cache-size=0,latency=26,meta-cache-size=0,qsize-q=0," +
"signature-cache-size=0,sys-msec=2889,uptime=86317,user-msec=2167,"
func (s statServer) serverSocket(l net.Listener) {
for {
@ -86,8 +110,7 @@ func TestMemcachedGeneratesMetrics(t *testing.T) {
}
func TestPowerdnsParseMetrics(t *testing.T) {
values, err := parseResponse(metrics)
require.NoError(t, err, "Error parsing memcached response")
values := parseResponse(metrics)
tests := []struct {
key string
@ -145,3 +168,121 @@ func TestPowerdnsParseMetrics(t *testing.T) {
}
}
}
func TestPowerdnsParseCorruptMetrics(t *testing.T) {
values := parseResponse(corruptMetrics)
tests := []struct {
key string
value int64
}{
{"deferred-cache-inserts", 0},
{"deferred-cache-lookup", 0},
{"dnsupdate-answers", 0},
{"dnsupdate-changes", 0},
{"dnsupdate-queries", 0},
{"dnsupdate-refused", 0},
{"packetcache-hit", 0},
{"packetcache-miss", 1},
{"packetcache-size", 0},
{"query-cache-hit", 0},
{"query-cache-miss", 6},
{"rd-queries", 1},
{"recursing-answers", 0},
{"recursing-questions", 0},
{"recursion-unanswered", 0},
{"security-status", 3},
{"servfail-packets", 0},
{"signatures", 0},
{"tcp-answers", 0},
{"tcp-queries", 0},
{"timedout-packets", 0},
{"udp-answers", 1},
{"udp-answers-bytes", 50},
{"udp-do-queries", 0},
{"udp-queries", 0},
{"udp4-answers", 1},
{"udp4-queries", 1},
{"udp6-answers", 0},
{"udp6-queries", 0},
{"key-cache-size", 0},
{"latency", 26},
{"meta-cache-size", 0},
{"qsize-q", 0},
{"signature-cache-size", 0},
{"sys-msec", 2889},
{"uptime", 86317},
{"user-msec", 2167},
}
for _, test := range tests {
value, ok := values[test.key]
if !ok {
t.Errorf("Did not find key for metric %s in values", test.key)
continue
}
if value != test.value {
t.Errorf("Metric: %s, Expected: %d, actual: %d",
test.key, test.value, value)
}
}
}
func TestPowerdnsParseIntOverflowMetrics(t *testing.T) {
values := parseResponse(intOverflowMetrics)
tests := []struct {
key string
value int64
}{
{"deferred-cache-inserts", 0},
{"deferred-cache-lookup", 0},
{"dnsupdate-answers", 0},
{"dnsupdate-changes", 0},
{"dnsupdate-queries", 0},
{"dnsupdate-refused", 0},
{"packetcache-hit", 0},
{"packetcache-miss", 1},
{"packetcache-size", 0},
{"query-cache-hit", 0},
{"query-cache-miss", 6},
{"rd-queries", 1},
{"recursing-answers", 0},
{"recursing-questions", 0},
{"recursion-unanswered", 0},
{"security-status", 3},
{"servfail-packets", 0},
{"signatures", 0},
{"tcp-answers", 0},
{"tcp-queries", 0},
{"timedout-packets", 0},
{"udp-answers", 1},
{"udp-answers-bytes", 50},
{"udp-do-queries", 0},
{"udp-queries", 0},
{"udp4-answers", 1},
{"udp4-queries", 1},
{"udp6-answers", 0},
{"udp6-queries", 0},
{"key-cache-size", 0},
{"latency", 26},
{"meta-cache-size", 0},
{"qsize-q", 0},
{"signature-cache-size", 0},
{"sys-msec", 2889},
{"uptime", 86317},
{"user-msec", 2167},
}
for _, test := range tests {
value, ok := values[test.key]
if !ok {
t.Errorf("Did not find key for metric %s in values", test.key)
continue
}
if value != test.value {
t.Errorf("Metric: %s, Expected: %d, actual: %d",
test.key, test.value, value)
}
}
}

View File

@ -66,7 +66,7 @@ func (_ *Procstat) Description() string {
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
err := p.createProcesses()
if err != nil {
log.Printf("Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
log.Printf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
p.Exe, p.PidFile, p.Pattern, p.User, err.Error())
} else {
for pid, proc := range p.pidmap {

View File

@ -136,6 +136,9 @@ Output measurement name.
* `oid`:
OID to get. May be a numeric or textual OID.
* `oid_index_suffix`:
The OID sub-identifier to strip off so that the index can be matched against other fields in the table.
* `name`:
Output field/tag name.
If not specified, it defaults to the value of `oid`. If `oid` is numeric, an attempt to translate the numeric OID into a texual OID will be made.
@ -149,6 +152,8 @@ Converts the value according to the given specification.
- `float(X)`: Converts the input value into a float and divides by the Xth power of 10. Efficively just moves the decimal left X places. For example a value of `123` with `float(2)` will result in `1.23`.
- `float`: Converts the value into a float with no adjustment. Same as `float(0)`.
- `int`: Convertes the value into an integer.
- `hwaddr`: Converts the value to a MAC address.
- `ipaddr`: Converts the value to an IP address.
#### Table parameters:
* `oid`:

View File

@ -20,25 +20,29 @@ import (
const description = `Retrieves SNMP values from remote agents`
const sampleConfig = `
agents = [ "127.0.0.1:161" ]
## Timeout for each SNMP query.
timeout = "5s"
## Number of retries to attempt within timeout.
retries = 3
## SNMP version, values can be 1, 2, or 3
version = 2
# SNMPv1 & SNMPv2 parameters
## SNMP community string.
community = "public"
# SNMPv2 & SNMPv3 parameters
max_repetitions = 50
## The GETBULK max-repetitions parameter
max_repetitions = 10
# SNMPv3 parameters
## SNMPv3 auth parameters
#sec_name = "myuser"
#auth_protocol = "md5" # Values: "MD5", "SHA", ""
#auth_password = "password123"
#sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
#auth_protocol = "md5" # Values: "MD5", "SHA", ""
#auth_password = "pass"
#sec_level = "authNoPriv" # Values: "noAuthNoPriv", "authNoPriv", "authPriv"
#context_name = ""
#priv_protocol = "" # Values: "DES", "AES", ""
#priv_protocol = "" # Values: "DES", "AES", ""
#priv_password = ""
# measurement name
## measurement name
name = "system"
[[inputs.snmp.field]]
name = "hostname"
@ -53,7 +57,7 @@ const sampleConfig = `
oid = "HOST-RESOURCES-MIB::hrMemorySize"
[[inputs.snmp.table]]
# measurement name
## measurement name
name = "remote_servers"
inherit_tags = [ "hostname" ]
[[inputs.snmp.table.field]]
@ -68,7 +72,7 @@ const sampleConfig = `
oid = ".1.0.0.0.1.2"
[[inputs.snmp.table]]
# auto populate table's fields using the MIB
## auto populate table's fields using the MIB
oid = "HOST-RESOURCES-MIB::hrNetworkTable"
`
@ -105,7 +109,7 @@ type Snmp struct {
Community string
// Parameters for Version 2 & 3
MaxRepetitions uint
MaxRepetitions int
// Parameters for Version 3
ContextName string
@ -184,23 +188,21 @@ func (t *Table) init() error {
return nil
}
mibPrefix := ""
if err := snmpTranslate(&mibPrefix, &t.Oid, &t.Name); err != nil {
return err
mibName, _, oidText, _, err := snmpTranslate(t.Oid)
if err != nil {
return Errorf(err, "translating %s", t.Oid)
}
if t.Name == "" {
t.Name = oidText
}
mibPrefix := mibName + "::"
oidFullName := mibPrefix + oidText
// first attempt to get the table's tags
tagOids := map[string]struct{}{}
// We have to guess that the "entry" oid is `t.Oid+".1"`. snmptable and snmptranslate don't seem to have a way to provide the info.
if out, err := execCmd("snmptranslate", "-m", "all", "-Td", t.Oid+".1"); err == nil {
if out, err := execCmd("snmptranslate", "-Td", oidFullName+".1"); err == nil {
lines := bytes.Split(out, []byte{'\n'})
// get the MIB name if we didn't get it above
if mibPrefix == "" {
if i := bytes.Index(lines[0], []byte("::")); i != -1 {
mibPrefix = string(lines[0][:i+2])
}
}
for _, line := range lines {
if !bytes.HasPrefix(line, []byte(" INDEX")) {
continue
@ -223,7 +225,7 @@ func (t *Table) init() error {
}
// this won't actually try to run a query. The `-Ch` will just cause it to dump headers.
out, err := execCmd("snmptable", "-m", "all", "-Ch", "-Cl", "-c", "public", "127.0.0.1", t.Oid)
out, err := execCmd("snmptable", "-Ch", "-Cl", "-c", "public", "127.0.0.1", oidFullName)
if err != nil {
return Errorf(err, "getting table columns for %s", t.Oid)
}
@ -260,12 +262,16 @@ type Field struct {
// off the OID prefix, and use the remainder as the index. For multiple fields
// to show up in the same row, they must share the same index.
Oid string
// OidIndexSuffix is the trailing sub-identifier on a table record OID that will be stripped off to get the record's index.
OidIndexSuffix string
// IsTag controls whether this OID is output as a tag or a value.
IsTag bool
// Conversion controls any type conversion that is done on the value.
// "float"/"float(0)" will convert the value into a float.
// "float(X)" will convert the value into a float, and then move the decimal before Xth right-most digit.
// "int" will conver the value into an integer.
// "hwaddr" will convert a 6-byte string to a MAC address.
// "ipaddr" will convert the value to an IPv4 or IPv6 address.
Conversion string
initialized bool
@ -277,8 +283,16 @@ func (f *Field) init() error {
return nil
}
if err := snmpTranslate(nil, &f.Oid, &f.Name); err != nil {
return err
_, oidNum, oidText, conversion, err := snmpTranslate(f.Oid)
if err != nil {
return Errorf(err, "translating %s", f.Oid)
}
f.Oid = oidNum
if f.Name == "" {
f.Name = oidText
}
if f.Conversion == "" {
f.Conversion = conversion
}
//TODO use textual convention conversion from the MIB
@ -330,8 +344,8 @@ func Errorf(err error, msg string, format ...interface{}) error {
func init() {
inputs.Add("snmp", func() telegraf.Input {
return &Snmp{
Retries: 5,
MaxRepetitions: 50,
Retries: 3,
MaxRepetitions: 10,
Timeout: internal.Duration{Duration: 5 * time.Second},
Version: 2,
Community: "public",
@ -448,14 +462,32 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
return nil, Errorf(err, "performing get")
} else if pkt != nil && len(pkt.Variables) > 0 && pkt.Variables[0].Type != gosnmp.NoSuchObject {
ent := pkt.Variables[0]
ifv[ent.Name[len(oid):]] = fieldConvert(f.Conversion, ent.Value)
fv, err := fieldConvert(f.Conversion, ent.Value)
if err != nil {
return nil, Errorf(err, "converting %q", ent.Value)
}
ifv[""] = fv
}
} else {
err := gs.Walk(oid, func(ent gosnmp.SnmpPDU) error {
if len(ent.Name) <= len(oid) || ent.Name[:len(oid)+1] != oid+"." {
return NestedError{} // break the walk
}
ifv[ent.Name[len(oid):]] = fieldConvert(f.Conversion, ent.Value)
idx := ent.Name[len(oid):]
if f.OidIndexSuffix != "" {
if !strings.HasSuffix(idx, f.OidIndexSuffix) {
// this entry doesn't match our OidIndexSuffix. skip it
return nil
}
idx = idx[:len(idx)-len(f.OidIndexSuffix)]
}
fv, err := fieldConvert(f.Conversion, ent.Value)
if err != nil {
return Errorf(err, "converting %q", ent.Value)
}
ifv[idx] = fv
return nil
})
if err != nil {
@ -610,7 +642,7 @@ func (s *Snmp) getConnection(agent string) (snmpConnection, error) {
}
}
gs.MaxRepetitions = int(s.MaxRepetitions)
gs.MaxRepetitions = s.MaxRepetitions
if s.Version == 3 {
gs.ContextName = s.ContextName
@ -677,14 +709,16 @@ func (s *Snmp) getConnection(agent string) (snmpConnection, error) {
// "float"/"float(0)" will convert the value into a float.
// "float(X)" will convert the value into a float, and then move the decimal before Xth right-most digit.
// "int" will convert the value into an integer.
// "hwaddr" will convert the value into a MAC address.
// "ipaddr" will convert the value into into an IP address.
// "" will convert a byte slice into a string.
// Any other conv will return the input value unchanged.
func fieldConvert(conv string, v interface{}) interface{} {
func fieldConvert(conv string, v interface{}) (interface{}, error) {
if conv == "" {
if bs, ok := v.([]byte); ok {
return string(bs)
return string(bs), nil
}
return v
return v, nil
}
var d int
@ -721,7 +755,9 @@ func fieldConvert(conv string, v interface{}) interface{} {
vf, _ := strconv.ParseFloat(vt, 64)
v = vf / math.Pow10(d)
}
return v, nil
}
if conv == "int" {
switch vt := v.(type) {
case float32:
@ -753,39 +789,106 @@ func fieldConvert(conv string, v interface{}) interface{} {
case string:
v, _ = strconv.Atoi(vt)
}
return v, nil
}
return v
if conv == "hwaddr" {
switch vt := v.(type) {
case string:
v = net.HardwareAddr(vt).String()
case []byte:
v = net.HardwareAddr(vt).String()
default:
return nil, fmt.Errorf("invalid type (%T) for hwaddr conversion", v)
}
}
if conv == "ipaddr" {
var ipbs []byte
switch vt := v.(type) {
case string:
ipbs = []byte(vt)
case []byte:
ipbs = vt
default:
return nil, fmt.Errorf("invalid type (%T) for ipaddr conversion", v)
}
switch len(ipbs) {
case 4, 16:
v = net.IP(ipbs).String()
default:
return nil, fmt.Errorf("invalid length (%d) for ipaddr conversion", len(ipbs))
}
return v, nil
}
return v, nil
}
// snmpTranslate resolves the given OID.
// The contents of the oid parameter will be replaced with the numeric oid value.
// If name is empty, the textual OID value is stored in it. If the textual OID cannot be translated, the numeric OID is stored instead.
// If mibPrefix is non-nil, the MIB in which the OID was found is stored, with a suffix of "::".
func snmpTranslate(mibPrefix *string, oid *string, name *string) error {
if strings.ContainsAny(*oid, ":abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") {
out, err := execCmd("snmptranslate", "-m", "all", "-On", *oid)
if err != nil {
return Errorf(err, "translating %s", *oid)
}
*oid = string(bytes.TrimSuffix(out, []byte{'\n'}))
func snmpTranslate(oid string) (mibName string, oidNum string, oidText string, conversion string, err error) {
var out []byte
if strings.ContainsAny(oid, ":abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") {
out, err = execCmd("snmptranslate", "-Td", "-Ob", oid)
} else {
out, err = execCmd("snmptranslate", "-Td", "-Ob", "-m", "all", oid)
}
if err != nil {
return "", "", "", "", err
}
if *name == "" {
out, err := execCmd("snmptranslate", "-m", "all", *oid)
bb := bytes.NewBuffer(out)
oidText, err = bb.ReadString('\n')
if err != nil {
return "", "", "", "", Errorf(err, "getting OID text")
}
oidText = oidText[:len(oidText)-1]
i := strings.Index(oidText, "::")
if i == -1 {
// was not found in MIB. Value is numeric
return "", oidText, oidText, "", nil
}
mibName = oidText[:i]
oidText = oidText[i+2:]
if i := bytes.Index(bb.Bytes(), []byte(" -- TEXTUAL CONVENTION ")); i != -1 {
bb.Next(i + len(" -- TEXTUAL CONVENTION "))
tc, err := bb.ReadString('\n')
if err != nil {
//TODO debug message
*name = *oid
return "", "", "", "", Errorf(err, "getting textual convention")
}
tc = tc[:len(tc)-1]
switch tc {
case "MacAddress", "PhysAddress":
conversion = "hwaddr"
case "InetAddressIPv4", "InetAddressIPv6", "InetAddress":
conversion = "ipaddr"
}
}
i = bytes.Index(bb.Bytes(), []byte("::= { "))
bb.Next(i + len("::= { "))
objs, err := bb.ReadString('}')
if err != nil {
return "", "", "", "", Errorf(err, "getting numeric oid")
}
objs = objs[:len(objs)-1]
for _, obj := range strings.Split(objs, " ") {
if len(obj) == 0 {
continue
}
if i := strings.Index(obj, "("); i != -1 {
obj = obj[i+1:]
oidNum += "." + obj[:strings.Index(obj, ")")]
} else {
if i := bytes.Index(out, []byte("::")); i != -1 {
if mibPrefix != nil {
*mibPrefix = string(out[:i+2])
}
out = out[i+2:]
}
*name = string(bytes.TrimSuffix(out, []byte{'\n'}))
oidNum += "." + obj
}
}
return nil
return mibName, oidNum, oidText, conversion, nil
}

View File

@ -0,0 +1,97 @@
// +build generate
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"os/exec"
"strings"
)
// This file is a generator used to generate the mocks for the commands used by the tests.
// These are the commands to be mocked.
var mockedCommands = [][]string{
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".1.0.0.0"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".1.0.0.1.1"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".1.0.0.1.2"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", "1.0.0.1.1"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".1.0.0.0.1.1"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".1.0.0.0.1.1.0"},
{"snmptranslate", "-Td", "-Ob", "-m", "all", ".999"},
{"snmptranslate", "-Td", "-Ob", "TEST::server"},
{"snmptranslate", "-Td", "-Ob", "TEST::server.0"},
{"snmptranslate", "-Td", "-Ob", "TEST::testTable"},
{"snmptranslate", "-Td", "-Ob", "TEST::connections"},
{"snmptranslate", "-Td", "-Ob", "TEST::latency"},
{"snmptranslate", "-Td", "-Ob", "TEST::hostname"},
{"snmptranslate", "-Td", "-Ob", "IF-MIB::ifPhysAddress.1"},
{"snmptranslate", "-Td", "-Ob", "BRIDGE-MIB::dot1dTpFdbAddress.1"},
{"snmptranslate", "-Td", "-Ob", "TCP-MIB::tcpConnectionLocalAddress.1"},
{"snmptranslate", "-Td", "TEST::testTable.1"},
{"snmptable", "-Ch", "-Cl", "-c", "public", "127.0.0.1", "TEST::testTable"},
}
type mockedCommandResult struct {
stdout string
stderr string
exitError bool
}
func main() {
if err := generate(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s\n", err)
os.Exit(1)
}
}
func generate() error {
f, err := os.OpenFile("snmp_mocks_test.go", os.O_RDWR, 0644)
if err != nil {
return err
}
br := bufio.NewReader(f)
var i int64
for l, err := br.ReadString('\n'); err == nil; l, err = br.ReadString('\n') {
i += int64(len(l))
if l == "// BEGIN GO GENERATE CONTENT\n" {
break
}
}
f.Truncate(i)
f.Seek(i, 0)
fmt.Fprintf(f, "var mockedCommandResults = map[string]mockedCommandResult{\n")
for _, cmd := range mockedCommands {
ec := exec.Command(cmd[0], cmd[1:]...)
out := bytes.NewBuffer(nil)
err := bytes.NewBuffer(nil)
ec.Stdout = out
ec.Stderr = err
ec.Env = []string{
"MIBDIRS=+./testdata",
}
var mcr mockedCommandResult
if err := ec.Run(); err != nil {
if err, ok := err.(*exec.ExitError); !ok {
mcr.exitError = true
} else {
return fmt.Errorf("executing %v: %s", cmd, err)
}
}
mcr.stdout = string(out.Bytes())
mcr.stderr = string(err.Bytes())
cmd0 := strings.Join(cmd, "\000")
mcrv := fmt.Sprintf("%#v", mcr)[5:] // trim `main.` prefix
fmt.Fprintf(f, "%#v: %s,\n", cmd0, mcrv)
}
f.Write([]byte("}\n"))
f.Close()
return exec.Command("gofmt", "-w", "snmp_mocks_test.go").Run()
}

View File

@ -0,0 +1,82 @@
package snmp
import (
"fmt"
"os"
"os/exec"
"strings"
"testing"
)
type mockedCommandResult struct {
stdout string
stderr string
exitError bool
}
func mockExecCommand(arg0 string, args ...string) *exec.Cmd {
args = append([]string{"-test.run=TestMockExecCommand", "--", arg0}, args...)
cmd := exec.Command(os.Args[0], args...)
cmd.Stderr = os.Stderr // so the test output shows errors
return cmd
}
// This is not a real test. This is just a way of mocking out commands.
//
// Idea based on https://github.com/golang/go/blob/7c31043/src/os/exec/exec_test.go#L568
func TestMockExecCommand(t *testing.T) {
var cmd []string
for _, arg := range os.Args {
if string(arg) == "--" {
cmd = []string{}
continue
}
if cmd == nil {
continue
}
cmd = append(cmd, string(arg))
}
if cmd == nil {
return
}
cmd0 := strings.Join(cmd, "\000")
mcr, ok := mockedCommandResults[cmd0]
if !ok {
cv := fmt.Sprintf("%#v", cmd)[8:] // trim `[]string` prefix
fmt.Fprintf(os.Stderr, "Unmocked command. Please add the following to `mockedCommands` in snmp_mocks_generate.go, and then run `go generate`:\n\t%s,\n", cv)
os.Exit(1)
}
fmt.Printf("%s", mcr.stdout)
fmt.Fprintf(os.Stderr, "%s", mcr.stderr)
if mcr.exitError {
os.Exit(1)
}
os.Exit(0)
}
func init() {
execCommand = mockExecCommand
}
// BEGIN GO GENERATE CONTENT
var mockedCommandResults = map[string]mockedCommandResult{
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.1.0.0.0": mockedCommandResult{stdout: "TEST::testTable\ntestTable OBJECT-TYPE\n -- FROM\tTEST\n MAX-ACCESS\tnot-accessible\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) 0 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.1.0.0.1.1": mockedCommandResult{stdout: "TEST::hostname\nhostname OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) 1 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.1.0.0.1.2": mockedCommandResult{stdout: "TEST::1.2\nanonymous#1 OBJECT-TYPE\n -- FROM\tTEST\n::= { iso(1) 0 testOID(0) 1 2 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x001.0.0.1.1": mockedCommandResult{stdout: "TEST::hostname\nhostname OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) 1 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.1.0.0.0.1.1": mockedCommandResult{stdout: "TEST::server\nserver OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.1.0.0.0.1.1.0": mockedCommandResult{stdout: "TEST::server.0\nserver OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) server(1) 0 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00-m\x00all\x00.999": mockedCommandResult{stdout: ".999\n [TRUNCATED]\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::server": mockedCommandResult{stdout: "TEST::server\nserver OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::server.0": mockedCommandResult{stdout: "TEST::server.0\nserver OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) server(1) 0 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::testTable": mockedCommandResult{stdout: "TEST::testTable\ntestTable OBJECT-TYPE\n -- FROM\tTEST\n MAX-ACCESS\tnot-accessible\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) 0 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::connections": mockedCommandResult{stdout: "TEST::connections\nconnections OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tINTEGER\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) 2 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::latency": mockedCommandResult{stdout: "TEST::latency\nlatency OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) testTable(0) testTableEntry(1) 3 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TEST::hostname": mockedCommandResult{stdout: "TEST::hostname\nhostname OBJECT-TYPE\n -- FROM\tTEST\n SYNTAX\tOCTET STRING\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n::= { iso(1) 0 testOID(0) 1 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00IF-MIB::ifPhysAddress.1": mockedCommandResult{stdout: "IF-MIB::ifPhysAddress.1\nifPhysAddress OBJECT-TYPE\n -- FROM\tIF-MIB\n -- TEXTUAL CONVENTION PhysAddress\n SYNTAX\tOCTET STRING\n DISPLAY-HINT\t\"1x:\"\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n DESCRIPTION\t\"The interface's address at its protocol sub-layer. For\n example, for an 802.x interface, this object normally\n contains a MAC address. The interface's media-specific MIB\n must define the bit and byte ordering and the format of the\n value of this object. For interfaces which do not have such\n an address (e.g., a serial line), this object should contain\n an octet string of zero length.\"\n::= { iso(1) org(3) dod(6) internet(1) mgmt(2) mib-2(1) interfaces(2) ifTable(2) ifEntry(1) ifPhysAddress(6) 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00BRIDGE-MIB::dot1dTpFdbAddress.1": mockedCommandResult{stdout: "BRIDGE-MIB::dot1dTpFdbAddress.1\ndot1dTpFdbAddress OBJECT-TYPE\n -- FROM\tBRIDGE-MIB\n -- TEXTUAL CONVENTION MacAddress\n SYNTAX\tOCTET STRING (6) \n DISPLAY-HINT\t\"1x:\"\n MAX-ACCESS\tread-only\n STATUS\tcurrent\n DESCRIPTION\t\"A unicast MAC address for which the bridge has\n forwarding and/or filtering information.\"\n::= { iso(1) org(3) dod(6) internet(1) mgmt(2) mib-2(1) dot1dBridge(17) dot1dTp(4) dot1dTpFdbTable(3) dot1dTpFdbEntry(1) dot1dTpFdbAddress(1) 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00-Ob\x00TCP-MIB::tcpConnectionLocalAddress.1": mockedCommandResult{stdout: "TCP-MIB::tcpConnectionLocalAddress.1\ntcpConnectionLocalAddress OBJECT-TYPE\n -- FROM\tTCP-MIB\n -- TEXTUAL CONVENTION InetAddress\n SYNTAX\tOCTET STRING (0..255) \n MAX-ACCESS\tnot-accessible\n STATUS\tcurrent\n DESCRIPTION\t\"The local IP address for this TCP connection. The type\n of this address is determined by the value of\n tcpConnectionLocalAddressType.\n\n As this object is used in the index for the\n tcpConnectionTable, implementors should be\n careful not to create entries that would result in OIDs\n with more than 128 subidentifiers; otherwise the information\n cannot be accessed by using SNMPv1, SNMPv2c, or SNMPv3.\"\n::= { iso(1) org(3) dod(6) internet(1) mgmt(2) mib-2(1) tcp(6) tcpConnectionTable(19) tcpConnectionEntry(1) tcpConnectionLocalAddress(2) 1 }\n", stderr: "", exitError: false},
"snmptranslate\x00-Td\x00TEST::testTable.1": mockedCommandResult{stdout: "TEST::testTableEntry\ntestTableEntry OBJECT-TYPE\n -- FROM\tTEST\n MAX-ACCESS\tnot-accessible\n STATUS\tcurrent\n INDEX\t\t{ server }\n::= { iso(1) 0 testOID(0) testTable(0) 1 }\n", stderr: "", exitError: false},
"snmptable\x00-Ch\x00-Cl\x00-c\x00public\x00127.0.0.1\x00TEST::testTable": mockedCommandResult{stdout: "server connections latency \nTEST::testTable: No entries\n", stderr: "", exitError: false},
}

View File

@ -1,11 +1,9 @@
//go:generate go run -tags generate snmp_mocks_generate.go
package snmp
import (
"fmt"
"net"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"
@ -18,77 +16,6 @@ import (
"github.com/stretchr/testify/require"
)
func mockExecCommand(arg0 string, args ...string) *exec.Cmd {
args = append([]string{"-test.run=TestMockExecCommand", "--", arg0}, args...)
cmd := exec.Command(os.Args[0], args...)
cmd.Stderr = os.Stderr // so the test output shows errors
return cmd
}
func TestMockExecCommand(t *testing.T) {
var cmd []string
for _, arg := range os.Args {
if string(arg) == "--" {
cmd = []string{}
continue
}
if cmd == nil {
continue
}
cmd = append(cmd, string(arg))
}
if cmd == nil {
return
}
// will not properly handle args with spaces, but it's good enough
cmdStr := strings.Join(cmd, " ")
switch cmdStr {
case "snmptranslate -m all .1.0.0.0":
fmt.Printf("TEST::testTable\n")
case "snmptranslate -m all .1.0.0.0.1.1":
fmt.Printf("server\n")
case "snmptranslate -m all .1.0.0.0.1.1.0":
fmt.Printf("server.0\n")
case "snmptranslate -m all .1.0.0.1.1":
fmt.Printf("hostname\n")
case "snmptranslate -m all .999":
fmt.Printf(".999\n")
case "snmptranslate -m all -On TEST::testTable":
fmt.Printf(".1.0.0.0\n")
case "snmptranslate -m all -On TEST::hostname":
fmt.Printf(".1.0.0.1.1\n")
case "snmptranslate -m all -On TEST::server":
fmt.Printf(".1.0.0.0.1.1\n")
case "snmptranslate -m all -On TEST::connections":
fmt.Printf(".1.0.0.0.1.2\n")
case "snmptranslate -m all -On TEST::latency":
fmt.Printf(".1.0.0.0.1.3\n")
case "snmptranslate -m all -On TEST::server.0":
fmt.Printf(".1.0.0.0.1.1.0\n")
case "snmptranslate -m all -Td .1.0.0.0.1":
fmt.Printf(`TEST::testTableEntry
testTableEntry OBJECT-TYPE
-- FROM TEST
MAX-ACCESS not-accessible
STATUS current
INDEX { server }
::= { iso(1) 2 testOID(3) testTable(0) 1 }
`)
case "snmptable -m all -Ch -Cl -c public 127.0.0.1 .1.0.0.0":
fmt.Printf(`server connections latency
TEST::testTable: No entries
`)
default:
fmt.Fprintf(os.Stderr, "Command not mocked: `%s`\n", cmdStr)
// you get the expected output by running the missing command with `-M testdata` in the plugin directory.
os.Exit(1)
}
os.Exit(0)
}
func init() {
execCommand = mockExecCommand
}
type testSNMPConnection struct {
host string
values map[string]interface{}
@ -133,18 +60,20 @@ func (tsc *testSNMPConnection) Walk(oid string, wf gosnmp.WalkFunc) error {
var tsc = &testSNMPConnection{
host: "tsc",
values: map[string]interface{}{
".1.0.0.0.1.1.0": "foo",
".1.0.0.0.1.1.1": []byte("bar"),
".1.0.0.0.1.102": "bad",
".1.0.0.0.1.2.0": 1,
".1.0.0.0.1.2.1": 2,
".1.0.0.0.1.3.0": "0.123",
".1.0.0.0.1.3.1": "0.456",
".1.0.0.0.1.3.2": "9.999",
".1.0.0.0.1.4.0": 123456,
".1.0.0.1.1": "baz",
".1.0.0.1.2": 234,
".1.0.0.1.3": []byte("byte slice"),
".1.0.0.0.1.1.0": "foo",
".1.0.0.0.1.1.1": []byte("bar"),
".1.0.0.0.1.102": "bad",
".1.0.0.0.1.2.0": 1,
".1.0.0.0.1.2.1": 2,
".1.0.0.0.1.3.0": "0.123",
".1.0.0.0.1.3.1": "0.456",
".1.0.0.0.1.3.2": "9.999",
".1.0.0.0.1.4.0": 123456,
".1.0.0.1.1": "baz",
".1.0.0.1.2": 234,
".1.0.0.1.3": []byte("byte slice"),
".1.0.0.2.1.5.0.9.9": 11,
".1.0.0.2.1.5.1.9.9": 22,
},
}
@ -162,7 +91,8 @@ func TestSampleConfig(t *testing.T) {
Timeout: internal.Duration{Duration: 5 * time.Second},
Version: 2,
Community: "public",
MaxRepetitions: 50,
MaxRepetitions: 10,
Retries: 3,
Name: "system",
Fields: []Field{
@ -191,27 +121,33 @@ func TestSampleConfig(t *testing.T) {
func TestFieldInit(t *testing.T) {
translations := []struct {
inputOid string
inputName string
expectedOid string
expectedName string
inputOid string
inputName string
inputConversion string
expectedOid string
expectedName string
expectedConversion string
}{
{".1.0.0.0.1.1", "", ".1.0.0.0.1.1", "server"},
{".1.0.0.0.1.1.0", "", ".1.0.0.0.1.1.0", "server.0"},
{".999", "", ".999", ".999"},
{"TEST::server", "", ".1.0.0.0.1.1", "server"},
{"TEST::server.0", "", ".1.0.0.0.1.1.0", "server.0"},
{"TEST::server", "foo", ".1.0.0.0.1.1", "foo"},
{".1.0.0.0.1.1", "", "", ".1.0.0.0.1.1", "server", ""},
{".1.0.0.0.1.1.0", "", "", ".1.0.0.0.1.1.0", "server.0", ""},
{".999", "", "", ".999", ".999", ""},
{"TEST::server", "", "", ".1.0.0.0.1.1", "server", ""},
{"TEST::server.0", "", "", ".1.0.0.0.1.1.0", "server.0", ""},
{"TEST::server", "foo", "", ".1.0.0.0.1.1", "foo", ""},
{"IF-MIB::ifPhysAddress.1", "", "", ".1.3.6.1.2.1.2.2.1.6.1", "ifPhysAddress.1", "hwaddr"},
{"IF-MIB::ifPhysAddress.1", "", "none", ".1.3.6.1.2.1.2.2.1.6.1", "ifPhysAddress.1", "none"},
{"BRIDGE-MIB::dot1dTpFdbAddress.1", "", "", ".1.3.6.1.2.1.17.4.3.1.1.1", "dot1dTpFdbAddress.1", "hwaddr"},
{"TCP-MIB::tcpConnectionLocalAddress.1", "", "", ".1.3.6.1.2.1.6.19.1.2.1", "tcpConnectionLocalAddress.1", "ipaddr"},
}
for _, txl := range translations {
f := Field{Oid: txl.inputOid, Name: txl.inputName}
f := Field{Oid: txl.inputOid, Name: txl.inputName, Conversion: txl.inputConversion}
err := f.init()
if !assert.NoError(t, err, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName) {
continue
}
assert.Equal(t, txl.expectedOid, f.Oid, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName)
assert.Equal(t, txl.expectedName, f.Name, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName)
assert.Equal(t, txl.expectedOid, f.Oid, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion)
assert.Equal(t, txl.expectedName, f.Name, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion)
}
}
@ -302,7 +238,7 @@ func TestGetSNMPConnection_v3(t *testing.T) {
assert.Equal(t, gs.Version, gosnmp.Version3)
sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters)
assert.Equal(t, "1.2.3.4", gsc.Host())
assert.Equal(t, 20, gs.MaxRepetitions)
assert.EqualValues(t, 20, gs.MaxRepetitions)
assert.Equal(t, "mycontext", gs.ContextName)
assert.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv)
assert.Equal(t, "myuser", sp.UserName)
@ -437,6 +373,11 @@ func TestTableBuild_walk(t *testing.T) {
Oid: ".1.0.0.0.1.3",
Conversion: "float",
},
{
Name: "myfield4",
Oid: ".1.0.0.2.1.5",
OidIndexSuffix: ".9.9",
},
},
}
@ -445,12 +386,20 @@ func TestTableBuild_walk(t *testing.T) {
assert.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{
Tags: map[string]string{"myfield1": "foo"},
Fields: map[string]interface{}{"myfield2": 1, "myfield3": float64(0.123)},
Tags: map[string]string{"myfield1": "foo"},
Fields: map[string]interface{}{
"myfield2": 1,
"myfield3": float64(0.123),
"myfield4": 11,
},
}
rtr2 := RTableRow{
Tags: map[string]string{"myfield1": "bar"},
Fields: map[string]interface{}{"myfield2": 2, "myfield3": float64(0.456)},
Tags: map[string]string{"myfield1": "bar"},
Fields: map[string]interface{}{
"myfield2": 2,
"myfield3": float64(0.456),
"myfield4": 22,
},
}
assert.Len(t, tb.Rows, 2)
assert.Contains(t, tb.Rows, rtr1)
@ -619,10 +568,18 @@ func TestFieldConvert(t *testing.T) {
{uint16(123), "int", int64(123)},
{uint32(123), "int", int64(123)},
{uint64(123), "int", int64(123)},
{[]byte("abcdef"), "hwaddr", "61:62:63:64:65:66"},
{"abcdef", "hwaddr", "61:62:63:64:65:66"},
{[]byte("abcd"), "ipaddr", "97.98.99.100"},
{"abcd", "ipaddr", "97.98.99.100"},
{[]byte("abcdefghijklmnop"), "ipaddr", "6162:6364:6566:6768:696a:6b6c:6d6e:6f70"},
}
for _, tc := range testTable {
act := fieldConvert(tc.conv, tc.input)
act, err := fieldConvert(tc.conv, tc.input)
if !assert.NoError(t, err, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected) {
continue
}
assert.EqualValues(t, tc.expected, act, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected)
}
}

View File

@ -296,7 +296,7 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
data, err := ioutil.ReadFile(s.SnmptranslateFile)
if err != nil {
log.Printf("Reading SNMPtranslate file error: %s", err)
log.Printf("E! Reading SNMPtranslate file error: %s", err)
return err
} else {
for _, line := range strings.Split(string(data), "\n") {
@ -394,16 +394,16 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error {
// only if len(s.OidInstanceMapping) == 0
if len(host.OidInstanceMapping) >= 0 {
if err := host.SNMPMap(acc, s.nameToOid, s.subTableMap); err != nil {
log.Printf("SNMP Mapping error for host '%s': %s", host.Address, err)
log.Printf("E! SNMP Mapping error for host '%s': %s", host.Address, err)
continue
}
}
// Launch Get requests
if err := host.SNMPGet(acc, s.initNode); err != nil {
log.Printf("SNMP Error for host '%s': %s", host.Address, err)
log.Printf("E! SNMP Error for host '%s': %s", host.Address, err)
}
if err := host.SNMPBulk(acc, s.initNode); err != nil {
log.Printf("SNMP Error for host '%s': %s", host.Address, err)
log.Printf("E! SNMP Error for host '%s': %s", host.Address, err)
}
}
return nil
@ -800,7 +800,7 @@ func (h *Host) HandleResponse(
acc.AddFields(field_name, fields, tags)
case gosnmp.NoSuchObject, gosnmp.NoSuchInstance:
// Oid not found
log.Printf("[snmp input] Oid not found: %s", oid_key)
log.Printf("E! [snmp input] Oid not found: %s", oid_key)
default:
// delete other data
}

View File

@ -626,26 +626,22 @@ const sqlDatabaseIO string = `SET NOCOUNT ON;
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
DECLARE @secondsBetween tinyint = 5;
DECLARE @delayInterval char(8) = CONVERT(Char(8), DATEADD(SECOND, @secondsBetween, '00:00:00'), 108);
IF OBJECT_ID('tempdb..#baseline') IS NOT NULL
DROP TABLE #baseline;
IF OBJECT_ID('tempdb..#baselinewritten') IS NOT NULL
DROP TABLE #baselinewritten;
SELECT DB_NAME(mf.database_id) AS databaseName ,
mf.physical_name,
divfs.num_of_bytes_read,
divfs.num_of_bytes_written,
divfs.num_of_reads,
divfs.num_of_writes,
GETDATE() AS baselineDate
GETDATE() AS baselinedate
INTO #baseline
FROM sys.dm_io_virtual_file_stats(NULL, NULL) AS divfs
INNER JOIN sys.master_files AS mf ON mf.database_id = divfs.database_id
AND mf.file_id = divfs.file_id
WAITFOR DELAY @delayInterval;
;WITH currentLine AS
(
SELECT DB_NAME(mf.database_id) AS databaseName ,
@ -655,12 +651,11 @@ WAITFOR DELAY @delayInterval;
divfs.num_of_bytes_written,
divfs.num_of_reads,
divfs.num_of_writes,
GETDATE() AS currentlineDate
GETDATE() AS currentlinedate
FROM sys.dm_io_virtual_file_stats(NULL, NULL) AS divfs
INNER JOIN sys.master_files AS mf ON mf.database_id = divfs.database_id
AND mf.file_id = divfs.file_id
)
SELECT database_name
, datafile_type
, num_of_bytes_read_persec = SUM(num_of_bytes_read_persec)
@ -673,23 +668,21 @@ FROM
SELECT
database_name = currentLine.databaseName
, datafile_type = type_desc
, num_of_bytes_read_persec = (currentLine.num_of_bytes_read - T1.num_of_bytes_read) / (DATEDIFF(SECOND,baseLineDate,currentLineDate))
, num_of_bytes_written_persec = (currentLine.num_of_bytes_written - T1.num_of_bytes_written) / (DATEDIFF(SECOND,baseLineDate,currentLineDate))
, num_of_reads_persec = (currentLine.num_of_reads - T1.num_of_reads) / (DATEDIFF(SECOND,baseLineDate,currentLineDate))
, num_of_writes_persec = (currentLine.num_of_writes - T1.num_of_writes) / (DATEDIFF(SECOND,baseLineDate,currentLineDate))
, num_of_bytes_read_persec = (currentLine.num_of_bytes_read - T1.num_of_bytes_read) / (DATEDIFF(SECOND,baselinedate,currentlinedate))
, num_of_bytes_written_persec = (currentLine.num_of_bytes_written - T1.num_of_bytes_written) / (DATEDIFF(SECOND,baselinedate,currentlinedate))
, num_of_reads_persec = (currentLine.num_of_reads - T1.num_of_reads) / (DATEDIFF(SECOND,baselinedate,currentlinedate))
, num_of_writes_persec = (currentLine.num_of_writes - T1.num_of_writes) / (DATEDIFF(SECOND,baselinedate,currentlinedate))
FROM currentLine
INNER JOIN #baseline T1 ON T1.databaseName = currentLine.databaseName
AND T1.physical_name = currentLine.physical_name
) as T
GROUP BY database_name, datafile_type
DECLARE @DynamicPivotQuery AS NVARCHAR(MAX)
DECLARE @ColumnName AS NVARCHAR(MAX), @ColumnName2 AS NVARCHAR(MAX)
SELECT @ColumnName = ISNULL(@ColumnName + ',','') + QUOTENAME(database_name)
FROM (SELECT DISTINCT database_name FROM #baselinewritten) AS bl
SELECT @ColumnName2 = ISNULL(@ColumnName2 + '+','') + QUOTENAME(database_name)
FROM (SELECT DISTINCT database_name FROM #baselinewritten) AS bl
SET @DynamicPivotQuery = N'
SELECT measurement = ''Log writes (bytes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
@ -699,9 +692,7 @@ FROM #baselinewritten
WHERE datafile_type = ''LOG''
) as V
PIVOT(SUM(num_of_bytes_written_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Rows writes (bytes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -710,9 +701,7 @@ FROM #baselinewritten
WHERE datafile_type = ''ROWS''
) as V
PIVOT(SUM(num_of_bytes_written_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Log reads (bytes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -721,9 +710,7 @@ FROM #baselinewritten
WHERE datafile_type = ''LOG''
) as V
PIVOT(SUM(num_of_bytes_read_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Rows reads (bytes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -732,9 +719,7 @@ FROM #baselinewritten
WHERE datafile_type = ''ROWS''
) as V
PIVOT(SUM(num_of_bytes_read_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Log (writes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -743,9 +728,7 @@ FROM #baselinewritten
WHERE datafile_type = ''LOG''
) as V
PIVOT(SUM(num_of_writes_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Rows (writes/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -754,9 +737,7 @@ FROM #baselinewritten
WHERE datafile_type = ''ROWS''
) as V
PIVOT(SUM(num_of_writes_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTabl
UNION ALL
SELECT measurement = ''Log (reads/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -765,9 +746,7 @@ FROM #baselinewritten
WHERE datafile_type = ''LOG''
) as V
PIVOT(SUM(num_of_reads_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
UNION ALL
SELECT measurement = ''Rows (reads/sec)'', servername = REPLACE(@@SERVERNAME, ''\'', '':''), type = ''Database IO''
, ' + @ColumnName + ', Total = ' + @ColumnName2 + ' FROM
(
@ -777,7 +756,6 @@ WHERE datafile_type = ''ROWS''
) as V
PIVOT(SUM(num_of_reads_persec) FOR database_name IN (' + @ColumnName + ')) AS PVTTable
'
EXEC sp_executesql @DynamicPivotQuery;
`

View File

@ -24,10 +24,11 @@ const (
defaultFieldName = "value"
defaultSeparator = "_"
defaultSeparator = "_"
defaultAllowPendingMessage = 10000
)
var dropwarn = "ERROR: statsd message queue full. " +
var dropwarn = "E! Error: statsd message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
@ -250,7 +251,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
}
if s.ConvertNames {
log.Printf("WARNING statsd: convert_names config option is deprecated," +
log.Printf("I! WARNING statsd: convert_names config option is deprecated," +
" please use metric_separator instead")
}
@ -263,7 +264,7 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
go s.udpListen()
// Start the line parser
go s.parser()
log.Printf("Started the statsd service on %s\n", s.ServiceAddress)
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
prevInstance = s
return nil
}
@ -277,7 +278,7 @@ func (s *Statsd) udpListen() error {
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String())
buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
@ -287,7 +288,7 @@ func (s *Statsd) udpListen() error {
default:
n, _, err := s.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error())
log.Printf("E! Error READ: %s\n", err.Error())
continue
}
bufCopy := make([]byte, n)
@ -297,7 +298,7 @@ func (s *Statsd) udpListen() error {
case s.in <- bufCopy:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
log.Printf(dropwarn, s.drops)
}
}
@ -373,7 +374,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Validate splitting the line on ":"
bits := strings.Split(line, ":")
if len(bits) < 2 {
log.Printf("Error: splitting ':', Unable to parse metric: %s\n", line)
log.Printf("E! Error: splitting ':', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
}
@ -389,11 +390,11 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Validate splitting the bit on "|"
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
log.Printf("Error: splitting '|', Unable to parse metric: %s\n", line)
log.Printf("E! Error: splitting '|', Unable to parse metric: %s\n", line)
return errors.New("Error Parsing statsd line")
} else if len(pipesplit) > 2 {
sr := pipesplit[2]
errmsg := "Error: parsing sample rate, %s, it must be in format like: " +
errmsg := "E! Error: parsing sample rate, %s, it must be in format like: " +
"@0.1, @0.5, etc. Ignoring sample rate for line: %s\n"
if strings.Contains(sr, "@") && len(sr) > 1 {
samplerate, err := strconv.ParseFloat(sr[1:], 64)
@ -413,14 +414,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
case "g", "c", "s", "ms", "h":
m.mtype = pipesplit[1]
default:
log.Printf("Error: Statsd Metric type %s unsupported", pipesplit[1])
log.Printf("E! Error: Statsd Metric type %s unsupported", pipesplit[1])
return errors.New("Error Parsing statsd line")
}
// Parse the value
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
if m.mtype != "g" {
log.Printf("Error: +- values are only supported for gauges: %s\n", line)
log.Printf("E! Error: +- values are only supported for gauges: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.additive = true
@ -430,7 +431,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
case "g", "ms", "h":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
log.Printf("Error: parsing value to float64: %s\n", line)
log.Printf("E! Error: parsing value to float64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
m.floatvalue = v
@ -440,7 +441,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
if err != nil {
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
if err2 != nil {
log.Printf("Error: parsing value to int64: %s\n", line)
log.Printf("E! Error: parsing value to int64: %s\n", line)
return errors.New("Error Parsing statsd line")
}
v = int64(v2)
@ -640,7 +641,7 @@ func (s *Statsd) aggregate(m metric) {
func (s *Statsd) Stop() {
s.Lock()
defer s.Unlock()
log.Println("Stopping the statsd service")
log.Println("I! Stopping the statsd service")
close(s.done)
s.listener.Close()
s.wg.Wait()
@ -650,7 +651,8 @@ func (s *Statsd) Stop() {
func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
MetricSeparator: "_",
MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage,
}
})
}

View File

@ -203,7 +203,7 @@ func (s *Sysstat) collect() error {
out, err := internal.CombinedOutputTimeout(cmd, time.Second*time.Duration(collectInterval+parseInterval))
if err != nil {
if err := os.Remove(s.tmpFile); err != nil {
log.Printf("failed to remove tmp file after %s command: %s", strings.Join(cmd.Args, " "), err)
log.Printf("E! failed to remove tmp file after %s command: %s", strings.Join(cmd.Args, " "), err)
}
return fmt.Errorf("failed to run command %s: %s - %s", strings.Join(cmd.Args, " "), err, string(out))
}

View File

@ -118,7 +118,7 @@ func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
case '?':
fields["unknown"] = fields["unknown"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] from ps",
log.Printf("I! processes: Unknown state [ %s ] from ps",
string(status[0]))
}
fields["total"] = fields["total"].(int64) + int64(1)
@ -169,14 +169,14 @@ func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
case 'W':
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
log.Printf("I! processes: Unknown state [ %s ] in file %s",
string(stats[0][0]), filename)
}
fields["total"] = fields["total"].(int64) + int64(1)
threads, err := strconv.Atoi(string(stats[17]))
if err != nil {
log.Printf("processes: Error parsing thread count: %s", err)
log.Printf("I! processes: Error parsing thread count: %s", err)
continue
}
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)

View File

@ -81,7 +81,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for _, filepath := range t.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
}
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
@ -118,7 +118,7 @@ func (t *Tail) receiver(tailer *tail.Tail) {
var line *tail.Line
for line = range tailer.Lines {
if line.Err != nil {
log.Printf("ERROR tailing file %s, Error: %s\n",
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err)
continue
}
@ -126,7 +126,7 @@ func (t *Tail) receiver(tailer *tail.Tail) {
if err == nil {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
} else {
log.Printf("Malformed log line in %s: [%s], Error: %s\n",
log.Printf("E! Malformed log line in %s: [%s], Error: %s\n",
tailer.Filename, line.Text, err)
}
}
@ -139,7 +139,7 @@ func (t *Tail) Stop() {
for _, t := range t.tailers {
err := t.Stop()
if err != nil {
log.Printf("ERROR stopping tail on file %s\n", t.Filename)
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
}
t.Cleanup()
}

View File

@ -43,11 +43,11 @@ type TcpListener struct {
acc telegraf.Accumulator
}
var dropwarn = "ERROR: tcp_listener message queue full. " +
var dropwarn = "E! Error: tcp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: tcp_listener has received %d malformed packets" +
var malformedwarn = "E! tcp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = `
@ -108,13 +108,13 @@ func (t *TcpListener) Start(acc telegraf.Accumulator) error {
log.Fatalf("ERROR: ListenUDP - %s", err)
return err
}
log.Println("TCP server listening on: ", t.listener.Addr().String())
log.Println("I! TCP server listening on: ", t.listener.Addr().String())
t.wg.Add(2)
go t.tcpListen()
go t.tcpParser()
log.Printf("Started TCP listener service on %s\n", t.ServiceAddress)
log.Printf("I! Started TCP listener service on %s\n", t.ServiceAddress)
return nil
}
@ -141,7 +141,7 @@ func (t *TcpListener) Stop() {
t.wg.Wait()
close(t.in)
log.Println("Stopped TCP listener service on ", t.ServiceAddress)
log.Println("I! Stopped TCP listener service on ", t.ServiceAddress)
}
// tcpListen listens for incoming TCP connections.
@ -182,8 +182,8 @@ func (t *TcpListener) refuser(conn *net.TCPConn) {
" reached, closing.\nYou may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n", t.MaxTCPConnections)
conn.Close()
log.Printf("Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("WARNING: Maximum TCP Connections reached, you may want to" +
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}

View File

@ -42,11 +42,11 @@ type UdpListener struct {
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_MAX_PACKET_SIZE int = 64 * 1024
var dropwarn = "ERROR: udp_listener message queue full. " +
var dropwarn = "E! Error: udp_listener message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"
var malformedwarn = "WARNING: udp_listener has received %d malformed packets" +
var malformedwarn = "E! udp_listener has received %d malformed packets" +
" thus far."
const sampleConfig = `
@ -94,7 +94,7 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
go u.udpListen()
go u.udpParser()
log.Printf("Started UDP listener service on %s\n", u.ServiceAddress)
log.Printf("I! Started UDP listener service on %s\n", u.ServiceAddress)
return nil
}
@ -105,7 +105,7 @@ func (u *UdpListener) Stop() {
u.wg.Wait()
u.listener.Close()
close(u.in)
log.Println("Stopped UDP listener service on ", u.ServiceAddress)
log.Println("I! Stopped UDP listener service on ", u.ServiceAddress)
}
func (u *UdpListener) udpListen() error {
@ -116,7 +116,7 @@ func (u *UdpListener) udpListen() error {
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
@ -129,7 +129,7 @@ func (u *UdpListener) udpListen() error {
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
} else {
log.Printf("ERROR: %s\n", err.Error())
log.Printf("E! Error: %s\n", err.Error())
}
continue
}

View File

@ -146,7 +146,9 @@ func (s *Varnish) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("varnish", func() telegraf.Input {
return &Varnish{
run: varnishRunner,
run: varnishRunner,
Stats: defaultStats,
Binary: defaultBinary,
}
})
}

View File

@ -19,7 +19,7 @@ type FilestackWebhook struct {
func (fs *FilestackWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(fs.Path, fs.eventHandler).Methods("POST")
log.Printf("Started the webhooks_filestack on %s\n", fs.Path)
log.Printf("I! Started the webhooks_filestack on %s\n", fs.Path)
fs.acc = acc
}

View File

@ -17,7 +17,7 @@ type GithubWebhook struct {
func (gh *GithubWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST")
log.Printf("Started the webhooks_github on %s\n", gh.Path)
log.Printf("I! Started the webhooks_github on %s\n", gh.Path)
gh.acc = acc
}
@ -58,7 +58,7 @@ func (e *newEventError) Error() string {
}
func NewEvent(data []byte, name string) (Event, error) {
log.Printf("New %v event received", name)
log.Printf("D! New %v event received", name)
switch name {
case "commit_comment":
return generateEvent(data, &CommitCommentEvent{})

View File

@ -21,7 +21,7 @@ func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator
router.HandleFunc(md.Path, md.returnOK).Methods("HEAD")
router.HandleFunc(md.Path, md.eventHandler).Methods("POST")
log.Printf("Started the webhooks_mandrill on %s\n", md.Path)
log.Printf("I! Started the webhooks_mandrill on %s\n", md.Path)
md.acc = acc
}

View File

@ -19,7 +19,7 @@ type RollbarWebhook struct {
func (rb *RollbarWebhook) Register(router *mux.Router, acc telegraf.Accumulator) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
log.Printf("Started the webhooks_rollbar on %s\n", rb.Path)
log.Printf("I! Started the webhooks_rollbar on %s\n", rb.Path)
rb.acc = acc
}

View File

@ -73,7 +73,7 @@ func (wb *Webhooks) Listen(acc telegraf.Accumulator) {
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r)
if err != nil {
log.Printf("Error starting server: %v", err)
log.Printf("E! Error starting server: %v", err)
}
}
@ -100,10 +100,10 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook {
func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
go wb.Listen(acc)
log.Printf("Started the webhooks service on %s\n", wb.ServiceAddress)
log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress)
return nil
}
func (rb *Webhooks) Stop() {
log.Println("Stopping the Webhooks service")
log.Println("I! Stopping the Webhooks service")
}

View File

@ -73,7 +73,7 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
metricCounter++
}
} else {
log.Printf("unable to build Metric for %s, skipping\n", m.Name())
log.Printf("I! unable to build Metric for %s, skipping\n", m.Name())
}
}

View File

@ -153,10 +153,10 @@ func (q *AMQP) Connect() error {
}
q.channel = channel
go func() {
log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("Trying to reconnect")
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("I! Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println(err)
log.Println("E! ", err.Error())
time.Sleep(10 * time.Second)
}

View File

@ -80,7 +80,7 @@ func (c *CloudWatch) Connect() error {
_, err := svc.ListMetrics(params) // Try a read-only call to test connection.
if err != nil {
log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
log.Printf("E! cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
}
c.svc = svc
@ -131,7 +131,7 @@ func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error {
_, err := c.svc.PutMetricData(params)
if err != nil {
log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
log.Printf("E! CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
}
return err

View File

@ -92,7 +92,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
metricCounter++
}
} else {
log.Printf("unable to build Metric for %s, skipping\n", m.Name())
log.Printf("I! unable to build Metric for %s, skipping\n", m.Name())
}
}

View File

@ -85,7 +85,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
gMetrics, err := s.Serialize(metric)
if err != nil {
log.Printf("Error serializing some metrics to graphite: %s", err.Error())
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
bp = append(bp, gMetrics...)
}
@ -102,7 +102,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error
log.Println("ERROR: " + e.Error())
log.Println("E! Graphite Error: " + e.Error())
// Let's try the next one
} else {
// Success

View File

@ -130,7 +130,7 @@ func (i *InfluxDB) Connect() error {
err = createDatabase(c, i.Database)
if err != nil {
log.Println("Database creation failed: " + err.Error())
log.Println("E! Database creation failed: " + err.Error())
continue
}
@ -201,11 +201,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
// Log write failure
log.Printf("ERROR: %s", e)
log.Printf("E! InfluxDB Output Error: %s", e)
// If the database was not found, try to recreate it
if strings.Contains(e.Error(), "database not found") {
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
log.Printf("ERROR: Database %s not found and failed to recreate\n",
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
}
}

View File

@ -119,7 +119,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
stats, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("Error serializing a metric to Instrumental: %s", err)
log.Printf("E! Error serializing a metric to Instrumental: %s", err)
}
switch metricType {
@ -144,7 +144,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
if !ValueIncludesBadChar.MatchString(value) {
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time))
} else if i.Debug {
log.Printf("Unable to send bad stat: %s", stat)
log.Printf("E! Instrumental unable to send bad stat: %s", stat)
}
}
}
@ -152,9 +152,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
allPoints := strings.Join(points, "\n") + "\n"
_, err = fmt.Fprintf(i.conn, allPoints)
if i.Debug {
log.Println(allPoints)
}
log.Println("D! Instrumental: " + allPoints)
if err != nil {
if err == io.EOF {

View File

@ -83,7 +83,7 @@ func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region)
log.Printf("E! kinesis: Establishing a connection to Kinesis in %+v", k.Region)
}
credentialConfig := &internalaws.CredentialConfig{
@ -105,17 +105,17 @@ func (k *KinesisOutput) Connect() error {
resp, err := svc.ListStreams(KinesisParams)
if err != nil {
log.Printf("kinesis: Error in ListSteams API call : %+v \n", err)
log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err)
}
if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("kinesis: Stream Exists")
log.Printf("E! kinesis: Stream Exists")
}
k.svc = svc
return nil
} else {
log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
return err
@ -147,14 +147,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
log.Printf("%+v \n", resp)
log.Printf("E! %+v \n", resp)
} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
}
return time.Since(start)
@ -182,7 +182,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
atomic.StoreUint32(&sz, 0)
r = nil
}

View File

@ -103,15 +103,13 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
log.Printf("D! Got a gauge: %v\n", gauge)
}
} else {
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
if l.Debug {
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
}
log.Printf("I! unable to build Gauge for %s, skipping\n", m.Name())
log.Printf("D! Couldn't build gauge: %v\n", err)
}
}
@ -132,9 +130,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
log.Printf("D! Librato request: %v\n", string(metricsBytes))
req, err := http.NewRequest(
"POST",
@ -150,9 +146,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
log.Printf("D! Error POSTing metrics: %v\n", err.Error())
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
@ -160,7 +154,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
log.Printf("D! Couldn't get response! (%v)\n", err)
}
if resp.StatusCode != 200 {
return fmt.Errorf(
@ -168,9 +162,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
log.Printf("D! Librato response: %v\n", string(htmlData))
}
}
@ -226,9 +218,8 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
}
gauges = append(gauges, gauge)
}
if l.Debug {
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
}
log.Printf("D! Built gauges: %v\n", gauges)
return gauges, nil
}

View File

@ -164,9 +164,11 @@ func (o *openTSDBHttp) flush() error {
if resp.StatusCode/100 != 2 {
if resp.StatusCode/100 == 4 {
log.Printf("WARNING: Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode)
log.Printf("E! Received %d status code. Dropping metrics to avoid overflowing buffer.",
resp.StatusCode)
} else {
return fmt.Errorf("Error when sending metrics.Received status %d", resp.StatusCode)
return fmt.Errorf("Error when sending metrics. Received status %d",
resp.StatusCode)
}
}

View File

@ -167,7 +167,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
continue
}
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}

View File

@ -35,6 +35,10 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
switch v := jsonOut[tag].(type) {
case string:
tags[tag] = v
case bool:
tags[tag] = strconv.FormatBool(v)
case float64:
tags[tag] = strconv.FormatFloat(v, 'f', -1, 64)
}
delete(jsonOut, tag)
}