Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save tranvansang/e25c9ca1691a472f87271347b5c98588 to your computer and use it in GitHub Desktop.

Select an option

Save tranvansang/e25c9ca1691a472f87271347b5c98588 to your computer and use it in GitHub Desktop.
Copy_Folder_Google_Drive_to_Google_Drive.ipynb
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"provenance": [],
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/tranvansang/e25c9ca1691a472f87271347b5c98588/copy_folder_google_drive_to_google_drive.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"source": [
"# Copy Folder Google Drive to Google Drive - 1TouchPro"
],
"metadata": {
"id": "8er7vEN1ApwJ"
}
},
{
"cell_type": "code",
"source": [
"#@title Input\n",
"from ipywidgets import widgets\n",
"\n",
"dest_text = widgets.Text(description=\"Your drive\", placeholder='Nhập đường link folder Google Drive của bạn')\n",
"source_text = widgets.Text(description=\"Shared drive\", placeholder='Nhập đường link folder Google Drive shared')\n",
"from_page_text = widgets.Text(description=\"Từ trang\", value=\"0\")\n",
"to_page_text = widgets.Text(description=\"Đến trang\", value=\"0\")\n",
"max_download_size_text = widgets.Text(description=\"Tổng dung lượng tối đa(GB)\", value=\"700\")\n",
"exclude_str_text = widgets.Text(description=\"Bỏ file, folder có chứa nội dung\", value=\"\")\n",
"\n",
"display(dest_text)\n",
"display(source_text)\n",
"display(from_page_text)\n",
"display(to_page_text)\n",
"display(max_download_size_text)\n",
"display(exclude_str_text)"
],
"metadata": {
"id": "Co6t50QUNhso"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#@title Run\n",
"import os\n",
"import time\n",
"import re\n",
"import sys\n",
"from googleapiclient.discovery import build\n",
"from google.colab import auth\n",
"from google.colab import drive\n",
"\n",
"class DownloadFromDrive:\n",
" def __init__(self):\n",
" self._total_size = 0\n",
" self._limit_size = 0\n",
" self.excluded_strings = []\n",
"\n",
" def get_user_credential(self):\n",
" auth.authenticate_user()\n",
" drive_service = build('drive', 'v3')\n",
" return drive_service\n",
"\n",
" def get_childs_from_folder(self, drive_service, folder_id, from_page, to_page):\n",
" files = []\n",
" page_token = None\n",
" query = f\"'{folder_id}' in parents and trashed = false\"\n",
" if self.excluded_strings and len(self.excluded_strings) > 0:\n",
" not_contains_query = \" and \".join([f\"not name contains '{ext}'\" for ext in self.excluded_strings])\n",
" query += f\" and ({not_contains_query})\"\n",
"\n",
" pages = 0\n",
" while True:\n",
" try:\n",
" pages += 1\n",
" response = drive_service.files().list(q=query,\n",
" orderBy='name, createdTime',\n",
" fields='files(id, name, mimeType, size), nextPageToken',\n",
" pageToken=page_token,\n",
" supportsAllDrives=True,\n",
" includeItemsFromAllDrives=True).execute()\n",
"\n",
" if (from_page < pages <= to_page) or to_page == 0:\n",
" files.extend(response.get('files', []))\n",
"\n",
" page_token = response.get('nextPageToken', None)\n",
" if page_token is None or (pages >= to_page > 0):\n",
" break\n",
" except Exception as e:\n",
" print(f\"An error occurred: {str(e)}\")\n",
" page_token = None\n",
"\n",
" print(f\"Total files: {len(files)}\")\n",
" return files\n",
"\n",
" def copy_file(self, drive_service, dest_folder_id, source_file):\n",
" if source_file['mimeType'] != 'application/vnd.google-apps.folder':\n",
" body_file_inf = {'parents': [dest_folder_id]}\n",
"\n",
" if not self.check_if_exists(drive_service, dest_folder_id, source_file['name']):\n",
" try:\n",
" start_time = time.time()\n",
" request_copy = drive_service.files().copy(body=body_file_inf, fileId=source_file['id'],\n",
" supportsAllDrives=True).execute()\n",
" end_time = time.time()\n",
"\n",
" fileSize = int(source_file.get('size', 0))\n",
" size_mb = fileSize / (1024 * 1024)\n",
" self._total_size += size_mb\n",
" speed_mb = size_mb / (end_time - start_time)\n",
" print(f\"[{source_file['name']}] copied. Size {size_mb:0.2f} MB. Speed {speed_mb:0.2f} MB/s\")\n",
"\n",
"\n",
" if self._total_size >= (self._limit_size * 1024):\n",
" self.on_total_size_exceeded(f\"Total size exceeds {self._limit_size} GB. Ending the program.\")\n",
" except Exception as e:\n",
" print(\"An error occurred: \", e)\n",
" else:\n",
" print(f\"[{source_file['name']}] exists.\")\n",
" else:\n",
"\n",
" source_files = self.get_childs_from_folder(drive_service, source_file['id'], 0, 0)\n",
" if source_files and len(source_files) > 0:\n",
" print(f\"Copy at Folder {source_file['name']} Starting\")\n",
" sub_folder_id = self.create_folder(drive_service, dest_folder_id, source_file['name'])\n",
" self.copy_multiple_files(drive_service, sub_folder_id, source_files)\n",
" print(f\"Copy at Folder {source_file['name']} Ending\")\n",
"\n",
"\n",
" def create_folder(self, drive_service, dest_folder_id, sub_folder_name):\n",
" sub_folder_inf = {'name': sub_folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [dest_folder_id]}\n",
"\n",
" exist_folder_id = self.check_if_exists(drive_service, dest_folder_id, sub_folder_name)\n",
" if not exist_folder_id:\n",
" try:\n",
" folder = drive_service.files().create(body=sub_folder_inf, fields='id').execute()\n",
" return folder['id']\n",
" except Exception as e:\n",
" print(\"An error occurred: \", e)\n",
" return exist_folder_id\n",
"\n",
"\n",
" def check_if_exists(self, drive_service, dest_folder_id, name):\n",
" try:\n",
" processed_name = name.replace(\"'\", \"\\\\'\")\n",
"\n",
" results = drive_service.files().list(q=f\"'{dest_folder_id}' in parents and name contains '{processed_name}' and trashed=false\",\n",
" fields='files(id)').execute()\n",
"\n",
" if 'files' in results and len(results['files']) > 0:\n",
" return results['files'][0]['id']\n",
" except Exception as e:\n",
" print(\"An error occurred: \", e)\n",
"\n",
" return \"\"\n",
"\n",
"\n",
" def copy_multiple_files(self, drive_service, dest_folder_id, source_files):\n",
" for source_file in source_files:\n",
" self.copy_file(drive_service, dest_folder_id, source_file)\n",
"\n",
" def extract_folder_id_from_url(self, url):\n",
" pattern = r'[-\\w]{25,}'\n",
" match = re.search(pattern, url)\n",
" if match:\n",
" return match.group(0)\n",
" else:\n",
" return None\n",
"\n",
" def on_total_size_exceeded(self, message):\n",
" print(message)\n",
" sys.exit()\n",
"\n",
" def copy_drive_to_drive(self, destDriveLink, sourceDriveLink, from_page, to_page):\n",
" service = self.get_user_credential()\n",
"\n",
" start_time = time.time()\n",
" dest_folder_id = self.extract_folder_id_from_url(destDriveLink)\n",
" source_folder_id = self.extract_folder_id_from_url(sourceDriveLink)\n",
" source_folder = service.files().get(fileId=source_folder_id, supportsAllDrives=True).execute()\n",
" new_dest_folder_id = self.create_folder(service, dest_folder_id, source_folder['name'])\n",
"\n",
" source_files = self.get_childs_from_folder(service, source_folder_id, from_page, to_page)\n",
" self.copy_multiple_files(service, new_dest_folder_id, source_files)\n",
" end_time = time.time()\n",
"\n",
" size_gb = self._total_size / 1024\n",
" speed_mb = self._total_size / (end_time - start_time)\n",
"\n",
" print(f\"Done. Total Size {size_gb:0.2f} GB. Total Time {int(end_time - start_time)} s. SpeedMB {speed_mb:0.2f} MB/s\")\n",
"\n",
"\n",
"# Main\n",
"destDriveLink = dest_text.value\n",
"sourceDriveLink = source_text.value\n",
"fromPage = int(from_page_text.value)\n",
"toPage = int(to_page_text.value)\n",
"\n",
"downloader = DownloadFromDrive()\n",
"downloader._limit_size = float(max_download_size_text.value);\n",
"downloader.excluded_strings = [ext.strip() for ext in exclude_str_text.value.split(\",\") if ext.strip()]\n",
"downloader.copy_drive_to_drive(destDriveLink, sourceDriveLink, fromPage, toPage)"
],
"metadata": {
"id": "dd_MwROX7DxL"
},
"execution_count": null,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment