Skip to content

Instantly share code, notes, and snippets.

@itszechs
Created May 1, 2022 15:23
Show Gist options
  • Select an option

  • Save itszechs/9621f32163ac70a22283982f9cb7ea3b to your computer and use it in GitHub Desktop.

Select an option

Save itszechs/9621f32163ac70a22283982f9cb7ea3b to your computer and use it in GitHub Desktop.
A simple python class to observe rss-feeds
from typing import Dict, Callable
import feedparser
import threading
import time
class RssReader:
def __init__(
self,
url: str,
interval: int = 60,
callback: Callable = None,
threaded: bool = True
) -> None:
self.url = url
self.interval = interval
self.timestamp = self.__get_time()
self._callback = callback
if threaded:
thread = threading.Thread(
target=self.__subscribe,
daemon=True
)
thread.start()
else:
self.__subscribe()
def __parse(self) -> Dict:
parse = feedparser.parse(self.url)
rss = {}
if not parse.bozo:
rss['url'] = parse.feed.link
rss['title'] = parse.feed.title
rss['entries'] = []
for entry in parse.entries:
rss['entries'].append({
"title": entry.title,
"id": entry.id,
"link": entry.link,
"timestamp": entry.published
})
return rss
else:
raise Exception(f"Error parsing RSS feed: {parse.bozo_exception}")
def __subscribe(self) -> None:
while True:
count = 0
rss = self.__parse()
print(f"Current time: {self.timestamp}")
for entry in rss['entries']:
if self.timestamp <= entry['timestamp']:
print(f"New post: {entry}")
if self._callback:
self._callback(dict(entry))
count += 1
if count > 0:
print(f'{count} new entries found')
else:
print("No new entries")
self.timestamp = self.__get_time()
print()
time.sleep(self.interval)
def __get_time(self) -> str:
return time.strftime(
"%a, %d %b %Y %H:%M:%S -0000",
time.gmtime()
)
def callback(post: dict) -> None:
print(f"Callback : {post}")
if __name__ == '__main__':
dummy_rss = "https://lorem-rss.herokuapp.com/feed?unit=second&interval=30"
RssReader(
url=dummy_rss,
callback=callback,
interval=30,
threaded=False
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment