Created
July 20, 2023 14:27
-
-
Save Krytos/9f338b3ac6a6bdf22a66ce05cd045ae9 to your computer and use it in GitHub Desktop.
Chapter Scraper for TBATE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import smtplib | |
| import os | |
| from zipfile import ZipFile | |
| from fastapi import FastAPI | |
| from bs4 import BeautifulSoup as bs | |
| from lxml import etree | |
| from httpx import AsyncClient | |
| from fuzzysearch import find_near_matches | |
| from datetime import datetime, timedelta | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.base import MIMEBase | |
| from email.mime.text import MIMEText | |
| from email import encoders | |
| from dotenv import load_dotenv | |
| from logger_local import setup_applevel_logger | |
| load_dotenv() | |
| log = setup_applevel_logger(file_name='debug.log') | |
| # Time in minutes to wait before checking for new chapter | |
| WAIT_TIME = 10 | |
| TBATE_URL = "https://www.readlightnovel.me/the-beginning-after-the-end-121422" | |
| STR_TO_DEL = [ | |
| "Read first at lnreader.org!", | |
| ] | |
| SENDER_EMAIL = os.getenv("SENDER_EMAIL") | |
| RECEIVER_EMAIL = os.getenv("RECEIVER_EMAIL") | |
| EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD") | |
| SMTP_SERVER = os.getenv("SMTP_SERVER") | |
| SMTP_PORT = os.getenv("SMTP_PORT") | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
| TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") | |
| app = FastAPI() | |
| async def telegram_push(message): | |
| async with AsyncClient() as client: | |
| await client.post( | |
| f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", | |
| json={ | |
| "chat_id": TELEGRAM_CHAT_ID, | |
| "text": message, | |
| }, | |
| ) | |
| async def grab_chapter() -> tuple: | |
| async with AsyncClient() as client: | |
| response = await client.get(TBATE_URL) | |
| soup = bs(response.text, "lxml") | |
| dom = etree.HTML(str(soup)) | |
| novel_data = {} | |
| novel_data_list = dom.xpath( | |
| f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div' | |
| ) | |
| for i, x in enumerate(novel_data_list): | |
| title = dom.xpath( | |
| f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div[{i+1}]/div[1]/h3/text()' | |
| ) | |
| body = dom.xpath( | |
| f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div[{i+1}]/div[2]/ul/li/a/text()' | |
| ) | |
| if title[0].lower() == "author": | |
| novel_data["author"] = body[0] | |
| elif title[0].lower() == "genre": | |
| novel_data["genre"] = body[0] | |
| elif title[0].lower() == "type": | |
| novel_data["type"] = body[0] | |
| cover_image = dom.xpath( | |
| '/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[1]/a/img/@src' | |
| )[0] | |
| response = await client.get(cover_image) | |
| with open("cover_image.jpg", "wb") as f: | |
| f.write(response.content) | |
| recent_chapter = dom.xpath( | |
| '/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[2]/div/div[5]/div[2]/ul/li[1]/a/@href' | |
| )[0] | |
| try: | |
| with open("recent_chapter.txt", "r") as f: | |
| old_chapter = f.read() | |
| except FileNotFoundError: | |
| old_chapter = recent_chapter | |
| if old_chapter != recent_chapter: | |
| with open("recent_chapter.txt", "w") as f: | |
| f.write(recent_chapter) | |
| print("New chapter available") | |
| response = await client.get(recent_chapter) | |
| soup = bs(response.text, "html.parser").prettify() | |
| soup = bs(soup, "lxml") | |
| chapter = soup.find("div", id="chapterhidden") | |
| chapter_content = chapter.find_all("p") | |
| try: | |
| chapter_name = chapter.find("h1").text | |
| except AttributeError: | |
| chapter_name = chapter_content[0].text | |
| chapter_cleaned = [] | |
| chapter_title = f"TBATE - Chapter {recent_chapter.split('-')[-1]} - {chapter_name.strip()}" | |
| chapter_title = " ".join(chapter_title.split()) | |
| novel_data["chapter_title"] = chapter_title | |
| for i, x in enumerate(chapter_content): | |
| fuzzy = find_near_matches("Read first at lnreader.org!", x.text, max_l_dist=10) | |
| if fuzzy: | |
| chapter_cleaned.append(x.text[:fuzzy[0].start]) | |
| else: | |
| chapter_cleaned.append(x.text) | |
| chapter_cleaned[0] = f"<p><b>{chapter_cleaned[0]}</b>" | |
| chapter_cleaned[-1] = chapter_cleaned[-1] + "</p>" | |
| chapter_cleaned = "</p><p>".join(chapter_cleaned) | |
| chapter_cleaned = f"""<?xml version='1.0' encoding='utf-8'?> | |
| <html xmlns="http://www.w3.org/1999/xhtml"> | |
| <head> | |
| <title>{chapter_title}</title> | |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> | |
| </head> | |
| <body><p>{chapter_cleaned}</p></body></html>""" | |
| return chapter_cleaned, novel_data | |
| async def send_chapter(chapter_title, attachment): | |
| # TODO: Send chapter to email | |
| # Create message container | |
| msg = MIMEMultipart() | |
| msg['From'] = SENDER_EMAIL | |
| msg['To'] = RECEIVER_EMAIL | |
| msg['Subject'] = chapter_title | |
| # Add body to message | |
| msg.attach(MIMEText('', 'plain')) | |
| # Add attachments to message | |
| with open(attachment, 'rb') as f: | |
| part = MIMEBase('application', 'octet-stream') | |
| part.set_payload(f.read()) | |
| encoders.encode_base64(part) | |
| part.add_header('Content-Disposition', f'attachment; filename="{attachment}"') | |
| msg.attach(part) | |
| # Send the message via SMTP server | |
| smtp_server = SMTP_SERVER | |
| smtp_port = int(SMTP_PORT) | |
| smtp_username = SENDER_EMAIL | |
| smtp_password = EMAIL_PASSWORD | |
| smtp_connection = smtplib.SMTP(smtp_server, smtp_port) | |
| smtp_connection.starttls() | |
| smtp_connection.login(smtp_username, smtp_password) | |
| smtp_connection.sendmail(SENDER_EMAIL, RECEIVER_EMAIL, msg.as_string()) | |
| smtp_connection.quit() | |
| async def save_chapter(chapter: str, novel_data: dict): | |
| content_opf = f""" | |
| <?xml version='1.0' encoding='utf-8'?> | |
| <package xmlns="http://www.idpf.org/2007/opf" unique-identifier="uuid_id" version="2.0"> | |
| <metadata xmlns:calibre="http://calibre.kovidgoyal.net/2009/metadata" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:dcterms="http://purl.org/dc/terms/" xmlns:opf="http://www.idpf.org/2007/opf" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | |
| <meta name="calibre:timestamp" content="{datetime.now().isoformat()}"/> | |
| <dc:creator>{novel_data.get("author")}</dc:creator> | |
| <meta name="cover" content="cover"/> | |
| <dc:title>{novel_data.get('chapter_title')}</dc:title> | |
| <dc:language>en</dc:language> | |
| <dc:identifier id="uuid_id" opf:scheme="uuid">45f94238-3216-42d2-8b31-01f4a4009523</dc:identifier> | |
| </metadata> | |
| <manifest> | |
| <item href="cover_image.jpg" id="cover" media-type="image/jpeg"/> | |
| <item href="titlepage.xhtml" id="titlepage" media-type="application/xhtml+xml"/> | |
| <item href="{novel_data.get('chapter_title')}.html" id="html" media-type="application/xhtml+xml"/> | |
| </manifest> | |
| <spine toc="ncx"> | |
| <itemref idref="titlepage"/> | |
| <itemref idref="html"/> | |
| </spine> | |
| <guide> | |
| <reference href="titlepage.xhtml" title="Title Page" type="cover"/> | |
| </guide> | |
| </package> | |
| """ | |
| title_page = f"""<?xml version='1.0' encoding='utf-8'?> | |
| <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"> | |
| <head> | |
| <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/> | |
| <meta name="calibre:cover" content="true"/> | |
| <title>{novel_data.get('chapter_title')}</title> | |
| <style type="text/css" title="override_css"> | |
| @page {{padding: 0pt; margin:0pt}} | |
| body {{text - align: center; padding:0pt; margin: 0pt; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div> | |
| <svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="100%" height="100%" viewBox="0 0 1200 1600" preserveAspectRatio="none"> | |
| <image width="1200" height="1600" xlink:href="cover_image.jpg"/> | |
| </svg> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| with open(f'{novel_data.get("chapter_title")}.html', "w", encoding='utf-8') as f: | |
| f.write(chapter) | |
| with open("content.opf", "w", encoding='utf-8') as f: | |
| f.write(content_opf) | |
| with open("titlepage.xhtml", "w", encoding='utf-8') as f: | |
| f.write(title_page) | |
| with ZipFile(f'{novel_data.get("chapter_title")}.epub', 'w') as zipObj: | |
| zipObj.write(f'{novel_data.get("chapter_title")}.html') | |
| zipObj.write("content.opf") | |
| zipObj.write("titlepage.xhtml") | |
| zipObj.write("cover_image.jpg") | |
| os.remove(f'{novel_data.get("chapter_title")}.html') | |
| os.remove("content.opf") | |
| os.remove("titlepage.xhtml") | |
| os.remove("cover_image.jpg") | |
| async def main(): | |
| date = None | |
| while True: | |
| # check if it's friday | |
| if date is None or datetime.now() - date >= timedelta(days=7): | |
| history = None | |
| try: | |
| with open('latest.txt', 'r', encoding='utf-8') as f: | |
| history = f.read() | |
| except FileNotFoundError: | |
| log.warning('No history file, creating one') | |
| history = None | |
| chapter, data = await grab_chapter() | |
| await save_chapter(chapter, data) | |
| if history == data.get("chapter_title"): | |
| log.info(f'No new chapter yet, waiting {WAIT_TIME} minutes') | |
| await asyncio.sleep(60 * WAIT_TIME) | |
| continue | |
| else: | |
| log.info(f'New chapter found: {data.get("chapter_title")}. Sending to kindle') | |
| await send_chapter(data.get("chapter_title"), f'{data.get("chapter_title")}.epub') | |
| with open('latest.txt', 'w', encoding='utf-8') as f: | |
| f.write(data.get('chapter_title')) | |
| os.remove(f'{data.get("chapter_title")}.epub') | |
| date = datetime.now() | |
| await telegram_push(f'New chapter: \n{data.get("chapter_title")}') | |
| else: | |
| log.info(f'Not friday yet') | |
| log.info(f'Date: {date}') | |
| log.info(f'Now: {datetime.now()}') | |
| await asyncio.sleep(60 * 5) | |
| continue | |
| @app.get("/") | |
| async def root(): | |
| return {"message": "Hello World"} | |
| @app.get("/hello/{name}") | |
| async def say_hello(name: str): | |
| return {"message": f"Hello {name}"} | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment