Skip to content

Instantly share code, notes, and snippets.

@Nadiar
Last active March 4, 2024 23:45
Show Gist options
  • Select an option

  • Save Nadiar/7129153f79a6af032566977a5e6f1504 to your computer and use it in GitHub Desktop.

Select an option

Save Nadiar/7129153f79a6af032566977a5e6f1504 to your computer and use it in GitHub Desktop.
python script for importing comicrack comicDB.xml into kapowarr
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Intro:
Original Author: github.com/Nadiar
Feburary 28th 2024
Description: Import all comics from a ComicRack ComicDB.xml
Usage:
- Set the values for KAPOWARR_BASE_URL, KAPOWARR_API_TOKEN, XML_FILE, ROOT_FOLDER, KAPOWARR_ROOT_FOLDER (for ComicDB.xml), MONITORED, and DEBUG_MODE.
- Run the script.
XML File Example (ComicRackDB.xml):
<?xml version="1.0"?>
<ComicDatabase>
<Books>
<Book File="Y:\Comics\Indie Comics\Queen of Clubs 001 (2024).cbz">
...
<CustomValuesStore>comicvine_issue=12345,comicvine_volume=123456,DataManager_Processed=Yes</CustomValuesStore>
...
</Book>
<Book File="Y:\Comics\Comics\King of Spades 005 (2023).cbz">
...
<CustomValuesStore>comicvine_issue=1234568,comicvine_volume=567890,DataManager_Processed=Yes</CustomValuesStore>
...
</Book>
...
</Books>
...
</ComicDatabase>
"""
import xml.etree.ElementTree as ET
from os.path import isfile
import os
from requests import Session
import requests
import time
import csv
from os.path import isfile
import logging
from datetime import datetime, timedelta
import csv
# Constants
MAX_CACHE_AGE = timedelta(hours=6)
KAPOWARR_BASE_URL = ""
KAPOWARR_API_TOKEN = ""
XML_FILE = "" # /mnt/comics/ComicDB.xml
KAPOWARR_ROOT_FOLDER = "" # "/comics"
WINDOWS_ROOT_FOLDER = r"" # "c:\Comics"
SKIP_FOLDER = r"" # "c:\Comics\unsorted"
LOGLEVEL = "INFO"
FRESH = False # setting this to True will cause it to never use the file cache, and query the API every time
MONITORED = True # Sets whether added volumes are monitored or not
# String Definition
never_expire = "never expire"
def get_root_folder_id(ssn):
try:
response = ssn.get(f"{KAPOWARR_BASE_URL}/api/rootfolder")
response.raise_for_status()
response_data = response.json()
if "result" not in response_data or not isinstance(
response_data["result"], list
):
logging.error("Unexpected response format for root folders.")
return None
root_folders = response_data["result"]
for rf in root_folders:
if "folder" in rf and rf["folder"] == kapowarr_root_folder:
return rf["id"]
logging.critical(f'Root folder "{kapowarr_root_folder}" not found')
return None
except Exception as e:
logging.critical(f"Error while retrieving root folder. Error: {e}")
return None
def get_fresh_comicvine_ids(ssn):
logging.info("Fetching comicvine IDs from API...")
try:
response = ssn.get(f"{KAPOWARR_BASE_URL}/api/volumes")
response.raise_for_status()
# Check if the request was successful
if response.status_code == 200:
data = response.json()["result"]
new_data = [
{
k: v
for k, v in d.items()
# we just want to clean up some of the bulk response to make this easier to read if you are debugging
if (k not in ("issues", "description", "cover"))
}
for d in data
]
logging.debug("Fetched data from API %s", new_data[:2])
# Check if the response contains the expected data
if all("comicvine_id" in volume for volume in new_data):
comicvine_ids = [volume["comicvine_id"] for volume in data]
logging.debug(
"Fetched comicvine IDs from API, limit 10: %s", comicvine_ids[:10]
)
return set(comicvine_ids)
else:
logging.error("Unexpected API response: %s", data)
else:
logging.error(
"API request failed with status code %s: %s",
response.status_code,
response.text,
)
except requests.RequestException as e:
logging.critical("Failed to fetch data from API: %s", e)
except ValueError as e:
logging.critical("Failed to parse API response: %s", e)
logging.error("Failed to fetch comicvine IDs from API")
# Return an empty list if the function fails for any reason
return list()
def write_cache_line(entry, time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")):
with open("id_cache.csv", "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([entry, time])
return
def process_fresh_comicvine_ids_cache(existing_ids, ssn):
logging.info("Fetching fresh cache...")
if existing_ids == {}:
existing_ids = get_fresh_comicvine_ids(ssn)
if existing_ids == {}:
logging.error("Failed to fetch fresh data. Exiting...")
exit()
try:
with open("id_cache.csv", "w", newline="") as f:
writer = csv.writer(f)
for entry in existing_ids:
writer.writerow([entry, datetime.now().strftime("%Y-%m-%d %H:%M:%S")])
except Exception as e:
logging.error("Error writing cache file. Error: %s", e)
logging.debug(f"Fresh IDs: (limit 10): {list(existing_ids)[:10]}...")
return existing_ids
def get_comicvine_ids(ssn, fresh=FRESH):
existing_ids = {}
if fresh == True:
existing_ids = get_fresh_comicvine_ids(ssn)
process_fresh_comicvine_ids_cache(existing_ids, ssn)
logging.debug(f"Fresh IDs: (limit 10): {list(existing_ids)[:10]}...")
return existing_ids
# Check if the cache file exists
elif os.path.exists("id_cache.csv"):
# If it does, load the IDs from the cache file
try:
with open("id_cache.csv", "r") as f:
reader = csv.reader(f)
for row in reader:
comicvine_id, timestamp = row
if timestamp == "never expire":
continue
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
# If the timestamp is not older than the maximum age, add the ID to the dictionary
if datetime.now() - timestamp <= MAX_CACHE_AGE:
existing_ids.append(comicvine_id)
except FileNotFoundError:
logging.info("Cache file not found. Fetching fresh data.")
existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn)
except ValueError as e:
logging.error("Error parsing cache file. Fetching fresh data. Error: %s", e)
fresh = True
existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn)
return existing_ids
except Exception as e:
logging.error("Error reading cache file. Fetching fresh data. Error: %s", e)
fresh = True
existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn)
return existing_ids
else:
# If the cache file doesn't exist, fetch the IDs and write them to the cache file
existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn)
logging.info("Cache file not found. Fetching fresh data.")
if existing_ids == {}:
existing_ids = get_fresh_comicvine_ids(ssn)
if existing_ids == {}: # If the functions fail for any reason, return an empty list
logging.error("Failed to fetch comicvine IDs. Exiting...")
exit()
try:
with open("id_cache.csv", "w", newline="") as f:
writer = csv.writer(f)
for comicvine_id in existing_ids:
writer.writerow(
[comicvine_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
)
except Exception as e:
logging.error("Error writing cache file. Error: %s", e)
logging.debug(f"Cached IDs: (limit 10): {list(existing_ids)[:10]}...")
return existing_ids
def process_and_add_volume(
ssn, comicvine_volume, series, rf_id, monitored, relative_path
):
response = None # Initialize response as None
logging.debug(
f"CV ID: {comicvine_volume}\nKapowarr Root: {rf_id}\nRelative Path: {relative_path}\nSeries: {series}\nFolder ID: {rf_id}\nMonitor: {monitored}\n---"
)
# Add volumes to kapowarr api
sleep_time = 30
while response is None or response.json().get("error") == "CVRateLimitReached":
response = ssn.post(
f"{KAPOWARR_BASE_URL}/api/volumes",
json={
"comicvine_id": comicvine_volume,
"volume_folder": relative_path,
"root_folder_id": rf_id,
"monitor": monitored,
},
)
# This line exists in case we need to add additional error handling later
logging.debug("Response: %s", response.json().get("error"))
if response.json().get("error") == "CVRateLimitReached":
if sleep_time > 3600:
logging.error(
f"Failed adding due to total comicvine timeout {comicvine_volume} : {relative_path}"
)
break
time.sleep(sleep_time)
logging.debug("CVRateLimitReached error")
sleep_time *= 2
else:
logging.info(
f"Added {comicvine_volume} : {relative_path}\nResponse: {response.text}\n"
)
time.sleep(20) # Sleep for 20 seconds to avoid rate limiting
return True
return True
def process_xml_volume(
relative_path, custom_values, unique_entries, series, ssn, rf_id, monitored
):
if custom_values is not None and custom_values.text is not None:
comicvine_volume = next(
(
x.split("=")[1]
for x in custom_values.text.split(",")
if "comicvine_volume" in x
),
None,
)
if comicvine_volume and comicvine_volume.isdigit():
# Check if the entry is already processed
if int(comicvine_volume) in unique_entries:
logging.info("Already processed %s - %s", comicvine_volume, series)
return True
else:
logging.info("Processing %s - %s", comicvine_volume, series)
# Add the entry to the set
unique_entries.add(comicvine_volume)
try:
process_and_add_volume(
ssn,
comicvine_volume,
series,
rf_id,
monitored,
relative_path,
)
# We can write this out to the cache file since it's a valid entry
except Exception as e:
logging.error("process_and_add_volume exited")
return False
write_cache_line(comicvine_volume, never_expire)
else:
logging.debug("No comicvine_volume found in %s", custom_values)
unique_entries.add(comicvine_volume)
write_cache_line(
comicvine_volume
) # we can let this one expire since it has a problem
else:
logging.debug("Custom Values was None in %s", series)
return False
def process_xml(ssn, xml_file: str, monitored: bool, rf_id: str) -> None:
wrf_path = WINDOWS_ROOT_FOLDER.replace("\\", "/").rstrip("/") + "/"
skip_path = SKIP_FOLDER.replace("\\", "/").rstrip("/") + "/"
# Use a set to track unique entries
unique_entries = set(
get_comicvine_ids(ssn, fresh=FRESH)
) # intialize existing_ids set, just populating it with the existing comicvine ids from kapowarr
if not isfile(xml_file):
logging.critical("XML file not found")
return
tree = ET.parse(xml_file)
root = tree.getroot()
logging.debug("unique_entries: %s", len(unique_entries))
if len(unique_entries) > 0:
volume_paths = set()
for book_element in root.findall(".//Book"):
logging.debug("Processing book element: %s", book_element.attrib)
custom_values = book_element.find(".//CustomValuesStore")
if custom_values is not None:
logging.debug("Processing custom value: %s", custom_values.text)
else:
logging.debug("No custom values found in %s", book_element.attrib)
continue
# save series variable here, as we'll be using it for the human readable prompt
if book_element.iter(".//Series") and book_element.iter(".//Volume"):
series = (
book_element.find(".//Series").text
+ " "
+ book_element.find(".//Volume").text
)
else:
logging.debug("Unable to obtain series information")
continue
if book_element.get("File"):
volume_path = book_element.get("File")
logging.debug("Volume path found series: %s", volume_path)
else:
logging.debug("Volume path not found series: %s", series)
continue
volume_path = volume_path.replace("\\", "/")
if not volume_path:
logging.warning("Volume path not found for %s", series)
continue
if volume_path.lower().startswith(wrf_path.lower()):
logging.debug("Relative path contained Windows path: %s", volume_path)
relative_path = os.path.dirname(volume_path[len(wrf_path) :])
else:
relative_path = os.path.dirname(volume_path)
logging.debug(
"Relative path did not contain Windows path: %s", relative_path
)
# Skip files under Y:\Comics\unsorted
if volume_path.startswith(skip_path):
logging.debug("Skipping %s - %s", skip_path, volume_path)
volume_paths.add(relative_path)
continue
if relative_path in volume_paths:
logging.debug("relative path already processed: %s", relative_path)
continue
volume_paths.add(relative_path)
logging.debug(
"Relative path found: %s\n series: %s\n custom_values: %s",
relative_path,
series,
custom_values.text,
)
process_xml_volume(
relative_path,
custom_values,
unique_entries,
series,
ssn,
rf_id,
monitored,
)
continue
return True
else:
logging.critical("unable to find unique entries")
return False
if __name__ == "__main__":
kapowarr_root_folder = KAPOWARR_ROOT_FOLDER.rstrip("/") + "/"
ssn = Session()
ssn.params.update({"api_key": KAPOWARR_API_TOKEN})
ssn.headers.update({"Content-Type": "application/json"})
rf_id = get_root_folder_id(ssn)
if rf_id is not None:
logging.basicConfig(
level=logging.getLevelName(LOGLEVEL)
) # Set the logging level
logging.debug("Root folder ID: %s", rf_id)
process_xml(ssn, XML_FILE, MONITORED, rf_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment