Last active
March 4, 2024 23:45
-
-
Save Nadiar/7129153f79a6af032566977a5e6f1504 to your computer and use it in GitHub Desktop.
python script for importing comicrack comicDB.xml into kapowarr
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Intro: | |
| Original Author: github.com/Nadiar | |
| Feburary 28th 2024 | |
| Description: Import all comics from a ComicRack ComicDB.xml | |
| Usage: | |
| - Set the values for KAPOWARR_BASE_URL, KAPOWARR_API_TOKEN, XML_FILE, ROOT_FOLDER, KAPOWARR_ROOT_FOLDER (for ComicDB.xml), MONITORED, and DEBUG_MODE. | |
| - Run the script. | |
| XML File Example (ComicRackDB.xml): | |
| <?xml version="1.0"?> | |
| <ComicDatabase> | |
| <Books> | |
| <Book File="Y:\Comics\Indie Comics\Queen of Clubs 001 (2024).cbz"> | |
| ... | |
| <CustomValuesStore>comicvine_issue=12345,comicvine_volume=123456,DataManager_Processed=Yes</CustomValuesStore> | |
| ... | |
| </Book> | |
| <Book File="Y:\Comics\Comics\King of Spades 005 (2023).cbz"> | |
| ... | |
| <CustomValuesStore>comicvine_issue=1234568,comicvine_volume=567890,DataManager_Processed=Yes</CustomValuesStore> | |
| ... | |
| </Book> | |
| ... | |
| </Books> | |
| ... | |
| </ComicDatabase> | |
| """ | |
| import xml.etree.ElementTree as ET | |
| from os.path import isfile | |
| import os | |
| from requests import Session | |
| import requests | |
| import time | |
| import csv | |
| from os.path import isfile | |
| import logging | |
| from datetime import datetime, timedelta | |
| import csv | |
| # Constants | |
| MAX_CACHE_AGE = timedelta(hours=6) | |
| KAPOWARR_BASE_URL = "" | |
| KAPOWARR_API_TOKEN = "" | |
| XML_FILE = "" # /mnt/comics/ComicDB.xml | |
| KAPOWARR_ROOT_FOLDER = "" # "/comics" | |
| WINDOWS_ROOT_FOLDER = r"" # "c:\Comics" | |
| SKIP_FOLDER = r"" # "c:\Comics\unsorted" | |
| LOGLEVEL = "INFO" | |
| FRESH = False # setting this to True will cause it to never use the file cache, and query the API every time | |
| MONITORED = True # Sets whether added volumes are monitored or not | |
| # String Definition | |
| never_expire = "never expire" | |
| def get_root_folder_id(ssn): | |
| try: | |
| response = ssn.get(f"{KAPOWARR_BASE_URL}/api/rootfolder") | |
| response.raise_for_status() | |
| response_data = response.json() | |
| if "result" not in response_data or not isinstance( | |
| response_data["result"], list | |
| ): | |
| logging.error("Unexpected response format for root folders.") | |
| return None | |
| root_folders = response_data["result"] | |
| for rf in root_folders: | |
| if "folder" in rf and rf["folder"] == kapowarr_root_folder: | |
| return rf["id"] | |
| logging.critical(f'Root folder "{kapowarr_root_folder}" not found') | |
| return None | |
| except Exception as e: | |
| logging.critical(f"Error while retrieving root folder. Error: {e}") | |
| return None | |
| def get_fresh_comicvine_ids(ssn): | |
| logging.info("Fetching comicvine IDs from API...") | |
| try: | |
| response = ssn.get(f"{KAPOWARR_BASE_URL}/api/volumes") | |
| response.raise_for_status() | |
| # Check if the request was successful | |
| if response.status_code == 200: | |
| data = response.json()["result"] | |
| new_data = [ | |
| { | |
| k: v | |
| for k, v in d.items() | |
| # we just want to clean up some of the bulk response to make this easier to read if you are debugging | |
| if (k not in ("issues", "description", "cover")) | |
| } | |
| for d in data | |
| ] | |
| logging.debug("Fetched data from API %s", new_data[:2]) | |
| # Check if the response contains the expected data | |
| if all("comicvine_id" in volume for volume in new_data): | |
| comicvine_ids = [volume["comicvine_id"] for volume in data] | |
| logging.debug( | |
| "Fetched comicvine IDs from API, limit 10: %s", comicvine_ids[:10] | |
| ) | |
| return set(comicvine_ids) | |
| else: | |
| logging.error("Unexpected API response: %s", data) | |
| else: | |
| logging.error( | |
| "API request failed with status code %s: %s", | |
| response.status_code, | |
| response.text, | |
| ) | |
| except requests.RequestException as e: | |
| logging.critical("Failed to fetch data from API: %s", e) | |
| except ValueError as e: | |
| logging.critical("Failed to parse API response: %s", e) | |
| logging.error("Failed to fetch comicvine IDs from API") | |
| # Return an empty list if the function fails for any reason | |
| return list() | |
| def write_cache_line(entry, time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")): | |
| with open("id_cache.csv", "a", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow([entry, time]) | |
| return | |
| def process_fresh_comicvine_ids_cache(existing_ids, ssn): | |
| logging.info("Fetching fresh cache...") | |
| if existing_ids == {}: | |
| existing_ids = get_fresh_comicvine_ids(ssn) | |
| if existing_ids == {}: | |
| logging.error("Failed to fetch fresh data. Exiting...") | |
| exit() | |
| try: | |
| with open("id_cache.csv", "w", newline="") as f: | |
| writer = csv.writer(f) | |
| for entry in existing_ids: | |
| writer.writerow([entry, datetime.now().strftime("%Y-%m-%d %H:%M:%S")]) | |
| except Exception as e: | |
| logging.error("Error writing cache file. Error: %s", e) | |
| logging.debug(f"Fresh IDs: (limit 10): {list(existing_ids)[:10]}...") | |
| return existing_ids | |
| def get_comicvine_ids(ssn, fresh=FRESH): | |
| existing_ids = {} | |
| if fresh == True: | |
| existing_ids = get_fresh_comicvine_ids(ssn) | |
| process_fresh_comicvine_ids_cache(existing_ids, ssn) | |
| logging.debug(f"Fresh IDs: (limit 10): {list(existing_ids)[:10]}...") | |
| return existing_ids | |
| # Check if the cache file exists | |
| elif os.path.exists("id_cache.csv"): | |
| # If it does, load the IDs from the cache file | |
| try: | |
| with open("id_cache.csv", "r") as f: | |
| reader = csv.reader(f) | |
| for row in reader: | |
| comicvine_id, timestamp = row | |
| if timestamp == "never expire": | |
| continue | |
| timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") | |
| # If the timestamp is not older than the maximum age, add the ID to the dictionary | |
| if datetime.now() - timestamp <= MAX_CACHE_AGE: | |
| existing_ids.append(comicvine_id) | |
| except FileNotFoundError: | |
| logging.info("Cache file not found. Fetching fresh data.") | |
| existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn) | |
| except ValueError as e: | |
| logging.error("Error parsing cache file. Fetching fresh data. Error: %s", e) | |
| fresh = True | |
| existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn) | |
| return existing_ids | |
| except Exception as e: | |
| logging.error("Error reading cache file. Fetching fresh data. Error: %s", e) | |
| fresh = True | |
| existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn) | |
| return existing_ids | |
| else: | |
| # If the cache file doesn't exist, fetch the IDs and write them to the cache file | |
| existing_ids = process_fresh_comicvine_ids_cache(existing_ids, ssn) | |
| logging.info("Cache file not found. Fetching fresh data.") | |
| if existing_ids == {}: | |
| existing_ids = get_fresh_comicvine_ids(ssn) | |
| if existing_ids == {}: # If the functions fail for any reason, return an empty list | |
| logging.error("Failed to fetch comicvine IDs. Exiting...") | |
| exit() | |
| try: | |
| with open("id_cache.csv", "w", newline="") as f: | |
| writer = csv.writer(f) | |
| for comicvine_id in existing_ids: | |
| writer.writerow( | |
| [comicvine_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S")] | |
| ) | |
| except Exception as e: | |
| logging.error("Error writing cache file. Error: %s", e) | |
| logging.debug(f"Cached IDs: (limit 10): {list(existing_ids)[:10]}...") | |
| return existing_ids | |
| def process_and_add_volume( | |
| ssn, comicvine_volume, series, rf_id, monitored, relative_path | |
| ): | |
| response = None # Initialize response as None | |
| logging.debug( | |
| f"CV ID: {comicvine_volume}\nKapowarr Root: {rf_id}\nRelative Path: {relative_path}\nSeries: {series}\nFolder ID: {rf_id}\nMonitor: {monitored}\n---" | |
| ) | |
| # Add volumes to kapowarr api | |
| sleep_time = 30 | |
| while response is None or response.json().get("error") == "CVRateLimitReached": | |
| response = ssn.post( | |
| f"{KAPOWARR_BASE_URL}/api/volumes", | |
| json={ | |
| "comicvine_id": comicvine_volume, | |
| "volume_folder": relative_path, | |
| "root_folder_id": rf_id, | |
| "monitor": monitored, | |
| }, | |
| ) | |
| # This line exists in case we need to add additional error handling later | |
| logging.debug("Response: %s", response.json().get("error")) | |
| if response.json().get("error") == "CVRateLimitReached": | |
| if sleep_time > 3600: | |
| logging.error( | |
| f"Failed adding due to total comicvine timeout {comicvine_volume} : {relative_path}" | |
| ) | |
| break | |
| time.sleep(sleep_time) | |
| logging.debug("CVRateLimitReached error") | |
| sleep_time *= 2 | |
| else: | |
| logging.info( | |
| f"Added {comicvine_volume} : {relative_path}\nResponse: {response.text}\n" | |
| ) | |
| time.sleep(20) # Sleep for 20 seconds to avoid rate limiting | |
| return True | |
| return True | |
| def process_xml_volume( | |
| relative_path, custom_values, unique_entries, series, ssn, rf_id, monitored | |
| ): | |
| if custom_values is not None and custom_values.text is not None: | |
| comicvine_volume = next( | |
| ( | |
| x.split("=")[1] | |
| for x in custom_values.text.split(",") | |
| if "comicvine_volume" in x | |
| ), | |
| None, | |
| ) | |
| if comicvine_volume and comicvine_volume.isdigit(): | |
| # Check if the entry is already processed | |
| if int(comicvine_volume) in unique_entries: | |
| logging.info("Already processed %s - %s", comicvine_volume, series) | |
| return True | |
| else: | |
| logging.info("Processing %s - %s", comicvine_volume, series) | |
| # Add the entry to the set | |
| unique_entries.add(comicvine_volume) | |
| try: | |
| process_and_add_volume( | |
| ssn, | |
| comicvine_volume, | |
| series, | |
| rf_id, | |
| monitored, | |
| relative_path, | |
| ) | |
| # We can write this out to the cache file since it's a valid entry | |
| except Exception as e: | |
| logging.error("process_and_add_volume exited") | |
| return False | |
| write_cache_line(comicvine_volume, never_expire) | |
| else: | |
| logging.debug("No comicvine_volume found in %s", custom_values) | |
| unique_entries.add(comicvine_volume) | |
| write_cache_line( | |
| comicvine_volume | |
| ) # we can let this one expire since it has a problem | |
| else: | |
| logging.debug("Custom Values was None in %s", series) | |
| return False | |
| def process_xml(ssn, xml_file: str, monitored: bool, rf_id: str) -> None: | |
| wrf_path = WINDOWS_ROOT_FOLDER.replace("\\", "/").rstrip("/") + "/" | |
| skip_path = SKIP_FOLDER.replace("\\", "/").rstrip("/") + "/" | |
| # Use a set to track unique entries | |
| unique_entries = set( | |
| get_comicvine_ids(ssn, fresh=FRESH) | |
| ) # intialize existing_ids set, just populating it with the existing comicvine ids from kapowarr | |
| if not isfile(xml_file): | |
| logging.critical("XML file not found") | |
| return | |
| tree = ET.parse(xml_file) | |
| root = tree.getroot() | |
| logging.debug("unique_entries: %s", len(unique_entries)) | |
| if len(unique_entries) > 0: | |
| volume_paths = set() | |
| for book_element in root.findall(".//Book"): | |
| logging.debug("Processing book element: %s", book_element.attrib) | |
| custom_values = book_element.find(".//CustomValuesStore") | |
| if custom_values is not None: | |
| logging.debug("Processing custom value: %s", custom_values.text) | |
| else: | |
| logging.debug("No custom values found in %s", book_element.attrib) | |
| continue | |
| # save series variable here, as we'll be using it for the human readable prompt | |
| if book_element.iter(".//Series") and book_element.iter(".//Volume"): | |
| series = ( | |
| book_element.find(".//Series").text | |
| + " " | |
| + book_element.find(".//Volume").text | |
| ) | |
| else: | |
| logging.debug("Unable to obtain series information") | |
| continue | |
| if book_element.get("File"): | |
| volume_path = book_element.get("File") | |
| logging.debug("Volume path found series: %s", volume_path) | |
| else: | |
| logging.debug("Volume path not found series: %s", series) | |
| continue | |
| volume_path = volume_path.replace("\\", "/") | |
| if not volume_path: | |
| logging.warning("Volume path not found for %s", series) | |
| continue | |
| if volume_path.lower().startswith(wrf_path.lower()): | |
| logging.debug("Relative path contained Windows path: %s", volume_path) | |
| relative_path = os.path.dirname(volume_path[len(wrf_path) :]) | |
| else: | |
| relative_path = os.path.dirname(volume_path) | |
| logging.debug( | |
| "Relative path did not contain Windows path: %s", relative_path | |
| ) | |
| # Skip files under Y:\Comics\unsorted | |
| if volume_path.startswith(skip_path): | |
| logging.debug("Skipping %s - %s", skip_path, volume_path) | |
| volume_paths.add(relative_path) | |
| continue | |
| if relative_path in volume_paths: | |
| logging.debug("relative path already processed: %s", relative_path) | |
| continue | |
| volume_paths.add(relative_path) | |
| logging.debug( | |
| "Relative path found: %s\n series: %s\n custom_values: %s", | |
| relative_path, | |
| series, | |
| custom_values.text, | |
| ) | |
| process_xml_volume( | |
| relative_path, | |
| custom_values, | |
| unique_entries, | |
| series, | |
| ssn, | |
| rf_id, | |
| monitored, | |
| ) | |
| continue | |
| return True | |
| else: | |
| logging.critical("unable to find unique entries") | |
| return False | |
| if __name__ == "__main__": | |
| kapowarr_root_folder = KAPOWARR_ROOT_FOLDER.rstrip("/") + "/" | |
| ssn = Session() | |
| ssn.params.update({"api_key": KAPOWARR_API_TOKEN}) | |
| ssn.headers.update({"Content-Type": "application/json"}) | |
| rf_id = get_root_folder_id(ssn) | |
| if rf_id is not None: | |
| logging.basicConfig( | |
| level=logging.getLevelName(LOGLEVEL) | |
| ) # Set the logging level | |
| logging.debug("Root folder ID: %s", rf_id) | |
| process_xml(ssn, XML_FILE, MONITORED, rf_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment