telegraf/plugins/inputs/twitter/twitter.go

124 lines
3.1 KiB
Go

package twitter
import (
"context"
"net/url"
"strings"
"sync"
"github.com/ChimeraCoder/anaconda"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Twitter struct {
ServiceAddress string
ConsumerKey string
ConsumerSecret string
AccessToken string
AccessTokenSecret string
KeywordsToTrack string
api *anaconda.TwitterApi
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
var sampleConfig = `
## These values can be optained from apps.twitter.com
service_address = ""
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
keywords_to_track = ""
`
// SampleConfig returns the default configuration of the Input
func (s *Twitter) SampleConfig() string {
return sampleConfig
}
// Description returns a one-sentence description on the Input
func (s *Twitter) Description() string {
return "Pulls metrics from tweets"
}
// Gather - Called every interval and used for polling inputs
func (s *Twitter) Gather(acc telegraf.Accumulator) error {
return nil
}
// Start - Called once when starting the plugin
func (s *Twitter) Start(acc telegraf.Accumulator) error {
anaconda.SetConsumerKey(s.ConsumerKey)
anaconda.SetConsumerSecret(s.ConsumerSecret)
s.api = anaconda.NewTwitterApi(s.AccessToken, s.AccessTokenSecret)
// Store the cancel function so we can call it on Stop
s.ctx, s.cancel = context.WithCancel(context.Background())
s.wg = &sync.WaitGroup{}
s.wg.Add(1)
go s.fetchTweets(acc)
return nil
}
func (s *Twitter) fetchTweets(acc telegraf.Accumulator) {
defer s.wg.Done()
// We will use this little later for finding keywords in tweets
keywordsList := strings.Split(s.KeywordsToTrack, ",")
// Setting the keywords we want to track
v := url.Values{}
v.Set("track", s.KeywordsToTrack)
stream := s.api.PublicStreamFilter(v)
for item := range stream.C {
select {
// Listen for the call to cancle so we know it's time to stop
case <-s.ctx.Done():
stream.Stop()
return
default:
switch tweet := item.(type) {
case anaconda.Tweet:
fields := make(map[string]interface{})
tags := make(map[string]string)
if tweet.Lang != "" {
tags["lang"] = tweet.Lang
}
fields["retweet_count"] = tweet.RetweetCount
fields["tweet_id"] = tweet.IdStr
fields["followers_count"] = tweet.User.FollowersCount
fields["screen_name"] = tweet.User.ScreenName
fields["friends_count"] = tweet.User.FriendsCount
fields["user_verified"] = tweet.User.Verified
fields["source"] = tweet.Source
fields["full_text"] = tweet.FullText
time, err := tweet.CreatedAtTime()
if err != nil {
acc.AddError(err)
continue
}
for _, keyword := range keywordsList {
if strings.Contains(strings.ToLower(tweet.Text), strings.ToLower(keyword)) {
tags["keyword"] = strings.ToLower(keyword)
acc.AddFields("tweets", fields, tags, time)
}
}
}
}
}
}
// Stop - Called once when stopping the plugin
func (s *Twitter) Stop() {
s.cancel()
s.wg.Wait()
return
}
func init() {
inputs.Add("twitter", func() telegraf.Input { return &Twitter{} })
}