package twitter import ( "context" "net/url" "strings" "sync" "github.com/ChimeraCoder/anaconda" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" ) type Twitter struct { ServiceAddress string ConsumerKey string ConsumerSecret string AccessToken string AccessTokenSecret string KeywordsToTrack string api *anaconda.TwitterApi wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc } var sampleConfig = ` ## These values can be optained from apps.twitter.com service_address = "" consumer_key = "" consumer_secret = "" access_token = "" access_token_secret = "" keywords_to_track = "" ` // SampleConfig returns the default configuration of the Input func (s *Twitter) SampleConfig() string { return sampleConfig } // Description returns a one-sentence description on the Input func (s *Twitter) Description() string { return "Pulls metrics from tweets" } // Gather - Called every interval and used for polling inputs func (s *Twitter) Gather(acc telegraf.Accumulator) error { return nil } // Start - Called once when starting the plugin func (s *Twitter) Start(acc telegraf.Accumulator) error { anaconda.SetConsumerKey(s.ConsumerKey) anaconda.SetConsumerSecret(s.ConsumerSecret) s.api = anaconda.NewTwitterApi(s.AccessToken, s.AccessTokenSecret) // Store the cancel function so we can call it on Stop s.ctx, s.cancel = context.WithCancel(context.Background()) s.wg = &sync.WaitGroup{} s.wg.Add(1) go s.fetchTweets(acc) return nil } func (s *Twitter) fetchTweets(acc telegraf.Accumulator) { defer s.wg.Done() // We will use this little later for finding keywords in tweets keywordsList := strings.Split(s.KeywordsToTrack, ",") // Setting the keywords we want to track v := url.Values{} v.Set("track", s.KeywordsToTrack) stream := s.api.PublicStreamFilter(v) for item := range stream.C { select { // Listen for the call to cancle so we know it's time to stop case <-s.ctx.Done(): stream.Stop() return default: switch tweet := item.(type) { case anaconda.Tweet: fields := make(map[string]interface{}) tags := make(map[string]string) if tweet.Lang != "" { tags["lang"] = tweet.Lang } fields["retweet_count"] = tweet.RetweetCount fields["tweet_id"] = tweet.IdStr fields["followers_count"] = tweet.User.FollowersCount fields["screen_name"] = tweet.User.ScreenName fields["friends_count"] = tweet.User.FriendsCount fields["user_verified"] = tweet.User.Verified fields["source"] = tweet.Source fields["full_text"] = tweet.FullText time, err := tweet.CreatedAtTime() if err != nil { acc.AddError(err) continue } for _, keyword := range keywordsList { if strings.Contains(strings.ToLower(tweet.Text), strings.ToLower(keyword)) { tags["keyword"] = strings.ToLower(keyword) acc.AddFields("tweets", fields, tags, time) } } } } } } // Stop - Called once when stopping the plugin func (s *Twitter) Stop() { s.cancel() s.wg.Wait() return } func init() { inputs.Add("twitter", func() telegraf.Input { return &Twitter{} }) }