Skip to content

Instantly share code, notes, and snippets.

@ashavijit
Created April 4, 2023 15:49
Show Gist options
  • Select an option

  • Save ashavijit/4776e42bbfc59e38c731f5f00be3fba8 to your computer and use it in GitHub Desktop.

Select an option

Save ashavijit/4776e42bbfc59e38c731f5f00be3fba8 to your computer and use it in GitHub Desktop.
Updated B0Bot
import tweepy
import pymongo
import os
from flask import Flask, request
from flask import jsonify
from flask import make_response
import yaml
from werkzeug.exceptions import NotFound
# Set up Flask app
app = Flask(__name__)
# Set up Twitter API credentials
consumer_key = os.getenv('CONSUMER_KEY')
consumer_secret = os.getenv('CONSUMER_SECRET')
access_token = os.getenv('ACCESS_TOKEN')
access_token_secret = os.getenv('ACCESS_TOKEN_SECRET')
# Set up Tweepy API client
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
# Set up MongoDB client
client = pymongo.MongoClient(os.getenv('MONGO_URI'))
db = client['bug_zero_db']
collection = db['news_collection']
# Load OpenAPI specification
with open('openapi.yaml', 'r') as f:
spec = yaml.load(f, Loader=yaml.Loader)
class B0Bot(tweepy.StreamListener):
def on_status(self, status):
if status.user.screen_name == 'bug_zero':
# Collect news data from tweet and store in MongoDB
news = {}
news['id'] = status.id
news['text'] = status.text
news['created_at'] = status.created_at
collection.insert_one(news)
def on_error(self, status_code):
if status_code == 420:
return False
def reply_to_user(self, keyword, mention):
# Retrieve latest news from MongoDB with given keyword
latest_news = collection.find_one({'text': {'$regex': f'.*{keyword}.*', '$options': 'i'}},
sort=[('created_at', pymongo.DESCENDING)])
# Reply to user with latest news
if latest_news:
reply_text = f"Here's the latest news on {keyword}: {latest_news['text']}"
else:
reply_text = f"Sorry, there's no news on {keyword} right now."
api.update_status(status=reply_text, in_reply_to_status_id=mention.id)
# Start B0Bot stream
bot = B0Bot()
stream = tweepy.Stream(auth=api.auth, listener=bot)
stream.filter(track=['cybersecurity', 'hacker', 'data breach'])
# Implement OpenAPI specification
@app.route('/api', methods=['GET'])
def get_api_spec():
return jsonify(spec)
@app.route('/api/news/latest', methods=['GET'])
def get_latest_news():
keyword = request.args.get('keyword')
latest_news = collection.find_one({'text': {'$regex': f'.*{keyword}.*', '$options': 'i'}},
sort=[('created_at', pymongo.DESCENDING)])
if latest_news:
return jsonify({'news': latest_news['text']})
else:
raise NotFound(f"Sorry, there's no news on {keyword} right now.")
@app.errorhandler(404)
def not_found_error(error):
return make_response(jsonify({'error': error.description}), 404)
if __name__ == '__main__':
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment