Compare commits

...

18 Commits
1.5.1 ... 1.5.2

Author SHA1 Message Date
Daniel Nelson
67440c95bb Set release date for 1.5.2
(cherry picked from commit 3a85e7b1f0)
2018-01-30 14:02:50 -08:00
Daniel Nelson
39de63d03c Update changelog
(cherry picked from commit 5d87ad85a1)
2018-01-30 14:01:17 -08:00
Daniel Nelson
56edd339e7 Exclude master_replid fields from redis input (#3725)
(cherry picked from commit c28d0e1b16)
2018-01-30 14:01:13 -08:00
Daniel Nelson
df768f83af Update changelog
(cherry picked from commit f9c0aa1e23)
2018-01-25 13:47:39 -08:00
Pierre Tessier
8733d3826a Add timeout to wavefront output write (#3711)
(cherry picked from commit 3e4c91880a)
2018-01-25 13:47:39 -08:00
Daniel Nelson
2bb97154db Update changelog
(cherry picked from commit 899c3a2ae1)
2018-01-22 12:06:30 -08:00
Daniel Nelson
a8d9e458ab Remove graphite serializer replacement of dot with underscore in field key (#3705)
(cherry picked from commit 4558aeddeb)
2018-01-22 12:06:26 -08:00
Daniel Nelson
b464adb08c Update changelog
(cherry picked from commit 36c9113917)
2018-01-22 12:01:31 -08:00
Daniel Nelson
4bd67824ae Avoid loop creation in second processor pass (#3656)
(cherry picked from commit 5270aa451c)
2018-01-22 12:01:24 -08:00
Daniel Nelson
f5894a6a2f Limit wait time for writes in mqtt output (#3699)
(cherry picked from commit 91fc2765b1)
2018-01-22 12:01:20 -08:00
Daniel Nelson
1790b26651 Update changelog
(cherry picked from commit 5bac08662e)
2018-01-18 17:39:22 -08:00
Piotr Popieluch
bb3ee1fd39 Align aggregator period with internal ticker to avoid skipping metrics (#3693)
By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.

(cherry picked from commit 601dc99606)
2018-01-18 17:39:17 -08:00
Daniel Nelson
82df5bf2d8 Update changelog
(cherry picked from commit 0f55d9eba2)
2018-01-17 15:28:52 -08:00
Piotr Popieluch
8b566b2b9f Reconnect before sending graphite metrics if disconnected (#3680)
(cherry picked from commit f374a295d9)
2018-01-17 15:28:52 -08:00
Daniel Nelson
059a751a71 Update changelog
(cherry picked from commit ad921a3840)
2018-01-17 14:39:10 -08:00
Michael Boudreau
dcaa0ca8db Fix index out of bounds error in solr input plugin (#3683)
(cherry picked from commit 9d559292a5)
2018-01-17 14:39:05 -08:00
Daniel Nelson
8777e32d9f Update changelog
(cherry picked from commit 6e24056757)
2018-01-16 13:47:23 -08:00
Noah Crowley
667940afac Ignore empty lines in Graphite plaintext (#3684)
(cherry picked from commit 87830a1c38)
2018-01-16 13:46:58 -08:00
15 changed files with 130 additions and 26 deletions

View File

@@ -1,4 +1,18 @@
## v1.5.1 [2017-01-10]
## v1.5.2 [2018-01-30]
### Bugfixes
- [#3684](https://github.com/influxdata/telegraf/pull/3684): Ignore empty lines in Graphite plaintext.
- [#3604](https://github.com/influxdata/telegraf/issues/3604): Fix index out of bounds error in solr input plugin.
- [#3680](https://github.com/influxdata/telegraf/pull/3680): Reconnect before sending graphite metrics if disconnected.
- [#3693](https://github.com/influxdata/telegraf/pull/3693): Align aggregator period with internal ticker to avoid skipping metrics.
- [#3629](https://github.com/influxdata/telegraf/issues/3629): Fix a potential deadlock when using aggregators.
- [#3697](https://github.com/influxdata/telegraf/issues/3697): Limit wait time for writes in mqtt output.
- [#3698](https://github.com/influxdata/telegraf/issues/3698): Revert change in graphite output where dot in field key was replaced by underscore.
- [#3710](https://github.com/influxdata/telegraf/issues/3710): Add timeout to wavefront output write.
- [#3725](https://github.com/influxdata/telegraf/issues/3725): Exclude master_replid fields from redis input.
## v1.5.1 [2018-01-10]
### Bugfixes

View File

@@ -308,7 +308,13 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
metrics = processor.Apply(metrics...)
}
for _, m := range metrics {
outMetricC <- m
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(m.Copy())
}
}
}
}
}
@@ -364,8 +370,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100)
now := time.Now()
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
@@ -406,7 +410,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
acc := NewAccumulator(agg, aggC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, now, shutdown)
agg.Run(acc, shutdown)
}(aggregator)
}

View File

@@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
// for period ticks to tell it when to push and reset the aggregator.
func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
now time.Time,
shutdown chan struct{},
) {
// The start of the period is truncated to the nearest second.
@@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
// 2nd interval: 00:10 - 00:20.5
// etc.
//
now := time.Now()
r.periodStart = now.Truncate(time.Second)
truncation := now.Sub(r.periodStart)
r.periodEnd = r.periodStart.Add(r.Config.Period)

View File

@@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{}))
go ra.Run(&acc, make(chan struct{}))
m := ra.MakeMetric(
"RITest",
@@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{}))
go ra.Run(&acc, make(chan struct{}))
// metric before current period
m := ra.MakeMetric(
@@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
ra.Run(&acc, time.Now(), shutdown)
ra.Run(&acc, shutdown)
}()
m := ra.MakeMetric(

View File

@@ -88,6 +88,7 @@ Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) a
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- second_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)

View File

@@ -189,6 +189,10 @@ func gatherInfoOutput(
}
}
if strings.HasPrefix(name, "master_replid") {
continue
}
if name == "mem_allocator" {
continue
}

View File

@@ -86,6 +86,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"repl_backlog_size": int64(1048576),
"repl_backlog_first_byte_offset": int64(0),
"repl_backlog_histlen": int64(0),
"second_repl_offset": int64(-1),
"used_cpu_sys": float64(0.14),
"used_cpu_user": float64(0.05),
"used_cpu_sys_children": float64(0.00),
@@ -189,7 +190,10 @@ latest_fork_usec:0
# Replication
role:master
connected_slaves:0
master_replid:8c4d7b768b26826825ceb20ff4a2c7c54616350b
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0

View File

@@ -246,6 +246,9 @@ func addAdminCoresStatusToAcc(acc telegraf.Accumulator, adminCoreStatus *AdminCo
// Add core metrics section to accumulator
func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var coreMetrics map[string]Core
if len(mBeansData.SolrMbeans) < 2 {
return fmt.Errorf("no core metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[1], &coreMetrics); err != nil {
return err
}
@@ -274,9 +277,14 @@ func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBea
func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var queryMetrics map[string]QueryHandler
if len(mBeansData.SolrMbeans) < 4 {
return fmt.Errorf("no query handler metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[3], &queryMetrics); err != nil {
return err
}
for name, metrics := range queryMetrics {
coreFields := map[string]interface{}{
"15min_rate_reqs_per_second": metrics.Stats.One5minRateReqsPerSecond,
@@ -310,6 +318,9 @@ func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansDa
func addUpdateHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var updateMetrics map[string]UpdateHandler
if len(mBeansData.SolrMbeans) < 6 {
return fmt.Errorf("no update handler metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[5], &updateMetrics); err != nil {
return err
}
@@ -364,6 +375,9 @@ func getFloat(unk interface{}) float64 {
// Add cache metrics section to accumulator
func addCacheMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
if len(mBeansData.SolrMbeans) < 8 {
return fmt.Errorf("no cache metric data to unmarshall")
}
var cacheMetrics map[string]Cache
if err := json.Unmarshal(mBeansData.SolrMbeans[7], &cacheMetrics); err != nil {
return err

View File

@@ -60,3 +60,44 @@ func createMockServer() *httptest.Server {
}
}))
}
func TestNoCoreDataHandling(t *testing.T) {
ts := createMockNoCoreDataServer()
solr := NewSolr()
solr.Servers = []string{ts.URL}
var acc testutil.Accumulator
require.NoError(t, solr.Gather(&acc))
acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminMainCoreStatusExpected,
map[string]string{"core": "main"})
acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminCore1StatusExpected,
map[string]string{"core": "core1"})
acc.AssertDoesNotContainMeasurement(t, "solr_core")
acc.AssertDoesNotContainMeasurement(t, "solr_queryhandler")
acc.AssertDoesNotContainMeasurement(t, "solr_updatehandler")
acc.AssertDoesNotContainMeasurement(t, "solr_handler")
}
func createMockNoCoreDataServer() *httptest.Server {
var nodata string
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, statusResponse)
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, nodata)
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, nodata)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintln(w, "nope")
}
}))
}

