124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package twitter
|
|
|
|
import (
|
|
"context"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/ChimeraCoder/anaconda"
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/plugins/inputs"
|
|
)
|
|
|
|
type Twitter struct {
|
|
ServiceAddress string
|
|
ConsumerKey string
|
|
ConsumerSecret string
|
|
AccessToken string
|
|
AccessTokenSecret string
|
|
KeywordsToTrack string
|
|
|
|
api *anaconda.TwitterApi
|
|
wg *sync.WaitGroup
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
var sampleConfig = `
|
|
## These values can be optained from apps.twitter.com
|
|
service_address = ""
|
|
consumer_key = ""
|
|
consumer_secret = ""
|
|
access_token = ""
|
|
access_token_secret = ""
|
|
keywords_to_track = ""
|
|
`
|
|
|
|
// SampleConfig returns the default configuration of the Input
|
|
func (s *Twitter) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description returns a one-sentence description on the Input
|
|
func (s *Twitter) Description() string {
|
|
return "Pulls metrics from tweets"
|
|
}
|
|
|
|
// Gather - Called every interval and used for polling inputs
|
|
func (s *Twitter) Gather(acc telegraf.Accumulator) error {
|
|
return nil
|
|
}
|
|
|
|
// Start - Called once when starting the plugin
|
|
func (s *Twitter) Start(acc telegraf.Accumulator) error {
|
|
anaconda.SetConsumerKey(s.ConsumerKey)
|
|
anaconda.SetConsumerSecret(s.ConsumerSecret)
|
|
s.api = anaconda.NewTwitterApi(s.AccessToken, s.AccessTokenSecret)
|
|
|
|
// Store the cancel function so we can call it on Stop
|
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
|
s.wg = &sync.WaitGroup{}
|
|
s.wg.Add(1)
|
|
go s.fetchTweets(acc)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Twitter) fetchTweets(acc telegraf.Accumulator) {
|
|
defer s.wg.Done()
|
|
// We will use this little later for finding keywords in tweets
|
|
keywordsList := strings.Split(s.KeywordsToTrack, ",")
|
|
// Setting the keywords we want to track
|
|
v := url.Values{}
|
|
v.Set("track", s.KeywordsToTrack)
|
|
stream := s.api.PublicStreamFilter(v)
|
|
for item := range stream.C {
|
|
select {
|
|
// Listen for the call to cancle so we know it's time to stop
|
|
case <-s.ctx.Done():
|
|
stream.Stop()
|
|
return
|
|
default:
|
|
switch tweet := item.(type) {
|
|
case anaconda.Tweet:
|
|
fields := make(map[string]interface{})
|
|
tags := make(map[string]string)
|
|
if tweet.Lang != "" {
|
|
tags["lang"] = tweet.Lang
|
|
}
|
|
fields["retweet_count"] = tweet.RetweetCount
|
|
fields["tweet_id"] = tweet.IdStr
|
|
fields["followers_count"] = tweet.User.FollowersCount
|
|
fields["screen_name"] = tweet.User.ScreenName
|
|
fields["friends_count"] = tweet.User.FriendsCount
|
|
fields["user_verified"] = tweet.User.Verified
|
|
fields["source"] = tweet.Source
|
|
fields["full_text"] = tweet.FullText
|
|
time, err := tweet.CreatedAtTime()
|
|
if err != nil {
|
|
acc.AddError(err)
|
|
continue
|
|
}
|
|
for _, keyword := range keywordsList {
|
|
if strings.Contains(strings.ToLower(tweet.Text), strings.ToLower(keyword)) {
|
|
tags["keyword"] = strings.ToLower(keyword)
|
|
acc.AddFields("tweets", fields, tags, time)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop - Called once when stopping the plugin
|
|
func (s *Twitter) Stop() {
|
|
s.cancel()
|
|
s.wg.Wait()
|
|
return
|
|
}
|
|
|
|
func init() {
|
|
inputs.Add("twitter", func() telegraf.Input { return &Twitter{} })
|
|
}
|