Compare commits

...

17 Commits

Author SHA1 Message Date
Cameron Sparr
6260dd1018 Makefile rule for building all linux binaries, and upload all ARCHs 2015-09-04 14:12:50 -06:00
Cameron Sparr
e47801074e package.sh script fixes for uploading binaries 2015-09-04 13:19:13 -06:00
Cameron Sparr
6d42973d7c Update package script and readme for 0.1.8 2015-09-04 12:53:29 -06:00
Cameron Sparr
68e41f130c Ping plugin
Closes #167
2015-09-04 11:20:49 -06:00
Cameron Sparr
65b33a848e Fix default installed config for consistency 2015-09-02 14:25:40 -06:00
Cameron Sparr
5bfb6df0e0 Write data in UTC by default and use 's' precision
Closes #159
Closes #162
2015-09-02 14:19:36 -06:00
Cameron Sparr
13061d1ec7 package.sh: upload raw binaries to S3
Closes #166
2015-09-02 12:05:29 -06:00
nickscript0
0143a4227e add additional metrics to mysql plugin tests
Closes #165
2015-09-02 11:49:16 -06:00
nickscript0
3f63bcde12 add additional MySQL metrics 2015-09-02 11:48:38 -06:00
Michael Wood
b86c6bba4e README: Say when tagpass/tagdrop are valid from.
closes #163
2015-09-02 09:33:05 -06:00
Cameron Sparr
4d19fc0860 Fixup for g->r change, io.reader was already using 'r' 2015-08-31 16:15:38 -06:00
Cameron Sparr
9c57c30e57 Redis plugin internal names consistency fix, g -> r 2015-08-31 15:57:52 -06:00
Cameron Sparr
9969c4e810 Add system uptime metric, string formatted AND in float64
closes #150
2015-08-31 14:43:34 -06:00
Alexander Oleinik
e2bc5d80c9 Apache Plugin
Closes #158
Fixes #132
2015-08-31 10:17:18 -06:00
Michael Desa
ab191e2b58 Rename DEPENDENCY_LICENSES LICENSE_OF_DEPENDENCIES
Closes #155
Closes #154
2015-08-28 19:37:23 -06:00
Michael Desa
d418a6e872 Add list of dependency licenses 2015-08-28 16:17:46 -07:00
Cameron Sparr
bdfd1aef62 Update README with 0.1.7 and make separate CONTRIBUTING file 2015-08-28 10:21:22 -06:00
53 changed files with 4129 additions and 227 deletions

View File

@@ -1,4 +1,21 @@
## v0.1.7 [unreleased]
## v0.1.8 [unreleased]
### Release Notes
- Telegraf will now write data in UTC at second precision by default
- Now using Go 1.5 to build telegraf
### Features
- [#150](https://github.com/influxdb/telegraf/pull/150): Add Host Uptime metric to system plugin
- [#158](https://github.com/influxdb/telegraf/pull/158): Apache Plugin. Thanks @KPACHbIuLLIAnO4
- [#159](https://github.com/influxdb/telegraf/pull/159): Use second precision for InfluxDB writes
- [#165](https://github.com/influxdb/telegraf/pull/165): Add additional metrics to mysql plugin. Thanks @nickscript0
- [#162](https://github.com/influxdb/telegraf/pull/162): Write UTC by default, provide option
- [#166](https://github.com/influxdb/telegraf/pull/166): Upload binaries to S3
- [#169](https://github.com/influxdb/telegraf/pull/169): Ping plugin
### Bugfixes
## v0.1.7 [2015-08-28]
### Features
- [#38](https://github.com/influxdb/telegraf/pull/38): Kafka output producer.

130
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,130 @@
## Sign the CLA
Before we can merge a pull request, you will need to sign the CLA,
which can be found [on our website](http://influxdb.com/community/cla.html)
## Plugins
This section is for developers that want to create new collection plugins.
Telegraf is entirely plugin driven. This interface allows for operators to
pick and chose what is gathered as well as makes it easy for developers
to create new ways of generating metrics.
Plugin authorship is kept as simple as possible to promote people to develop
and submit new plugins.
### Plugin Guidelines
* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdb/telegraf/plugins/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
plugin can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this plugin does.
### Plugin interface
```go
type Plugin interface {
SampleConfig() string
Description() string
Gather(Accumulator) error
}
type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddValuesWithTime(measurement string,
values map[string]interface{},
tags map[string]string,
timestamp time.Time)
}
```
### Accumulator
The way that a plugin emits metrics is by interacting with the Accumulator.
The `Add` function takes 3 arguments:
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
* **value**: A value for the metric. This accepts 5 different types of value:
* **int**: The most common type. All int types are accepted but favor using `int64`
Useful for counters, etc.
* **float**: Favor `float64`, useful for gauges, percentages, etc.
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
* **string**: Typically used to indicate a message, or some kind of freeform information.
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
* **tags**: This is a map of strings to strings to describe the where or who
about the metric. For instance, the `net` plugin adds a tag named `"interface"`
set to the name of the network interface, like `"eth0"`.
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
used are the same type profile as **value** above. The **timestamp** argument
allows a point to be registered as having occurred at an arbitrary time.
Let's say you've written a plugin that emits metrics about processes on the current host.
```go
type Process struct {
CPUTime float64
MemoryBytes int64
PID int
}
func Gather(acc plugins.Accumulator) error {
for _, process := range system.Processes() {
tags := map[string]string {
"pid": fmt.Sprintf("%d", process.Pid),
}
acc.Add("cpu", process.CPUTime, tags)
acc.Add("memory", process.MemoryBytes, tags)
}
}
```
### Example
```go
package simple
// simple.go
import "github.com/influxdb/telegraf/plugins"
type Simple struct {
Ok bool
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return "ok = true # indicate if everything is fine"
}
func (s *Simple) Gather(acc plugins.Accumulator) error {
if s.Ok {
acc.Add("state", "pretty good", nil)
} else {
acc.Add("state", "not great", nil)
}
return nil
}
func init() {
plugins.Add("simple", func() plugins.Plugin { return &Simple{} })
}
```
## Outputs
TODO: this section will describe requirements for contributing an output

7
Godeps/Godeps.json generated
View File

@@ -1,6 +1,6 @@
{
"ImportPath": "github.com/influxdb/telegraf",
"GoVersion": "go1.4.2",
"GoVersion": "go1.5",
"Packages": [
"./..."
],
@@ -28,6 +28,11 @@
"ImportPath": "github.com/cenkalti/backoff",
"Rev": "4dc77674aceaabba2c7e3da25d4c823edfb73f99"
},
{
"ImportPath": "github.com/cloudfoundry/gosigar",
"Comment": "scotty_09012012-27-g3ed7c74",
"Rev": "3ed7c74352dae6dc00bdc8c74045375352e3ec05"
},
{
"ImportPath": "github.com/dancannon/gorethink/encoding",
"Comment": "v1.x.x-1-g786f12a",

View File

@@ -0,0 +1 @@
.vagrant

View File

@@ -0,0 +1,8 @@
language: go
go:
- 1.2
install:
- 'go install github.com/onsi/ginkgo/ginkgo'
script: 'ginkgo -r'

View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,9 @@
Copyright (c) [2009-2011] VMware, Inc. All Rights Reserved.
This product is licensed to you under the Apache License, Version 2.0 (the "License").
You may not use this product except in compliance with the License.
This product includes a number of subcomponents with
separate copyright notices and license terms. Your use of these
subcomponents is subject to the terms and conditions of the
subcomponent's license, as noted in the LICENSE file.

View File

@@ -0,0 +1,22 @@
# Go sigar
## Overview
Go sigar is a golang implementation of the
[sigar API](https://github.com/hyperic/sigar). The Go version of
sigar has a very similar interface, but is being written from scratch
in pure go/cgo, rather than cgo bindings for libsigar.
## Test drive
$ go get github.com/cloudfoundry/gosigar
$ cd $GOPATH/src/github.com/cloudfoundry/gosigar/examples
$ go run uptime.go
## Supported platforms
Currently targeting modern flavors of darwin and linux.
## License
Apache 2.0

View File

@@ -0,0 +1,25 @@
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "hashicorp/precise64"
config.vm.provision "shell", inline: "mkdir -p /home/vagrant/go"
config.vm.synced_folder ".", "/home/vagrant/go/src/github.com/cloudfoundry/gosigar"
config.vm.provision "shell", inline: "chown -R vagrant:vagrant /home/vagrant/go"
install_go = <<-BASH
set -e
if [ ! -d "/usr/local/go" ]; then
cd /tmp && wget https://storage.googleapis.com/golang/go1.3.3.linux-amd64.tar.gz
cd /usr/local
tar xvzf /tmp/go1.3.3.linux-amd64.tar.gz
echo 'export GOPATH=/home/vagrant/go; export PATH=/usr/local/go/bin:$PATH:$GOPATH/bin' >> /home/vagrant/.bashrc
fi
export GOPATH=/home/vagrant/go
export PATH=/usr/local/go/bin:$PATH:$GOPATH/bin
/usr/local/go/bin/go get -u github.com/onsi/ginkgo/ginkgo
/usr/local/go/bin/go get -u github.com/onsi/gomega;
BASH
config.vm.provision "shell", inline: 'apt-get install -y git-core'
config.vm.provision "shell", inline: install_go
end

View File

@@ -0,0 +1,69 @@
package sigar
import (
"time"
)
type ConcreteSigar struct{}
func (c *ConcreteSigar) CollectCpuStats(collectionInterval time.Duration) (<-chan Cpu, chan<- struct{}) {
// samplesCh is buffered to 1 value to immediately return first CPU sample
samplesCh := make(chan Cpu, 1)
stopCh := make(chan struct{})
go func() {
var cpuUsage Cpu
// Immediately provide non-delta value.
// samplesCh is buffered to 1 value, so it will not block.
cpuUsage.Get()
samplesCh <- cpuUsage
ticker := time.NewTicker(collectionInterval)
for {
select {
case <-ticker.C:
previousCpuUsage := cpuUsage
cpuUsage.Get()
select {
case samplesCh <- cpuUsage.Delta(previousCpuUsage):
default:
// Include default to avoid channel blocking
}
case <-stopCh:
return
}
}
}()
return samplesCh, stopCh
}
func (c *ConcreteSigar) GetLoadAverage() (LoadAverage, error) {
l := LoadAverage{}
err := l.Get()
return l, err
}
func (c *ConcreteSigar) GetMem() (Mem, error) {
m := Mem{}
err := m.Get()
return m, err
}
func (c *ConcreteSigar) GetSwap() (Swap, error) {
s := Swap{}
err := s.Get()
return s, err
}
func (c *ConcreteSigar) GetFileSystemUsage(path string) (FileSystemUsage, error) {
f := FileSystemUsage{}
err := f.Get(path)
return f, err
}

View File

@@ -0,0 +1,85 @@
package sigar_test
import (
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
sigar "github.com/cloudfoundry/gosigar"
)
var _ = Describe("ConcreteSigar", func() {
var concreteSigar *sigar.ConcreteSigar
BeforeEach(func() {
concreteSigar = &sigar.ConcreteSigar{}
})
Describe("CollectCpuStats", func() {
It("immediately makes first CPU usage available even though it's not very accurate", func() {
samplesCh, stop := concreteSigar.CollectCpuStats(500 * time.Millisecond)
firstValue := <-samplesCh
Expect(firstValue.User).To(BeNumerically(">", 0))
stop <- struct{}{}
})
It("makes CPU usage delta values available", func() {
samplesCh, stop := concreteSigar.CollectCpuStats(500 * time.Millisecond)
firstValue := <-samplesCh
secondValue := <-samplesCh
Expect(secondValue.User).To(BeNumerically("<", firstValue.User))
stop <- struct{}{}
})
It("does not block", func() {
_, stop := concreteSigar.CollectCpuStats(10 * time.Millisecond)
// Sleep long enough for samplesCh to fill at least 2 values
time.Sleep(20 * time.Millisecond)
stop <- struct{}{}
// If CollectCpuStats blocks it will never get here
Expect(true).To(BeTrue())
})
})
It("GetLoadAverage", func() {
avg, err := concreteSigar.GetLoadAverage()
Expect(avg.One).ToNot(BeNil())
Expect(avg.Five).ToNot(BeNil())
Expect(avg.Fifteen).ToNot(BeNil())
Expect(err).ToNot(HaveOccurred())
})
It("GetMem", func() {
mem, err := concreteSigar.GetMem()
Expect(err).ToNot(HaveOccurred())
Expect(mem.Total).To(BeNumerically(">", 0))
Expect(mem.Used + mem.Free).To(BeNumerically("<=", mem.Total))
})
It("GetSwap", func() {
swap, err := concreteSigar.GetSwap()
Expect(err).ToNot(HaveOccurred())
Expect(swap.Used + swap.Free).To(BeNumerically("<=", swap.Total))
})
It("GetSwap", func() {
fsusage, err := concreteSigar.GetFileSystemUsage("/")
Expect(err).ToNot(HaveOccurred())
Expect(fsusage.Total).ToNot(BeNil())
fsusage, err = concreteSigar.GetFileSystemUsage("T O T A L L Y B O G U S")
Expect(err).To(HaveOccurred())
Expect(fsusage.Total).To(Equal(uint64(0)))
})
})

View File

@@ -0,0 +1,52 @@
package main
import (
"fmt"
"time"
"github.com/cloudfoundry/gosigar"
)
func main() {
cpus := sigar.CpuList{}
cpus.Get()
tcpu := getOverallCpu(cpus)
for i, cpu := range cpus.List {
fmt.Printf("CPU%d Ticks: %d\n", i, cpu.Total())
}
fmt.Printf("Total CPU Ticks: %d\n", tcpu.Total())
fmt.Printf("Total CPU Time: %d\n", tcpu.Total()/128)
fmt.Printf("User CPU Time: %d\n", tcpu.User/128)
time.Sleep(1 * time.Second)
tcpu2 := sigar.Cpu{}
tcpu2.Get()
dcpu := tcpu2.Delta(tcpu)
tcpuDelta := tcpu2.Total() - tcpu.Total()
iPercentage := 100.0 * float64(dcpu.Idle) / float64(tcpuDelta)
fmt.Printf("Idle percentage: %f\n", iPercentage)
bPercentage := 100.0 * float64(busy(tcpu2)-busy(tcpu)) / float64(tcpuDelta)
fmt.Printf("Busy percentage: %f\n", bPercentage)
}
func busy(c sigar.Cpu) uint64 {
return c.Total() - c.Idle
}
func getOverallCpu(cl sigar.CpuList) sigar.Cpu {
var overallCpu sigar.Cpu
for _, c := range cl.List {
overallCpu.User += c.User
overallCpu.Nice += c.Nice
overallCpu.Sys += c.Sys
overallCpu.Idle += c.Idle
overallCpu.Wait += c.Wait
overallCpu.Irq += c.Irq
overallCpu.SoftIrq += c.SoftIrq
overallCpu.Stolen += c.Stolen
}
return overallCpu
}

View File

@@ -0,0 +1,39 @@
// Copyright (c) 2012 VMware, Inc.
package main
import (
"fmt"
"github.com/cloudfoundry/gosigar"
"os"
)
const output_format = "%-15s %4s %4s %5s %4s %-15s\n"
func formatSize(size uint64) string {
return sigar.FormatSize(size * 1024)
}
func main() {
fslist := sigar.FileSystemList{}
fslist.Get()
fmt.Fprintf(os.Stdout, output_format,
"Filesystem", "Size", "Used", "Avail", "Use%", "Mounted on")
for _, fs := range fslist.List {
dir_name := fs.DirName
usage := sigar.FileSystemUsage{}
usage.Get(dir_name)
fmt.Fprintf(os.Stdout, output_format,
fs.DevName,
formatSize(usage.Total),
formatSize(usage.Used),
formatSize(usage.Avail),
sigar.FormatPercent(usage.UsePercent()),
dir_name)
}
}

View File

@@ -0,0 +1,33 @@
// Copyright (c) 2012 VMware, Inc.
package main
import (
"fmt"
"github.com/cloudfoundry/gosigar"
"os"
)
func format(val uint64) uint64 {
return val / 1024
}
func main() {
mem := sigar.Mem{}
swap := sigar.Swap{}
mem.Get()
swap.Get()
fmt.Fprintf(os.Stdout, "%18s %10s %10s\n",
"total", "used", "free")
fmt.Fprintf(os.Stdout, "Mem: %10d %10d %10d\n",
format(mem.Total), format(mem.Used), format(mem.Free))
fmt.Fprintf(os.Stdout, "-/+ buffers/cache: %10d %10d\n",
format(mem.ActualUsed), format(mem.ActualFree))
fmt.Fprintf(os.Stdout, "Swap: %10d %10d %10d\n",
format(swap.Total), format(swap.Used), format(swap.Free))
}

View File

@@ -0,0 +1,37 @@
// Copyright (c) 2012 VMware, Inc.
package main
import (
"fmt"
"github.com/cloudfoundry/gosigar"
)
func main() {
pids := sigar.ProcList{}
pids.Get()
// ps -eo pid,ppid,stime,time,rss,state,comm
fmt.Print(" PID PPID STIME TIME RSS S COMMAND\n")
for _, pid := range pids.List {
state := sigar.ProcState{}
mem := sigar.ProcMem{}
time := sigar.ProcTime{}
if err := state.Get(pid); err != nil {
continue
}
if err := mem.Get(pid); err != nil {
continue
}
if err := time.Get(pid); err != nil {
continue
}
fmt.Printf("%5d %5d %s %s %6d %c %s\n",
pid, state.Ppid,
time.FormatStartTime(), time.FormatTotal(),
mem.Resident/1024, state.State, state.Name)
}
}

View File

@@ -0,0 +1,27 @@
// Copyright (c) 2012 VMware, Inc.
package main
import (
"fmt"
"github.com/cloudfoundry/gosigar"
"os"
"time"
)
func main() {
concreteSigar := sigar.ConcreteSigar{}
uptime := sigar.Uptime{}
uptime.Get()
avg, err := concreteSigar.GetLoadAverage()
if err != nil {
fmt.Printf("Failed to get load average")
return
}
fmt.Fprintf(os.Stdout, " %s up %s load average: %.2f, %.2f, %.2f\n",
time.Now().Format("15:04:05"),
uptime.Format(),
avg.One, avg.Five, avg.Fifteen)
}

View File

@@ -0,0 +1,72 @@
package fakes
import (
"time"
sigar "github.com/cloudfoundry/gosigar"
)
type FakeSigar struct {
LoadAverage sigar.LoadAverage
LoadAverageErr error
Mem sigar.Mem
MemErr error
Swap sigar.Swap
SwapErr error
FileSystemUsage sigar.FileSystemUsage
FileSystemUsageErr error
FileSystemUsagePath string
CollectCpuStatsCpuCh chan sigar.Cpu
CollectCpuStatsStopCh chan struct{}
}
func NewFakeSigar() *FakeSigar {
return &FakeSigar{
CollectCpuStatsCpuCh: make(chan sigar.Cpu, 1),
CollectCpuStatsStopCh: make(chan struct{}),
}
}
func (f *FakeSigar) CollectCpuStats(collectionInterval time.Duration) (<-chan sigar.Cpu, chan<- struct{}) {
samplesCh := make(chan sigar.Cpu, 1)
stopCh := make(chan struct{})
go func() {
for {
select {
case cpuStat := <-f.CollectCpuStatsCpuCh:
select {
case samplesCh <- cpuStat:
default:
// Include default to avoid channel blocking
}
case <-f.CollectCpuStatsStopCh:
return
}
}
}()
return samplesCh, stopCh
}
func (f *FakeSigar) GetLoadAverage() (sigar.LoadAverage, error) {
return f.LoadAverage, f.LoadAverageErr
}
func (f *FakeSigar) GetMem() (sigar.Mem, error) {
return f.Mem, f.MemErr
}
func (f *FakeSigar) GetSwap() (sigar.Swap, error) {
return f.Swap, f.SwapErr
}
func (f *FakeSigar) GetFileSystemUsage(path string) (sigar.FileSystemUsage, error) {
f.FileSystemUsagePath = path
return f.FileSystemUsage, f.FileSystemUsageErr
}

View File

@@ -0,0 +1,50 @@
# Process notifications for Go
## Overview
The psnotify package captures process events from the kernel via
kqueue on Darwin/BSD and the netlink connector on Linux.
The psnotify API is similar to the
[fsnotify](https://github.com/howeyc/fsnotify) package.
Example:
```go
watcher, err := psnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
// Process events
go func() {
for {
select {
case ev := <-watcher.Fork:
log.Println("fork event:", ev)
case ev := <-watcher.Exec:
log.Println("exec event:", ev)
case ev := <-watcher.Exit:
log.Println("exit event:", ev)
case err := <-watcher.Error:
log.Println("error:", err)
}
}
}()
err = watcher.Watch(os.Getpid(), psnotify.PROC_EVENT_ALL)
if err != nil {
log.Fatal(err)
}
/* ... do stuff ... */
watcher.Close()
```
## Supported platforms
Currently targeting modern flavors of Darwin and Linux.
Should work on BSD, but untested.
## License
Apache 2.0

View File

@@ -0,0 +1,136 @@
// Copyright (c) 2012 VMware, Inc.
package psnotify
import (
"errors"
"fmt"
)
type ProcEventFork struct {
ParentPid int // Pid of the process that called fork()
ChildPid int // Child process pid created by fork()
}
type ProcEventExec struct {
Pid int // Pid of the process that called exec()
}
type ProcEventExit struct {
Pid int // Pid of the process that called exit()
}
type watch struct {
flags uint32 // Saved value of Watch() flags param
}
type eventListener interface {
close() error // Watch.Close() closes the OS specific listener
}
type Watcher struct {
listener eventListener // OS specifics (kqueue or netlink)
watches map[int]*watch // Map of watched process ids
Error chan error // Errors are sent on this channel
Fork chan *ProcEventFork // Fork events are sent on this channel
Exec chan *ProcEventExec // Exec events are sent on this channel
Exit chan *ProcEventExit // Exit events are sent on this channel
done chan bool // Used to stop the readEvents() goroutine
isClosed bool // Set to true when Close() is first called
}
// Initialize event listener and channels
func NewWatcher() (*Watcher, error) {
listener, err := createListener()
if err != nil {
return nil, err
}
w := &Watcher{
listener: listener,
watches: make(map[int]*watch),
Fork: make(chan *ProcEventFork),
Exec: make(chan *ProcEventExec),
Exit: make(chan *ProcEventExit),
Error: make(chan error),
done: make(chan bool, 1),
}
go w.readEvents()
return w, nil
}
// Close event channels when done message is received
func (w *Watcher) finish() {
close(w.Fork)
close(w.Exec)
close(w.Exit)
close(w.Error)
}
// Closes the OS specific event listener,
// removes all watches and closes all event channels.
func (w *Watcher) Close() error {
if w.isClosed {
return nil
}
w.isClosed = true
for pid := range w.watches {
w.RemoveWatch(pid)
}
w.done <- true
w.listener.close()
return nil
}
// Add pid to the watched process set.
// The flags param is a bitmask of process events to capture,
// must be one or more of: PROC_EVENT_FORK, PROC_EVENT_EXEC, PROC_EVENT_EXIT
func (w *Watcher) Watch(pid int, flags uint32) error {
if w.isClosed {
return errors.New("psnotify watcher is closed")
}
watchEntry, found := w.watches[pid]
if found {
watchEntry.flags |= flags
} else {
if err := w.register(pid, flags); err != nil {
return err
}
w.watches[pid] = &watch{flags: flags}
}
return nil
}
// Remove pid from the watched process set.
func (w *Watcher) RemoveWatch(pid int) error {
_, ok := w.watches[pid]
if !ok {
msg := fmt.Sprintf("watch for pid=%d does not exist", pid)
return errors.New(msg)
}
delete(w.watches, pid)
return w.unregister(pid)
}
// Internal helper to check if there is a message on the "done" channel.
// The "done" message is sent by the Close() method; when received here,
// the Watcher.finish method is called to close all channels and return
// true - in which case the caller should break from the readEvents loop.
func (w *Watcher) isDone() bool {
var done bool
select {
case done = <-w.done:
w.finish()
default:
}
return done
}

View File

@@ -0,0 +1,93 @@
// Copyright (c) 2012 VMware, Inc.
// +build darwin freebsd netbsd openbsd
// Go interface to BSD kqueue process events.
package psnotify
import (
"syscall"
)
const (
// Flags (from <sys/event.h>)
PROC_EVENT_FORK = syscall.NOTE_FORK // fork() events
PROC_EVENT_EXEC = syscall.NOTE_EXEC // exec() events
PROC_EVENT_EXIT = syscall.NOTE_EXIT // exit() events
// Watch for all process events
PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
)
type kqueueListener struct {
kq int // The syscall.Kqueue() file descriptor
buf [1]syscall.Kevent_t // An event buffer for Add/Remove watch
}
// Initialize bsd implementation of the eventListener interface
func createListener() (eventListener, error) {
listener := &kqueueListener{}
kq, err := syscall.Kqueue()
listener.kq = kq
return listener, err
}
// Initialize Kevent_t fields and propagate changelist for the given pid
func (w *Watcher) kevent(pid int, fflags uint32, flags int) error {
listener, _ := w.listener.(*kqueueListener)
event := &listener.buf[0]
syscall.SetKevent(event, pid, syscall.EVFILT_PROC, flags)
event.Fflags = fflags
_, err := syscall.Kevent(listener.kq, listener.buf[:], nil, nil)
return err
}
// Delete filter for given pid from the queue
func (w *Watcher) unregister(pid int) error {
return w.kevent(pid, 0, syscall.EV_DELETE)
}
// Add and enable filter for given pid in the queue
func (w *Watcher) register(pid int, flags uint32) error {
return w.kevent(pid, flags, syscall.EV_ADD|syscall.EV_ENABLE)
}
// Poll the kqueue file descriptor and dispatch to the Event channels
func (w *Watcher) readEvents() {
listener, _ := w.listener.(*kqueueListener)
events := make([]syscall.Kevent_t, 10)
for {
if w.isDone() {
return
}
n, err := syscall.Kevent(listener.kq, nil, events, nil)
if err != nil {
w.Error <- err
continue
}
for _, ev := range events[:n] {
pid := int(ev.Ident)
switch ev.Fflags {
case syscall.NOTE_FORK:
w.Fork <- &ProcEventFork{ParentPid: pid}
case syscall.NOTE_EXEC:
w.Exec <- &ProcEventExec{Pid: pid}
case syscall.NOTE_EXIT:
w.RemoveWatch(pid)
w.Exit <- &ProcEventExit{Pid: pid}
}
}
}
}
// Close our kqueue file descriptor; deletes any remaining filters
func (listener *kqueueListener) close() error {
return syscall.Close(listener.kq)
}

View File

@@ -0,0 +1,253 @@
// Copyright (c) 2012 VMware, Inc.
// Go interface to the Linux netlink process connector.
// See Documentation/connector/connector.txt in the linux kernel source tree.
package psnotify
import (
"bytes"
"encoding/binary"
"os"
"syscall"
)
const (
// internal flags (from <linux/connector.h>)
_CN_IDX_PROC = 0x1
_CN_VAL_PROC = 0x1
// internal flags (from <linux/cn_proc.h>)
_PROC_CN_MCAST_LISTEN = 1
_PROC_CN_MCAST_IGNORE = 2
// Flags (from <linux/cn_proc.h>)
PROC_EVENT_FORK = 0x00000001 // fork() events
PROC_EVENT_EXEC = 0x00000002 // exec() events
PROC_EVENT_EXIT = 0x80000000 // exit() events
// Watch for all process events
PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
)
var (
byteOrder = binary.LittleEndian
)
// linux/connector.h: struct cb_id
type cbId struct {
Idx uint32
Val uint32
}
// linux/connector.h: struct cb_msg
type cnMsg struct {
Id cbId
Seq uint32
Ack uint32
Len uint16
Flags uint16
}
// linux/cn_proc.h: struct proc_event.{what,cpu,timestamp_ns}
type procEventHeader struct {
What uint32
Cpu uint32
Timestamp uint64
}
// linux/cn_proc.h: struct proc_event.fork
type forkProcEvent struct {
ParentPid uint32
ParentTgid uint32
ChildPid uint32
ChildTgid uint32
}
// linux/cn_proc.h: struct proc_event.exec
type execProcEvent struct {
ProcessPid uint32
ProcessTgid uint32
}
// linux/cn_proc.h: struct proc_event.exit
type exitProcEvent struct {
ProcessPid uint32
ProcessTgid uint32
ExitCode uint32
ExitSignal uint32
}
// standard netlink header + connector header
type netlinkProcMessage struct {
Header syscall.NlMsghdr
Data cnMsg
}
type netlinkListener struct {
addr *syscall.SockaddrNetlink // Netlink socket address
sock int // The syscall.Socket() file descriptor
seq uint32 // struct cn_msg.seq
}
// Initialize linux implementation of the eventListener interface
func createListener() (eventListener, error) {
listener := &netlinkListener{}
err := listener.bind()
return listener, err
}
// noop on linux
func (w *Watcher) unregister(pid int) error {
return nil
}
// noop on linux
func (w *Watcher) register(pid int, flags uint32) error {
return nil
}
// Read events from the netlink socket
func (w *Watcher) readEvents() {
buf := make([]byte, syscall.Getpagesize())
listener, _ := w.listener.(*netlinkListener)
for {
if w.isDone() {
return
}
nr, _, err := syscall.Recvfrom(listener.sock, buf, 0)
if err != nil {
w.Error <- err
continue
}
if nr < syscall.NLMSG_HDRLEN {
w.Error <- syscall.EINVAL
continue
}
msgs, _ := syscall.ParseNetlinkMessage(buf[:nr])
for _, m := range msgs {
if m.Header.Type == syscall.NLMSG_DONE {
w.handleEvent(m.Data)
}
}
}
}
// Internal helper to check if pid && event is being watched
func (w *Watcher) isWatching(pid int, event uint32) bool {
if watch, ok := w.watches[pid]; ok {
return (watch.flags & event) == event
}
return false
}
// Dispatch events from the netlink socket to the Event channels.
// Unlike bsd kqueue, netlink receives events for all pids,
// so we apply filtering based on the watch table via isWatching()
func (w *Watcher) handleEvent(data []byte) {
buf := bytes.NewBuffer(data)
msg := &cnMsg{}
hdr := &procEventHeader{}
binary.Read(buf, byteOrder, msg)
binary.Read(buf, byteOrder, hdr)
switch hdr.What {
case PROC_EVENT_FORK:
event := &forkProcEvent{}
binary.Read(buf, byteOrder, event)
ppid := int(event.ParentTgid)
pid := int(event.ChildTgid)
if w.isWatching(ppid, PROC_EVENT_EXEC) {
// follow forks
watch, _ := w.watches[ppid]
w.Watch(pid, watch.flags)
}
if w.isWatching(ppid, PROC_EVENT_FORK) {
w.Fork <- &ProcEventFork{ParentPid: ppid, ChildPid: pid}
}
case PROC_EVENT_EXEC:
event := &execProcEvent{}
binary.Read(buf, byteOrder, event)
pid := int(event.ProcessTgid)
if w.isWatching(pid, PROC_EVENT_EXEC) {
w.Exec <- &ProcEventExec{Pid: pid}
}
case PROC_EVENT_EXIT:
event := &exitProcEvent{}
binary.Read(buf, byteOrder, event)
pid := int(event.ProcessTgid)
if w.isWatching(pid, PROC_EVENT_EXIT) {
w.RemoveWatch(pid)
w.Exit <- &ProcEventExit{Pid: pid}
}
}
}
// Bind our netlink socket and
// send a listen control message to the connector driver.
func (listener *netlinkListener) bind() error {
sock, err := syscall.Socket(
syscall.AF_NETLINK,
syscall.SOCK_DGRAM,
syscall.NETLINK_CONNECTOR)
if err != nil {
return err
}
listener.sock = sock
listener.addr = &syscall.SockaddrNetlink{
Family: syscall.AF_NETLINK,
Groups: _CN_IDX_PROC,
}
err = syscall.Bind(listener.sock, listener.addr)
if err != nil {
return err
}
return listener.send(_PROC_CN_MCAST_LISTEN)
}
// Send an ignore control message to the connector driver
// and close our netlink socket.
func (listener *netlinkListener) close() error {
err := listener.send(_PROC_CN_MCAST_IGNORE)
syscall.Close(listener.sock)
return err
}
// Generic method for sending control messages to the connector
// driver; where op is one of PROC_CN_MCAST_{LISTEN,IGNORE}
func (listener *netlinkListener) send(op uint32) error {
listener.seq++
pr := &netlinkProcMessage{}
plen := binary.Size(pr.Data) + binary.Size(op)
pr.Header.Len = syscall.NLMSG_HDRLEN + uint32(plen)
pr.Header.Type = uint16(syscall.NLMSG_DONE)
pr.Header.Flags = 0
pr.Header.Seq = listener.seq
pr.Header.Pid = uint32(os.Getpid())
pr.Data.Id.Idx = _CN_IDX_PROC
pr.Data.Id.Val = _CN_VAL_PROC
pr.Data.Len = uint16(binary.Size(op))
buf := bytes.NewBuffer(make([]byte, 0, pr.Header.Len))
binary.Write(buf, byteOrder, pr)
binary.Write(buf, byteOrder, op)
return syscall.Sendto(listener.sock, buf.Bytes(), 0, listener.addr)
}

View File

@@ -0,0 +1,283 @@
// Copyright (c) 2012 VMware, Inc.
package psnotify
import (
"fmt"
"os"
"os/exec"
"runtime"
"syscall"
"testing"
"time"
)
type anyEvent struct {
exits []int
forks []int
execs []int
errors []error
done chan bool
}
type testWatcher struct {
t *testing.T
watcher *Watcher
events *anyEvent
}
// General purpose Watcher wrapper for all tests
func newTestWatcher(t *testing.T) *testWatcher {
watcher, err := NewWatcher()
if err != nil {
t.Fatal(err)
}
events := &anyEvent{
done: make(chan bool, 1),
}
tw := &testWatcher{
t: t,
watcher: watcher,
events: events,
}
go func() {
for {
select {
case <-events.done:
return
case ev := <-watcher.Fork:
events.forks = append(events.forks, ev.ParentPid)
case ev := <-watcher.Exec:
events.execs = append(events.execs, ev.Pid)
case ev := <-watcher.Exit:
events.exits = append(events.exits, ev.Pid)
case err := <-watcher.Error:
events.errors = append(events.errors, err)
}
}
}()
return tw
}
func (tw *testWatcher) close() {
pause := 100 * time.Millisecond
time.Sleep(pause)
tw.events.done <- true
tw.watcher.Close()
time.Sleep(pause)
}
func skipTest(t *testing.T) bool {
if runtime.GOOS == "linux" && os.Getuid() != 0 {
fmt.Println("SKIP: test must be run as root on linux")
return true
}
return false
}
func startSleepCommand(t *testing.T) *exec.Cmd {
cmd := exec.Command("sh", "-c", "sleep 100")
if err := cmd.Start(); err != nil {
t.Error(err)
}
return cmd
}
func runCommand(t *testing.T, name string) *exec.Cmd {
cmd := exec.Command(name)
if err := cmd.Run(); err != nil {
t.Error(err)
}
return cmd
}
func expectEvents(t *testing.T, num int, name string, pids []int) bool {
if len(pids) != num {
t.Errorf("Expected %d %s events, got=%v", num, name, pids)
return false
}
return true
}
func expectEventPid(t *testing.T, name string, expect int, pid int) bool {
if expect != pid {
t.Errorf("Expected %s pid=%d, received=%d", name, expect, pid)
return false
}
return true
}
func TestWatchFork(t *testing.T) {
if skipTest(t) {
return
}
pid := os.Getpid()
tw := newTestWatcher(t)
// no watches added yet, so this fork event will no be captured
runCommand(t, "date")
// watch fork events for this process
if err := tw.watcher.Watch(pid, PROC_EVENT_FORK); err != nil {
t.Error(err)
}
// this fork event will be captured,
// the exec and exit events will not be captured
runCommand(t, "cal")
tw.close()
if expectEvents(t, 1, "forks", tw.events.forks) {
expectEventPid(t, "fork", pid, tw.events.forks[0])
}
expectEvents(t, 0, "execs", tw.events.execs)
expectEvents(t, 0, "exits", tw.events.exits)
}
func TestWatchExit(t *testing.T) {
if skipTest(t) {
return
}
tw := newTestWatcher(t)
cmd := startSleepCommand(t)
childPid := cmd.Process.Pid
// watch for exit event of our child process
if err := tw.watcher.Watch(childPid, PROC_EVENT_EXIT); err != nil {
t.Error(err)
}
// kill our child process, triggers exit event
syscall.Kill(childPid, syscall.SIGTERM)
cmd.Wait()
tw.close()
expectEvents(t, 0, "forks", tw.events.forks)
expectEvents(t, 0, "execs", tw.events.execs)
if expectEvents(t, 1, "exits", tw.events.exits) {
expectEventPid(t, "exit", childPid, tw.events.exits[0])
}
}
// combined version of TestWatchFork() and TestWatchExit()
func TestWatchForkAndExit(t *testing.T) {
if skipTest(t) {
return
}
pid := os.Getpid()
tw := newTestWatcher(t)
if err := tw.watcher.Watch(pid, PROC_EVENT_FORK); err != nil {
t.Error(err)
}
cmd := startSleepCommand(t)
childPid := cmd.Process.Pid
if err := tw.watcher.Watch(childPid, PROC_EVENT_EXIT); err != nil {
t.Error(err)
}
syscall.Kill(childPid, syscall.SIGTERM)
cmd.Wait()
tw.close()
if expectEvents(t, 1, "forks", tw.events.forks) {
expectEventPid(t, "fork", pid, tw.events.forks[0])
}
expectEvents(t, 0, "execs", tw.events.execs)
if expectEvents(t, 1, "exits", tw.events.exits) {
expectEventPid(t, "exit", childPid, tw.events.exits[0])
}
}
func TestWatchFollowFork(t *testing.T) {
if skipTest(t) {
return
}
// Darwin is not able to follow forks, as the kqueue fork event
// does not provide the child pid.
if runtime.GOOS != "linux" {
fmt.Println("SKIP: test follow forks is linux only")
return
}
pid := os.Getpid()
tw := newTestWatcher(t)
// watch for all process events related to this process
if err := tw.watcher.Watch(pid, PROC_EVENT_ALL); err != nil {
t.Error(err)
}
commands := []string{"date", "cal"}
childPids := make([]int, len(commands))
// triggers fork/exec/exit events for each command
for i, name := range commands {
cmd := runCommand(t, name)
childPids[i] = cmd.Process.Pid
}
// remove watch for this process
tw.watcher.RemoveWatch(pid)
// run commands again to make sure we don't receive any unwanted events
for _, name := range commands {
runCommand(t, name)
}
tw.close()
// run commands again to make sure nothing panics after
// closing the watcher
for _, name := range commands {
runCommand(t, name)
}
num := len(commands)
if expectEvents(t, num, "forks", tw.events.forks) {
for _, epid := range tw.events.forks {
expectEventPid(t, "fork", pid, epid)
}
}
if expectEvents(t, num, "execs", tw.events.execs) {
for i, epid := range tw.events.execs {
expectEventPid(t, "exec", childPids[i], epid)
}
}
if expectEvents(t, num, "exits", tw.events.exits) {
for i, epid := range tw.events.exits {
expectEventPid(t, "exit", childPids[i], epid)
}
}
}

View File

@@ -0,0 +1,467 @@
// Copyright (c) 2012 VMware, Inc.
package sigar
/*
#include <stdlib.h>
#include <sys/sysctl.h>
#include <sys/mount.h>
#include <mach/mach_init.h>
#include <mach/mach_host.h>
#include <mach/host_info.h>
#include <libproc.h>
#include <mach/processor_info.h>
#include <mach/vm_map.h>
*/
import "C"
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"syscall"
"time"
"unsafe"
)
func (self *LoadAverage) Get() error {
avg := []C.double{0, 0, 0}
C.getloadavg(&avg[0], C.int(len(avg)))
self.One = float64(avg[0])
self.Five = float64(avg[1])
self.Fifteen = float64(avg[2])
return nil
}
func (self *Uptime) Get() error {
tv := syscall.Timeval32{}
if err := sysctlbyname("kern.boottime", &tv); err != nil {
return err
}
self.Length = time.Since(time.Unix(int64(tv.Sec), int64(tv.Usec)*1000)).Seconds()
return nil
}
func (self *Mem) Get() error {
var vmstat C.vm_statistics_data_t
if err := sysctlbyname("hw.memsize", &self.Total); err != nil {
return err
}
if err := vm_info(&vmstat); err != nil {
return err
}
kern := uint64(vmstat.inactive_count) << 12
self.Free = uint64(vmstat.free_count) << 12
self.Used = self.Total - self.Free
self.ActualFree = self.Free + kern
self.ActualUsed = self.Used - kern
return nil
}
type xsw_usage struct {
Total, Avail, Used uint64
}
func (self *Swap) Get() error {
sw_usage := xsw_usage{}
if err := sysctlbyname("vm.swapusage", &sw_usage); err != nil {
return err
}
self.Total = sw_usage.Total
self.Used = sw_usage.Used
self.Free = sw_usage.Avail
return nil
}
func (self *Cpu) Get() error {
var count C.mach_msg_type_number_t = C.HOST_CPU_LOAD_INFO_COUNT
var cpuload C.host_cpu_load_info_data_t
status := C.host_statistics(C.host_t(C.mach_host_self()),
C.HOST_CPU_LOAD_INFO,
C.host_info_t(unsafe.Pointer(&cpuload)),
&count)
if status != C.KERN_SUCCESS {
return fmt.Errorf("host_statistics error=%d", status)
}
self.User = uint64(cpuload.cpu_ticks[C.CPU_STATE_USER])
self.Sys = uint64(cpuload.cpu_ticks[C.CPU_STATE_SYSTEM])
self.Idle = uint64(cpuload.cpu_ticks[C.CPU_STATE_IDLE])
self.Nice = uint64(cpuload.cpu_ticks[C.CPU_STATE_NICE])
return nil
}
func (self *CpuList) Get() error {
var count C.mach_msg_type_number_t
var cpuload *C.processor_cpu_load_info_data_t
var ncpu C.natural_t
status := C.host_processor_info(C.host_t(C.mach_host_self()),
C.PROCESSOR_CPU_LOAD_INFO,
&ncpu,
(*C.processor_info_array_t)(unsafe.Pointer(&cpuload)),
&count)
if status != C.KERN_SUCCESS {
return fmt.Errorf("host_processor_info error=%d", status)
}
// jump through some cgo casting hoops and ensure we properly free
// the memory that cpuload points to
target := C.vm_map_t(C.mach_task_self_)
address := C.vm_address_t(uintptr(unsafe.Pointer(cpuload)))
defer C.vm_deallocate(target, address, C.vm_size_t(ncpu))
// the body of struct processor_cpu_load_info
// aka processor_cpu_load_info_data_t
var cpu_ticks [C.CPU_STATE_MAX]uint32
// copy the cpuload array to a []byte buffer
// where we can binary.Read the data
size := int(ncpu) * binary.Size(cpu_ticks)
buf := C.GoBytes(unsafe.Pointer(cpuload), C.int(size))
bbuf := bytes.NewBuffer(buf)
self.List = make([]Cpu, 0, ncpu)
for i := 0; i < int(ncpu); i++ {
cpu := Cpu{}
err := binary.Read(bbuf, binary.LittleEndian, &cpu_ticks)
if err != nil {
return err
}
cpu.User = uint64(cpu_ticks[C.CPU_STATE_USER])
cpu.Sys = uint64(cpu_ticks[C.CPU_STATE_SYSTEM])
cpu.Idle = uint64(cpu_ticks[C.CPU_STATE_IDLE])
cpu.Nice = uint64(cpu_ticks[C.CPU_STATE_NICE])
self.List = append(self.List, cpu)
}
return nil
}
func (self *FileSystemList) Get() error {
num, err := getfsstat(nil, C.MNT_NOWAIT)
if num < 0 {
return err
}
buf := make([]syscall.Statfs_t, num)
num, err = getfsstat(buf, C.MNT_NOWAIT)
if err != nil {
return err
}
fslist := make([]FileSystem, 0, num)
for i := 0; i < num; i++ {
fs := FileSystem{}
fs.DirName = bytePtrToString(&buf[i].Mntonname[0])
fs.DevName = bytePtrToString(&buf[i].Mntfromname[0])
fs.SysTypeName = bytePtrToString(&buf[i].Fstypename[0])
fslist = append(fslist, fs)
}
self.List = fslist
return err
}
func (self *ProcList) Get() error {
n := C.proc_listpids(C.PROC_ALL_PIDS, 0, nil, 0)
if n <= 0 {
return syscall.EINVAL
}
buf := make([]byte, n)
n = C.proc_listpids(C.PROC_ALL_PIDS, 0, unsafe.Pointer(&buf[0]), n)
if n <= 0 {
return syscall.ENOMEM
}
var pid int32
num := int(n) / binary.Size(pid)
list := make([]int, 0, num)
bbuf := bytes.NewBuffer(buf)
for i := 0; i < num; i++ {
if err := binary.Read(bbuf, binary.LittleEndian, &pid); err != nil {
return err
}
if pid == 0 {
continue
}
list = append(list, int(pid))
}
self.List = list
return nil
}
func (self *ProcState) Get(pid int) error {
info := C.struct_proc_taskallinfo{}
if err := task_info(pid, &info); err != nil {
return err
}
self.Name = C.GoString(&info.pbsd.pbi_comm[0])
switch info.pbsd.pbi_status {
case C.SIDL:
self.State = RunStateIdle
case C.SRUN:
self.State = RunStateRun
case C.SSLEEP:
self.State = RunStateSleep
case C.SSTOP:
self.State = RunStateStop
case C.SZOMB:
self.State = RunStateZombie
default:
self.State = RunStateUnknown
}
self.Ppid = int(info.pbsd.pbi_ppid)
self.Tty = int(info.pbsd.e_tdev)
self.Priority = int(info.ptinfo.pti_priority)
self.Nice = int(info.pbsd.pbi_nice)
return nil
}
func (self *ProcMem) Get(pid int) error {
info := C.struct_proc_taskallinfo{}
if err := task_info(pid, &info); err != nil {
return err
}
self.Size = uint64(info.ptinfo.pti_virtual_size)
self.Resident = uint64(info.ptinfo.pti_resident_size)
self.PageFaults = uint64(info.ptinfo.pti_faults)
return nil
}
func (self *ProcTime) Get(pid int) error {
info := C.struct_proc_taskallinfo{}
if err := task_info(pid, &info); err != nil {
return err
}
self.User =
uint64(info.ptinfo.pti_total_user) / uint64(time.Millisecond)
self.Sys =
uint64(info.ptinfo.pti_total_system) / uint64(time.Millisecond)
self.Total = self.User + self.Sys
self.StartTime = (uint64(info.pbsd.pbi_start_tvsec) * 1000) +
(uint64(info.pbsd.pbi_start_tvusec) / 1000)
return nil
}
func (self *ProcArgs) Get(pid int) error {
var args []string
argv := func(arg string) {
args = append(args, arg)
}
err := kern_procargs(pid, nil, argv, nil)
self.List = args
return err
}
func (self *ProcExe) Get(pid int) error {
exe := func(arg string) {
self.Name = arg
}
return kern_procargs(pid, exe, nil, nil)
}
// wrapper around sysctl KERN_PROCARGS2
// callbacks params are optional,
// up to the caller as to which pieces of data they want
func kern_procargs(pid int,
exe func(string),
argv func(string),
env func(string, string)) error {
mib := []C.int{C.CTL_KERN, C.KERN_PROCARGS2, C.int(pid)}
argmax := uintptr(C.ARG_MAX)
buf := make([]byte, argmax)
err := sysctl(mib, &buf[0], &argmax, nil, 0)
if err != nil {
return nil
}
bbuf := bytes.NewBuffer(buf)
bbuf.Truncate(int(argmax))
var argc int32
binary.Read(bbuf, binary.LittleEndian, &argc)
path, err := bbuf.ReadBytes(0)
if exe != nil {
exe(string(chop(path)))
}
// skip trailing \0's
for {
c, _ := bbuf.ReadByte()
if c != 0 {
bbuf.UnreadByte()
break // start of argv[0]
}
}
for i := 0; i < int(argc); i++ {
arg, err := bbuf.ReadBytes(0)
if err == io.EOF {
break
}
if argv != nil {
argv(string(chop(arg)))
}
}
if env == nil {
return nil
}
delim := []byte{61} // "="
for {
line, err := bbuf.ReadBytes(0)
if err == io.EOF || line[0] == 0 {
break
}
pair := bytes.SplitN(chop(line), delim, 2)
env(string(pair[0]), string(pair[1]))
}
return nil
}
// XXX copied from zsyscall_darwin_amd64.go
func sysctl(mib []C.int, old *byte, oldlen *uintptr,
new *byte, newlen uintptr) (err error) {
var p0 unsafe.Pointer
p0 = unsafe.Pointer(&mib[0])
_, _, e1 := syscall.Syscall6(syscall.SYS___SYSCTL, uintptr(p0),
uintptr(len(mib)),
uintptr(unsafe.Pointer(old)), uintptr(unsafe.Pointer(oldlen)),
uintptr(unsafe.Pointer(new)), uintptr(newlen))
if e1 != 0 {
err = e1
}
return
}
func vm_info(vmstat *C.vm_statistics_data_t) error {
var count C.mach_msg_type_number_t = C.HOST_VM_INFO_COUNT
status := C.host_statistics(
C.host_t(C.mach_host_self()),
C.HOST_VM_INFO,
C.host_info_t(unsafe.Pointer(vmstat)),
&count)
if status != C.KERN_SUCCESS {
return fmt.Errorf("host_statistics=%d", status)
}
return nil
}
// generic Sysctl buffer unmarshalling
func sysctlbyname(name string, data interface{}) (err error) {
val, err := syscall.Sysctl(name)
if err != nil {
return err
}
buf := []byte(val)
switch v := data.(type) {
case *uint64:
*v = *(*uint64)(unsafe.Pointer(&buf[0]))
return
}
bbuf := bytes.NewBuffer([]byte(val))
return binary.Read(bbuf, binary.LittleEndian, data)
}
// syscall.Getfsstat() wrapper is broken, roll our own to workaround.
func getfsstat(buf []syscall.Statfs_t, flags int) (n int, err error) {
var ptr uintptr
var size uintptr
if len(buf) > 0 {
ptr = uintptr(unsafe.Pointer(&buf[0]))
size = unsafe.Sizeof(buf[0]) * uintptr(len(buf))
} else {
ptr = uintptr(0)
size = uintptr(0)
}
trap := uintptr(syscall.SYS_GETFSSTAT64)
ret, _, errno := syscall.Syscall(trap, ptr, size, uintptr(flags))
n = int(ret)
if errno != 0 {
err = errno
}
return
}
func task_info(pid int, info *C.struct_proc_taskallinfo) error {
size := C.int(unsafe.Sizeof(*info))
ptr := unsafe.Pointer(info)
n := C.proc_pidinfo(C.int(pid), C.PROC_PIDTASKALLINFO, 0, ptr, size)
if n != size {
return syscall.ENOMEM
}
return nil
}

View File

@@ -0,0 +1,126 @@
// Copyright (c) 2012 VMware, Inc.
package sigar
import (
"bufio"
"bytes"
"fmt"
"strconv"
"time"
)
// Go version of apr_strfsize
func FormatSize(size uint64) string {
ord := []string{"K", "M", "G", "T", "P", "E"}
o := 0
buf := new(bytes.Buffer)
w := bufio.NewWriter(buf)
if size < 973 {
fmt.Fprintf(w, "%3d ", size)
w.Flush()
return buf.String()
}
for {
remain := size & 1023
size >>= 10
if size >= 973 {
o++
continue
}
if size < 9 || (size == 9 && remain < 973) {
remain = ((remain * 5) + 256) / 512
if remain >= 10 {
size++
remain = 0
}
fmt.Fprintf(w, "%d.%d%s", size, remain, ord[o])
break
}
if remain >= 512 {
size++
}
fmt.Fprintf(w, "%3d%s", size, ord[o])
break
}
w.Flush()
return buf.String()
}
func FormatPercent(percent float64) string {
return strconv.FormatFloat(percent, 'f', -1, 64) + "%"
}
func (self *FileSystemUsage) UsePercent() float64 {
b_used := (self.Total - self.Free) / 1024
b_avail := self.Avail / 1024
utotal := b_used + b_avail
used := b_used
if utotal != 0 {
u100 := used * 100
pct := u100 / utotal
if u100%utotal != 0 {
pct += 1
}
return (float64(pct) / float64(100)) * 100.0
}
return 0.0
}
func (self *Uptime) Format() string {
buf := new(bytes.Buffer)
w := bufio.NewWriter(buf)
uptime := uint64(self.Length)
days := uptime / (60 * 60 * 24)
if days != 0 {
s := ""
if days > 1 {
s = "s"
}
fmt.Fprintf(w, "%d day%s, ", days, s)
}
minutes := uptime / 60
hours := minutes / 60
hours %= 24
minutes %= 60
fmt.Fprintf(w, "%2d:%02d", hours, minutes)
w.Flush()
return buf.String()
}
func (self *ProcTime) FormatStartTime() string {
if self.StartTime == 0 {
return "00:00"
}
start := time.Unix(int64(self.StartTime)/1000, 0)
format := "Jan02"
if time.Since(start).Seconds() < (60 * 60 * 24) {
format = "15:04"
}
return start.Format(format)
}
func (self *ProcTime) FormatTotal() string {
t := self.Total / 1000
ss := t % 60
t /= 60
mm := t % 60
t /= 60
hh := t % 24
return fmt.Sprintf("%02d:%02d:%02d", hh, mm, ss)
}

View File

@@ -0,0 +1,141 @@
package sigar
import (
"time"
)
type Sigar interface {
CollectCpuStats(collectionInterval time.Duration) (<-chan Cpu, chan<- struct{})
GetLoadAverage() (LoadAverage, error)
GetMem() (Mem, error)
GetSwap() (Swap, error)
GetFileSystemUsage(string) (FileSystemUsage, error)
}
type Cpu struct {
User uint64
Nice uint64
Sys uint64
Idle uint64
Wait uint64
Irq uint64
SoftIrq uint64
Stolen uint64
}
func (cpu *Cpu) Total() uint64 {
return cpu.User + cpu.Nice + cpu.Sys + cpu.Idle +
cpu.Wait + cpu.Irq + cpu.SoftIrq + cpu.Stolen
}
func (cpu Cpu) Delta(other Cpu) Cpu {
return Cpu{
User: cpu.User - other.User,
Nice: cpu.Nice - other.Nice,
Sys: cpu.Sys - other.Sys,
Idle: cpu.Idle - other.Idle,
Wait: cpu.Wait - other.Wait,
Irq: cpu.Irq - other.Irq,
SoftIrq: cpu.SoftIrq - other.SoftIrq,
Stolen: cpu.Stolen - other.Stolen,
}
}
type LoadAverage struct {
One, Five, Fifteen float64
}
type Uptime struct {
Length float64
}
type Mem struct {
Total uint64
Used uint64
Free uint64
ActualFree uint64
ActualUsed uint64
}
type Swap struct {
Total uint64
Used uint64
Free uint64
}
type CpuList struct {
List []Cpu
}
type FileSystem struct {
DirName string
DevName string
TypeName string
SysTypeName string
Options string
Flags uint32
}
type FileSystemList struct {
List []FileSystem
}
type FileSystemUsage struct {
Total uint64
Used uint64
Free uint64
Avail uint64
Files uint64
FreeFiles uint64
}
type ProcList struct {
List []int
}
type RunState byte
const (
RunStateSleep = 'S'
RunStateRun = 'R'
RunStateStop = 'T'
RunStateZombie = 'Z'
RunStateIdle = 'D'
RunStateUnknown = '?'
)
type ProcState struct {
Name string
State RunState
Ppid int
Tty int
Priority int
Nice int
Processor int
}
type ProcMem struct {
Size uint64
Resident uint64
Share uint64
MinorFaults uint64
MajorFaults uint64
PageFaults uint64
}
type ProcTime struct {
StartTime uint64
User uint64
Sys uint64
Total uint64
}
type ProcArgs struct {
List []string
}
type ProcExe struct {
Name string
Cwd string
Root string
}

View File

@@ -0,0 +1,135 @@
package sigar_test
import (
"os"
"path/filepath"
"runtime"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/cloudfoundry/gosigar"
)
var _ = Describe("Sigar", func() {
var invalidPid = 666666
It("cpu", func() {
cpu := Cpu{}
err := cpu.Get()
Expect(err).ToNot(HaveOccurred())
})
It("load average", func() {
avg := LoadAverage{}
err := avg.Get()
Expect(err).ToNot(HaveOccurred())
})
It("uptime", func() {
uptime := Uptime{}
err := uptime.Get()
Expect(err).ToNot(HaveOccurred())
Expect(uptime.Length).To(BeNumerically(">", 0))
})
It("mem", func() {
mem := Mem{}
err := mem.Get()
Expect(err).ToNot(HaveOccurred())
Expect(mem.Total).To(BeNumerically(">", 0))
Expect(mem.Used + mem.Free).To(BeNumerically("<=", mem.Total))
})
It("swap", func() {
swap := Swap{}
err := swap.Get()
Expect(err).ToNot(HaveOccurred())
Expect(swap.Used + swap.Free).To(BeNumerically("<=", swap.Total))
})
It("cpu list", func() {
cpulist := CpuList{}
err := cpulist.Get()
Expect(err).ToNot(HaveOccurred())
nsigar := len(cpulist.List)
numcpu := runtime.NumCPU()
Expect(nsigar).To(Equal(numcpu))
})
It("file system list", func() {
fslist := FileSystemList{}
err := fslist.Get()
Expect(err).ToNot(HaveOccurred())
Expect(len(fslist.List)).To(BeNumerically(">", 0))
})
It("file system usage", func() {
fsusage := FileSystemUsage{}
err := fsusage.Get("/")
Expect(err).ToNot(HaveOccurred())
err = fsusage.Get("T O T A L L Y B O G U S")
Expect(err).To(HaveOccurred())
})
It("proc list", func() {
pids := ProcList{}
err := pids.Get()
Expect(err).ToNot(HaveOccurred())
Expect(len(pids.List)).To(BeNumerically(">", 2))
err = pids.Get()
Expect(err).ToNot(HaveOccurred())
})
It("proc state", func() {
state := ProcState{}
err := state.Get(os.Getppid())
Expect(err).ToNot(HaveOccurred())
Expect([]RunState{RunStateRun, RunStateSleep}).To(ContainElement(state.State))
Expect([]string{"go", "ginkgo"}).To(ContainElement(state.Name))
err = state.Get(invalidPid)
Expect(err).To(HaveOccurred())
})
It("proc mem", func() {
mem := ProcMem{}
err := mem.Get(os.Getppid())
Expect(err).ToNot(HaveOccurred())
err = mem.Get(invalidPid)
Expect(err).To(HaveOccurred())
})
It("proc time", func() {
time := ProcTime{}
err := time.Get(os.Getppid())
Expect(err).ToNot(HaveOccurred())
err = time.Get(invalidPid)
Expect(err).To(HaveOccurred())
})
It("proc args", func() {
args := ProcArgs{}
err := args.Get(os.Getppid())
Expect(err).ToNot(HaveOccurred())
Expect(len(args.List)).To(BeNumerically(">=", 2))
})
It("proc exe", func() {
exe := ProcExe{}
err := exe.Get(os.Getppid())
Expect(err).ToNot(HaveOccurred())
Expect([]string{"go", "ginkgo"}).To(ContainElement(filepath.Base(exe.Name)))
})
})

View File

@@ -0,0 +1,386 @@
// Copyright (c) 2012 VMware, Inc.
package sigar
import (
"bufio"
"bytes"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"syscall"
)
var system struct {
ticks uint64
btime uint64
}
var Procd string
func init() {
system.ticks = 100 // C.sysconf(C._SC_CLK_TCK)
Procd = "/proc"
// grab system boot time
readFile(Procd+"/stat", func(line string) bool {
if strings.HasPrefix(line, "btime") {
system.btime, _ = strtoull(line[6:])
return false // stop reading
}
return true
})
}
func (self *LoadAverage) Get() error {
line, err := ioutil.ReadFile(Procd + "/loadavg")
if err != nil {
return nil
}
fields := strings.Fields(string(line))
self.One, _ = strconv.ParseFloat(fields[0], 64)
self.Five, _ = strconv.ParseFloat(fields[1], 64)
self.Fifteen, _ = strconv.ParseFloat(fields[2], 64)
return nil
}
func (self *Uptime) Get() error {
sysinfo := syscall.Sysinfo_t{}
if err := syscall.Sysinfo(&sysinfo); err != nil {
return err
}
self.Length = float64(sysinfo.Uptime)
return nil
}
func (self *Mem) Get() error {
var buffers, cached uint64
table := map[string]*uint64{
"MemTotal": &self.Total,
"MemFree": &self.Free,
"Buffers": &buffers,
"Cached": &cached,
}
if err := parseMeminfo(table); err != nil {
return err
}
self.Used = self.Total - self.Free
kern := buffers + cached
self.ActualFree = self.Free + kern
self.ActualUsed = self.Used - kern
return nil
}
func (self *Swap) Get() error {
table := map[string]*uint64{
"SwapTotal": &self.Total,
"SwapFree": &self.Free,
}
if err := parseMeminfo(table); err != nil {
return err
}
self.Used = self.Total - self.Free
return nil
}
func (self *Cpu) Get() error {
return readFile(Procd+"/stat", func(line string) bool {
if len(line) > 4 && line[0:4] == "cpu " {
parseCpuStat(self, line)
return false
}
return true
})
}
func (self *CpuList) Get() error {
capacity := len(self.List)
if capacity == 0 {
capacity = 4
}
list := make([]Cpu, 0, capacity)
err := readFile(Procd+"/stat", func(line string) bool {
if len(line) > 3 && line[0:3] == "cpu" && line[3] != ' ' {
cpu := Cpu{}
parseCpuStat(&cpu, line)
list = append(list, cpu)
}
return true
})
self.List = list
return err
}
func (self *FileSystemList) Get() error {
capacity := len(self.List)
if capacity == 0 {
capacity = 10
}
fslist := make([]FileSystem, 0, capacity)
err := readFile("/etc/mtab", func(line string) bool {
fields := strings.Fields(line)
fs := FileSystem{}
fs.DevName = fields[0]
fs.DirName = fields[1]
fs.SysTypeName = fields[2]
fs.Options = fields[3]
fslist = append(fslist, fs)
return true
})
self.List = fslist
return err
}
func (self *ProcList) Get() error {
dir, err := os.Open(Procd)
if err != nil {
return err
}
defer dir.Close()
const readAllDirnames = -1 // see os.File.Readdirnames doc
names, err := dir.Readdirnames(readAllDirnames)
if err != nil {
return err
}
capacity := len(names)
list := make([]int, 0, capacity)
for _, name := range names {
if name[0] < '0' || name[0] > '9' {
continue
}
pid, err := strconv.Atoi(name)
if err == nil {
list = append(list, pid)
}
}
self.List = list
return nil
}
func (self *ProcState) Get(pid int) error {
contents, err := readProcFile(pid, "stat")
if err != nil {
return err
}
fields := strings.Fields(string(contents))
self.Name = fields[1][1 : len(fields[1])-1] // strip ()'s
self.State = RunState(fields[2][0])
self.Ppid, _ = strconv.Atoi(fields[3])
self.Tty, _ = strconv.Atoi(fields[6])
self.Priority, _ = strconv.Atoi(fields[17])
self.Nice, _ = strconv.Atoi(fields[18])
self.Processor, _ = strconv.Atoi(fields[38])
return nil
}
func (self *ProcMem) Get(pid int) error {
contents, err := readProcFile(pid, "statm")
if err != nil {
return err
}
fields := strings.Fields(string(contents))
size, _ := strtoull(fields[0])
self.Size = size << 12
rss, _ := strtoull(fields[1])
self.Resident = rss << 12
share, _ := strtoull(fields[2])
self.Share = share << 12
contents, err = readProcFile(pid, "stat")
if err != nil {
return err
}
fields = strings.Fields(string(contents))
self.MinorFaults, _ = strtoull(fields[10])
self.MajorFaults, _ = strtoull(fields[12])
self.PageFaults = self.MinorFaults + self.MajorFaults
return nil
}
func (self *ProcTime) Get(pid int) error {
contents, err := readProcFile(pid, "stat")
if err != nil {
return err
}
fields := strings.Fields(string(contents))
user, _ := strtoull(fields[13])
sys, _ := strtoull(fields[14])
// convert to millis
self.User = user * (1000 / system.ticks)
self.Sys = sys * (1000 / system.ticks)
self.Total = self.User + self.Sys
// convert to millis
self.StartTime, _ = strtoull(fields[21])
self.StartTime /= system.ticks
self.StartTime += system.btime
self.StartTime *= 1000
return nil
}
func (self *ProcArgs) Get(pid int) error {
contents, err := readProcFile(pid, "cmdline")
if err != nil {
return err
}
bbuf := bytes.NewBuffer(contents)
var args []string
for {
arg, err := bbuf.ReadBytes(0)
if err == io.EOF {
break
}
args = append(args, string(chop(arg)))
}
self.List = args
return nil
}
func (self *ProcExe) Get(pid int) error {
fields := map[string]*string{
"exe": &self.Name,
"cwd": &self.Cwd,
"root": &self.Root,
}
for name, field := range fields {
val, err := os.Readlink(procFileName(pid, name))
if err != nil {
return err
}
*field = val
}
return nil
}
func parseMeminfo(table map[string]*uint64) error {
return readFile(Procd+"/meminfo", func(line string) bool {
fields := strings.Split(line, ":")
if ptr := table[fields[0]]; ptr != nil {
num := strings.TrimLeft(fields[1], " ")
val, err := strtoull(strings.Fields(num)[0])
if err == nil {
*ptr = val * 1024
}
}
return true
})
}
func parseCpuStat(self *Cpu, line string) error {
fields := strings.Fields(line)
self.User, _ = strtoull(fields[1])
self.Nice, _ = strtoull(fields[2])
self.Sys, _ = strtoull(fields[3])
self.Idle, _ = strtoull(fields[4])
self.Wait, _ = strtoull(fields[5])
self.Irq, _ = strtoull(fields[6])
self.SoftIrq, _ = strtoull(fields[7])
self.Stolen, _ = strtoull(fields[8])
return nil
}
func readFile(file string, handler func(string) bool) error {
contents, err := ioutil.ReadFile(file)
if err != nil {
return err
}
reader := bufio.NewReader(bytes.NewBuffer(contents))
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
if !handler(string(line)) {
break
}
}
return nil
}
func strtoull(val string) (uint64, error) {
return strconv.ParseUint(val, 10, 64)
}
func procFileName(pid int, name string) string {
return Procd + "/" + strconv.Itoa(pid) + "/" + name
}
func readProcFile(pid int, name string) ([]byte, error) {
path := procFileName(pid, name)
contents, err := ioutil.ReadFile(path)
if err != nil {
if perr, ok := err.(*os.PathError); ok {
if perr.Err == syscall.ENOENT {
return nil, syscall.ESRCH
}
}
}
return contents, err
}

View File

@@ -0,0 +1,225 @@
package sigar_test
import (
"io/ioutil"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
sigar "github.com/cloudfoundry/gosigar"
)
var _ = Describe("sigarLinux", func() {
var procd string
BeforeEach(func() {
var err error
procd, err = ioutil.TempDir("", "sigarTests")
Expect(err).ToNot(HaveOccurred())
sigar.Procd = procd
})
AfterEach(func() {
sigar.Procd = "/proc"
})
Describe("CPU", func() {
var (
statFile string
cpu sigar.Cpu
)
BeforeEach(func() {
statFile = procd + "/stat"
cpu = sigar.Cpu{}
})
Describe("Get", func() {
It("gets CPU usage", func() {
statContents := []byte("cpu 25 1 2 3 4 5 6 7")
err := ioutil.WriteFile(statFile, statContents, 0644)
Expect(err).ToNot(HaveOccurred())
err = cpu.Get()
Expect(err).ToNot(HaveOccurred())
Expect(cpu.User).To(Equal(uint64(25)))
})
It("ignores empty lines", func() {
statContents := []byte("cpu ")
err := ioutil.WriteFile(statFile, statContents, 0644)
Expect(err).ToNot(HaveOccurred())
err = cpu.Get()
Expect(err).ToNot(HaveOccurred())
Expect(cpu.User).To(Equal(uint64(0)))
})
})
Describe("CollectCpuStats", func() {
It("collects CPU usage over time", func() {
statContents := []byte("cpu 25 1 2 3 4 5 6 7")
err := ioutil.WriteFile(statFile, statContents, 0644)
Expect(err).ToNot(HaveOccurred())
concreteSigar := &sigar.ConcreteSigar{}
cpuUsages, stop := concreteSigar.CollectCpuStats(500 * time.Millisecond)
Expect(<-cpuUsages).To(Equal(sigar.Cpu{
User: uint64(25),
Nice: uint64(1),
Sys: uint64(2),
Idle: uint64(3),
Wait: uint64(4),
Irq: uint64(5),
SoftIrq: uint64(6),
Stolen: uint64(7),
}))
statContents = []byte("cpu 30 3 7 10 25 55 36 65")
err = ioutil.WriteFile(statFile, statContents, 0644)
Expect(err).ToNot(HaveOccurred())
Expect(<-cpuUsages).To(Equal(sigar.Cpu{
User: uint64(5),
Nice: uint64(2),
Sys: uint64(5),
Idle: uint64(7),
Wait: uint64(21),
Irq: uint64(50),
SoftIrq: uint64(30),
Stolen: uint64(58),
}))
stop <- struct{}{}
})
})
})
Describe("Mem", func() {
var meminfoFile string
BeforeEach(func() {
meminfoFile = procd + "/meminfo"
meminfoContents := `
MemTotal: 374256 kB
MemFree: 274460 kB
Buffers: 9764 kB
Cached: 38648 kB
SwapCached: 0 kB
Active: 33772 kB
Inactive: 31184 kB
Active(anon): 16572 kB
Inactive(anon): 552 kB
Active(file): 17200 kB
Inactive(file): 30632 kB
Unevictable: 0 kB
Mlocked: 0 kB
SwapTotal: 786428 kB
SwapFree: 786428 kB
Dirty: 0 kB
Writeback: 0 kB
AnonPages: 16564 kB
Mapped: 6612 kB
Shmem: 584 kB
Slab: 19092 kB
SReclaimable: 9128 kB
SUnreclaim: 9964 kB
KernelStack: 672 kB
PageTables: 1864 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 973556 kB
Committed_AS: 55880 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 21428 kB
VmallocChunk: 34359713596 kB
HardwareCorrupted: 0 kB
AnonHugePages: 0 kB
HugePages_Total: 0
HugePages_Free: 0
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 59328 kB
DirectMap2M: 333824 kB
`
err := ioutil.WriteFile(meminfoFile, []byte(meminfoContents), 0444)
Expect(err).ToNot(HaveOccurred())
})
It("returns correct memory info", func() {
mem := sigar.Mem{}
err := mem.Get()
Expect(err).ToNot(HaveOccurred())
Expect(mem.Total).To(BeNumerically("==", 374256*1024))
Expect(mem.Free).To(BeNumerically("==", 274460*1024))
})
})
Describe("Swap", func() {
var meminfoFile string
BeforeEach(func() {
meminfoFile = procd + "/meminfo"
meminfoContents := `
MemTotal: 374256 kB
MemFree: 274460 kB
Buffers: 9764 kB
Cached: 38648 kB
SwapCached: 0 kB
Active: 33772 kB
Inactive: 31184 kB
Active(anon): 16572 kB
Inactive(anon): 552 kB
Active(file): 17200 kB
Inactive(file): 30632 kB
Unevictable: 0 kB
Mlocked: 0 kB
SwapTotal: 786428 kB
SwapFree: 786428 kB
Dirty: 0 kB
Writeback: 0 kB
AnonPages: 16564 kB
Mapped: 6612 kB
Shmem: 584 kB
Slab: 19092 kB
SReclaimable: 9128 kB
SUnreclaim: 9964 kB
KernelStack: 672 kB
PageTables: 1864 kB
NFS_Unstable: 0 kB
Bounce: 0 kB
WritebackTmp: 0 kB
CommitLimit: 973556 kB
Committed_AS: 55880 kB
VmallocTotal: 34359738367 kB
VmallocUsed: 21428 kB
VmallocChunk: 34359713596 kB
HardwareCorrupted: 0 kB
AnonHugePages: 0 kB
HugePages_Total: 0
HugePages_Free: 0
HugePages_Rsvd: 0
HugePages_Surp: 0
Hugepagesize: 2048 kB
DirectMap4k: 59328 kB
DirectMap2M: 333824 kB
`
err := ioutil.WriteFile(meminfoFile, []byte(meminfoContents), 0444)
Expect(err).ToNot(HaveOccurred())
})
It("returns correct memory info", func() {
swap := sigar.Swap{}
err := swap.Get()
Expect(err).ToNot(HaveOccurred())
Expect(swap.Total).To(BeNumerically("==", 786428*1024))
Expect(swap.Free).To(BeNumerically("==", 786428*1024))
})
})
})

View File

@@ -0,0 +1,13 @@
package sigar_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)
func TestGosigar(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Gosigar Suite")
}

View File

@@ -0,0 +1,26 @@
// Copyright (c) 2012 VMware, Inc.
// +build darwin freebsd linux netbsd openbsd
package sigar
import "syscall"
func (self *FileSystemUsage) Get(path string) error {
stat := syscall.Statfs_t{}
err := syscall.Statfs(path, &stat)
if err != nil {
return err
}
bsize := stat.Bsize / 512
self.Total = (uint64(stat.Blocks) * uint64(bsize)) >> 1
self.Free = (uint64(stat.Bfree) * uint64(bsize)) >> 1
self.Avail = (uint64(stat.Bavail) * uint64(bsize)) >> 1
self.Used = self.Total - self.Free
self.Files = stat.Files
self.FreeFiles = stat.Ffree
return nil
}

View File

@@ -0,0 +1,22 @@
// Copyright (c) 2012 VMware, Inc.
package sigar
import (
"unsafe"
)
func bytePtrToString(ptr *int8) string {
bytes := (*[10000]byte)(unsafe.Pointer(ptr))
n := 0
for bytes[n] != 0 {
n++
}
return string(bytes[0:n])
}
func chop(buf []byte) []byte {
return buf[0 : len(buf)-1]
}

View File

@@ -0,0 +1,100 @@
// Copyright (c) 2012 VMware, Inc.
package sigar
// #include <stdlib.h>
// #include <windows.h>
import "C"
import (
"fmt"
"unsafe"
)
func init() {
}
func (self *LoadAverage) Get() error {
return nil
}
func (self *Uptime) Get() error {
return nil
}
func (self *Mem) Get() error {
var statex C.MEMORYSTATUSEX
statex.dwLength = C.DWORD(unsafe.Sizeof(statex))
succeeded := C.GlobalMemoryStatusEx(&statex)
if succeeded == C.FALSE {
lastError := C.GetLastError()
return fmt.Errorf("GlobalMemoryStatusEx failed with error: %d", int(lastError))
}
self.Total = uint64(statex.ullTotalPhys)
return nil
}
func (self *Swap) Get() error {
return notImplemented()
}
func (self *Cpu) Get() error {
return notImplemented()
}
func (self *CpuList) Get() error {
return notImplemented()
}
func (self *FileSystemList) Get() error {
return notImplemented()
}
func (self *ProcList) Get() error {
return notImplemented()
}
func (self *ProcState) Get(pid int) error {
return notImplemented()
}
func (self *ProcMem) Get(pid int) error {
return notImplemented()
}
func (self *ProcTime) Get(pid int) error {
return notImplemented()
}
func (self *ProcArgs) Get(pid int) error {
return notImplemented()
}
func (self *ProcExe) Get(pid int) error {
return notImplemented()
}
func (self *FileSystemUsage) Get(path string) error {
var availableBytes C.ULARGE_INTEGER
var totalBytes C.ULARGE_INTEGER
var totalFreeBytes C.ULARGE_INTEGER
pathChars := C.CString(path)
defer C.free(unsafe.Pointer(pathChars))
succeeded := C.GetDiskFreeSpaceEx((*C.CHAR)(pathChars), &availableBytes, &totalBytes, &totalFreeBytes)
if succeeded == C.FALSE {
lastError := C.GetLastError()
return fmt.Errorf("GetDiskFreeSpaceEx failed with error: %d", int(lastError))
}
self.Total = *(*uint64)(unsafe.Pointer(&totalBytes))
return nil
}
func notImplemented() error {
panic("Not Implemented")
return nil
}

View File

@@ -0,0 +1,32 @@
package sigar_test
import (
"os"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
sigar "github.com/cloudfoundry/gosigar"
)
var _ = Describe("SigarWindows", func() {
Describe("Memory", func() {
It("gets the total memory", func() {
mem := sigar.Mem{}
err := mem.Get()
Ω(err).ShouldNot(HaveOccurred())
Ω(mem.Total).Should(BeNumerically(">", 0))
})
})
Describe("Disk", func() {
It("gets the total disk space", func() {
usage := sigar.FileSystemUsage{}
err := usage.Get(os.TempDir())
Ω(err).ShouldNot(HaveOccurred())
Ω(usage.Total).Should(BeNumerically(">", 0))
})
})
})

View File

@@ -0,0 +1,33 @@
# List
- github.com/Shopify/sarama [MIT LICENSE](https://github.com/Shopify/sarama/blob/master/MIT-LICENSE)
- github.com/Sirupsen/logrus [MIT LICENSE](https://github.com/Sirupsen/logrus/blob/master/LICENSE)
- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
- github.com/cenkalti/backoff [MIT LICENSE](https://github.com/cenkalti/backoff/blob/master/LICENSE)
- github.com/dancannon/gorethink [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/master/LICENSE)
- github.com/eapache/go-resiliency [MIT LICENSE](https://github.com/eapache/go-resiliency/blob/master/LICENSE)
- github.com/eapache/queue [MIT LICENSE](https://github.com/eapache/queue/blob/master/LICENSE)
- github.com/fsouza/go-dockerclient [BSD LICENSE](https://github.com/fsouza/go-dockerclient/blob/master/LICENSE)
- github.com/go-sql-driver/mysql [MPL LICENSE](https://github.com/go-sql-driver/mysql/blob/master/LICENSE)
- github.com/gogo/protobuf [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
- github.com/golang/protobuf [BSD LICENSE](https://github.com/golang/protobuf/blob/master/LICENSE)
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
- github.com/gonuts/go-shellquote (No License, but the project it was forked from https://github.com/kballard/go-shellquote is [MIT](https://github.com/kballard/go-shellquote/blob/master/LICENSE)).
- github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE)
- github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE)
- github.com/hashicorp/raft-boltdb [MPL LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE)
- github.com/lib/pq [MIT LICENSE](https://github.com/lib/pq/blob/master/LICENSE.md)
- github.com/matttproud/golang_protobuf_extensions [APACHE LICENSE](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE)
- github.com/naoina/go-stringutil [MIT LICENSE](https://github.com/naoina/go-stringutil/blob/master/LICENSE)
- github.com/naoina/toml [MIT LICENSE](https://github.com/naoina/toml/blob/master/LICENSE)
- github.com/prometheus/client_golang [APACHE LICENSE](https://github.com/prometheus/client_golang/blob/master/LICENSE)
- github.com/samuel/go-zookeeper [BSD LICENSE](https://github.com/samuel/go-zookeeper/blob/master/LICENSE)
- github.com/stretchr/objx [MIT LICENSE](github.com/stretchr/objx)
- github.com/stretchr/testify [MIT LICENSE](https://github.com/stretchr/testify/blob/master/LICENCE.txt)
- github.com/wvanbergen/kafka [MIT LICENSE](https://github.com/wvanbergen/kafka/blob/master/LICENSE)
- github.com/wvanbergen/kazoo-go [MIT LICENSE](https://github.com/wvanbergen/kazoo-go/blob/master/MIT-LICENSE)
- gopkg.in/dancannon/gorethink.v1 [APACHE LICENSE](https://github.com/dancannon/gorethink/blob/v1.1.2/LICENSE)
- gopkg.in/mgo.v2 [BSD LICENSE](https://github.com/go-mgo/mgo/blob/v2/LICENSE)
- golang.org/x/crypto/blowfish
- golang.org/x/crypto/bcrypt

View File

@@ -6,6 +6,17 @@ build: prepare
"-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
build-linux-bins: prepare
GOARCH=amd64 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_amd64 \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=386 GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_386 \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
GOARCH=arm GOOS=linux $(GOPATH)/bin/godep go build -o telegraf_linux_arm \
-ldflags "-X main.Version $(VERSION)" \
./cmd/telegraf/telegraf.go
prepare:
go get github.com/tools/godep
@@ -21,7 +32,7 @@ test: prepare docker-compose
$(GOPATH)/bin/godep go test -v ./...
test-short: prepare
$(GOPATH)/bin/godep go test -v -short ./...
$(GOPATH)/bin/godep go test -short ./...
test-cleanup:
docker-compose kill

147
README.md
View File

@@ -30,8 +30,11 @@ are some InfluxDB compatibility requirements:
* InfluxDB 0.9.2 and prior requires Telegraf 0.1.4
Latest:
* http://get.influxdb.org/telegraf/telegraf_0.1.6_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.1.6-1.x86_64.rpm
* http://get.influxdb.org/telegraf/telegraf_0.1.8_amd64.deb
* http://get.influxdb.org/telegraf/telegraf-0.1.8-1.x86_64.rpm
Binaries:
* http://get.influxdb.org/telegraf/telegraf_0.1.8_linux_x86_64.tar.gz
0.1.4:
* http://get.influxdb.org/telegraf/telegraf_0.1.4_amd64.deb
@@ -87,10 +90,10 @@ There are 5 configuration options that are configurable per plugin:
current plugin. Each string in the array is tested as a prefix against metric names
and if it matches, the metric is emitted.
* **drop**: The inverse of pass, if a metric name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter metrics by
* **tagpass**: (added in 0.1.5) tag names and arrays of strings that are used to filter metrics by
the current plugin. Each string in the array is tested as an exact match against
the tag name, and if it matches the metric is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted.
* **tagdrop**: (added in 0.1.5) The inverse of tagpass. If a tag matches, the metric is not emitted.
This is tested on metrics that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
@@ -121,7 +124,7 @@ measurements at a 10s interval and will collect totalcpu & percpu data.
totalcpu = true
```
Below is how to configure `tagpass` parameters (added in 0.1.4)
Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
```
# Don't collect CPU data for cpu6 & cpu7
@@ -166,135 +169,31 @@ Telegraf currently has support for collecting metrics from
We'll be adding support for many more over the coming months. Read on if you
want to add support for another service or third-party API.
## Plugins
## Output options
This section is for developers that want to create new collection plugins.
Telegraf is entirely plugin driven. This interface allows for operators to
pick and chose what is gathered as well as makes it easy for developers
to create new ways of generating metrics.
Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`
Plugin authorship is kept as simple as possible to promote people to develop
and submit new plugins.
## Supported Outputs
## Guidelines
* influxdb
* kafka
* datadog
* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
See below for a quick example.
* To be available within Telegraf itself, plugins must add themselves to the
`github.com/influxdb/telegraf/plugins/all/all.go` file.
* The `SampleConfig` function should return valid toml that describes how the
plugin can be configured. This is include in `telegraf -sample-config`.
* The `Description` function should say in one line what this plugin does.
## Contributing
### Plugin interface
```go
type Plugin interface {
SampleConfig() string
Description() string
Gather(Accumulator) error
}
type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddValuesWithTime(measurement string,
values map[string]interface{},
tags map[string]string,
timestamp time.Time)
}
```
### Accumulator
The way that a plugin emits metrics is by interacting with the Accumulator.
The `Add` function takes 3 arguments:
* **measurement**: A string description of the metric. For instance `bytes_read` or `faults`.
* **value**: A value for the metric. This accepts 5 different types of value:
* **int**: The most common type. All int types are accepted but favor using `int64`
Useful for counters, etc.
* **float**: Favor `float64`, useful for gauges, percentages, etc.
* **bool**: `true` or `false`, useful to indicate the presence of a state. `light_on`, etc.
* **string**: Typically used to indicate a message, or some kind of freeform information.
* **time.Time**: Useful for indicating when a state last occurred, for instance `light_on_since`.
* **tags**: This is a map of strings to strings to describe the where or who
about the metric. For instance, the `net` plugin adds a tag named `"interface"`
set to the name of the network interface, like `"eth0"`.
The `AddValuesWithTime` allows multiple values for a point to be passed. The values
used are the same type profile as **value** above. The **timestamp** argument
allows a point to be registered as having occurred at an arbitrary time.
Let's say you've written a plugin that emits metrics about processes on the current host.
```go
type Process struct {
CPUTime float64
MemoryBytes int64
PID int
}
func Gather(acc plugins.Accumulator) error {
for _, process := range system.Processes() {
tags := map[string]string {
"pid": fmt.Sprintf("%d", process.Pid),
}
acc.Add("cpu", process.CPUTime, tags)
acc.Add("memory", process.MemoryBytes, tags)
}
}
```
### Example
```go
package simple
// simple.go
import "github.com/influxdb/telegraf/plugins"
type Simple struct {
Ok bool
}
func (s *Simple) Description() string {
return "a demo plugin"
}
func (s *Simple) SampleConfig() string {
return "ok = true # indicate if everything is fine"
}
func (s *Simple) Gather(acc plugins.Accumulator) error {
if s.Ok {
acc.Add("state", "pretty good", nil)
} else {
acc.Add("state", "not great", nil)
}
return nil
}
func init() {
plugins.Add("simple", func() plugins.Plugin { return &Simple{} })
}
```
Please see the
[contributing guide](https://github.com/influxdb/telegraf/blob/master/CONTRIBUTING.md)
for details on contributing a plugin or output to Telegraf
## Testing
### Execute short tests:
### Execute short tests
execute `make test-short`
### Execute long tests:
### Execute long tests
As Telegraf collects metrics from several third-party services it becomes a
difficult task to mock each service as some of them have complicated protocols
@@ -314,7 +213,7 @@ instructions
and `brew install docker-compose`
- execute `make test`
### Unit test troubleshooting:
### Unit test troubleshooting
Try cleaning up your test environment by executing `make test-cleanup` and
re-running

View File

@@ -31,7 +31,14 @@ type Agent struct {
// Interval at which to gather information
Interval Duration
// Run in debug mode?
// Option for outputting data in UTC
UTC bool `toml:"utc"`
// Precision to write data at
// Valid values for Precision are n, u, ms, s, m, and h
Precision string
// Option for running in debug mode
Debug bool
Hostname string
@@ -43,8 +50,14 @@ type Agent struct {
// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *Config) (*Agent, error) {
agent := &Agent{Config: config, Interval: Duration{10 * time.Second}}
agent := &Agent{
Config: config,
Interval: Duration{10 * time.Second},
UTC: true,
Precision: "s",
}
// Apply the toml table to the agent config, overriding defaults
err := config.ApplyAgent(agent)
if err != nil {
return nil, err
@@ -199,7 +212,11 @@ func (a *Agent) crankParallel() error {
var bp BatchPoints
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Tags = a.Config.Tags
bp.Precision = a.Precision
for sub := range points {
bp.Points = append(bp.Points, sub.Points...)
@@ -223,8 +240,12 @@ func (a *Agent) crank() error {
}
}
bp.Time = time.Now()
bp.Tags = a.Config.Tags
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Precision = a.Precision
return a.flush(&bp)
}
@@ -250,6 +271,10 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
bp.Tags = a.Config.Tags
bp.Time = time.Now()
if a.UTC {
bp.Time = bp.Time.UTC()
}
bp.Precision = a.Precision
if err := a.flush(&bp); err != nil {
outerr = errors.New("Error encountered processing plugins & outputs")

View File

@@ -126,8 +126,9 @@ func main() {
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+
"Precision:%#v, UTC: %#v\n",
ag.Interval, ag.Debug, ag.Hostname, ag.Precision, ag.UTC)
}
log.Printf("Tags enabled: %s", config.ListTags())

View File

@@ -131,10 +131,11 @@ func (c *Config) ApplyOutput(name string, v interface{}) error {
return nil
}
// ApplyAgent loads the toml config into the given interface
func (c *Config) ApplyAgent(v interface{}) error {
// ApplyAgent loads the toml config into the given Agent object, overriding
// defaults (such as collection duration) with the values from the toml config.
func (c *Config) ApplyAgent(a *Agent) error {
if c.agent != nil {
return toml.UnmarshalTable(c.agent, v)
return toml.UnmarshalTable(c.agent, a)
}
return nil
@@ -350,11 +351,23 @@ var header = `# Telegraf configuration
[tags]
# dc = "us-east-1"
# Configuration for telegraf itself
# Configuration for telegraf agent
[agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
# Default data collection interval for all plugins
interval = "10s"
# If utc = false, uses local time (utc is highly recommended)
utc = true
# Precision of writes, valid values are n, u, ms, s, m, and h
# note: using second precision greatly helps InfluxDB compression
precision = "s"
# run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
hostname = ""
###############################################################################
@@ -426,8 +439,8 @@ func PrintSampleConfig() {
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
plugin := creator()
fmt.Printf("# %s\n[%s]\n", plugin.Description(), name)
fmt.Printf(strings.TrimSpace(plugin.SampleConfig()))
fmt.Printf("# %s\n[%s]", plugin.Description(), name)
fmt.Printf(plugin.SampleConfig())
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}

View File

@@ -22,55 +22,55 @@
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.
[tags]
# dc = "us-east-1"
# Configuration for telegraf itself
[agent]
interval = "10s"
debug = false
hostname = ""
utc = true
precision = "s"
# Configuration for influxdb server to send metrics to
[outputs]
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# Tags can also be specified via a normal map, but only one form at a time:
# [influxdb.tags]
# tags = { "dc" = "us-east-1" }
# Configuration for telegraf itself
# [agent]
# interval = "10s"
# debug = false
# hostname = "prod3241"
# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# PLUGINS
# Read metrics about cpu usage
[cpu]
# Whether to report per-cpu stats or not
percpu = true
# # Whether to report total system cpu stats or not
totalcpu = true
# Whether to report per-cpu stats or not
percpu = true
# Whether to report total system cpu stats or not
totalcpu = true
# Read metrics about disk usage by mount point
[disk]
# no configuration
# no configuration
# Read metrics about disk IO by device
[io]
# no configuration
# no configuration
# Read metrics about memory usage
[mem]
# no configuration
# no configuration
[system]
# no configuration
# no configuration
[swap]
# no configuration
# no configuration

View File

@@ -19,10 +19,10 @@ type Kafka struct {
}
var sampleConfig = `
# URLs of kafka brokers
brokers = ["localhost:9092"]
# Kafka topic for producer messages
topic = "telegraf"
# URLs of kafka brokers
brokers = ["localhost:9092"]
# Kafka topic for producer messages
topic = "telegraf"
`
func (k *Kafka) Connect() error {

View File

@@ -260,7 +260,9 @@ else
debian_package=telegraf_${VERSION}_amd64.deb
fi
COMMON_FPM_ARGS="-C $TMP_WORK_DIR --vendor $VENDOR --url $URL --license $LICENSE --maintainer $MAINTAINER --after-install $POST_INSTALL_PATH --name telegraf --version $VERSION --config-files $CONFIG_ROOT_DIR ."
COMMON_FPM_ARGS="-C $TMP_WORK_DIR --vendor $VENDOR --url $URL --license $LICENSE \
--maintainer $MAINTAINER --after-install $POST_INSTALL_PATH \
--name telegraf --version $VERSION --config-files $CONFIG_ROOT_DIR ."
$rpm_args fpm -s dir -t rpm --description "$DESCRIPTION" $COMMON_FPM_ARGS
if [ $? -ne 0 ]; then
echo "Failed to create RPM package -- aborting."
@@ -289,15 +291,35 @@ if [ "$CIRCLE_BRANCH" == "" ]; then
cleanup_exit 1
fi
# Upload .deb and .rpm packages
for filepath in `ls *.{deb,rpm}`; do
echo "Uploading $filepath to S3"
filename=`basename $filepath`
echo "Uploading $filename to s3://get.influxdb.org/telegraf/$filename"
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath s3://get.influxdb.org/telegraf/$filename --acl public-read --region us-east-1
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $filepath \
s3://get.influxdb.org/telegraf/$filename \
--acl public-read --region us-east-1
if [ $? -ne 0 ]; then
echo "Upload failed -- aborting".
cleanup_exit 1
fi
rm $filepath
done
# Make and upload linux amd64, 386, and arm
make build-linux-bins
for b in `ls telegraf_*`; do
zippedbin=${b}_${VERSION}.tar.gz
# Zip the binary
tar -zcf $TMP_WORK_DIR/$zippedbin ./$b
echo "Uploading binary: $zippedbin to S3"
AWS_CONFIG_FILE=$AWS_FILE aws s3 cp $TMP_WORK_DIR/$zippedbin \
s3://get.influxdb.org/telegraf/$zippedbin \
--acl public-read --region us-east-1
if [ $? -ne 0 ]; then
echo "Binary upload failed -- aborting".
cleanup_exit 1
fi
done
else
echo "Not publishing packages to S3."

View File

@@ -1,6 +1,7 @@
package all
import (
_ "github.com/influxdb/telegraf/plugins/apache"
_ "github.com/influxdb/telegraf/plugins/disque"
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
_ "github.com/influxdb/telegraf/plugins/exec"
@@ -13,6 +14,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/nginx"
_ "github.com/influxdb/telegraf/plugins/ping"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/prometheus"
_ "github.com/influxdb/telegraf/plugins/rabbitmq"

150
plugins/apache/apache.go Normal file
View File

@@ -0,0 +1,150 @@
package apache
import (
"bufio"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdb/telegraf/plugins"
)
type Apache struct {
Urls []string
}
var sampleConfig = `
# An array of Apache status URI to gather stats.
urls = ["http://localhost/server-status?auto"]
`
func (n *Apache) SampleConfig() string {
return sampleConfig
}
func (n *Apache) Description() string {
return "Read Apache status information (mod_status)"
}
func (n *Apache) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
var outerr error
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
return fmt.Errorf("Unable to parse address '%s': %s", u, err)
}
wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
outerr = n.gatherUrl(addr, acc)
}(addr)
}
wg.Wait()
return outerr
}
var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}
var client = &http.Client{Transport: tr}
func (n *Apache) gatherUrl(addr *url.URL, acc plugins.Accumulator) error {
resp, err := client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
tags := getTags(addr)
sc := bufio.NewScanner(resp.Body)
for sc.Scan() {
line := sc.Text()
if strings.Contains(line, ":") {
parts := strings.SplitN(line, ":", 2)
key, part := strings.Replace(parts[0], " ", "", -1), strings.TrimSpace(parts[1])
switch key {
case "Scoreboard":
n.gatherScores(part, acc, tags)
default:
value, err := strconv.ParseFloat(part, 32)
if err != nil {
continue
}
acc.Add(key, value, tags)
}
}
}
return nil
}
func (n *Apache) gatherScores(data string, acc plugins.Accumulator, tags map[string]string) {
var waiting, open int = 0, 0
var S, R, W, K, D, C, L, G, I int = 0, 0, 0, 0, 0, 0, 0, 0, 0
for _, s := range strings.Split(data, "") {
switch s {
case "_": waiting++
case "S": S++
case "R": R++
case "W": W++
case "K": K++
case "D": D++
case "C": C++
case "L": L++
case "G": G++
case "I": I++
case ".": open++
}
}
acc.Add("scboard_waiting", float64(waiting), tags);
acc.Add("scboard_starting", float64(S), tags);
acc.Add("scboard_reading", float64(R), tags);
acc.Add("scboard_sending", float64(W), tags);
acc.Add("scboard_keepalive", float64(K), tags);
acc.Add("scboard_dnslookup", float64(D), tags);
acc.Add("scboard_closing", float64(C), tags);
acc.Add("scboard_logging", float64(L), tags);
acc.Add("scboard_finishing", float64(G), tags);
acc.Add("scboard_idle_cleanup", float64(I), tags);
acc.Add("scboard_open", float64(open), tags);
}
// Get tag(s) for the apache plugin
func getTags(addr *url.URL) map[string]string {
h := addr.Host
var htag string
if host, _, err := net.SplitHostPort(h); err == nil {
htag = host
} else {
htag = h
}
return map[string]string{"server": htag}
}
func init() {
plugins.Add("apache", func() plugins.Plugin {
return &Apache{}
})
}

View File

@@ -58,6 +58,10 @@ type mapping struct {
}
var mappings = []*mapping{
{
onServer: "Aborted_",
inExport: "aborted_",
},
{
onServer: "Bytes_",
inExport: "bytes_",
@@ -66,6 +70,10 @@ var mappings = []*mapping{
onServer: "Com_",
inExport: "commands_",
},
{
onServer: "Created_",
inExport: "created_",
},
{
onServer: "Handler_",
inExport: "handler_",
@@ -74,6 +82,26 @@ var mappings = []*mapping{
onServer: "Innodb_",
inExport: "innodb_",
},
{
onServer: "Key_",
inExport: "key_",
},
{
onServer: "Open_",
inExport: "open_",
},
{
onServer: "Opened_",
inExport: "opened_",
},
{
onServer: "Qcache_",
inExport: "qcache_",
},
{
onServer: "Table_",
inExport: "table_",
},
{
onServer: "Tokudb_",
inExport: "tokudb_",

View File

@@ -33,6 +33,13 @@ func TestMysqlGeneratesMetrics(t *testing.T) {
{"bytes", 2},
{"innodb", 51},
{"threads", 4},
{"aborted", 2},
{"created", 3},
{"key", 7},
{"open", 7},
{"opened", 3},
{"qcache", 8},
{"table", 5},
}
intMetrics := []string{

177
plugins/ping/ping.go Normal file
View File

@@ -0,0 +1,177 @@
package ping
import (
"errors"
"os/exec"
"strconv"
"strings"
"sync"
"github.com/influxdb/telegraf/plugins"
)
// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(args ...string) (string, error)
type Ping struct {
// Interval at which to ping (ping -i <INTERVAL>)
PingInterval float64 `toml:"ping_interval"`
// Number of pings to send (ping -c <COUNT>)
Count int
// Ping timeout, in seconds. 0 means no timeout (ping -t <TIMEOUT>)
Timeout float64
// Interface to send ping from (ping -I <INTERFACE>)
Interface string
// URLs to ping
Urls []string
// host ping function
pingHost HostPinger
}
func (_ *Ping) Description() string {
return "Ping given url(s) and return statistics"
}
var sampleConfig = `
# urls to ping
urls = ["www.google.com"] # required
# number of pings to send (ping -c <COUNT>)
count = 1 # required
# interval, in s, at which to ping. 0 == default (ping -i <PING_INTERVAL>)
ping_interval = 0.0
# ping timeout, in s. 0 == no timeout (ping -t <TIMEOUT>)
timeout = 0.0
# interface to send ping from (ping -I <INTERFACE>)
interface = ""
`
func (_ *Ping) SampleConfig() string {
return sampleConfig
}
func (p *Ping) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup
errorChannel := make(chan error, len(p.Urls)*2)
// Spin off a go routine for each url to ping
for _, url := range p.Urls {
wg.Add(1)
go func(url string, acc plugins.Accumulator) {
defer wg.Done()
args := p.args(url)
out, err := p.pingHost(args...)
if err != nil {
// Combine go err + stderr output
errorChannel <- errors.New(
strings.TrimSpace(out) + ", " + err.Error())
}
tags := map[string]string{"url": url}
trans, rec, avg, err := processPingOutput(out)
if err != nil {
// fatal error
errorChannel <- err
return
}
// Calculate packet loss percentage
loss := float64(trans-rec) / float64(trans) * 100.0
acc.Add("packets_transmitted", trans, tags)
acc.Add("packets_received", rec, tags)
acc.Add("percent_packet_loss", loss, tags)
acc.Add("average_response_ms", avg, tags)
}(url, acc)
}
wg.Wait()
close(errorChannel)
// Get all errors and return them as one giant error
errorStrings := []string{}
for err := range errorChannel {
errorStrings = append(errorStrings, err.Error())
}
if len(errorStrings) == 0 {
return nil
}
return errors.New(strings.Join(errorStrings, "\n"))
}
func hostPinger(args ...string) (string, error) {
c := exec.Command("ping", args...)
out, err := c.CombinedOutput()
return string(out), err
}
// args returns the arguments for the 'ping' executable
func (p *Ping) args(url string) []string {
// Build the ping command args based on toml config
args := []string{"-c", strconv.Itoa(p.Count)}
if p.PingInterval > 0 {
args = append(args, "-i", strconv.FormatFloat(p.PingInterval, 'f', 1, 64))
}
if p.Timeout > 0 {
args = append(args, "-t", strconv.FormatFloat(p.Timeout, 'f', 1, 64))
}
if p.Interface != "" {
args = append(args, "-I", p.Interface)
}
args = append(args, url)
return args
}
// processPingOutput takes in a string output from the ping command, like:
//
// PING www.google.com (173.194.115.84): 56 data bytes
// 64 bytes from 173.194.115.84: icmp_seq=0 ttl=54 time=52.172 ms
// 64 bytes from 173.194.115.84: icmp_seq=1 ttl=54 time=34.843 ms
//
// --- www.google.com ping statistics ---
// 2 packets transmitted, 2 packets received, 0.0% packet loss
// round-trip min/avg/max/stddev = 34.843/43.508/52.172/8.664 ms
//
// It returns (<transmitted packets>, <received packets>, <average response>)
func processPingOutput(out string) (int, int, float64, error) {
var trans, recv int
var avg float64
// Set this error to nil if we find a 'transmitted' line
err := errors.New("Fatal error processing ping output")
lines := strings.Split(out, "\n")
for _, line := range lines {
if strings.Contains(line, "transmitted") &&
strings.Contains(line, "received") {
err = nil
stats := strings.Split(line, ", ")
// Transmitted packets
trans, err = strconv.Atoi(strings.Split(stats[0], " ")[0])
if err != nil {
return trans, recv, avg, err
}
// Received packets
recv, err = strconv.Atoi(strings.Split(stats[1], " ")[0])
if err != nil {
return trans, recv, avg, err
}
} else if strings.Contains(line, "min/avg/max") {
stats := strings.Split(line, " = ")[1]
avg, err = strconv.ParseFloat(strings.Split(stats, "/")[1], 64)
if err != nil {
return trans, recv, avg, err
}
}
}
return trans, recv, avg, err
}
func init() {
plugins.Add("ping", func() plugins.Plugin {
return &Ping{pingHost: hostPinger}
})
}

218
plugins/ping/ping_test.go Normal file
View File

@@ -0,0 +1,218 @@
package ping
import (
"errors"
"reflect"
"sort"
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
// BSD/Darwin ping output
var bsdPingOutput = `
PING www.google.com (216.58.217.36): 56 data bytes
64 bytes from 216.58.217.36: icmp_seq=0 ttl=55 time=15.087 ms
64 bytes from 216.58.217.36: icmp_seq=1 ttl=55 time=21.564 ms
64 bytes from 216.58.217.36: icmp_seq=2 ttl=55 time=27.263 ms
64 bytes from 216.58.217.36: icmp_seq=3 ttl=55 time=18.828 ms
64 bytes from 216.58.217.36: icmp_seq=4 ttl=55 time=18.378 ms
--- www.google.com ping statistics ---
5 packets transmitted, 5 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 15.087/20.224/27.263/4.076 ms
`
// Linux ping output
var linuxPingOutput = `
PING www.google.com (216.58.218.164) 56(84) bytes of data.
64 bytes from host.net (216.58.218.164): icmp_seq=1 ttl=63 time=35.2 ms
64 bytes from host.net (216.58.218.164): icmp_seq=2 ttl=63 time=42.3 ms
64 bytes from host.net (216.58.218.164): icmp_seq=3 ttl=63 time=45.1 ms
64 bytes from host.net (216.58.218.164): icmp_seq=4 ttl=63 time=43.5 ms
64 bytes from host.net (216.58.218.164): icmp_seq=5 ttl=63 time=51.8 ms
--- www.google.com ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 4010ms
rtt min/avg/max/mdev = 35.225/43.628/51.806/5.325 ms
`
// Fatal ping output (invalid argument)
var fatalPingOutput = `
ping: -i interval too short: Operation not permitted
`
// Test that ping command output is processed properly
func TestProcessPingOutput(t *testing.T) {
trans, rec, avg, err := processPingOutput(bsdPingOutput)
assert.NoError(t, err)
assert.Equal(t, 5, trans, "5 packets were transmitted")
assert.Equal(t, 5, rec, "5 packets were transmitted")
assert.InDelta(t, 20.224, avg, 0.001)
trans, rec, avg, err = processPingOutput(linuxPingOutput)
assert.NoError(t, err)
assert.Equal(t, 5, trans, "5 packets were transmitted")
assert.Equal(t, 5, rec, "5 packets were transmitted")
assert.InDelta(t, 43.628, avg, 0.001)
}
// Test that processPingOutput returns an error when 'ping' fails to run, such
// as when an invalid argument is provided
func TestErrorProcessPingOutput(t *testing.T) {
_, _, _, err := processPingOutput(fatalPingOutput)
assert.Error(t, err, "Error was expected from processPingOutput")
}
// Test that arg lists and created correctly
func TestArgs(t *testing.T) {
p := Ping{
Count: 2,
}
// Actual and Expected arg lists must be sorted for reflect.DeepEqual
actual := p.args("www.google.com")
expected := []string{"-c", "2", "www.google.com"}
sort.Strings(actual)
sort.Strings(expected)
assert.True(t, reflect.DeepEqual(expected, actual),
"Expected: %s Actual: %s", expected, actual)
p.Interface = "eth0"
actual = p.args("www.google.com")
expected = []string{"-c", "2", "-I", "eth0", "www.google.com"}
sort.Strings(actual)
sort.Strings(expected)
assert.True(t, reflect.DeepEqual(expected, actual),
"Expected: %s Actual: %s", expected, actual)
p.Timeout = 12.0
actual = p.args("www.google.com")
expected = []string{"-c", "2", "-I", "eth0", "-t", "12.0", "www.google.com"}
sort.Strings(actual)
sort.Strings(expected)
assert.True(t, reflect.DeepEqual(expected, actual),
"Expected: %s Actual: %s", expected, actual)
p.PingInterval = 1.2
actual = p.args("www.google.com")
expected = []string{"-c", "2", "-I", "eth0", "-t", "12.0", "-i", "1.2",
"www.google.com"}
sort.Strings(actual)
sort.Strings(expected)
assert.True(t, reflect.DeepEqual(expected, actual),
"Expected: %s Actual: %s", expected, actual)
}
func mockHostPinger(args ...string) (string, error) {
return linuxPingOutput, nil
}
// Test that Gather function works on a normal ping
func TestPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com", "www.reddit.com"},
pingHost: mockHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
assert.NoError(t, acc.ValidateTaggedValue("packets_transmitted", 5, tags))
assert.NoError(t, acc.ValidateTaggedValue("packets_received", 5, tags))
assert.NoError(t, acc.ValidateTaggedValue("percent_packet_loss", 0.0, tags))
assert.NoError(t, acc.ValidateTaggedValue("average_response_ms",
43.628, tags))
tags = map[string]string{"url": "www.reddit.com"}
assert.NoError(t, acc.ValidateTaggedValue("packets_transmitted", 5, tags))
assert.NoError(t, acc.ValidateTaggedValue("packets_received", 5, tags))
assert.NoError(t, acc.ValidateTaggedValue("percent_packet_loss", 0.0, tags))
assert.NoError(t, acc.ValidateTaggedValue("average_response_ms",
43.628, tags))
}
var lossyPingOutput = `
PING www.google.com (216.58.218.164) 56(84) bytes of data.
64 bytes from host.net (216.58.218.164): icmp_seq=1 ttl=63 time=35.2 ms
64 bytes from host.net (216.58.218.164): icmp_seq=3 ttl=63 time=45.1 ms
64 bytes from host.net (216.58.218.164): icmp_seq=5 ttl=63 time=51.8 ms
--- www.google.com ping statistics ---
5 packets transmitted, 3 received, 40% packet loss, time 4010ms
rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms
`
func mockLossyHostPinger(args ...string) (string, error) {
return lossyPingOutput, nil
}
// Test that Gather works on a ping with lossy packets
func TestLossyPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.google.com"},
pingHost: mockLossyHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.google.com"}
assert.NoError(t, acc.ValidateTaggedValue("packets_transmitted", 5, tags))
assert.NoError(t, acc.ValidateTaggedValue("packets_received", 3, tags))
assert.NoError(t, acc.ValidateTaggedValue("percent_packet_loss", 40.0, tags))
assert.NoError(t, acc.ValidateTaggedValue("average_response_ms", 44.033, tags))
}
var errorPingOutput = `
PING www.amazon.com (176.32.98.166): 56 data bytes
Request timeout for icmp_seq 0
--- www.amazon.com ping statistics ---
2 packets transmitted, 0 packets received, 100.0% packet loss
`
func mockErrorHostPinger(args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received")
}
// Test that Gather works on a ping with no transmitted packets, even though the
// command returns an error
func TestBadPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockErrorHostPinger,
}
p.Gather(&acc)
tags := map[string]string{"url": "www.amazon.com"}
assert.NoError(t, acc.ValidateTaggedValue("packets_transmitted", 2, tags))
assert.NoError(t, acc.ValidateTaggedValue("packets_received", 0, tags))
assert.NoError(t, acc.ValidateTaggedValue("percent_packet_loss", 100.0, tags))
assert.NoError(t, acc.ValidateTaggedValue("average_response_ms", 0.0, tags))
}
func mockFatalHostPinger(args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad")
}
// Test that a fatal ping command does not gather any statistics.
func TestFatalPingGather(t *testing.T) {
var acc testutil.Accumulator
p := Ping{
Urls: []string{"www.amazon.com"},
pingHost: mockFatalHostPinger,
}
p.Gather(&acc)
assert.False(t, acc.HasMeasurement("packets_transmitted"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("packets_received"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("percent_packet_loss"),
"Fatal ping should not have packet measurements")
assert.False(t, acc.HasMeasurement("average_response_ms"),
"Fatal ping should not have packet measurements")
}

View File

@@ -74,12 +74,12 @@ var ErrProtocolError = errors.New("redis protocol error")
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (g *Redis) Gather(acc plugins.Accumulator) error {
if len(g.Servers) == 0 {
func (r *Redis) Gather(acc plugins.Accumulator) error {
if len(r.Servers) == 0 {
url := &url.URL{
Host: ":6379",
}
g.gatherServer(url, acc)
r.gatherServer(url, acc)
return nil
}
@@ -87,7 +87,7 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
var outerr error
for _, serv := range g.Servers {
for _, serv := range r.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
@@ -100,7 +100,7 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = g.gatherServer(u, acc)
outerr = r.gatherServer(u, acc)
}(serv)
}
@@ -111,8 +111,8 @@ func (g *Redis) Gather(acc plugins.Accumulator) error {
const defaultPort = "6379"
func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
if g.c == nil {
func (r *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
if r.c == nil {
_, _, err := net.SplitHostPort(addr.Host)
if err != nil {
@@ -129,9 +129,9 @@ func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
r := bufio.NewReader(c)
rdr := bufio.NewReader(c)
line, err := r.ReadString('\n')
line, err := rdr.ReadString('\n')
if err != nil {
return err
}
@@ -141,14 +141,14 @@ func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
}
}
g.c = c
r.c = c
}
g.c.Write([]byte("info\r\n"))
r.c.Write([]byte("info\r\n"))
r := bufio.NewReader(g.c)
rdr := bufio.NewReader(r.c)
line, err := r.ReadString('\n')
line, err := rdr.ReadString('\n')
if err != nil {
return err
}
@@ -169,7 +169,7 @@ func (g *Redis) gatherServer(addr *url.URL, acc plugins.Accumulator) error {
var read int
for read < sz {
line, err := r.ReadString('\n')
line, err := rdr.ReadString('\n')
if err != nil {
return err
}

View File

@@ -10,7 +10,6 @@ import (
"github.com/influxdb/telegraf/plugins/system/ps/cpu"
"github.com/influxdb/telegraf/plugins/system/ps/disk"
"github.com/influxdb/telegraf/plugins/system/ps/docker"
"github.com/influxdb/telegraf/plugins/system/ps/load"
"github.com/influxdb/telegraf/plugins/system/ps/mem"
"github.com/influxdb/telegraf/plugins/system/ps/net"
)
@@ -24,7 +23,6 @@ type DockerContainerStat struct {
}
type PS interface {
LoadAvg() (*load.LoadAvgStat, error)
CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error)
DiskUsage() ([]*disk.DiskUsageStat, error)
NetIO() ([]net.NetIOCountersStat, error)
@@ -45,10 +43,6 @@ type systemPS struct {
dockerClient *dc.Client
}
func (s *systemPS) LoadAvg() (*load.LoadAvgStat, error) {
return load.LoadAvg()
}
func (s *systemPS) CPUTimes(perCPU, totalCPU bool) ([]cpu.CPUTimesStat, error) {
var cpuTimes []cpu.CPUTimesStat
if perCPU {

View File

@@ -1,39 +1,48 @@
package system
import "github.com/influxdb/telegraf/plugins"
import (
"github.com/cloudfoundry/gosigar"
type SystemStats struct {
ps PS
}
"github.com/influxdb/telegraf/plugins"
)
type SystemStats struct{}
func (_ *SystemStats) Description() string {
return "Read metrics about system load"
return "Read metrics about system load & uptime"
}
func (_ *SystemStats) SampleConfig() string { return "" }
func (s *SystemStats) add(acc plugins.Accumulator,
func (_ *SystemStats) add(acc plugins.Accumulator,
name string, val float64, tags map[string]string) {
if val >= 0 {
acc.Add(name, val, tags)
}
}
func (s *SystemStats) Gather(acc plugins.Accumulator) error {
lv, err := s.ps.LoadAvg()
if err != nil {
func (_ *SystemStats) Gather(acc plugins.Accumulator) error {
loadavg := sigar.LoadAverage{}
if err := loadavg.Get(); err != nil {
return err
}
acc.Add("load1", lv.Load1, nil)
acc.Add("load5", lv.Load5, nil)
acc.Add("load15", lv.Load15, nil)
uptime := sigar.Uptime{}
if err := uptime.Get(); err != nil {
return err
}
acc.Add("load1", loadavg.One, nil)
acc.Add("load5", loadavg.Five, nil)
acc.Add("load15", loadavg.Fifteen, nil)
acc.Add("uptime", uptime.Length, nil)
acc.Add("uptime_format", uptime.Format(), nil)
return nil
}
func init() {
plugins.Add("system", func() plugins.Plugin {
return &SystemStats{ps: &systemPS{}}
return &SystemStats{}
})
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/influxdb/telegraf/plugins/system/ps/cpu"
"github.com/influxdb/telegraf/plugins/system/ps/disk"
"github.com/influxdb/telegraf/plugins/system/ps/load"
"github.com/influxdb/telegraf/plugins/system/ps/mem"
"github.com/influxdb/telegraf/plugins/system/ps/net"
"github.com/influxdb/telegraf/testutil"
@@ -22,14 +21,6 @@ func TestSystemStats_GenerateStats(t *testing.T) {
var acc testutil.Accumulator
lv := &load.LoadAvgStat{
Load1: 0.3,
Load5: 1.5,
Load15: 0.8,
}
mps.On("LoadAvg").Return(lv, nil)
cts := cpu.CPUTimesStat{
CPU: "cpu0",
User: 3.1,
@@ -128,15 +119,6 @@ func TestSystemStats_GenerateStats(t *testing.T) {
mps.On("SwapStat").Return(sms, nil)
ss := &SystemStats{ps: &mps}
err := ss.Gather(&acc)
require.NoError(t, err)
assert.True(t, acc.CheckValue("load1", 0.3))
assert.True(t, acc.CheckValue("load5", 1.5))
assert.True(t, acc.CheckValue("load15", 0.8))
cs := NewCPUStats(&mps)
cputags := map[string]string{
@@ -144,7 +126,7 @@ func TestSystemStats_GenerateStats(t *testing.T) {
}
preCPUPoints := len(acc.Points)
err = cs.Gather(&acc)
err := cs.Gather(&acc)
require.NoError(t, err)
numCPUPoints := len(acc.Points) - preCPUPoints

View File

@@ -151,3 +151,14 @@ func (a *Accumulator) HasFloatValue(measurement string) bool {
return false
}
// HasMeasurement returns true if the accumulator has a measurement with the
// given name
func (a *Accumulator) HasMeasurement(measurement string) bool {
for _, p := range a.Points {
if p.Measurement == measurement {
return true
}
}
return false
}