Merge pull request #1 from influxdb/master

merge changes
This commit is contained in:
KPACHbIuLLIAnO4 2015-09-13 17:59:00 +03:00
commit dfb48bc5a1
9 changed files with 161 additions and 88 deletions

View File

@ -1,9 +1,19 @@
## v0.1.9 [unreleased]
### Release Notes
- InfluxDB output config change: `url` is now `urls`, and is a list. Config files
will still be backwards compatible if only `url` is specified.
### Features
- [#143](https://github.com/influxdb/telegraf/issues/143): InfluxDB clustering support
- [#181](https://github.com/influxdb/telegraf/issues/181): Makefile GOBIN support. Thanks @Vye!
### Bugfixes
- [#170](https://github.com/influxdb/telegraf/issues/170): Systemd support
- [#175](https://github.com/influxdb/telegraf/issues/175): Set write precision before gathering metrics
- [#178](https://github.com/influxdb/telegraf/issues/178): redis plugin, multiple server thread hang bug
- Fix net plugin on darwin
- [#84](https://github.com/influxdb/telegraf/issues/84): Fix docker plugin on CentOS. Thanks @neezgee!
## v0.1.8 [2015-09-04]

View File

@ -1,19 +1,22 @@
UNAME := $(shell sh -c 'uname')
VERSION := $(shell sh -c 'git describe --always --tags')
ifndef GOBIN
GOBIN = $(GOPATH)/bin
endif
build: prepare
$(GOPATH)/bin/godep go build -o telegraf -ldflags \
$(GOBIN)/godep go build -o telegraf -ldflags \
"-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
build-linux-bins: prepare
GOARCH=amd64 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_amd64 \
GOARCH=amd64 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_amd64 \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=386 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_386 \
GOARCH=386 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_386 \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=arm GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_arm \
GOARCH=arm GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_arm \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
@ -30,10 +33,10 @@ ifeq ($(UNAME), Linux)
endif
test: prepare docker-compose
$(GOPATH)/bin/godep go test -v ./...
$(GOBIN)/godep go test -v ./...
test-short: prepare
$(GOPATH)/bin/godep go test -short ./...
$(GOBIN)/godep go test -short ./...
test-cleanup:
docker-compose --file scripts/docker-compose.yml kill

View File

@ -25,7 +25,11 @@ type BatchPoints struct {
}
// Add adds a measurement
func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]string) {
func (bp *BatchPoints) Add(
measurement string,
val interface{},
tags map[string]string,
) {
bp.mu.Lock()
defer bp.mu.Unlock()

View File

@ -84,6 +84,9 @@ func NewAgent(config *Config) (*Agent, error) {
// Connect connects to all configured outputs
func (a *Agent) Connect() error {
for _, o := range a.outputs {
if a.Debug {
log.Printf("Attempting connection to output: %s\n", o.name)
}
err := o.output.Connect()
if err != nil {
return err
@ -193,16 +196,17 @@ func (a *Agent) crankParallel() error {
go func(plugin *runningPlugin) {
defer wg.Done()
var acc BatchPoints
acc.Debug = a.Debug
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
var bp BatchPoints
bp.Debug = a.Debug
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
bp.Precision = a.Precision
if err := plugin.plugin.Gather(&acc); err != nil {
if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
}
points <- &acc
points <- &bp
}(plugin)
}
@ -230,6 +234,7 @@ func (a *Agent) crank() error {
var bp BatchPoints
bp.Debug = a.Debug
bp.Precision = a.Precision
for _, plugin := range a.plugins {
bp.Prefix = plugin.name + "_"
@ -245,7 +250,6 @@ func (a *Agent) crank() error {
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Precision = a.Precision
return a.flush(&bp)
}
@ -263,6 +267,7 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
bp.Precision = a.Precision
if err := plugin.plugin.Gather(&bp); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.name, err)
@ -274,7 +279,6 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Precision = a.Precision
if err := a.flush(&bp); err != nil {
outerr = errors.New("Error encountered processing plugins & outputs")

View File

@ -37,7 +37,7 @@
[outputs]
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
urls = ["http://localhost:8086"] # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.

View File

@ -1,8 +1,10 @@
package influxdb
import (
"errors"
"fmt"
"log"
"math/rand"
"net/url"
"strings"
@ -12,19 +14,23 @@ import (
)
type InfluxDB struct {
// URL is only for backwards compatability
URL string
URLs []string `toml:"urls"`
Username string
Password string
Database string
UserAgent string
Timeout t.Duration
conn *client.Client
conns []*client.Client
}
var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
# Multiple urls can be specified for InfluxDB cluster support. Server to
# write to will be randomly chosen each interval.
urls = ["http://localhost:8086"] # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
@ -42,33 +48,58 @@ var sampleConfig = `
`
func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL)
if err != nil {
return err
var urls []*url.URL
for _, URL := range i.URLs {
u, err := url.Parse(URL)
if err != nil {
return err
}
urls = append(urls, u)
}
c, err := client.NewClient(client.Config{
URL: *u,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})
if err != nil {
return err
// Backward-compatability with single Influx URL config files
// This could eventually be removed in favor of specifying the urls as a list
if i.URL != "" {
u, err := url.Parse(i.URL)
if err != nil {
return err
}
urls = append(urls, u)
}
_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
})
if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
var conns []*client.Client
for _, parsed_url := range urls {
c, err := client.NewClient(client.Config{
URL: *parsed_url,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})
if err != nil {
return err
}
conns = append(conns, c)
}
i.conn = c
return nil
// This will get set to nil if a successful connection is made
err := errors.New("Could not create database on any server")
for _, conn := range conns {
_, e := conn.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE %s", i.Database),
})
if e != nil && !strings.Contains(e.Error(), "database already exists") {
log.Println("ERROR: " + e.Error())
} else {
err = nil
break
}
}
i.conns = conns
return err
}
func (i *InfluxDB) Close() error {
@ -84,12 +115,24 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to"
}
// Choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(bp client.BatchPoints) error {
bp.Database = i.Database
if _, err := i.conn.Write(bp); err != nil {
return err
// This will get set to nil if a successful write occurs
err := errors.New("Could not write to any InfluxDB server in cluster")
p := rand.Perm(len(i.conns))
for _, n := range p {
if _, e := i.conns[n].Write(bp); e != nil {
log.Println("ERROR: " + e.Error())
} else {
err = nil
break
}
}
return nil
return err
}
func init() {

View File

@ -15,9 +15,6 @@ import (
type Redis struct {
Servers []string
c net.Conn
buf []byte
}
var sampleConfig = `
@ -112,41 +109,37 @@ func (r *Redis) Gather(acc plugins.Accumulator) error {
const defaultPort = "6379"
func (r *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
if r.c == nil {
_, _, err := net.SplitHostPort(addr.Host)
if err != nil {
addr.Host = addr.Host + ":" + defaultPort
}
c, err := net.Dial("tcp", addr.Host)
if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
}
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
rdr := bufio.NewReader(c)
line, err := rdr.ReadString('\n')
if err != nil {
return err
}
if line[0] != '+' {
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
}
}
}
r.c = c
_, _, err := net.SplitHostPort(addr.Host)
if err != nil {
addr.Host = addr.Host + ":" + defaultPort
}
r.c.Write([]byte("info\r\n"))
c, err := net.Dial("tcp", addr.Host)
if err != nil {
return fmt.Errorf("Unable to connect to redis server '%s': %s", addr.Host, err)
}
defer c.Close()
rdr := bufio.NewReader(r.c)
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
rdr := bufio.NewReader(c)
line, err := rdr.ReadString('\n')
if err != nil {
return err
}
if line[0] != '+' {
return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
}
}
}
c.Write([]byte("info\r\n"))
rdr := bufio.NewReader(c)
line, err := rdr.ReadString('\n')
if err != nil {

View File

@ -4,6 +4,7 @@ package docker
import (
"encoding/json"
"os"
"os/exec"
"path"
"strconv"
@ -48,9 +49,13 @@ func CgroupCPU(containerid string, base string) (*cpu.CPUTimesStat, error) {
if len(base) == 0 {
base = "/sys/fs/cgroup/cpuacct/docker"
}
path := path.Join(base, containerid, "cpuacct.stat")
statfile := path.Join(base, containerid, "cpuacct.stat")
lines, err := common.ReadLines(path)
if _, err := os.Stat(statfile); os.IsNotExist(err) {
statfile = path.Join("/sys/fs/cgroup/cpuacct/system.slice", "docker-" + containerid + ".scope", "cpuacct.stat")
}
lines, err := common.ReadLines(statfile)
if err != nil {
return nil, err
}
@ -86,12 +91,17 @@ func CgroupMem(containerid string, base string) (*CgroupMemStat, error) {
if len(base) == 0 {
base = "/sys/fs/cgroup/memory/docker"
}
path := path.Join(base, containerid, "memory.stat")
statfile := path.Join(base, containerid, "memory.stat")
if _, err := os.Stat(statfile); os.IsNotExist(err) {
statfile = path.Join("/sys/fs/cgroup/memory/system.slice", "docker-" + containerid + ".scope", "memory.stat")
}
// empty containerid means all cgroup
if len(containerid) == 0 {
containerid = "all"
}
lines, err := common.ReadLines(path)
lines, err := common.ReadLines(statfile)
if err != nil {
return nil, err
}

View File

@ -7,7 +7,7 @@ import (
"strconv"
"strings"
"github.com/influxdb/telegraf/plugins/system/ps/common"
"github.com/shirou/gopsutil/common"
)
func NetIOCounters(pernic bool) ([]NetIOCountersStat, error) {
@ -26,7 +26,7 @@ func NetIOCounters(pernic bool) ([]NetIOCountersStat, error) {
// skip first line
continue
}
if common.StringContains(exists, values[0]) {
if common.StringsHas(exists, values[0]) {
// skip if already get
continue
}
@ -38,11 +38,14 @@ func NetIOCounters(pernic bool) ([]NetIOCountersStat, error) {
base = 0
}
parsed := make([]uint64, 0, 3)
parsed := make([]uint64, 0, 6)
vv := []string{
values[base+3], // PacketsRecv
values[base+4], // Errin
values[base+5], // Dropin
values[base+3], // Ipkts == PacketsRecv
values[base+4], // Ierrs == Errin
values[base+5], // Ibytes == BytesRecv
values[base+6], // Opkts == PacketsSent
values[base+7], // Oerrs == Errout
values[base+8], // Obytes == BytesSent
}
for _, target := range vv {
if target == "-" {
@ -61,7 +64,10 @@ func NetIOCounters(pernic bool) ([]NetIOCountersStat, error) {
Name: values[0],
PacketsRecv: parsed[0],
Errin: parsed[1],
Dropin: parsed[2],
BytesRecv: parsed[2],
PacketsSent: parsed[3],
Errout: parsed[4],
BytesSent: parsed[5],
}
ret = append(ret, n)
}