add initial version of rss plugin
This commit is contained in:
parent
ca10d4205b
commit
bcaf0e910c
|
@ -0,0 +1,24 @@
|
||||||
|
# Telegraf Input Plugin: rss
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# Pulls items from rss feeds
|
||||||
|
[[inputs.rss]]
|
||||||
|
## One or more URLs from which to read rss feed
|
||||||
|
urls = [
|
||||||
|
"http://localhost/rss/feed"
|
||||||
|
]
|
||||||
|
|
||||||
|
## Reduce traffic. Do not run too frequently.
|
||||||
|
interval = '10m'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Measurements & Fields:
|
||||||
|
|
||||||
|
|
||||||
|
### Tags:
|
||||||
|
|
||||||
|
|
||||||
|
### Example Output:
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
package rss
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"github.com/mmcdole/gofeed"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ts = time.Now()
|
||||||
|
|
||||||
|
type RSS struct {
|
||||||
|
URLs []string `toml:"urls"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RSS) Description() string {
|
||||||
|
return "Reads RSS Feeds Description"
|
||||||
|
}
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## One or more URLs from which to read rss feed
|
||||||
|
urls = [
|
||||||
|
"http://localhost/rss/feed"
|
||||||
|
]
|
||||||
|
|
||||||
|
## Reduce traffic. Do not run too frequently.
|
||||||
|
interval = '10m'
|
||||||
|
`
|
||||||
|
|
||||||
|
func (r *RSS) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RSS) Gather(acc telegraf.Accumulator) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, u := range r.URLs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(url string) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := r.gatherFeed(acc, url, ts); err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err))
|
||||||
|
}
|
||||||
|
}(u)
|
||||||
|
}
|
||||||
|
|
||||||
|
// remember last time started
|
||||||
|
ts = time.Now()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RSS) gatherFeed(acc telegraf.Accumulator, url string, ts time.Time) error {
|
||||||
|
fp := gofeed.NewParser()
|
||||||
|
feed, err := fp.ParseURL(url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
tags := make(map[string]string)
|
||||||
|
|
||||||
|
tags["url"] = url
|
||||||
|
tags["feed_type"] = feed.FeedType
|
||||||
|
tags["feed_version"] = feed.FeedVersion
|
||||||
|
|
||||||
|
for _, item := range feed.Items {
|
||||||
|
time := item.PublishedParsed
|
||||||
|
if time.Before(ts) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fields["title"] = item.Title
|
||||||
|
fields["description"] = item.Description
|
||||||
|
|
||||||
|
acc.AddFields("rss", fields, tags, *time)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("rss", func() telegraf.Input { return &RSS{} })
|
||||||
|
}
|
Loading…
Reference in New Issue