Compare commits

...

36 Commits
1.5.1 ... 1.5.3

Author SHA1 Message Date
Daniel Nelson
1e51969813 Set 1.5.3 release date
(cherry picked from commit 2160779126)
2018-03-14 16:34:56 -07:00
Daniel Nelson
51b097a7c6 Use previous image on appveyor
(cherry picked from commit f1b681cbdc)
2018-03-14 16:31:18 -07:00
Daniel Nelson
77dfb8c9c5 Update changelog
(cherry picked from commit 6e5e2f713d)
2018-03-14 14:25:53 -07:00
Jonas Hahnfeld
1398f8e678 Add output of stderr in case of error to exec log message (#3862)
If the command failed with a non-zero exit status there might be an error
message on stderr. Append the first line to the error message to ease the
search for its cause.

(cherry picked from commit 8e515688eb)
2018-03-14 14:25:53 -07:00
Daniel Nelson
d96483bffb Use Go 1.9.4 in builds 2018-03-09 14:37:53 -08:00
Daniel Nelson
5e534676a0 Update changelog
(cherry picked from commit f7207f514e)
2018-03-08 10:55:02 -08:00
Dennis Schön
9329200afa Fix uptime metric in passenger input plugin (#3871)
(cherry picked from commit f1c8abd68c)
2018-03-08 10:55:02 -08:00
Daniel Nelson
645b8b905d Update changelog
(cherry picked from commit e4ce057885)
2018-03-07 14:17:36 -08:00
dilshatm
ea7d884c09 Fix collation difference in sqlserver input (#3786)
(cherry picked from commit a6d366fb84)
2018-03-07 14:17:33 -08:00
Daniel Nelson
7f94cb58e4 Update changelog
(cherry picked from commit 5928219454)
2018-02-25 01:07:16 -08:00
Daniel Nelson
d8f2d4af0f Disable keepalive in mqtt output. (#3779)
This functionality currently has race conditions that can result in the
output deadlocking.

(cherry picked from commit 8c932abff6)
2018-02-25 01:07:16 -08:00
Daniel Nelson
d8dae1b1ab Fix memory leak in postgresql_extensible 2018-02-20 11:58:43 -08:00
Daniel Nelson
770cf4e0b6 Update changelog
(cherry picked from commit a00d5b48f8)
2018-02-09 12:13:49 -08:00
efficks
8cb5391f4e Fix ping plugin not reporting zero durations (#3778)
(cherry picked from commit f5ea13a9ab)
2018-02-09 12:13:48 -08:00
Daniel Nelson
c5ddb65ad9 Update changelog
(cherry picked from commit 9a1d69a2ae)
2018-02-05 11:23:03 -08:00
Philipp Weber
d671299e96 Remove userinfo from url tag in prometheus input (#3743)
(cherry picked from commit b7a68eef56)
2018-02-05 11:23:02 -08:00
Daniel Nelson
f59231941f Update changelog
(cherry picked from commit 32732d42f8)
2018-01-30 18:09:24 -08:00
Daniel Nelson
100bdfba6c Set path to / if HOST_MOUNT_PREFIX matches full path (#3736)
(cherry picked from commit 10e51e4b49)
2018-01-30 18:08:50 -08:00
Daniel Nelson
67440c95bb Set release date for 1.5.2
(cherry picked from commit 3a85e7b1f0)
2018-01-30 14:02:50 -08:00
Daniel Nelson
39de63d03c Update changelog
(cherry picked from commit 5d87ad85a1)
2018-01-30 14:01:17 -08:00
Daniel Nelson
56edd339e7 Exclude master_replid fields from redis input (#3725)
(cherry picked from commit c28d0e1b16)
2018-01-30 14:01:13 -08:00
Daniel Nelson
df768f83af Update changelog
(cherry picked from commit f9c0aa1e23)
2018-01-25 13:47:39 -08:00
Pierre Tessier
8733d3826a Add timeout to wavefront output write (#3711)
(cherry picked from commit 3e4c91880a)
2018-01-25 13:47:39 -08:00
Daniel Nelson
2bb97154db Update changelog
(cherry picked from commit 899c3a2ae1)
2018-01-22 12:06:30 -08:00
Daniel Nelson
a8d9e458ab Remove graphite serializer replacement of dot with underscore in field key (#3705)
(cherry picked from commit 4558aeddeb)
2018-01-22 12:06:26 -08:00
Daniel Nelson
b464adb08c Update changelog
(cherry picked from commit 36c9113917)
2018-01-22 12:01:31 -08:00
Daniel Nelson
4bd67824ae Avoid loop creation in second processor pass (#3656)
(cherry picked from commit 5270aa451c)
2018-01-22 12:01:24 -08:00
Daniel Nelson
f5894a6a2f Limit wait time for writes in mqtt output (#3699)
(cherry picked from commit 91fc2765b1)
2018-01-22 12:01:20 -08:00
Daniel Nelson
1790b26651 Update changelog
(cherry picked from commit 5bac08662e)
2018-01-18 17:39:22 -08:00
Piotr Popieluch
bb3ee1fd39 Align aggregator period with internal ticker to avoid skipping metrics (#3693)
By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics.

(cherry picked from commit 601dc99606)
2018-01-18 17:39:17 -08:00
Daniel Nelson
82df5bf2d8 Update changelog
(cherry picked from commit 0f55d9eba2)
2018-01-17 15:28:52 -08:00
Piotr Popieluch
8b566b2b9f Reconnect before sending graphite metrics if disconnected (#3680)
(cherry picked from commit f374a295d9)
2018-01-17 15:28:52 -08:00
Daniel Nelson
059a751a71 Update changelog
(cherry picked from commit ad921a3840)
2018-01-17 14:39:10 -08:00
Michael Boudreau
dcaa0ca8db Fix index out of bounds error in solr input plugin (#3683)
(cherry picked from commit 9d559292a5)
2018-01-17 14:39:05 -08:00
Daniel Nelson
8777e32d9f Update changelog
(cherry picked from commit 6e24056757)
2018-01-16 13:47:23 -08:00
Noah Crowley
667940afac Ignore empty lines in Graphite plaintext (#3684)
(cherry picked from commit 87830a1c38)
2018-01-16 13:46:58 -08:00
30 changed files with 388 additions and 96 deletions

View File

@@ -1,4 +1,31 @@
## v1.5.1 [2017-01-10]
## v1.5.3 [2018-03-14]
### Bugfixes
- [#3729](https://github.com/influxdata/telegraf/issues/3729): Set path to / if HOST_MOUNT_PREFIX matches full path.
- [#3739](https://github.com/influxdata/telegraf/issues/3739): Remove userinfo from url tag in prometheus input.
- [#3778](https://github.com/influxdata/telegraf/issues/3778): Fix ping plugin not reporting zero durations.
- [#3807](https://github.com/influxdata/telegraf/issues/3807): Fix memory leak in postgresql_extensible.
- [#3697](https://github.com/influxdata/telegraf/issues/3697): Disable keepalive in mqtt output to prevent deadlock.
- [#3786](https://github.com/influxdata/telegraf/pull/3786): Fix collation difference in sqlserver input.
- [#3871](https://github.com/influxdata/telegraf/pull/3871): Fix uptime metric in passenger input plugin.
- [#3851](https://github.com/influxdata/telegraf/issues/3851): Add output of stderr in case of error to exec log message.
## v1.5.2 [2018-01-30]
### Bugfixes
- [#3684](https://github.com/influxdata/telegraf/pull/3684): Ignore empty lines in Graphite plaintext.
- [#3604](https://github.com/influxdata/telegraf/issues/3604): Fix index out of bounds error in solr input plugin.
- [#3680](https://github.com/influxdata/telegraf/pull/3680): Reconnect before sending graphite metrics if disconnected.
- [#3693](https://github.com/influxdata/telegraf/pull/3693): Align aggregator period with internal ticker to avoid skipping metrics.
- [#3629](https://github.com/influxdata/telegraf/issues/3629): Fix a potential deadlock when using aggregators.
- [#3697](https://github.com/influxdata/telegraf/issues/3697): Limit wait time for writes in mqtt output.
- [#3698](https://github.com/influxdata/telegraf/issues/3698): Revert change in graphite output where dot in field key was replaced by underscore.
- [#3710](https://github.com/influxdata/telegraf/issues/3710): Add timeout to wavefront output write.
- [#3725](https://github.com/influxdata/telegraf/issues/3725): Exclude master_replid fields from redis input.
## v1.5.1 [2018-01-10]
### Bugfixes

View File

@@ -308,7 +308,13 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric, ag
metrics = processor.Apply(metrics...)
}
for _, m := range metrics {
outMetricC <- m
for i, o := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
o.AddMetric(m)
} else {
o.AddMetric(m.Copy())
}
}
}
}
}
@@ -364,8 +370,6 @@ func (a *Agent) Run(shutdown chan struct{}) error {
metricC := make(chan telegraf.Metric, 100)
aggC := make(chan telegraf.Metric, 100)
now := time.Now()
// Start all ServicePlugins
for _, input := range a.Config.Inputs {
input.SetDefaultTags(a.Config.Tags)
@@ -406,7 +410,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
acc := NewAccumulator(agg, aggC)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
agg.Run(acc, now, shutdown)
agg.Run(acc, shutdown)
}(aggregator)
}

View File

@@ -1,3 +1,4 @@
image: Previous Visual Studio 2015
version: "{build}"
cache:
@@ -12,11 +13,11 @@ platform: x64
install:
- IF NOT EXIST "C:\Cache" mkdir C:\Cache
- IF NOT EXIST "C:\Cache\go1.9.2.msi" curl -o "C:\Cache\go1.9.2.msi" https://storage.googleapis.com/golang/go1.9.2.windows-amd64.msi
- IF NOT EXIST "C:\Cache\go1.9.4.msi" curl -o "C:\Cache\go1.9.4.msi" https://storage.googleapis.com/golang/go1.9.4.windows-amd64.msi
- IF NOT EXIST "C:\Cache\gnuwin32-bin.zip" curl -o "C:\Cache\gnuwin32-bin.zip" https://dl.influxdata.com/telegraf/ci/make-3.81-bin.zip
- IF NOT EXIST "C:\Cache\gnuwin32-dep.zip" curl -o "C:\Cache\gnuwin32-dep.zip" https://dl.influxdata.com/telegraf/ci/make-3.81-dep.zip
- IF EXIST "C:\Go" rmdir /S /Q C:\Go
- msiexec.exe /i "C:\Cache\go1.9.2.msi" /quiet
- msiexec.exe /i "C:\Cache\go1.9.4.msi" /quiet
- 7z x "C:\Cache\gnuwin32-bin.zip" -oC:\GnuWin32 -y
- 7z x "C:\Cache\gnuwin32-dep.zip" -oC:\GnuWin32 -y
- go version

View File

@@ -6,8 +6,8 @@ machine:
- rabbitmq-server
post:
- sudo rm -rf /usr/local/go
- wget https://storage.googleapis.com/golang/go1.9.2.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.9.2.linux-amd64.tar.gz
- wget https://storage.googleapis.com/golang/go1.9.4.linux-amd64.tar.gz
- sudo tar -C /usr/local -xzf go1.9.4.linux-amd64.tar.gz
- go version
dependencies:

View File

@@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() {
// for period ticks to tell it when to push and reset the aggregator.
func (r *RunningAggregator) Run(
acc telegraf.Accumulator,
now time.Time,
shutdown chan struct{},
) {
// The start of the period is truncated to the nearest second.
@@ -133,6 +132,7 @@ func (r *RunningAggregator) Run(
// 2nd interval: 00:10 - 00:20.5
// etc.
//
now := time.Now()
r.periodStart = now.Truncate(time.Second)
truncation := now.Sub(r.periodStart)
r.periodEnd = r.periodStart.Add(r.Config.Period)

View File

@@ -24,7 +24,7 @@ func TestAdd(t *testing.T) {
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{}))
go ra.Run(&acc, make(chan struct{}))
m := ra.MakeMetric(
"RITest",
@@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) {
})
assert.NoError(t, ra.Config.Filter.Compile())
acc := testutil.Accumulator{}
go ra.Run(&acc, time.Now(), make(chan struct{}))
go ra.Run(&acc, make(chan struct{}))
// metric before current period
m := ra.MakeMetric(
@@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
ra.Run(&acc, time.Now(), shutdown)
ra.Run(&acc, shutdown)
}()
m := ra.MakeMetric(

View File

@@ -41,6 +41,8 @@ const sampleConfig = `
data_format = "influx"
`
const MaxStderrBytes = 512
type Exec struct {
Commands []string
Command string
@@ -96,15 +98,41 @@ func (c CommandRunner) Run(
cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
var out bytes.Buffer
var (
out bytes.Buffer
stderr bytes.Buffer
)
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
var errMessage = ""
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
// Limit the number of bytes.
didTruncate := false
if stderr.Len() > MaxStderrBytes {
stderr.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(stderr.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < stderr.Len()-1 {
didTruncate = true
}
stderr.Truncate(i)
}
if didTruncate {
stderr.WriteString("...")
}
errMessage = fmt.Sprintf(": %s", stderr.String())
}
return nil, fmt.Errorf("exec: %s for command '%s'%s", err, command, errMessage)
}
} else {
switch e.parser.(type) {

View File

@@ -102,7 +102,7 @@ func (p *process) getUptime() int64 {
uptime += value * (24 * 60 * 60)
}
case strings.HasSuffix(v, "h"):
iValue := strings.TrimSuffix(v, "y")
iValue := strings.TrimSuffix(v, "h")
value, err := strconv.ParseInt(iValue, 10, 64)
if err == nil {
uptime += value * (60 * 60)

View File

@@ -126,7 +126,7 @@ func TestPassengerGenerateMetric(t *testing.T) {
"spawn_start_time": int64(1452746844946982),
"spawn_end_time": int64(1452746845013365),
"last_used": int64(1452747071764940),
"uptime": int64(226), // in seconds of 3m 46s
"uptime": int64(191026), // in seconds of 2d 5h 3m 46s
"cpu": int64(58),
"rss": int64(418548),
"pss": int64(319391),
@@ -219,7 +219,7 @@ var sampleStat = `
<spawn_end_time>1452746845013365</spawn_end_time>
<last_used>1452747071764940</last_used>
<last_used_desc>0s ago</last_used_desc>
<uptime>3m 46s</uptime>
<uptime>2d 5h 3m 46s</uptime>
<code_revision>899ac7f</code_revision>
<life_status>ALIVE</life_status>
<enabled>ENABLED</enabled>
@@ -263,7 +263,7 @@ var sampleStat = `
<spawn_end_time>1452746845172460</spawn_end_time>
<last_used>1452747071709179</last_used>
<last_used_desc>0s ago</last_used_desc>
<uptime>3m 46s</uptime>
<uptime>2d 5h 3m 46s</uptime>
<code_revision>899ac7f</code_revision>
<life_status>ALIVE</life_status>
<enabled>ENABLED</enabled>

View File

@@ -128,16 +128,16 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
fields["packets_transmitted"] = trans
fields["packets_received"] = rec
fields["percent_packet_loss"] = loss
if min > 0 {
if min >= 0 {
fields["minimum_response_ms"] = min
}
if avg > 0 {
if avg >= 0 {
fields["average_response_ms"] = avg
}
if max > 0 {
if max >= 0 {
fields["maximum_response_ms"] = max
}
if stddev > 0 {
if stddev >= 0 {
fields["standard_deviation_ms"] = stddev
}
acc.AddFields("ping", fields, tags)
@@ -198,7 +198,7 @@ func (p *Ping) args(url string) []string {
// It returns (<transmitted packets>, <received packets>, <average response>)
func processPingOutput(out string) (int, int, float64, float64, float64, float64, error) {
var trans, recv int
var min, avg, max, stddev float64
var min, avg, max, stddev float64 = -1.0, -1.0, -1.0, -1.0
// Set this error to nil if we find a 'transmitted' line
err := errors.New("Fatal error processing ping output")
lines := strings.Split(out, "\n")

View File

@@ -93,32 +93,32 @@ func processPingOutput(out string) (int, int, int, int, int, int, error) {
// stats data should contain 4 members: entireExpression + ( Send, Receive, Lost )
if len(stats) != 4 {
return 0, 0, 0, 0, 0, 0, err
return 0, 0, 0, -1, -1, -1, err
}
trans, err := strconv.Atoi(stats[1])
if err != nil {
return 0, 0, 0, 0, 0, 0, err
return 0, 0, 0, -1, -1, -1, err
}
receivedPacket, err := strconv.Atoi(stats[2])
if err != nil {
return 0, 0, 0, 0, 0, 0, err
return 0, 0, 0, -1, -1, -1, err
}
// aproxs data should contain 4 members: entireExpression + ( min, max, avg )
if len(aproxs) != 4 {
return trans, receivedReply, receivedPacket, 0, 0, 0, err
return trans, receivedReply, receivedPacket, -1, -1, -1, err
}
min, err := strconv.Atoi(aproxs[1])
if err != nil {
return trans, receivedReply, receivedPacket, 0, 0, 0, err
return trans, receivedReply, receivedPacket, -1, -1, -1, err
}
max, err := strconv.Atoi(aproxs[2])
if err != nil {
return trans, receivedReply, receivedPacket, 0, 0, 0, err
return trans, receivedReply, receivedPacket, -1, -1, -1, err
}
avg, err := strconv.Atoi(aproxs[3])
if err != nil {
return 0, 0, 0, 0, 0, 0, err
return 0, 0, 0, -1, -1, -1, err
}
return trans, receivedReply, receivedPacket, avg, min, max, err
@@ -201,13 +201,13 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
fields["packets_received"] = receivePacket
fields["percent_packet_loss"] = lossPackets
fields["percent_reply_loss"] = lossReply
if avg > 0 {
if avg >= 0 {
fields["average_response_ms"] = float64(avg)
}
if min > 0 {
if min >= 0 {
fields["minimum_response_ms"] = float64(min)
}
if max > 0 {
if max >= 0 {
fields["maximum_response_ms"] = float64(max)
}
acc.AddFields("ping", fields, tags)

View File

@@ -127,6 +127,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
meas_name string
)
p.AllColumns = nil
if p.Address == "" || p.Address == "localhost" {
p.Address = localhost
}

View File

@@ -67,7 +67,7 @@ Measurement names are based on the Metric Family and tags are created for each
label. The value is added to a field named based on the metric type.
All metrics receive the `url` tag indicating the related URL specified in the
Telegraf configuration. If using Kubernetes service discovery the `address`
Telegraf configuration. If using Kubernetes service discovery the `address`
tag is also added indicating the discovered ip address.
### Example Output:

View File

@@ -20,7 +20,7 @@ const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client
type Prometheus struct {
// An array of urls to scrape metrics from.
Urls []string
URLs []string `toml:"urls"`
// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string
@@ -73,12 +73,12 @@ func (p *Prometheus) Description() string {
var ErrProtocolError = errors.New("prometheus protocol error")
func (p *Prometheus) AddressToURL(u *url.URL, address string) string {
func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
host := address
if u.Port() != "" {
host = address + ":" + u.Port()
}
reconstructedUrl := url.URL{
reconstructedURL := &url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
@@ -89,36 +89,42 @@ func (p *Prometheus) AddressToURL(u *url.URL, address string) string {
Fragment: u.Fragment,
Host: host,
}
return reconstructedUrl.String()
return reconstructedURL
}
type UrlAndAddress struct {
OriginalUrl string
Url string
type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
}
func (p *Prometheus) GetAllURLs() ([]UrlAndAddress, error) {
allUrls := make([]UrlAndAddress, 0)
for _, url := range p.Urls {
allUrls = append(allUrls, UrlAndAddress{Url: url, OriginalUrl: url})
func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
allURLs := make([]URLAndAddress, 0)
for _, u := range p.URLs {
URL, err := url.Parse(u)
if err != nil {
log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err)
continue
}
allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL})
}
for _, service := range p.KubernetesServices {
u, err := url.Parse(service)
URL, err := url.Parse(service)
if err != nil {
return nil, err
}
resolvedAddresses, err := net.LookupHost(u.Hostname())
resolvedAddresses, err := net.LookupHost(URL.Hostname())
if err != nil {
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", u.Host, err)
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err)
continue
}
for _, resolved := range resolvedAddresses {
serviceUrl := p.AddressToURL(u, resolved)
allUrls = append(allUrls, UrlAndAddress{Url: serviceUrl, Address: resolved, OriginalUrl: service})
serviceURL := p.AddressToURL(URL, resolved)
allURLs = append(allURLs, URLAndAddress{URL: serviceURL, Address: resolved, OriginalURL: URL})
}
}
return allUrls, nil
return allURLs, nil
}
// Reads stats from all configured servers accumulates stats.
@@ -134,16 +140,16 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
allUrls, err := p.GetAllURLs()
allURLs, err := p.GetAllURLs()
if err != nil {
return err
}
for _, url := range allUrls {
for _, URL := range allURLs {
wg.Add(1)
go func(serviceUrl UrlAndAddress) {
go func(serviceURL URLAndAddress) {
defer wg.Done()
acc.AddError(p.gatherURL(serviceUrl, acc))
}(url)
acc.AddError(p.gatherURL(serviceURL, acc))
}(URL)
}
wg.Wait()
@@ -178,8 +184,8 @@ func (p *Prometheus) createHttpClient() (*http.Client, error) {
return client, nil
}
func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url.Url, nil)
func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", u.URL.String(), nil)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response
@@ -194,11 +200,11 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
resp, err = p.client.Do(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url.Url, err)
return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url.Url, resp.Status)
return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status)
}
body, err := ioutil.ReadAll(resp.Body)
@@ -209,14 +215,16 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
metrics, err := Parse(body, resp.Header)
if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
url.Url, err)
u.URL, err)
}
// Add (or not) collected metrics
for _, metric := range metrics {
tags := metric.Tags()
tags["url"] = url.OriginalUrl
if url.Address != "" {
tags["address"] = url.Address
// strip user and password from URL
u.OriginalURL.User = nil
tags["url"] = u.OriginalURL.String()
if u.Address != "" {
tags["address"] = u.Address
}
switch metric.Type() {

View File

@@ -37,7 +37,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
defer ts.Close()
p := &Prometheus{
Urls: []string{ts.URL},
URLs: []string{ts.URL},
}
var acc testutil.Accumulator
@@ -89,7 +89,7 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
defer ts.Close()
p := &Prometheus{
Urls: []string{ts.URL},
URLs: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
}

View File

@@ -88,6 +88,7 @@ Additionally the plugin also calculates the hit/miss ratio (keyspace\_hitrate) a
**Replication**
- connected_slaves(int, number)
- master_repl_offset(int, number)
- second_repl_offset(int, number)
- repl_backlog_active(int, number)
- repl_backlog_size(int, bytes)
- repl_backlog_first_byte_offset(int, number)

View File

@@ -189,6 +189,10 @@ func gatherInfoOutput(
}
}
if strings.HasPrefix(name, "master_replid") {
continue
}
if name == "mem_allocator" {
continue
}

View File

@@ -86,6 +86,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"repl_backlog_size": int64(1048576),
"repl_backlog_first_byte_offset": int64(0),
"repl_backlog_histlen": int64(0),
"second_repl_offset": int64(-1),
"used_cpu_sys": float64(0.14),
"used_cpu_user": float64(0.05),
"used_cpu_sys_children": float64(0.00),
@@ -189,7 +190,10 @@ latest_fork_usec:0
# Replication
role:master
connected_slaves:0
master_replid:8c4d7b768b26826825ceb20ff4a2c7c54616350b
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0

View File

@@ -246,6 +246,9 @@ func addAdminCoresStatusToAcc(acc telegraf.Accumulator, adminCoreStatus *AdminCo
// Add core metrics section to accumulator
func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var coreMetrics map[string]Core
if len(mBeansData.SolrMbeans) < 2 {
return fmt.Errorf("no core metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[1], &coreMetrics); err != nil {
return err
}
@@ -274,9 +277,14 @@ func addCoreMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBea
func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var queryMetrics map[string]QueryHandler
if len(mBeansData.SolrMbeans) < 4 {
return fmt.Errorf("no query handler metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[3], &queryMetrics); err != nil {
return err
}
for name, metrics := range queryMetrics {
coreFields := map[string]interface{}{
"15min_rate_reqs_per_second": metrics.Stats.One5minRateReqsPerSecond,
@@ -310,6 +318,9 @@ func addQueryHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansDa
func addUpdateHandlerMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
var updateMetrics map[string]UpdateHandler
if len(mBeansData.SolrMbeans) < 6 {
return fmt.Errorf("no update handler metric data to unmarshall")
}
if err := json.Unmarshal(mBeansData.SolrMbeans[5], &updateMetrics); err != nil {
return err
}
@@ -364,6 +375,9 @@ func getFloat(unk interface{}) float64 {
// Add cache metrics section to accumulator
func addCacheMetricsToAcc(acc telegraf.Accumulator, core string, mBeansData *MBeansData, time time.Time) error {
if len(mBeansData.SolrMbeans) < 8 {
return fmt.Errorf("no cache metric data to unmarshall")
}
var cacheMetrics map[string]Cache
if err := json.Unmarshal(mBeansData.SolrMbeans[7], &cacheMetrics); err != nil {
return err

View File

@@ -60,3 +60,44 @@ func createMockServer() *httptest.Server {
}
}))
}
func TestNoCoreDataHandling(t *testing.T) {
ts := createMockNoCoreDataServer()
solr := NewSolr()
solr.Servers = []string{ts.URL}
var acc testutil.Accumulator
require.NoError(t, solr.Gather(&acc))
acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminMainCoreStatusExpected,
map[string]string{"core": "main"})
acc.AssertContainsTaggedFields(t, "solr_admin",
solrAdminCore1StatusExpected,
map[string]string{"core": "core1"})
acc.AssertDoesNotContainMeasurement(t, "solr_core")
acc.AssertDoesNotContainMeasurement(t, "solr_queryhandler")
acc.AssertDoesNotContainMeasurement(t, "solr_updatehandler")
acc.AssertDoesNotContainMeasurement(t, "solr_handler")
}
func createMockNoCoreDataServer() *httptest.Server {
var nodata string
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "/solr/admin/cores") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, statusResponse)
} else if strings.Contains(r.URL.Path, "solr/main/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, nodata)
} else if strings.Contains(r.URL.Path, "solr/core1/admin") {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, nodata)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprintln(w, "nope")
}
}))
}

View File

@@ -1116,30 +1116,30 @@ DECLARE @delayInterval char(8) = CONVERT(Char(8), DATEADD(SECOND, @secondsBetwee
DECLARE @w1 TABLE
(
WaitType nvarchar(64) NOT NULL,
WaitType nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL,
WaitTimeInMs bigint NOT NULL,
WaitTaskCount bigint NOT NULL,
CollectionDate datetime NOT NULL
)
DECLARE @w2 TABLE
(
WaitType nvarchar(64) NOT NULL,
WaitType nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL,
WaitTimeInMs bigint NOT NULL,
WaitTaskCount bigint NOT NULL,
CollectionDate datetime NOT NULL
)
DECLARE @w3 TABLE
(
WaitType nvarchar(64) NOT NULL
WaitType nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL
)
DECLARE @w4 TABLE
(
WaitType nvarchar(64) NOT NULL,
WaitCategory nvarchar(64) NOT NULL
WaitType nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL,
WaitCategory nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL
)
DECLARE @w5 TABLE
(
WaitCategory nvarchar(64) NOT NULL,
WaitCategory nvarchar(64) collate SQL_Latin1_General_CP1_CI_AS NOT NULL,
WaitTimeInMs bigint NOT NULL,
WaitTaskCount bigint NOT NULL
)
@@ -1380,12 +1380,12 @@ INSERT @w4 (WaitType, WaitCategory) VALUES ('ABR', 'OTHER') ,
INSERT @w1 (WaitType, WaitTimeInMs, WaitTaskCount, CollectionDate)
SELECT
WaitType = wait_type
WaitType = wait_type collate SQL_Latin1_General_CP1_CI_AS
, WaitTimeInMs = SUM(wait_time_ms)
, WaitTaskCount = SUM(waiting_tasks_count)
, CollectionDate = GETDATE()
FROM sys.dm_os_wait_stats
WHERE [wait_type] NOT IN
WHERE [wait_type] collate SQL_Latin1_General_CP1_CI_AS NOT IN
(
SELECT WaitType FROM @w3
)
@@ -1396,12 +1396,12 @@ WAITFOR DELAY @delayInterval;
INSERT @w2 (WaitType, WaitTimeInMs, WaitTaskCount, CollectionDate)
SELECT
WaitType = wait_type
WaitType = wait_type collate SQL_Latin1_General_CP1_CI_AS
, WaitTimeInMs = SUM(wait_time_ms)
, WaitTaskCount = SUM(waiting_tasks_count)
, CollectionDate = GETDATE()
FROM sys.dm_os_wait_stats
WHERE [wait_type] NOT IN
WHERE [wait_type] collate SQL_Latin1_General_CP1_CI_AS NOT IN
(
SELECT WaitType FROM @w3
)

View File

@@ -117,6 +117,140 @@ func TestDiskUsage(t *testing.T) {
assert.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields())
}
func TestDiskUsageHostMountPrefix(t *testing.T) {
tests := []struct {
name string
partitionStats []disk.PartitionStat
usageStats []*disk.UsageStat
hostMountPrefix string
expectedTags map[string]string
expectedFields map[string]interface{}
}{
{
name: "no host mount prefix",
partitionStats: []disk.PartitionStat{
{
Device: "/dev/sda",
Mountpoint: "/",
Fstype: "ext4",
Opts: "ro",
},
},
usageStats: []*disk.UsageStat{
&disk.UsageStat{
Path: "/",
Total: 42,
},
},
expectedTags: map[string]string{
"path": "/",
"device": "sda",
"fstype": "ext4",
"mode": "ro",
},
expectedFields: map[string]interface{}{
"total": uint64(42),
"used": uint64(0),
"free": uint64(0),
"inodes_total": uint64(0),
"inodes_free": uint64(0),
"inodes_used": uint64(0),
"used_percent": float64(0),
},
},
{
name: "host mount prefix",
partitionStats: []disk.PartitionStat{
{
Device: "/dev/sda",
Mountpoint: "/hostfs/var",
Fstype: "ext4",
Opts: "ro",
},
},
usageStats: []*disk.UsageStat{
&disk.UsageStat{
Path: "/hostfs/var",
Total: 42,
},
},
hostMountPrefix: "/hostfs",
expectedTags: map[string]string{
"path": "/var",
"device": "sda",
"fstype": "ext4",
"mode": "ro",
},
expectedFields: map[string]interface{}{
"total": uint64(42),
"used": uint64(0),
"free": uint64(0),
"inodes_total": uint64(0),
"inodes_free": uint64(0),
"inodes_used": uint64(0),
"used_percent": float64(0),
},
},
{
name: "host mount prefix exact match",
partitionStats: []disk.PartitionStat{
{
Device: "/dev/sda",
Mountpoint: "/hostfs",
Fstype: "ext4",
Opts: "ro",
},
},
usageStats: []*disk.UsageStat{
&disk.UsageStat{
Path: "/hostfs",
Total: 42,
},
},
hostMountPrefix: "/hostfs",
expectedTags: map[string]string{
"path": "/",
"device": "sda",
"fstype": "ext4",
"mode": "ro",
},
expectedFields: map[string]interface{}{
"total": uint64(42),
"used": uint64(0),
"free": uint64(0),
"inodes_total": uint64(0),
"inodes_free": uint64(0),
"inodes_used": uint64(0),
"used_percent": float64(0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mck := &mock.Mock{}
mps := MockPSDisk{&systemPS{&mockDiskUsage{mck}}, mck}
defer mps.AssertExpectations(t)
var acc testutil.Accumulator
var err error
mps.On("Partitions", true).Return(tt.partitionStats, nil)
for _, v := range tt.usageStats {
mps.On("PSDiskUsage", v.Path).Return(v, nil)
}
mps.On("OSGetenv", "HOST_MOUNT_PREFIX").Return(tt.hostMountPrefix)
err = (&DiskStats{ps: mps}).Gather(&acc)
require.NoError(t, err)
acc.AssertContainsTaggedFields(t, "disk", tt.expectedFields, tt.expectedTags)
})
}
}
func TestDiskStats(t *testing.T) {
var mps MockPS
defer mps.AssertExpectations(t)

View File

@@ -2,6 +2,7 @@ package system
import (
"os"
"path/filepath"
"strings"
"github.com/influxdata/telegraf"
@@ -129,7 +130,7 @@ func (s *systemPS) DiskUsage(
continue
}
du.Path = strings.TrimPrefix(p.Mountpoint, hostMountPrefix)
du.Path = filepath.Join("/", strings.TrimPrefix(p.Mountpoint, hostMountPrefix))
du.Fstype = p.Fstype
usage = append(usage, du)
partitions = append(partitions, &p)

View File

@@ -155,8 +155,22 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
batch = append(batch, buf...)
}
err = g.send(batch)
// try to reconnect and retry to send
if err != nil {
log.Println("E! Graphite: Reconnecting and retrying: ")
g.Connect()
err = g.send(batch)
}
return err
}
func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
err = errors.New("Could not write to any Graphite server in cluster\n")
err := errors.New("Could not write to any Graphite server in cluster\n")
// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
@@ -167,6 +181,8 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
if _, e := g.conns[n].Write(batch); e != nil {
// Error
log.Println("E! Graphite Error: " + e.Error())
// Close explicitely
g.conns[n].Close()
// Let's try the next one
} else {
// Success
@@ -174,11 +190,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
break
}
}
// try to reconnect
if err != nil {
log.Println("E! Reconnecting: ")
g.Connect()
}
return err
}

View File

@@ -81,7 +81,7 @@ func TestGraphiteOK(t *testing.T) {
err2 := g.Write(metrics)
require.NoError(t, err2)
// Waiting TCPserver
// Waiting TCPserver, should reconnect and resend
wg.Wait()
t.Log("Finished Waiting for first data")
var wg2 sync.WaitGroup
@@ -89,10 +89,8 @@ func TestGraphiteOK(t *testing.T) {
wg2.Add(1)
TCPServer2(t, &wg2)
//Write but expect an error, but reconnect
g.Write(metrics2)
err3 := g.Write(metrics2)
t.Log("Finished writing second data, it should have failed")
//Actually write the new metrics
t.Log("Finished writing second data, it should have reconnected automatically")
require.NoError(t, err3)
t.Log("Finished writing third data")

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@@ -25,6 +26,9 @@ var sampleConfig = `
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Timeout for write operations. default: 5s
# timeout = "5s"
## client ID, if not set a random ID is generated
# client_id = ""
@@ -149,7 +153,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), false, body)
token.Wait()
token.WaitTimeout(m.Timeout.Duration)
if token.Error() != nil {
return token.Error()
}
@@ -158,6 +162,12 @@ func (m *MQTT) publish(topic string, body []byte) error {
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
opts.KeepAlive = 0 * time.Second
if m.Timeout.Duration < time.Second {
m.Timeout.Duration = 5 * time.Second
}
opts.WriteTimeout = m.Timeout.Duration
if m.ClientID != "" {
opts.SetClientID(m.ClientID)

View File

@@ -631,7 +631,7 @@ func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error)
time.Sleep(time.Millisecond * 200)
p := &prometheus_input.Prometheus{
Urls: []string{"http://localhost:9127/metrics"},
URLs: []string{"http://localhost:9127/metrics"},
}
return pTesting, p, nil

View File

@@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"time"
)
type Wavefront struct {
@@ -101,13 +102,11 @@ func (w *Wavefront) Connect() error {
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
_, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
log.Printf("Wavefront: TCP address cannot be resolved %s", err.Error())
return nil
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error())
}
connection, err := net.Dial("tcp", uri)
if err != nil {
log.Printf("Wavefront: TCP connect fail %s", err.Error())
return nil
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
return nil
@@ -122,6 +121,7 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
connection.SetWriteDeadline(time.Now().Add(5 * time.Second))
for _, m := range metrics {
for _, metricPoint := range buildMetrics(m, w) {

View File

@@ -138,8 +138,11 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) {
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
metric, err := p.ParseLine(line)
if line == "" {
continue
}
metric, err := p.ParseLine(line)
if err == nil {
metrics = append(metrics, metric)
} else {

View File

@@ -133,7 +133,7 @@ func InsertField(bucket, fieldName string) string {
if fieldName == "value" {
return fieldDeleter.Replace(bucket)
}
return strings.Replace(bucket, "FIELDNAME", strings.Replace(fieldName, ".", "_", -1), 1)
return strings.Replace(bucket, "FIELDNAME", fieldName, 1)
}
func buildTags(tags map[string]string) string {