Major Logging Overhaul

in this commit:

- centralize logging output handler.
- set global Info/Debug/Error log levels based on config file or flags.
- remove per-plugin debug arg handling.
- add a I!, D!, or E! to every log message.
- add configuration option to specify where to send logs.

closes #1786
This commit is contained in:
Cameron Sparr
2016-09-30 22:37:56 +01:00
parent 78ced6bc30
commit c7834209d2
52 changed files with 363 additions and 269 deletions

View File

@@ -73,7 +73,7 @@ func (a *Amon) Write(metrics []telegraf.Metric) error {
metricCounter++
}
} else {
log.Printf("unable to build Metric for %s, skipping\n", m.Name())
log.Printf("I! unable to build Metric for %s, skipping\n", m.Name())
}
}

View File

@@ -153,10 +153,10 @@ func (q *AMQP) Connect() error {
}
q.channel = channel
go func() {
log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("Trying to reconnect")
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("I! Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println(err)
log.Println("E! ", err.Error())
time.Sleep(10 * time.Second)
}

View File

@@ -80,7 +80,7 @@ func (c *CloudWatch) Connect() error {
_, err := svc.ListMetrics(params) // Try a read-only call to test connection.
if err != nil {
log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
log.Printf("E! cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
}
c.svc = svc
@@ -131,7 +131,7 @@ func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error {
_, err := c.svc.PutMetricData(params)
if err != nil {
log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
log.Printf("E! CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
}
return err

View File

@@ -92,7 +92,7 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
metricCounter++
}
} else {
log.Printf("unable to build Metric for %s, skipping\n", m.Name())
log.Printf("I! unable to build Metric for %s, skipping\n", m.Name())
}
}

View File

@@ -85,7 +85,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
gMetrics, err := s.Serialize(metric)
if err != nil {
log.Printf("Error serializing some metrics to graphite: %s", err.Error())
log.Printf("E! Error serializing some metrics to graphite: %s", err.Error())
}
bp = append(bp, gMetrics...)
}
@@ -102,7 +102,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
}
if _, e := g.conns[n].Write([]byte(graphitePoints)); e != nil {
// Error
log.Println("ERROR: " + e.Error())
log.Println("E! Graphite Error: " + e.Error())
// Let's try the next one
} else {
// Success

View File

@@ -130,7 +130,7 @@ func (i *InfluxDB) Connect() error {
err = createDatabase(c, i.Database)
if err != nil {
log.Println("Database creation failed: " + err.Error())
log.Println("E! Database creation failed: " + err.Error())
continue
}
@@ -201,11 +201,11 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
for _, n := range p {
if e := i.conns[n].Write(bp); e != nil {
// Log write failure
log.Printf("ERROR: %s", e)
log.Printf("E! InfluxDB Output Error: %s", e)
// If the database was not found, try to recreate it
if strings.Contains(e.Error(), "database not found") {
if errc := createDatabase(i.conns[n], i.Database); errc != nil {
log.Printf("ERROR: Database %s not found and failed to recreate\n",
log.Printf("E! Error: Database %s not found and failed to recreate\n",
i.Database)
}
}

View File

@@ -119,7 +119,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
stats, err := s.Serialize(toSerialize)
if err != nil {
log.Printf("Error serializing a metric to Instrumental: %s", err)
log.Printf("E! Error serializing a metric to Instrumental: %s", err)
}
switch metricType {
@@ -144,7 +144,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
if !ValueIncludesBadChar.MatchString(value) {
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, clean_metric, value, time))
} else if i.Debug {
log.Printf("Unable to send bad stat: %s", stat)
log.Printf("E! Instrumental unable to send bad stat: %s", stat)
}
}
}
@@ -152,9 +152,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
allPoints := strings.Join(points, "\n") + "\n"
_, err = fmt.Fprintf(i.conn, allPoints)
if i.Debug {
log.Println(allPoints)
}
log.Println("D! Instrumental: " + allPoints)
if err != nil {
if err == io.EOF {

View File

@@ -83,7 +83,7 @@ func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region)
log.Printf("E! kinesis: Establishing a connection to Kinesis in %+v", k.Region)
}
credentialConfig := &internalaws.CredentialConfig{
@@ -105,17 +105,17 @@ func (k *KinesisOutput) Connect() error {
resp, err := svc.ListStreams(KinesisParams)
if err != nil {
log.Printf("kinesis: Error in ListSteams API call : %+v \n", err)
log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err)
}
if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("kinesis: Stream Exists")
log.Printf("E! kinesis: Stream Exists")
}
k.svc = svc
return nil
} else {
log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
return err
@@ -147,14 +147,14 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
log.Printf("%+v \n", resp)
log.Printf("E! %+v \n", resp)
} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
log.Printf("E! kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
}
return time.Since(start)
@@ -182,7 +182,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
atomic.StoreUint32(&sz, 0)
r = nil
}

View File

@@ -103,15 +103,13 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
if gauges, err := l.buildGauges(m); err == nil {
for _, gauge := range gauges {
tempGauges = append(tempGauges, gauge)
if l.Debug {
log.Printf("[DEBUG] Got a gauge: %v\n", gauge)
}
log.Printf("D! Got a gauge: %v\n", gauge)
}
} else {
log.Printf("unable to build Gauge for %s, skipping\n", m.Name())
if l.Debug {
log.Printf("[DEBUG] Couldn't build gauge: %v\n", err)
}
log.Printf("I! unable to build Gauge for %s, skipping\n", m.Name())
log.Printf("D! Couldn't build gauge: %v\n", err)
}
}
@@ -132,9 +130,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error())
}
if l.Debug {
log.Printf("[DEBUG] Librato request: %v\n", string(metricsBytes))
}
log.Printf("D! Librato request: %v\n", string(metricsBytes))
req, err := http.NewRequest(
"POST",
@@ -150,9 +146,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
resp, err := l.client.Do(req)
if err != nil {
if l.Debug {
log.Printf("[DEBUG] Error POSTing metrics: %v\n", err.Error())
}
log.Printf("D! Error POSTing metrics: %v\n", err.Error())
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
@@ -160,7 +154,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
if resp.StatusCode != 200 || l.Debug {
htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("[DEBUG] Couldn't get response! (%v)\n", err)
log.Printf("D! Couldn't get response! (%v)\n", err)
}
if resp.StatusCode != 200 {
return fmt.Errorf(
@@ -168,9 +162,7 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
resp.StatusCode,
string(htmlData))
}
if l.Debug {
log.Printf("[DEBUG] Librato response: %v\n", string(htmlData))
}
log.Printf("D! Librato response: %v\n", string(htmlData))
}
}
@@ -226,9 +218,8 @@ func (l *Librato) buildGauges(m telegraf.Metric) ([]*Gauge, error) {
}
gauges = append(gauges, gauge)
}
if l.Debug {
fmt.Printf("[DEBUG] Built gauges: %v\n", gauges)
}
log.Printf("D! Built gauges: %v\n", gauges)
return gauges, nil
}

View File

@@ -164,9 +164,11 @@ func (o *openTSDBHttp) flush() error {
if resp.StatusCode/100 != 2 {
if resp.StatusCode/100 == 4 {
log.Printf("WARNING: Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode)
log.Printf("E! Received %d status code. Dropping metrics to avoid overflowing buffer.",
resp.StatusCode)
} else {
return fmt.Errorf("Error when sending metrics.Received status %d", resp.StatusCode)
return fmt.Errorf("Error when sending metrics. Received status %d",
resp.StatusCode)
}
}

View File

@@ -167,7 +167,7 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
continue
}
if err != nil {
log.Printf("ERROR creating prometheus metric, "+
log.Printf("E! Error creating prometheus metric, "+
"key: %s, labels: %v,\nerr: %s\n",
mname, l, err.Error())
}