Compare commits
8 Commits
0.13.0-rc1
...
igloo-rc1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37ae3956c1 | ||
|
|
4c28f15b35 | ||
|
|
095ef04c04 | ||
|
|
7d49979658 | ||
|
|
7a36695a21 | ||
|
|
5865587bd0 | ||
|
|
219bf93566 | ||
|
|
8371546a66 |
@@ -1,4 +1,4 @@
|
|||||||
## v0.13 [2016-05-09]
|
## v0.13 [unreleased]
|
||||||
|
|
||||||
### Release Notes
|
### Release Notes
|
||||||
|
|
||||||
@@ -71,6 +71,7 @@ It is not included on the report path. This is necessary for reporting host disk
|
|||||||
- [#1107](https://github.com/influxdata/telegraf/issues/1107): Support lustre2 job stats. Thanks @hanleyja!
|
- [#1107](https://github.com/influxdata/telegraf/issues/1107): Support lustre2 job stats. Thanks @hanleyja!
|
||||||
- [#1122](https://github.com/influxdata/telegraf/pull/1122): Support setting config path through env variable and default paths.
|
- [#1122](https://github.com/influxdata/telegraf/pull/1122): Support setting config path through env variable and default paths.
|
||||||
- [#1128](https://github.com/influxdata/telegraf/pull/1128): MongoDB jumbo chunks metric for MongoDB input plugin
|
- [#1128](https://github.com/influxdata/telegraf/pull/1128): MongoDB jumbo chunks metric for MongoDB input plugin
|
||||||
|
- [#1146](https://github.com/influxdata/telegraf/pull/1146): HAProxy socket support. Thanks weshmashian!
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
|
|
||||||
|
|||||||
24
README.md
24
README.md
@@ -20,12 +20,12 @@ new plugins.
|
|||||||
### Linux deb and rpm Packages:
|
### Linux deb and rpm Packages:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.0-1_amd64.deb
|
* http://get.influxdb.org/telegraf/telegraf_0.12.1-1_amd64.deb
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1.x86_64.rpm
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1.x86_64.rpm
|
||||||
|
|
||||||
Latest (arm):
|
Latest (arm):
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf_0.13.0-1_armhf.deb
|
* http://get.influxdb.org/telegraf/telegraf_0.12.1-1_armhf.deb
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1.armhf.rpm
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1.armhf.rpm
|
||||||
|
|
||||||
##### Package Instructions:
|
##### Package Instructions:
|
||||||
|
|
||||||
@@ -46,28 +46,28 @@ to use this repo to install & update telegraf.
|
|||||||
### Linux tarballs:
|
### Linux tarballs:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_linux_amd64.tar.gz
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_linux_amd64.tar.gz
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_linux_i386.tar.gz
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_linux_i386.tar.gz
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_linux_armhf.tar.gz
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_linux_armhf.tar.gz
|
||||||
|
|
||||||
##### tarball Instructions:
|
##### tarball Instructions:
|
||||||
|
|
||||||
To install the full directory structure with config file, run:
|
To install the full directory structure with config file, run:
|
||||||
|
|
||||||
```
|
```
|
||||||
sudo tar -C / -zxvf ./telegraf-0.13.0-1_linux_amd64.tar.gz
|
sudo tar -C / -zxvf ./telegraf-0.12.1-1_linux_amd64.tar.gz
|
||||||
```
|
```
|
||||||
|
|
||||||
To extract only the binary, run:
|
To extract only the binary, run:
|
||||||
|
|
||||||
```
|
```
|
||||||
tar -zxvf telegraf-0.13.0-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf
|
tar -zxvf telegraf-0.12.1-1_linux_amd64.tar.gz --strip-components=3 ./usr/bin/telegraf
|
||||||
```
|
```
|
||||||
|
|
||||||
### FreeBSD tarball:
|
### FreeBSD tarball:
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_freebsd_amd64.tar.gz
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_freebsd_amd64.tar.gz
|
||||||
|
|
||||||
##### tarball Instructions:
|
##### tarball Instructions:
|
||||||
|
|
||||||
@@ -87,8 +87,8 @@ brew install telegraf
|
|||||||
### Windows Binaries (EXPERIMENTAL)
|
### Windows Binaries (EXPERIMENTAL)
|
||||||
|
|
||||||
Latest:
|
Latest:
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_windows_amd64.zip
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_windows_amd64.zip
|
||||||
* https://dl.influxdata.com/telegraf/releases/telegraf-0.13.0-1_windows_i386.zip
|
* http://get.influxdb.org/telegraf/telegraf-0.12.1-1_windows_i386.zip
|
||||||
|
|
||||||
### From Source:
|
### From Source:
|
||||||
|
|
||||||
|
|||||||
@@ -879,9 +879,15 @@
|
|||||||
# ## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
# ## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
||||||
# gather_table_io_waits = false
|
# gather_table_io_waits = false
|
||||||
# #
|
# #
|
||||||
|
# ## gather metrics from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS
|
||||||
|
# gather_table_lock_waits = false
|
||||||
|
# #
|
||||||
# ## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
# ## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
||||||
# gather_index_io_waits = false
|
# gather_index_io_waits = false
|
||||||
# #
|
# #
|
||||||
|
# ## gather metrics from PERFORMANCE_SCHEMA.EVENT_WAITS
|
||||||
|
# gather_event_waits = false
|
||||||
|
# #
|
||||||
# ## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
# ## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
||||||
# gather_file_events_stats = false
|
# gather_file_events_stats = false
|
||||||
# #
|
# #
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
|
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
|
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
|
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/inputs/igloo"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
|
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
|
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
|
||||||
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
|
_ "github.com/influxdata/telegraf/plugins/inputs/jolokia"
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ based on the availability of per-cpu stats on your system.
|
|||||||
- io_serviced_recursive_sync
|
- io_serviced_recursive_sync
|
||||||
- io_serviced_recursive_total
|
- io_serviced_recursive_total
|
||||||
- io_serviced_recursive_write
|
- io_serviced_recursive_write
|
||||||
|
- container_id
|
||||||
- docker_
|
- docker_
|
||||||
- n_used_file_descriptors
|
- n_used_file_descriptors
|
||||||
- n_cpus
|
- n_cpus
|
||||||
|
|||||||
@@ -328,7 +328,7 @@ func gatherContainerStats(
|
|||||||
acc.AddFields("docker_container_net", netfields, nettags, now)
|
acc.AddFields("docker_container_net", netfields, nettags, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
gatherBlockIOMetrics(stat, acc, tags, now)
|
gatherBlockIOMetrics(stat, acc, tags, now, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func calculateMemPercent(stat *types.StatsJSON) float64 {
|
func calculateMemPercent(stat *types.StatsJSON) float64 {
|
||||||
@@ -356,6 +356,7 @@ func gatherBlockIOMetrics(
|
|||||||
acc telegraf.Accumulator,
|
acc telegraf.Accumulator,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
now time.Time,
|
now time.Time,
|
||||||
|
id string,
|
||||||
) {
|
) {
|
||||||
blkioStats := stat.BlkioStats
|
blkioStats := stat.BlkioStats
|
||||||
// Make a map of devices to their block io stats
|
// Make a map of devices to their block io stats
|
||||||
@@ -420,6 +421,7 @@ func gatherBlockIOMetrics(
|
|||||||
for device, fields := range deviceStatMap {
|
for device, fields := range deviceStatMap {
|
||||||
iotags := copyTags(tags)
|
iotags := copyTags(tags)
|
||||||
iotags["device"] = device
|
iotags["device"] = device
|
||||||
|
fields["container_id"] = id
|
||||||
acc.AddFields("docker_container_blkio", fields, iotags, now)
|
acc.AddFields("docker_container_blkio", fields, iotags, now)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ func TestDockerGatherContainerStats(t *testing.T) {
|
|||||||
blkiofields := map[string]interface{}{
|
blkiofields := map[string]interface{}{
|
||||||
"io_service_bytes_recursive_read": uint64(100),
|
"io_service_bytes_recursive_read": uint64(100),
|
||||||
"io_serviced_recursive_write": uint64(101),
|
"io_serviced_recursive_write": uint64(101),
|
||||||
|
"container_id": "123456789",
|
||||||
}
|
}
|
||||||
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
|
acc.AssertContainsTaggedFields(t, "docker_container_blkio", blkiofields, blkiotags)
|
||||||
|
|
||||||
|
|||||||
@@ -6,9 +6,11 @@ import (
|
|||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -47,7 +49,7 @@ const (
|
|||||||
HF_THROTTLE = 29 //29. throttle [...S]: current throttle percentage for the server, when slowstart is active, or no value if not in slowstart.
|
HF_THROTTLE = 29 //29. throttle [...S]: current throttle percentage for the server, when slowstart is active, or no value if not in slowstart.
|
||||||
HF_LBTOT = 30 //30. lbtot [..BS]: total number of times a server was selected, either for new sessions, or when re-dispatching. The server counter is the number of times that server was selected.
|
HF_LBTOT = 30 //30. lbtot [..BS]: total number of times a server was selected, either for new sessions, or when re-dispatching. The server counter is the number of times that server was selected.
|
||||||
HF_TRACKED = 31 //31. tracked [...S]: id of proxy/server if tracking is enabled.
|
HF_TRACKED = 31 //31. tracked [...S]: id of proxy/server if tracking is enabled.
|
||||||
HF_TYPE = 32 //32. type [LFBS]: (0 = frontend, 1 = backend, 2 = server, 3 = socket/listener)
|
HF_TYPE = 32 //32. type [LFBS]: (0 = frontend, 1 = backend, 2 = server, 3 = socket/listener)
|
||||||
HF_RATE = 33 //33. rate [.FBS]: number of sessions per second over last elapsed second
|
HF_RATE = 33 //33. rate [.FBS]: number of sessions per second over last elapsed second
|
||||||
HF_RATE_LIM = 34 //34. rate_lim [.F..]: configured limit on new sessions per second
|
HF_RATE_LIM = 34 //34. rate_lim [.F..]: configured limit on new sessions per second
|
||||||
HF_RATE_MAX = 35 //35. rate_max [.FBS]: max number of new sessions per second
|
HF_RATE_MAX = 35 //35. rate_max [.FBS]: max number of new sessions per second
|
||||||
@@ -91,8 +93,8 @@ var sampleConfig = `
|
|||||||
|
|
||||||
## If no servers are specified, then default to 127.0.0.1:1936
|
## If no servers are specified, then default to 127.0.0.1:1936
|
||||||
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
|
servers = ["http://myhaproxy.com:1936", "http://anotherhaproxy.com:1936"]
|
||||||
## Or you can also use local socket(not work yet)
|
## Or you can also use local socket
|
||||||
## servers = ["socket://run/haproxy/admin.sock"]
|
## servers = ["socket:/run/haproxy/admin.sock"]
|
||||||
`
|
`
|
||||||
|
|
||||||
func (r *haproxy) SampleConfig() string {
|
func (r *haproxy) SampleConfig() string {
|
||||||
@@ -127,7 +129,36 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
|
|||||||
return outerr
|
return outerr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
||||||
|
var socketPath string
|
||||||
|
socketAddr := strings.Split(addr, ":")
|
||||||
|
|
||||||
|
if len(socketAddr) >= 2 {
|
||||||
|
socketPath = socketAddr[1]
|
||||||
|
} else {
|
||||||
|
socketPath = socketAddr[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := net.Dial("unix", socketPath)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Could not connect to socket '%s': %s", addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, errw := c.Write([]byte("show stat\n"))
|
||||||
|
|
||||||
|
if errw != nil {
|
||||||
|
return fmt.Errorf("Could not write to socket '%s': %s", addr, errw)
|
||||||
|
}
|
||||||
|
|
||||||
|
return importCsvResult(c, acc, socketPath)
|
||||||
|
}
|
||||||
|
|
||||||
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
|
if !strings.HasPrefix(addr, "http") {
|
||||||
|
return g.gatherServerSocket(addr, acc)
|
||||||
|
}
|
||||||
|
|
||||||
if g.client == nil {
|
if g.client == nil {
|
||||||
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
|
||||||
client := &http.Client{
|
client := &http.Client{
|
||||||
|
|||||||
@@ -1,17 +1,42 @@
|
|||||||
package haproxy
|
package haproxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type statServer struct{}
|
||||||
|
|
||||||
|
func (s statServer) serverSocket(l net.Listener) {
|
||||||
|
for {
|
||||||
|
conn, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(c net.Conn) {
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
n, _ := c.Read(buf)
|
||||||
|
|
||||||
|
data := buf[:n]
|
||||||
|
if string(data) == "show stat\n" {
|
||||||
|
c.Write([]byte(csvOutputSample))
|
||||||
|
c.Close()
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
|
func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
|
||||||
//We create a fake server to return test data
|
//We create a fake server to return test data
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -146,6 +171,69 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
|
|||||||
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHaproxyGeneratesMetricsUsingSocket(t *testing.T) {
|
||||||
|
var randomNumber int64
|
||||||
|
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
||||||
|
sock, err := net.Listen("unix", fmt.Sprintf("/tmp/test-haproxy%d.sock", randomNumber))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Cannot initialize socket ")
|
||||||
|
}
|
||||||
|
|
||||||
|
defer sock.Close()
|
||||||
|
|
||||||
|
s := statServer{}
|
||||||
|
go s.serverSocket(sock)
|
||||||
|
|
||||||
|
r := &haproxy{
|
||||||
|
Servers: []string{sock.Addr().String()},
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err = r.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tags := map[string]string{
|
||||||
|
"proxy": "be_app",
|
||||||
|
"server": sock.Addr().String(),
|
||||||
|
"sv": "host0",
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := map[string]interface{}{
|
||||||
|
"active_servers": uint64(1),
|
||||||
|
"backup_servers": uint64(0),
|
||||||
|
"bin": uint64(510913516),
|
||||||
|
"bout": uint64(2193856571),
|
||||||
|
"check_duration": uint64(10),
|
||||||
|
"cli_abort": uint64(73),
|
||||||
|
"ctime": uint64(2),
|
||||||
|
"downtime": uint64(0),
|
||||||
|
"dresp": uint64(0),
|
||||||
|
"econ": uint64(0),
|
||||||
|
"eresp": uint64(1),
|
||||||
|
"http_response.1xx": uint64(0),
|
||||||
|
"http_response.2xx": uint64(119534),
|
||||||
|
"http_response.3xx": uint64(48051),
|
||||||
|
"http_response.4xx": uint64(2345),
|
||||||
|
"http_response.5xx": uint64(1056),
|
||||||
|
"lbtot": uint64(171013),
|
||||||
|
"qcur": uint64(0),
|
||||||
|
"qmax": uint64(0),
|
||||||
|
"qtime": uint64(0),
|
||||||
|
"rate": uint64(3),
|
||||||
|
"rate_max": uint64(12),
|
||||||
|
"rtime": uint64(312),
|
||||||
|
"scur": uint64(1),
|
||||||
|
"smax": uint64(32),
|
||||||
|
"srv_abort": uint64(1),
|
||||||
|
"stot": uint64(171014),
|
||||||
|
"ttime": uint64(2341),
|
||||||
|
"wredis": uint64(0),
|
||||||
|
"wretr": uint64(1),
|
||||||
|
}
|
||||||
|
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
//When not passing server config, we default to localhost
|
//When not passing server config, we default to localhost
|
||||||
//We just want to make sure we did request stat from localhost
|
//We just want to make sure we did request stat from localhost
|
||||||
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
||||||
|
|||||||
23
plugins/inputs/igloo/README.md
Normal file
23
plugins/inputs/igloo/README.md
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
# igloo Input Plugin
|
||||||
|
|
||||||
|
The igloo plugin "tails" a logfile and parses each log message.
|
||||||
|
|
||||||
|
By default, the igloo plugin acts like the following unix tail command:
|
||||||
|
|
||||||
|
```
|
||||||
|
tail -F --lines=0 myfile.log
|
||||||
|
```
|
||||||
|
|
||||||
|
- `-F` means that it will follow the _name_ of the given file, so
|
||||||
|
that it will be compatible with log-rotated files, and that it will retry on
|
||||||
|
inaccessible files.
|
||||||
|
- `--lines=0` means that it will start at the end of the file (unless
|
||||||
|
the `from_beginning` option is set).
|
||||||
|
|
||||||
|
see http://man7.org/linux/man-pages/man1/tail.1.html for more details.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
```
|
||||||
|
|
||||||
331
plugins/inputs/igloo/igloo.go
Normal file
331
plugins/inputs/igloo/igloo.go
Normal file
@@ -0,0 +1,331 @@
|
|||||||
|
package igloo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"regexp"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hpcloud/tail"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal/globpath"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// format of timestamps
|
||||||
|
const (
|
||||||
|
rfcFormat string = "%s-%s-%sT%s:%s:%s.%sZ"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// regex for finding timestamps
|
||||||
|
tRe = regexp.MustCompile(`Timestamp=((\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2}),(\d+))`)
|
||||||
|
)
|
||||||
|
|
||||||
|
type Tail struct {
|
||||||
|
Files []string
|
||||||
|
FromBeginning bool
|
||||||
|
TagKeys []string
|
||||||
|
Counters []string
|
||||||
|
NumFields []string
|
||||||
|
StrFields []string
|
||||||
|
|
||||||
|
numfieldsRe map[string]*regexp.Regexp
|
||||||
|
strfieldsRe map[string]*regexp.Regexp
|
||||||
|
countersRe map[string]*regexp.Regexp
|
||||||
|
tagsRe map[string]*regexp.Regexp
|
||||||
|
|
||||||
|
counters map[string]map[string]int64
|
||||||
|
|
||||||
|
tailers []*tail.Tail
|
||||||
|
wg sync.WaitGroup
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTail() *Tail {
|
||||||
|
return &Tail{
|
||||||
|
FromBeginning: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const sampleConfig = `
|
||||||
|
## logfiles to parse.
|
||||||
|
##
|
||||||
|
## These accept standard unix glob matching rules, but with the addition of
|
||||||
|
## ** as a "super asterisk". ie:
|
||||||
|
## "/var/log/**.log" -> recursively find all .log files in /var/log
|
||||||
|
## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log
|
||||||
|
## "/var/log/apache.log" -> just tail the apache log file
|
||||||
|
##
|
||||||
|
## See https://github.com/gobwas/glob for more examples
|
||||||
|
##
|
||||||
|
files = ["$HOME/sample.log"]
|
||||||
|
## Read file from beginning.
|
||||||
|
from_beginning = false
|
||||||
|
|
||||||
|
## Each log message is searched for these tag keys in TagKey=Value format.
|
||||||
|
## Any that are found will be tagged on the resulting influx measurements.
|
||||||
|
tag_keys = [
|
||||||
|
"HostLocal",
|
||||||
|
"ProductName",
|
||||||
|
"OperationName",
|
||||||
|
]
|
||||||
|
|
||||||
|
## counters are keys which are treated as counters.
|
||||||
|
## so if counters = ["Result"], then this means that the following ocurrence
|
||||||
|
## on a log line:
|
||||||
|
## Result=Success
|
||||||
|
## would be treated as a counter: Result_Success, and it will be incremented
|
||||||
|
## for every occurrence, until Telegraf is restarted.
|
||||||
|
counters = ["Result"]
|
||||||
|
## num_fields are log line occurrences that are translated into numerical
|
||||||
|
## fields. ie:
|
||||||
|
## Duration=1
|
||||||
|
num_fields = ["Duration", "Attempt"]
|
||||||
|
## str_fields are log line occurences that are translated into string fields,
|
||||||
|
## ie:
|
||||||
|
## ActivityGUID=0bb03bf4-ae1d-4487-bb6f-311653b35760
|
||||||
|
str_fields = ["ActivityGUID"]
|
||||||
|
`
|
||||||
|
|
||||||
|
func (t *Tail) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) Description() string {
|
||||||
|
return "Stream an igloo file, like the tail -f command"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) Gather(acc telegraf.Accumulator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) buildRegexes() error {
|
||||||
|
t.numfieldsRe = make(map[string]*regexp.Regexp)
|
||||||
|
t.strfieldsRe = make(map[string]*regexp.Regexp)
|
||||||
|
t.tagsRe = make(map[string]*regexp.Regexp)
|
||||||
|
t.countersRe = make(map[string]*regexp.Regexp)
|
||||||
|
t.counters = make(map[string]map[string]int64)
|
||||||
|
|
||||||
|
for _, field := range t.NumFields {
|
||||||
|
re, err := regexp.Compile(field + `=([0-9\.]+)`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.numfieldsRe[field] = re
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range t.StrFields {
|
||||||
|
re, err := regexp.Compile(field + `=([0-9a-zA-Z\.\-]+)`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.strfieldsRe[field] = re
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range t.TagKeys {
|
||||||
|
re, err := regexp.Compile(field + `=([0-9a-zA-Z\.\-]+)`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.tagsRe[field] = re
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, field := range t.Counters {
|
||||||
|
re, err := regexp.Compile("(" + field + ")" + `=([0-9a-zA-Z\.\-]+)`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.countersRe[field] = re
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) Start(acc telegraf.Accumulator) error {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
|
||||||
|
t.acc = acc
|
||||||
|
if err := t.buildRegexes(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var seek tail.SeekInfo
|
||||||
|
if !t.FromBeginning {
|
||||||
|
seek.Whence = 2
|
||||||
|
seek.Offset = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var errS string
|
||||||
|
// Create a "tailer" for each file
|
||||||
|
for _, filepath := range t.Files {
|
||||||
|
g, err := globpath.Compile(filepath)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
|
||||||
|
}
|
||||||
|
for file, _ := range g.Match() {
|
||||||
|
tailer, err := tail.TailFile(file,
|
||||||
|
tail.Config{
|
||||||
|
ReOpen: true,
|
||||||
|
Follow: true,
|
||||||
|
Location: &seek,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
errS += err.Error() + " "
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// create a goroutine for each "tailer"
|
||||||
|
go t.receiver(tailer)
|
||||||
|
t.tailers = append(t.tailers, tailer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if errS != "" {
|
||||||
|
return fmt.Errorf(errS)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is launched as a goroutine to continuously watch a tailed logfile
|
||||||
|
// for changes, parse any incoming msgs, and add to the accumulator.
|
||||||
|
func (t *Tail) receiver(tailer *tail.Tail) {
|
||||||
|
t.wg.Add(1)
|
||||||
|
defer t.wg.Done()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var line *tail.Line
|
||||||
|
for line = range tailer.Lines {
|
||||||
|
if line.Err != nil {
|
||||||
|
log.Printf("ERROR tailing file %s, Error: %s\n",
|
||||||
|
tailer.Filename, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = t.Parse(line.Text)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) Parse(line string) error {
|
||||||
|
// find the timestamp:
|
||||||
|
match := tRe.FindAllStringSubmatch(line, -1)
|
||||||
|
if len(match) < 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(match[0]) < 9 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// make an rfc3339 timestamp and parse it:
|
||||||
|
ts, err := time.Parse(time.RFC3339Nano,
|
||||||
|
fmt.Sprintf(rfcFormat, match[0][2], match[0][3], match[0][4], match[0][5], match[0][6], match[0][7], match[0][8]))
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
// parse numerical fields:
|
||||||
|
for name, re := range t.numfieldsRe {
|
||||||
|
match := re.FindAllStringSubmatch(line, -1)
|
||||||
|
if len(match) < 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(match[0]) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
num, err := strconv.ParseFloat(match[0][1], 64)
|
||||||
|
if err == nil {
|
||||||
|
fields[name] = num
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse string fields:
|
||||||
|
for name, re := range t.strfieldsRe {
|
||||||
|
match := re.FindAllStringSubmatch(line, -1)
|
||||||
|
if len(match) < 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(match[0]) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fields[name] = match[0][1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse tags:
|
||||||
|
for name, re := range t.tagsRe {
|
||||||
|
match := re.FindAllStringSubmatch(line, -1)
|
||||||
|
if len(match) < 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(match[0]) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags[name] = match[0][1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.countersRe) > 0 {
|
||||||
|
// Make a unique key for the measurement name/tags
|
||||||
|
var tg []string
|
||||||
|
for k, v := range tags {
|
||||||
|
tg = append(tg, fmt.Sprintf("%s=%s", k, v))
|
||||||
|
}
|
||||||
|
sort.Strings(tg)
|
||||||
|
hash := fmt.Sprintf("%s%s", strings.Join(tg, ""), "igloo")
|
||||||
|
|
||||||
|
// check if this hash already has a counter map
|
||||||
|
_, ok := t.counters[hash]
|
||||||
|
if !ok {
|
||||||
|
// doesnt have counter map, so make one
|
||||||
|
t.counters[hash] = make(map[string]int64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// search for counter matches:
|
||||||
|
for _, re := range t.countersRe {
|
||||||
|
match := re.FindAllStringSubmatch(line, -1)
|
||||||
|
if len(match) < 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(match[0]) < 3 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
counterName := match[0][1] + "_" + match[0][2]
|
||||||
|
// increment this counter
|
||||||
|
t.counters[hash][counterName] += 1
|
||||||
|
// add this counter to the output fields
|
||||||
|
fields[counterName] = t.counters[hash][counterName]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.acc.AddFields("igloo", fields, tags, ts)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tail) Stop() {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
|
||||||
|
for _, t := range t.tailers {
|
||||||
|
err := t.Stop()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("ERROR stopping tail on file %s\n", t.Filename)
|
||||||
|
}
|
||||||
|
t.Cleanup()
|
||||||
|
}
|
||||||
|
t.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("igloo", func() telegraf.Input {
|
||||||
|
return NewTail()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -56,9 +56,15 @@ This plugin gathers the statistic data from MySQL server
|
|||||||
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
||||||
gather_table_io_waits = false
|
gather_table_io_waits = false
|
||||||
#
|
#
|
||||||
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS
|
||||||
|
gather_table_lock_waits = false
|
||||||
|
#
|
||||||
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
||||||
gather_index_io_waits = false
|
gather_index_io_waits = false
|
||||||
#
|
#
|
||||||
|
## gather metrics from PERFORMANCE_SCHEMA.EVENT_WAITS
|
||||||
|
gather_event_waits = false
|
||||||
|
#
|
||||||
## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
||||||
gather_file_events_stats = false
|
gather_file_events_stats = false
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -25,7 +25,9 @@ type Mysql struct {
|
|||||||
GatherSlaveStatus bool `toml:"gather_slave_status"`
|
GatherSlaveStatus bool `toml:"gather_slave_status"`
|
||||||
GatherBinaryLogs bool `toml:"gather_binary_logs"`
|
GatherBinaryLogs bool `toml:"gather_binary_logs"`
|
||||||
GatherTableIOWaits bool `toml:"gather_table_io_waits"`
|
GatherTableIOWaits bool `toml:"gather_table_io_waits"`
|
||||||
|
GatherTableLockWaits bool `toml:"gather_table_lock_waits"`
|
||||||
GatherIndexIOWaits bool `toml:"gather_index_io_waits"`
|
GatherIndexIOWaits bool `toml:"gather_index_io_waits"`
|
||||||
|
GatherEventWaits bool `toml:"gather_event_waits"`
|
||||||
GatherTableSchema bool `toml:"gather_table_schema"`
|
GatherTableSchema bool `toml:"gather_table_schema"`
|
||||||
GatherFileEventsStats bool `toml:"gather_file_events_stats"`
|
GatherFileEventsStats bool `toml:"gather_file_events_stats"`
|
||||||
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
|
GatherPerfEventsStatements bool `toml:"gather_perf_events_statements"`
|
||||||
@@ -68,9 +70,15 @@ var sampleConfig = `
|
|||||||
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_TABLE
|
||||||
gather_table_io_waits = false
|
gather_table_io_waits = false
|
||||||
#
|
#
|
||||||
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS
|
||||||
|
gather_table_lock_waits = false
|
||||||
|
#
|
||||||
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
## gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMART_BY_INDEX_USAGE
|
||||||
gather_index_io_waits = false
|
gather_index_io_waits = false
|
||||||
#
|
#
|
||||||
|
## gather metrics from PERFORMANCE_SCHEMA.EVENT_WAITS
|
||||||
|
gather_event_waits = false
|
||||||
|
#
|
||||||
## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
## gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME
|
||||||
gather_file_events_stats = false
|
gather_file_events_stats = false
|
||||||
#
|
#
|
||||||
@@ -612,14 +620,18 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.gatherPerfTableLockWaits(db, serv, acc)
|
if m.GatherTableLockWaits {
|
||||||
if err != nil {
|
err = m.gatherPerfTableLockWaits(db, serv, acc)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.gatherPerfEventWaits(db, serv, acc)
|
if m.GatherEventWaits {
|
||||||
if err != nil {
|
err = m.gatherPerfEventWaits(db, serv, acc)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.GatherFileEventsStats {
|
if m.GatherFileEventsStats {
|
||||||
|
|||||||
@@ -568,7 +568,7 @@ def package(build_output, version, nightly=False, rc=None, iteration=1, static=F
|
|||||||
# For windows and static builds, just copy
|
# For windows and static builds, just copy
|
||||||
# binaries to root of package (no other scripts or
|
# binaries to root of package (no other scripts or
|
||||||
# directories)
|
# directories)
|
||||||
package_scripts(build_root, windows=True)
|
package_scripts(build_root, config_only=True)
|
||||||
else:
|
else:
|
||||||
create_package_fs(build_root)
|
create_package_fs(build_root)
|
||||||
package_scripts(build_root)
|
package_scripts(build_root)
|
||||||
|
|||||||
@@ -77,7 +77,4 @@ if [ $? -eq 0 ]; then
|
|||||||
echo $tag
|
echo $tag
|
||||||
exit_if_fail ./scripts/build.py --release --package --version=$tag --platform=all --arch=all --upload --bucket=dl.influxdata.com/telegraf/releases
|
exit_if_fail ./scripts/build.py --release --package --version=$tag --platform=all --arch=all --upload --bucket=dl.influxdata.com/telegraf/releases
|
||||||
mv build $CIRCLE_ARTIFACTS
|
mv build $CIRCLE_ARTIFACTS
|
||||||
else
|
|
||||||
# Upload Linux build artifact to S3
|
|
||||||
./scripts/build.py --package --upload
|
|
||||||
fi
|
fi
|
||||||
|
|||||||
Reference in New Issue
Block a user