diff --git a/plugins/inputs/twitter/README.md b/plugins/inputs/twitter/README.md new file mode 100644 index 000000000..01f7d0051 --- /dev/null +++ b/plugins/inputs/twitter/README.md @@ -0,0 +1,25 @@ +# Telegraf Service Plugin: twitter + +Originally developed by https://dzone.com/articles/monitoring-cryptocurrency-with-influxdb-amp-telegr + +### Configuration: + +```toml +# Pulls metrics from tweets +[[inputs.twitter]] + ## These values can be optained from apps.twitter.com + consumer_key = "ZrJV7ouTEQPW1gvtFti0Ou2qb" + consumer_secret = "X9qKXO8AkgVWEuS0TPbrxykzDpCtr9FJogUm1YEUbky1aW6hsk" + access_token = "835781864833052672-sBie6YhrhUB4MAYtyYZSE1fDlNv2XGM" + access_token_secret = "GGnHT2M8s5tQ7e4iKHIoDPNAsVSKUMmh6dBGNaluat5Yf" + keywords_to_track = "bitcoin,$btc,ethereum,$eth,monero,$xmr,dax,$dax,eurusd" +``` + +### Measurements & Fields: + + +### Tags: + + +### Example Output: + diff --git a/plugins/inputs/twitter/twitter.go b/plugins/inputs/twitter/twitter.go new file mode 100644 index 000000000..234e7eb6b --- /dev/null +++ b/plugins/inputs/twitter/twitter.go @@ -0,0 +1,123 @@ +package twitter + +import ( + "context" + "net/url" + "strings" + "sync" + + "github.com/ChimeraCoder/anaconda" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Twitter struct { + ServiceAddress string + ConsumerKey string + ConsumerSecret string + AccessToken string + AccessTokenSecret string + KeywordsToTrack string + + api *anaconda.TwitterApi + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +var sampleConfig = ` + ## These values can be optained from apps.twitter.com + service_address = "" + consumer_key = "" + consumer_secret = "" + access_token = "" + access_token_secret = "" + keywords_to_track = "" +` + +// SampleConfig returns the default configuration of the Input +func (s *Twitter) SampleConfig() string { + return sampleConfig +} + +// Description returns a one-sentence description on the Input +func (s *Twitter) Description() string { + return "Pulls metrics from tweets" +} + +// Gather - Called every interval and used for polling inputs +func (s *Twitter) Gather(acc telegraf.Accumulator) error { + return nil +} + +// Start - Called once when starting the plugin +func (s *Twitter) Start(acc telegraf.Accumulator) error { + anaconda.SetConsumerKey(s.ConsumerKey) + anaconda.SetConsumerSecret(s.ConsumerSecret) + s.api = anaconda.NewTwitterApi(s.AccessToken, s.AccessTokenSecret) + + // Store the cancel function so we can call it on Stop + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.wg = &sync.WaitGroup{} + s.wg.Add(1) + go s.fetchTweets(acc) + + return nil +} + +func (s *Twitter) fetchTweets(acc telegraf.Accumulator) { + defer s.wg.Done() + // We will use this little later for finding keywords in tweets + keywordsList := strings.Split(s.KeywordsToTrack, ",") + // Setting the keywords we want to track + v := url.Values{} + v.Set("track", s.KeywordsToTrack) + stream := s.api.PublicStreamFilter(v) + for item := range stream.C { + select { + // Listen for the call to cancle so we know it's time to stop + case <-s.ctx.Done(): + stream.Stop() + return + default: + switch tweet := item.(type) { + case anaconda.Tweet: + fields := make(map[string]interface{}) + tags := make(map[string]string) + if tweet.Lang != "" { + tags["lang"] = tweet.Lang + } + fields["retweet_count"] = tweet.RetweetCount + fields["tweet_id"] = tweet.IdStr + fields["followers_count"] = tweet.User.FollowersCount + fields["screen_name"] = tweet.User.ScreenName + fields["friends_count"] = tweet.User.FriendsCount + fields["user_verified"] = tweet.User.Verified + fields["source"] = tweet.Source + fields["full_text"] = tweet.FullText + time, err := tweet.CreatedAtTime() + if err != nil { + acc.AddError(err) + continue + } + for _, keyword := range keywordsList { + if strings.Contains(strings.ToLower(tweet.Text), strings.ToLower(keyword)) { + tags["keyword"] = strings.ToLower(keyword) + acc.AddFields("tweets", fields, tags, time) + } + } + } + } + } +} + +// Stop - Called once when stopping the plugin +func (s *Twitter) Stop() { + s.cancel() + s.wg.Wait() + return +} + +func init() { + inputs.Add("twitter", func() telegraf.Input { return &Twitter{} }) +}