Compare commits

...

1 Commits

Author SHA1 Message Date
Cameron Sparr 92d8a2e5ea
Load external .so plugins
support for the Go 1.8 shared object feature of loading external
plugins.

this support relies on the developer defining a `Plugin` symbol in their
.so file that is a telegraf plugin interface.

So instead of the plugin developer "Adding" their own plugin to the
telegraf registry, telegraf loads the .so, looks up the Plugin symbol,
and then adds it if it finds it.

The name of the plugin is determined by telegraf, and is namespaced
based on the filename and path.

see #1717
2017-03-10 16:55:23 +00:00
3 changed files with 119 additions and 11 deletions

View File

@ -15,8 +15,7 @@ windows: prepare-windows build-windows
# Only run the build (no dependency grabbing)
build:
go install -ldflags \
"-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.branch=$(BRANCH)" ./...
go install -ldflags "-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.branch=$(BRANCH)" ./...
build-windows:
GOOS=windows GOARCH=amd64 go build -o telegraf.exe -ldflags \

View File

@ -6,19 +6,27 @@ import (
"log"
"os"
"os/signal"
"path"
"path/filepath"
"plugin"
"runtime"
"strings"
"syscall"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/logger"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
"github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/processors"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service"
)
@ -50,23 +58,29 @@ var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'")
var fService = flag.String("service", "",
"operate on the service")
var fPlugins = flag.String("external-plugins", "",
"path to directory containing external plugins")
// Telegraf version, populated linker.
// ie, -ldflags "-X main.version=`git describe --always --tags`"
var (
version string
commit string
branch string
version string
commit string
branch string
goversion string
)
func init() {
// If commit or branch are not set, make that clear.
if version == "" {
version = "unknown"
}
if commit == "" {
commit = "unknown"
}
if branch == "" {
branch = "unknown"
}
goversion = runtime.Version() + " " + runtime.GOOS + "/" + runtime.GOARCH
}
const usage = `Telegraf, The plugin-driven server agent for collecting and reporting metrics.
@ -83,6 +97,9 @@ The commands & flags are:
--config <file> configuration file to load
--test gather metrics once, print them to stdout, and exit
--config-directory directory containing additional *.conf files
--external-plugins directory containing *.so files, this directory will be
searched recursively. Any Plugin found will be loaded
and namespaced.
--input-filter filter the input plugins to enable, separator is :
--output-filter filter the output plugins to enable, separator is :
--usage print usage for a plugin, ie, 'telegraf --usage mysql'
@ -188,7 +205,8 @@ func reloadLoop(
}
}()
log.Printf("I! Starting Telegraf (version %s)\n", version)
log.Printf("I! Starting Telegraf (version %s), Go version: %s\n",
version, goversion)
log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("I! Tags enabled: %s", c.ListTags())
@ -246,10 +264,90 @@ func (p *program) Stop(s service.Service) error {
return nil
}
// loadExternalPlugins loads external plugins from shared libraries (.so, .dll, etc.)
// in the specified directory.
func loadExternalPlugins(rootDir string) error {
return filepath.Walk(rootDir, func(pth string, info os.FileInfo, err error) error {
// Stop if there was an error.
if err != nil {
return err
}
// Ignore directories.
if info.IsDir() {
return nil
}
// Ignore files that aren't shared libraries.
ext := strings.ToLower(path.Ext(pth))
if ext != ".so" && ext != ".dll" {
return nil
}
// name will be the path to the plugin file beginning at the root
// directory, minus the extension.
// ie, if the plugin file is /opt/telegraf-plugins/group1/foo.so, name
// will be "group1/foo"
name := strings.TrimPrefix(strings.TrimPrefix(pth, rootDir), string(os.PathSeparator))
name = strings.TrimSuffix(name, filepath.Ext(pth))
name = "external" + string(os.PathSeparator) + name
// Load plugin.
p, err := plugin.Open(pth)
if err != nil {
return fmt.Errorf("error loading [%s]: %s", pth, err)
}
s, err := p.Lookup("Plugin")
if err != nil {
fmt.Printf("ERROR Could not find 'Plugin' symbol in [%s]\n", pth)
return nil
}
switch tplugin := s.(type) {
case *telegraf.Input:
fmt.Printf("Adding external input plugin: %s\n", name)
inputs.Add(name, func() telegraf.Input { return *tplugin })
case *telegraf.Output:
fmt.Printf("Adding external output plugin: %s\n", name)
outputs.Add(name, func() telegraf.Output { return *tplugin })
case *telegraf.Processor:
fmt.Printf("Adding external processor plugin: %s\n", name)
processors.Add(name, func() telegraf.Processor { return *tplugin })
case *telegraf.Aggregator:
fmt.Printf("Adding external aggregator plugin: %s\n", name)
aggregators.Add(name, func() telegraf.Aggregator { return *tplugin })
default:
fmt.Printf("ERROR: 'Plugin' symbol from [%s] is not a telegraf interface, it has type: %T\n", pth, tplugin)
}
return nil
})
}
func printVersion() {
fmt.Printf(`Telegraf %s
branch: %s
commit: %s
go version: %s
`, version, branch, commit, goversion)
}
func main() {
flag.Usage = func() { usageExit(0) }
flag.Parse()
args := flag.Args()
// Load external plugins, if requested.
if *fPlugins != "" {
pluginsDir, err := filepath.Abs(*fPlugins)
if err != nil {
log.Fatal(err.Error())
}
fmt.Printf("Loading external plugins from: %s\n", pluginsDir)
if err := loadExternalPlugins(*fPlugins); err != nil {
log.Fatal(err.Error())
}
}
inputFilters, outputFilters := []string{}, []string{}
if *fInputFilters != "" {
@ -270,7 +368,7 @@ func main() {
if len(args) > 0 {
switch args[0] {
case "version":
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
printVersion()
return
case "config":
config.PrintSampleConfig(
@ -298,7 +396,7 @@ func main() {
}
return
case *fVersion:
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit)
printVersion()
return
case *fSampleConfig:
config.PrintSampleConfig(

View File

@ -39,6 +39,14 @@ var (
// envVarRe is a regex to find environment variables in the config file
envVarRe = regexp.MustCompile(`\$\w+`)
// addQuoteRe is a regex for finding and adding quotes around / characters
// when they are used for distinguishing external plugins.
// ie, a ReplaceAll() with this pattern will be used to turn this:
// [[inputs.external/test/example]]
// to
// [[inputs."external/test/example"]]
addQuoteRe = regexp.MustCompile(`(\[?\[?inputs|outputs|processors|aggregators)\.(external\/[^.\]]+)`)
)
// Config specifies the URL/user/password for the database that telegraf
@ -704,6 +712,9 @@ func parseFile(fpath string) (*ast.Table, error) {
}
}
// add quotes around external plugin paths.
contents = addQuoteRe.ReplaceAll(contents, []byte(`$1."$2"`))
return toml.Parse(contents)
}