Load external .so plugins

support for the Go 1.8 shared object feature of loading external
plugins.

this support relies on the developer defining a `Plugin` symbol in their
.so file that is a telegraf plugin interface.

So instead of the plugin developer "Adding" their own plugin to the
telegraf registry, telegraf loads the .so, looks up the Plugin symbol,
and then adds it if it finds it.

The name of the plugin is determined by telegraf, and is namespaced
based on the filename and path.

see #1717
This commit is contained in:
Cameron Sparr 2017-02-07 12:10:39 +00:00
parent 13f314a507
commit 92d8a2e5ea
No known key found for this signature in database
GPG Key ID: 19E67263DCB25D0F
3 changed files with 119 additions and 11 deletions

View File

@ -15,8 +15,7 @@ windows: prepare-windows build-windows
# Only run the build (no dependency grabbing) # Only run the build (no dependency grabbing)
build: build:
go install -ldflags \ go install -ldflags "-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.branch=$(BRANCH)" ./...
"-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.branch=$(BRANCH)" ./...
build-windows: build-windows:
GOOS=windows GOARCH=amd64 go build -o telegraf.exe -ldflags \ GOOS=windows GOARCH=amd64 go build -o telegraf.exe -ldflags \

View File

@ -6,19 +6,27 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"path"
"path/filepath"
"plugin"
"runtime" "runtime"
"strings" "strings"
"syscall" "syscall"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config" "github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/logger" "github.com/influxdata/telegraf/logger"
_ "github.com/influxdata/telegraf/plugins/aggregators/all" "github.com/influxdata/telegraf/plugins/aggregators"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/processors"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all" _ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all" _ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service" "github.com/kardianos/service"
) )
@ -50,23 +58,29 @@ var fUsage = flag.String("usage", "",
"print usage for a plugin, ie, 'telegraf -usage mysql'") "print usage for a plugin, ie, 'telegraf -usage mysql'")
var fService = flag.String("service", "", var fService = flag.String("service", "",
"operate on the service") "operate on the service")
var fPlugins = flag.String("external-plugins", "",
"path to directory containing external plugins")
// Telegraf version, populated linker. // Telegraf version, populated linker.
// ie, -ldflags "-X main.version=`git describe --always --tags`" // ie, -ldflags "-X main.version=`git describe --always --tags`"
var ( var (
version string version string
commit string commit string
branch string branch string
goversion string
) )
func init() { func init() {
// If commit or branch are not set, make that clear. if version == "" {
version = "unknown"
}
if commit == "" { if commit == "" {
commit = "unknown" commit = "unknown"
} }
if branch == "" { if branch == "" {
branch = "unknown" branch = "unknown"
} }
goversion = runtime.Version() + " " + runtime.GOOS + "/" + runtime.GOARCH
} }
const usage = `Telegraf, The plugin-driven server agent for collecting and reporting metrics. const usage = `Telegraf, The plugin-driven server agent for collecting and reporting metrics.
@ -83,6 +97,9 @@ The commands & flags are:
--config <file> configuration file to load --config <file> configuration file to load
--test gather metrics once, print them to stdout, and exit --test gather metrics once, print them to stdout, and exit
--config-directory directory containing additional *.conf files --config-directory directory containing additional *.conf files
--external-plugins directory containing *.so files, this directory will be
searched recursively. Any Plugin found will be loaded
and namespaced.
--input-filter filter the input plugins to enable, separator is : --input-filter filter the input plugins to enable, separator is :
--output-filter filter the output plugins to enable, separator is : --output-filter filter the output plugins to enable, separator is :
--usage print usage for a plugin, ie, 'telegraf --usage mysql' --usage print usage for a plugin, ie, 'telegraf --usage mysql'
@ -188,7 +205,8 @@ func reloadLoop(
} }
}() }()
log.Printf("I! Starting Telegraf (version %s)\n", version) log.Printf("I! Starting Telegraf (version %s), Go version: %s\n",
version, goversion)
log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " ")) log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
log.Printf("I! Tags enabled: %s", c.ListTags()) log.Printf("I! Tags enabled: %s", c.ListTags())
@ -246,10 +264,90 @@ func (p *program) Stop(s service.Service) error {
return nil return nil
} }
// loadExternalPlugins loads external plugins from shared libraries (.so, .dll, etc.)
// in the specified directory.
func loadExternalPlugins(rootDir string) error {
return filepath.Walk(rootDir, func(pth string, info os.FileInfo, err error) error {
// Stop if there was an error.
if err != nil {
return err
}
// Ignore directories.
if info.IsDir() {
return nil
}
// Ignore files that aren't shared libraries.
ext := strings.ToLower(path.Ext(pth))
if ext != ".so" && ext != ".dll" {
return nil
}
// name will be the path to the plugin file beginning at the root
// directory, minus the extension.
// ie, if the plugin file is /opt/telegraf-plugins/group1/foo.so, name
// will be "group1/foo"
name := strings.TrimPrefix(strings.TrimPrefix(pth, rootDir), string(os.PathSeparator))
name = strings.TrimSuffix(name, filepath.Ext(pth))
name = "external" + string(os.PathSeparator) + name
// Load plugin.
p, err := plugin.Open(pth)
if err != nil {
return fmt.Errorf("error loading [%s]: %s", pth, err)
}
s, err := p.Lookup("Plugin")
if err != nil {
fmt.Printf("ERROR Could not find 'Plugin' symbol in [%s]\n", pth)
return nil
}
switch tplugin := s.(type) {
case *telegraf.Input:
fmt.Printf("Adding external input plugin: %s\n", name)
inputs.Add(name, func() telegraf.Input { return *tplugin })
case *telegraf.Output:
fmt.Printf("Adding external output plugin: %s\n", name)
outputs.Add(name, func() telegraf.Output { return *tplugin })
case *telegraf.Processor:
fmt.Printf("Adding external processor plugin: %s\n", name)
processors.Add(name, func() telegraf.Processor { return *tplugin })
case *telegraf.Aggregator:
fmt.Printf("Adding external aggregator plugin: %s\n", name)
aggregators.Add(name, func() telegraf.Aggregator { return *tplugin })
default:
fmt.Printf("ERROR: 'Plugin' symbol from [%s] is not a telegraf interface, it has type: %T\n", pth, tplugin)
}
return nil
})
}
func printVersion() {
fmt.Printf(`Telegraf %s
branch: %s
commit: %s
go version: %s
`, version, branch, commit, goversion)
}
func main() { func main() {
flag.Usage = func() { usageExit(0) } flag.Usage = func() { usageExit(0) }
flag.Parse() flag.Parse()
args := flag.Args() args := flag.Args()
// Load external plugins, if requested.
if *fPlugins != "" {
pluginsDir, err := filepath.Abs(*fPlugins)
if err != nil {
log.Fatal(err.Error())
}
fmt.Printf("Loading external plugins from: %s\n", pluginsDir)
if err := loadExternalPlugins(*fPlugins); err != nil {
log.Fatal(err.Error())
}
}
inputFilters, outputFilters := []string{}, []string{} inputFilters, outputFilters := []string{}, []string{}
if *fInputFilters != "" { if *fInputFilters != "" {
@ -270,7 +368,7 @@ func main() {
if len(args) > 0 { if len(args) > 0 {
switch args[0] { switch args[0] {
case "version": case "version":
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit) printVersion()
return return
case "config": case "config":
config.PrintSampleConfig( config.PrintSampleConfig(
@ -298,7 +396,7 @@ func main() {
} }
return return
case *fVersion: case *fVersion:
fmt.Printf("Telegraf v%s (git: %s %s)\n", version, branch, commit) printVersion()
return return
case *fSampleConfig: case *fSampleConfig:
config.PrintSampleConfig( config.PrintSampleConfig(

View File

@ -39,6 +39,14 @@ var (
// envVarRe is a regex to find environment variables in the config file // envVarRe is a regex to find environment variables in the config file
envVarRe = regexp.MustCompile(`\$\w+`) envVarRe = regexp.MustCompile(`\$\w+`)
// addQuoteRe is a regex for finding and adding quotes around / characters
// when they are used for distinguishing external plugins.
// ie, a ReplaceAll() with this pattern will be used to turn this:
// [[inputs.external/test/example]]
// to
// [[inputs."external/test/example"]]
addQuoteRe = regexp.MustCompile(`(\[?\[?inputs|outputs|processors|aggregators)\.(external\/[^.\]]+)`)
) )
// Config specifies the URL/user/password for the database that telegraf // Config specifies the URL/user/password for the database that telegraf
@ -704,6 +712,9 @@ func parseFile(fpath string) (*ast.Table, error) {
} }
} }
// add quotes around external plugin paths.
contents = addQuoteRe.ReplaceAll(contents, []byte(`$1."$2"`))
return toml.Parse(contents) return toml.Parse(contents)
} }