Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67440c95bb | ||
|
|
39de63d03c | ||
|
|
56edd339e7 | ||
|
|
df768f83af | ||
|
|
8733d3826a | ||
|
|
2bb97154db | ||
|
|
a8d9e458ab | ||
|
|
b464adb08c | ||
|
|
4bd67824ae | ||
|
|
f5894a6a2f | ||
|
|
1790b26651 | ||
|
|
bb3ee1fd39 | ||
|
|
82df5bf2d8 | ||
|
|
8b566b2b9f | ||
|
|
059a751a71 | ||
|
|
dcaa0ca8db | ||
|
|
8777e32d9f | ||
|
|
667940afac |
16
CHANGELOG.md
16
CHANGELOG.md
@@ -1,4 +1,18 @@
|
||||
## v1.5.1 [2017-01-10]
|
||||
## v1.5.2 [2018-01-30]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#3684](https://github.com/influxdata/telegraf/pull/3684): Ignore empty lines in Graphite plaintext.
|
||||
- [#3604](https://github.com/influxdata/telegraf/issues/3604): Fix index out of bounds error in solr input plugin.
|
||||
- [#3680](https://github.com/influxdata/telegraf/pull/3680): Reconnect before sending graphite metrics if disconnected.
|
||||
- [#3693](https://github.com/influxdata/telegraf/pull/3693): Align aggregator period with internal ticker to avoid skipping metrics.
|
||||
- [#3629](https://github.com/influxdata/telegraf/issues/3629): Fix a potential deadlock when using aggregators.
|
||||
- [#3697](https://github.com/influxdata/telegraf/issues/3697): Limit wait time for writes in mqtt output.
|
||||
- [#3698](https://github.com/influxdata/telegraf/issues/3698): Revert change in graphite output where dot in field key was replaced by underscore.
|
||||
- [#3710](https://github.com/influxdata/telegraf/issues/3710): Add timeout to wavefront output write.
|
||||
- [#3725](https://github.com/influxdata/telegraf/issues/3725): Exclude master_replid fields from redis input.
|
||||
|
||||
## v1.5.1 [2018-01-10]
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
||||
@@ -308,7 +308,13 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
|
||||
metrics = processor.Apply(metrics...)
|
||||
}
|
||||
for _, m := range metrics {
|
||||
outMetricC <- m
|
||||
for i, o := range a.Config.Outputs {
|
||||
if i == len(a.Config.Outputs)-1 {
|
||||
o.AddMetric(m)
|
||||
} else {
|
||||
o.AddMetric(m.Copy())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -364,8 +370,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||
metricC := make(chan telegraf.Metric, 100)
|
||||
aggC := make(chan telegraf.Metric, 100)
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// Start all ServicePlugins
|
||||
for _, input := range a.Config.Inputs {
|
||||
input.SetDefaultTags(a.Config.Tags)
|
||||
@@ -406,7 +410,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
|
||||
acc := NewAccumulator(agg, aggC)
|
||||
acc.SetPrecision(a.Config.Agent.Precision.Duration,
|
||||
a.Config.Agent.Interval.Duration)
|
||||
agg.Run(acc, now, shutdown)
|
||||
agg.Run(acc, shutdown)
|
||||
}(aggregator)
|
||||
}
|
||||
|
||||
|
||||
@@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
|
||||
// for period ticks to tell it when to push and reset the aggregator.
|
||||
func (r *RunningAggregator) Run(
|
||||
acc telegraf.Accumulator,
|
||||
now time.Time,
|
||||
shutdown chan struct{},
|
||||
) {
|
||||
// The start of the period is truncated to the nearest second.
|
||||
@@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
|
||||
// 2nd interval: 00:10 - 00:20.5
|
||||
// etc.
|
||||
//
|
||||
now := time.Now()
|
||||
r.periodStart = now.Truncate(time.Second)
|
||||
truncation := now.Sub(r.periodStart)
|
||||
r.periodEnd = r.periodStart.Add(r.Config.Period)
|
||||
|
||||
@@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
|
||||
})
|
||||
assert.NoError(t, ra.Config.Filter.Compile())
|
||||
acc := testutil.Accumulator{}
|
||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
||||
go ra.Run(&acc, make(chan struct{}))
|
||||
|
||||
m := ra.MakeMetric(
|
||||
"RITest",
|
||||
@@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
|
||||
})
|
||||
assert.NoError(t, ra.Config.Filter.Compile())
|
||||
acc := testutil.Accumulator{}
|
||||
go ra.Run(&acc, time.Now(), make(chan struct{}))
|
||||
go ra.Run(&acc, make(chan struct{}))
|
||||
|
||||
// metric before current period
|
||||
m := ra.MakeMetric(
|
||||
@@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ra.Run(&acc, time.Now(), shutdown)
|
||||
ra.Run(&acc, shutdown)
|
||||
}()
|
||||
|
||||
m := ra.MakeMetric(
|
||||
|
||||
@@ -88,6 +88,7 @@ Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) a
|
||||
**Replication**
|
||||
- connected_slaves(int, number)
|
||||
- master_repl_offset(int, number)
|
||||
- second_repl_offset(int, number)
|
||||
- repl_backlog_active(int, number)
|
||||
- repl_backlog_size(int, bytes)
|
||||
- repl_backlog_first_byte_offset(int, number)
|
||||
|
||||
@@ -189,6 +189,10 @@ func gatherInfoOutput(
|
||||
}
|
||||
}
|
||||
|
||||
if strings.HasPrefix(name, "master_replid") {
|
||||
continue
|
||||
}
|
||||
|
||||
if name == "mem_allocator" {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -86,6 +86,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
|
||||
"repl_backlog_size": int64(1048576),
|
||||
"repl_backlog_first_byte_offset": int64(0),
|
||||
"repl_backlog_histlen": int64(0),
|
||||
"second_repl_offset": int64(-1),
|
||||
"used_cpu_sys": float64(0.14),
|
||||
"used_cpu_user": float64(0.05),
|
||||
"used_cpu_sys_children": float64(0.00),
|
||||
@@ -189,7 +190,10 @@ latest_fork_usec:0
|
||||
# Replication
|
||||
role:master
|
||||
connected_slaves:0
|
||||
master_replid:8c4d7b768b26826825ceb20ff4a2c7c54616350b
|
||||
master_replid2:0000000000000000000000000000000000000000
|
||||
master_repl_offset:0
|
||||
second_repl_offset:-1
|
||||
repl_backlog_active:0
|
||||
repl_backlog_size:1048576
|
||||
repl_backlog_first_byte_offset:0
|
||||
|
||||
@@ -246,6 +246,9 @@ func addAdminCoresStatusToAcc(acc telegraf.Accumulator, adminCoreStatus *AdminCo
|
||||
// Add core metrics section to accumulator
|
||||
func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
|
||||
var coreMetrics map[string]Core
|
||||
if len(mBeansData.SolrMbeans) < 2 {
|
||||
return fmt.Errorf("no core metric data to unmarshall")
|
||||
}
|
||||
if err := json.Unmarshal(mBeansData.SolrMbeans[1], &coreMetrics); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -274,9 +277,14 @@ func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBea
|
||||
func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
|
||||
var queryMetrics map[string]QueryHandler
|
||||
|
||||
if len(mBeansData.SolrMbeans) < 4 {
|
||||
return fmt.Errorf("no query handler metric data to unmarshall")
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(mBeansData.SolrMbeans[3], &queryMetrics); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for name, metrics := range queryMetrics {
|
||||
coreFields := map[string]interface{}{
|
||||
"15min_rate_reqs_per_second": metrics.Stats.One5minRateReqsPerSecond,
|
||||
@@ -310,6 +318,9 @@ func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansDa
|
||||
func addUpdateHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
|
||||
var updateMetrics map[string]UpdateHandler
|
||||
|
||||
if len(mBeansData.SolrMbeans) < 6 {
|
||||
return fmt.Errorf("no update handler metric data to unmarshall")
|
||||
}
|
||||
if err := json.Unmarshal(mBeansData.SolrMbeans[5], &updateMetrics); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -364,6 +375,9 @@ func getFloat(unk interface{}) float64 {
|
||||
|
||||
// Add cache metrics section to accumulator
|
||||
func addCacheMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
|
||||
if len(mBeansData.SolrMbeans) < 8 {
|
||||
return fmt.Errorf("no cache metric data to unmarshall")
|
||||
}
|
||||
var cacheMetrics map[string]Cache
|
||||
if err := json.Unmarshal(mBeansData.SolrMbeans[7], &cacheMetrics); err != nil {
|
||||
return err
|
||||
|
||||
@@ -60,3 +60,44 @@ func createMockServer() *httptest.Server {
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func TestNoCoreDataHandling(t *testing.T) {
|
||||
ts := createMockNoCoreDataServer()
|
||||
solr := NewSolr()
|
||||
solr.Servers = []string{ts.URL}
|
||||
var acc testutil.Accumulator
|
||||
require.NoError(t, solr.Gather(&acc))
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "solr_admin",
|
||||
solrAdminMainCoreStatusExpected,
|
||||
map[string]string{"core": "main"})
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "solr_admin",
|
||||
solrAdminCore1StatusExpected,
|
||||
map[string]string{"core": "core1"})
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "solr_core")
|
||||
acc.AssertDoesNotContainMeasurement(t, "solr_queryhandler")
|
||||
acc.AssertDoesNotContainMeasurement(t, "solr_updatehandler")
|
||||
acc.AssertDoesNotContainMeasurement(t, "solr_handler")
|
||||
|
||||
}
|
||||
|
||||
func createMockNoCoreDataServer() *httptest.Server {
|
||||
var nodata string
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, statusResponse)
|
||||
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, nodata)
|
||||
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, nodata)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
fmt.Fprintln(w, "nope")
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -155,8 +155,22 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||
batch = append(batch, buf...)
|
||||
}
|
||||
|
||||
err = g.send(batch)
|
||||
|
||||
// try to reconnect and retry to send
|
||||
if err != nil {
|
||||
log.Println("E! Graphite: Reconnecting and retrying: ")
|
||||
g.Connect()
|
||||
err = g.send(batch)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *Graphite) send(batch []byte) error {
|
||||
// This will get set to nil if a successful write occurs
|
||||
err = errors.New("Could not write to any Graphite server in cluster\n")
|
||||
err := errors.New("Could not write to any Graphite server in cluster\n")
|
||||
|
||||
// Send data to a random server
|
||||
p := rand.Perm(len(g.conns))
|
||||
for _, n := range p {
|
||||
@@ -167,6 +181,8 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||
if _, e := g.conns[n].Write(batch); e != nil {
|
||||
// Error
|
||||
log.Println("E! Graphite Error: " + e.Error())
|
||||
// Close explicitely
|
||||
g.conns[n].Close()
|
||||
// Let's try the next one
|
||||
} else {
|
||||
// Success
|
||||
@@ -174,11 +190,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||
break
|
||||
}
|
||||
}
|
||||
// try to reconnect
|
||||
if err != nil {
|
||||
log.Println("E! Reconnecting: ")
|
||||
g.Connect()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -81,7 +81,7 @@ func TestGraphiteOK(t *testing.T) {
|
||||
err2 := g.Write(metrics)
|
||||
require.NoError(t, err2)
|
||||
|
||||
// Waiting TCPserver
|
||||
// Waiting TCPserver, should reconnect and resend
|
||||
wg.Wait()
|
||||
t.Log("Finished Waiting for first data")
|
||||
var wg2 sync.WaitGroup
|
||||
@@ -89,10 +89,8 @@ func TestGraphiteOK(t *testing.T) {
|
||||
wg2.Add(1)
|
||||
TCPServer2(t, &wg2)
|
||||
//Write but expect an error, but reconnect
|
||||
g.Write(metrics2)
|
||||
err3 := g.Write(metrics2)
|
||||
t.Log("Finished writing second data, it should have failed")
|
||||
//Actually write the new metrics
|
||||
t.Log("Finished writing second data, it should have reconnected automatically")
|
||||
|
||||
require.NoError(t, err3)
|
||||
t.Log("Finished writing third data")
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
@@ -25,6 +26,9 @@ var sampleConfig = `
|
||||
# username = "telegraf"
|
||||
# password = "metricsmetricsmetricsmetrics"
|
||||
|
||||
## Timeout for write operations. default: 5s
|
||||
# timeout = "5s"
|
||||
|
||||
## client ID, if not set a random ID is generated
|
||||
# client_id = ""
|
||||
|
||||
@@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||
|
||||
func (m *MQTT) publish(topic string, body []byte) error {
|
||||
token := m.client.Publish(topic, byte(m.QoS), false, body)
|
||||
token.Wait()
|
||||
token.WaitTimeout(m.Timeout.Duration)
|
||||
if token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
@@ -159,6 +163,11 @@ func (m *MQTT) publish(topic string, body []byte) error {
|
||||
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
||||
opts := paho.NewClientOptions()
|
||||
|
||||
if m.Timeout.Duration < time.Second {
|
||||
m.Timeout.Duration = 5 * time.Second
|
||||
}
|
||||
opts.WriteTimeout = m.Timeout.Duration
|
||||
|
||||
if m.ClientID != "" {
|
||||
opts.SetClientID(m.ClientID)
|
||||
} else {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Wavefront struct {
|
||||
@@ -101,13 +102,11 @@ func (w *Wavefront) Connect() error {
|
||||
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
|
||||
_, err := net.ResolveTCPAddr("tcp", uri)
|
||||
if err != nil {
|
||||
log.Printf("Wavefront: TCP address cannot be resolved %s", err.Error())
|
||||
return nil
|
||||
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error())
|
||||
}
|
||||
connection, err := net.Dial("tcp", uri)
|
||||
if err != nil {
|
||||
log.Printf("Wavefront: TCP connect fail %s", err.Error())
|
||||
return nil
|
||||
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
|
||||
}
|
||||
defer connection.Close()
|
||||
return nil
|
||||
@@ -122,6 +121,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
|
||||
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
|
||||
}
|
||||
defer connection.Close()
|
||||
connection.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||
|
||||
for _, m := range metrics {
|
||||
for _, metricPoint := range buildMetrics(m, w) {
|
||||
|
||||
@@ -138,8 +138,11 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
|
||||
// Trim the buffer, even though there should be no padding
|
||||
line := strings.TrimSpace(string(buf))
|
||||
metric, err := p.ParseLine(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
metric, err := p.ParseLine(line)
|
||||
if err == nil {
|
||||
metrics = append(metrics, metric)
|
||||
} else {
|
||||
|
||||
@@ -133,7 +133,7 @@ func InsertField(bucket, fieldName string) string {
|
||||
if fieldName == "value" {
|
||||
return fieldDeleter.Replace(bucket)
|
||||
}
|
||||
return strings.Replace(bucket, "FIELDNAME", strings.Replace(fieldName, ".", "_", -1), 1)
|
||||
return strings.Replace(bucket, "FIELDNAME", fieldName, 1)
|
||||
}
|
||||
|
||||
func buildTags(tags map[string]string) string {
|
||||
|
||||
Reference in New Issue
Block a user