View File

@@ -155,8 +155,22 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
batch = append(batch, buf...)
}
err = g.send(batch)
// try to reconnect and retry to send
if err != nil {
log.Println("E! Graphite: Reconnecting and retrying: ")
g.Connect()
err = g.send(batch)
}
return err
}
func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n")
err := errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
@@ -167,6 +181,8 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Close explicitely
g.conns[n].Close()
// Let's try the next one
} else {
// Success
@@ -174,11 +190,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
break
}
}
// try to reconnect
if err != nil {
log.Println("E! Reconnecting: ")
g.Connect()
}
return err
}

View File

@@ -81,7 +81,7 @@ func TestGraphiteOK(t *testing.T) {
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
@@ -89,10 +89,8 @@ func TestGraphiteOK(t *testing.T) {
wg2.Add(1)
TCPServer2(t, &wg2)
//Write but expect an error, but reconnect
g.Write(metrics2)
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have failed")
//Actually write the new metrics
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -25,6 +26,9 @@ var sampleConfig = `
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Timeout for write operations. default: 5s
# timeout = "5s"
## client ID, if not set a random ID is generated
# client_id = ""
@@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), false, body)
token.Wait()
token.WaitTimeout(m.Timeout.Duration)
if token.Error() != nil {
return token.Error()
}
@@ -159,6 +163,11 @@ func (m *MQTT) publish(topic string, body []byte) error {
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
if m.Timeout.Duration < time.Second {
m.Timeout.Duration = 5 * time.Second
}
opts.WriteTimeout = m.Timeout.Duration
if m.ClientID != "" {
opts.SetClientID(m.ClientID)
} else {

View File

@@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"time"
)
type Wavefront struct {
@@ -101,13 +102,11 @@ func (w *Wavefront) Connect() error {
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
_, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
log.Printf("Wavefront: TCP address cannot be resolved %s", err.Error())
return nil
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error())
}
connection, err := net.Dial("tcp", uri)
if err != nil {
log.Printf("Wavefront: TCP connect fail %s", err.Error())
return nil
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
return nil
@@ -122,6 +121,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
connection.SetWriteDeadline(time.Now().Add(5 * time.Second))
for _, m := range metrics {
for _, metricPoint := range buildMetrics(m, w) {

View File

@@ -138,8 +138,11 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
metric, err := p.ParseLine(line)
if line == "" {
continue
}
metric, err := p.ParseLine(line)
if err == nil {
metrics = append(metrics, metric)
} else {

View File

@@ -133,7 +133,7 @@ func InsertField(bucket, fieldName string) string {
if fieldName == "value" {
return fieldDeleter.Replace(bucket)
}
return strings.Replace(bucket, "FIELDNAME", strings.Replace(fieldName, ".", "_", -1), 1)
return strings.Replace(bucket, "FIELDNAME", fieldName, 1)
}
func buildTags(tags map[string]string) string {