Skip to content

Instantly share code, notes, and snippets.

@Krytos
Created July 20, 2023 14:27
Show Gist options
  • Select an option

  • Save Krytos/9f338b3ac6a6bdf22a66ce05cd045ae9 to your computer and use it in GitHub Desktop.

Select an option

Save Krytos/9f338b3ac6a6bdf22a66ce05cd045ae9 to your computer and use it in GitHub Desktop.
Chapter Scraper for TBATE
import asyncio
import smtplib
import os
from zipfile import ZipFile
from fastapi import FastAPI
from bs4 import BeautifulSoup as bs
from lxml import etree
from httpx import AsyncClient
from fuzzysearch import find_near_matches
from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from dotenv import load_dotenv
from logger_local import setup_applevel_logger
load_dotenv()
log = setup_applevel_logger(file_name='debug.log')
# Time in minutes to wait before checking for new chapter
WAIT_TIME = 10
TBATE_URL = "https://www.readlightnovel.me/the-beginning-after-the-end-121422"
STR_TO_DEL = [
"Read first at lnreader.org!",
]
SENDER_EMAIL = os.getenv("SENDER_EMAIL")
RECEIVER_EMAIL = os.getenv("RECEIVER_EMAIL")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = os.getenv("SMTP_PORT")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
app = FastAPI()
async def telegram_push(message):
async with AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage",
json={
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
},
)
async def grab_chapter() -> tuple:
async with AsyncClient() as client:
response = await client.get(TBATE_URL)
soup = bs(response.text, "lxml")
dom = etree.HTML(str(soup))
novel_data = {}
novel_data_list = dom.xpath(
f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div'
)
for i, x in enumerate(novel_data_list):
title = dom.xpath(
f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div[{i+1}]/div[1]/h3/text()'
)
body = dom.xpath(
f'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[4]/div[{i+1}]/div[2]/ul/li/a/text()'
)
if title[0].lower() == "author":
novel_data["author"] = body[0]
elif title[0].lower() == "genre":
novel_data["genre"] = body[0]
elif title[0].lower() == "type":
novel_data["type"] = body[0]
cover_image = dom.xpath(
'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[1]/div[1]/a/img/@src'
)[0]
response = await client.get(cover_image)
with open("cover_image.jpg", "wb") as f:
f.write(response.content)
recent_chapter = dom.xpath(
'/html/body/div[2]/div/div/div[1]/div/div[2]/div/div[2]/div/div[5]/div[2]/ul/li[1]/a/@href'
)[0]
try:
with open("recent_chapter.txt", "r") as f:
old_chapter = f.read()
except FileNotFoundError:
old_chapter = recent_chapter
if old_chapter != recent_chapter:
with open("recent_chapter.txt", "w") as f:
f.write(recent_chapter)
print("New chapter available")
response = await client.get(recent_chapter)
soup = bs(response.text, "html.parser").prettify()
soup = bs(soup, "lxml")
chapter = soup.find("div", id="chapterhidden")
chapter_content = chapter.find_all("p")
try:
chapter_name = chapter.find("h1").text
except AttributeError:
chapter_name = chapter_content[0].text
chapter_cleaned = []
chapter_title = f"TBATE - Chapter {recent_chapter.split('-')[-1]} - {chapter_name.strip()}"
chapter_title = " ".join(chapter_title.split())
novel_data["chapter_title"] = chapter_title
for i, x in enumerate(chapter_content):
fuzzy = find_near_matches("Read first at lnreader.org!", x.text, max_l_dist=10)
if fuzzy:
chapter_cleaned.append(x.text[:fuzzy[0].start])
else:
chapter_cleaned.append(x.text)
chapter_cleaned[0] = f"<p><b>{chapter_cleaned[0]}</b>"
chapter_cleaned[-1] = chapter_cleaned[-1] + "</p>"
chapter_cleaned = "</p><p>".join(chapter_cleaned)
chapter_cleaned = f"""<?xml version='1.0' encoding='utf-8'?>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<title>{chapter_title}</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
</head>
<body><p>{chapter_cleaned}</p></body></html>"""
return chapter_cleaned, novel_data
async def send_chapter(chapter_title, attachment):
# TODO: Send chapter to email
# Create message container
msg = MIMEMultipart()
msg['From'] = SENDER_EMAIL
msg['To'] = RECEIVER_EMAIL
msg['Subject'] = chapter_title
# Add body to message
msg.attach(MIMEText('', 'plain'))
# Add attachments to message
with open(attachment, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{attachment}"')
msg.attach(part)
# Send the message via SMTP server
smtp_server = SMTP_SERVER
smtp_port = int(SMTP_PORT)
smtp_username = SENDER_EMAIL
smtp_password = EMAIL_PASSWORD
smtp_connection = smtplib.SMTP(smtp_server, smtp_port)
smtp_connection.starttls()
smtp_connection.login(smtp_username, smtp_password)
smtp_connection.sendmail(SENDER_EMAIL, RECEIVER_EMAIL, msg.as_string())
smtp_connection.quit()
async def save_chapter(chapter: str, novel_data: dict):
content_opf = f"""
<?xml version='1.0' encoding='utf-8'?>
<package xmlns="http://www.idpf.org/2007/opf" unique-identifier="uuid_id" version="2.0">
<metadata xmlns:calibre="http://calibre.kovidgoyal.net/2009/metadata" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:dcterms="http://purl.org/dc/terms/" xmlns:opf="http://www.idpf.org/2007/opf" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<meta name="calibre:timestamp" content="{datetime.now().isoformat()}"/>
<dc:creator>{novel_data.get("author")}</dc:creator>
<meta name="cover" content="cover"/>
<dc:title>{novel_data.get('chapter_title')}</dc:title>
<dc:language>en</dc:language>
<dc:identifier id="uuid_id" opf:scheme="uuid">45f94238-3216-42d2-8b31-01f4a4009523</dc:identifier>
</metadata>
<manifest>
<item href="cover_image.jpg" id="cover" media-type="image/jpeg"/>
<item href="titlepage.xhtml" id="titlepage" media-type="application/xhtml+xml"/>
<item href="{novel_data.get('chapter_title')}.html" id="html" media-type="application/xhtml+xml"/>
</manifest>
<spine toc="ncx">
<itemref idref="titlepage"/>
<itemref idref="html"/>
</spine>
<guide>
<reference href="titlepage.xhtml" title="Title Page" type="cover"/>
</guide>
</package>
"""
title_page = f"""<?xml version='1.0' encoding='utf-8'?>
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="calibre:cover" content="true"/>
<title>{novel_data.get('chapter_title')}</title>
<style type="text/css" title="override_css">
@page {{padding: 0pt; margin:0pt}}
body {{text - align: center; padding:0pt; margin: 0pt; }}
</style>
</head>
<body>
<div>
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="100%" height="100%" viewBox="0 0 1200 1600" preserveAspectRatio="none">
<image width="1200" height="1600" xlink:href="cover_image.jpg"/>
</svg>
</div>
</body>
</html>
"""
with open(f'{novel_data.get("chapter_title")}.html', "w", encoding='utf-8') as f:
f.write(chapter)
with open("content.opf", "w", encoding='utf-8') as f:
f.write(content_opf)
with open("titlepage.xhtml", "w", encoding='utf-8') as f:
f.write(title_page)
with ZipFile(f'{novel_data.get("chapter_title")}.epub', 'w') as zipObj:
zipObj.write(f'{novel_data.get("chapter_title")}.html')
zipObj.write("content.opf")
zipObj.write("titlepage.xhtml")
zipObj.write("cover_image.jpg")
os.remove(f'{novel_data.get("chapter_title")}.html')
os.remove("content.opf")
os.remove("titlepage.xhtml")
os.remove("cover_image.jpg")
async def main():
date = None
while True:
# check if it's friday
if date is None or datetime.now() - date >= timedelta(days=7):
history = None
try:
with open('latest.txt', 'r', encoding='utf-8') as f:
history = f.read()
except FileNotFoundError:
log.warning('No history file, creating one')
history = None
chapter, data = await grab_chapter()
await save_chapter(chapter, data)
if history == data.get("chapter_title"):
log.info(f'No new chapter yet, waiting {WAIT_TIME} minutes')
await asyncio.sleep(60 * WAIT_TIME)
continue
else:
log.info(f'New chapter found: {data.get("chapter_title")}. Sending to kindle')
await send_chapter(data.get("chapter_title"), f'{data.get("chapter_title")}.epub')
with open('latest.txt', 'w', encoding='utf-8') as f:
f.write(data.get('chapter_title'))
os.remove(f'{data.get("chapter_title")}.epub')
date = datetime.now()
await telegram_push(f'New chapter: \n{data.get("chapter_title")}')
else:
log.info(f'Not friday yet')
log.info(f'Date: {date}')
log.info(f'Now: {datetime.now()}')
await asyncio.sleep(60 * 5)
continue
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/hello/{name}")
async def say_hello(name: str):
return {"message": f"Hello {name}"}
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment