Created
July 17, 2015 14:09
-
-
Save Frisch12/55e80fa7423f762fb06c to your computer and use it in GitHub Desktop.
Google Music All Access Downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import argparse | |
| import time | |
| import urllib2 | |
| import os | |
| from PIL import Image | |
| import shutil | |
| from gmusicapi import Mobileclient, Webclient, CallFailure | |
| from mutagen.id3._frames import * | |
| from mutagen.mp3 import MP3 | |
| from mutagen.id3 import ID3 | |
| import mutagen.id3 | |
| base_path = "F:\\Musik\\" | |
| google_username = "" | |
| google_password = "" | |
| android_device_id = "" | |
| allowed_chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_.\\" | |
| def download(url, file_name): | |
| u = urllib2.urlopen(url, timeout=30) | |
| f = open(file_name, 'wb') | |
| meta = u.info() | |
| file_size = int(meta.getheaders("Content-Length")[0]) | |
| print "Downloading: %s Bytes: %s" % (file_name, file_size) | |
| file_size_dl = 0 | |
| block_sz = 8192 | |
| while True: | |
| byte_buffer = u.read(block_sz) | |
| if not byte_buffer: | |
| break | |
| file_size_dl += len(byte_buffer) | |
| f.write(byte_buffer) | |
| #status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size) | |
| #status += chr(8) * (len(status) + 1) | |
| #print status | |
| f.close() | |
| def cleanup(value, allow_space=False): | |
| result = value | |
| result = result.replace("ü".decode("utf-8"), "ue") \ | |
| .replace("ä".decode("utf-8"), "ae") \ | |
| .replace("ö".decode("utf-8"), "oe") \ | |
| .replace("Ü".decode("utf-8"), "Ue") \ | |
| .replace("Ö".decode("utf-8"), "Oe") \ | |
| .replace("Ä".decode("utf-8"), "Ae") \ | |
| .replace("ß".decode("utf-8"), "ss") | |
| for chrValue in result: | |
| if not chrValue in allowed_chars and not (allow_space and chrValue == " "): | |
| result = result.replace(chrValue, "_") | |
| return result | |
| def resize_image(filename, width, height): | |
| im1 = Image.open(filename) | |
| im1 = im1.resize((width, height), Image.ANTIALIAS) | |
| im1.save(filename) | |
| pass | |
| def download_album_art(item): | |
| album_path = get_directory(item) | |
| filename = os.path.join(album_path, "ALBUM_ART.jpg") | |
| if not os.path.exists(filename): | |
| if 'albumArtRef' in item and len(item['albumArtRef']) > 0: | |
| retry = 0 | |
| while retry < 3: | |
| try: | |
| download(item['albumArtRef'][0]['url'], filename) | |
| album_art_small = os.path.join(album_path, "AlbumArtSmall.jpg") | |
| shutil.copyfile(filename, album_art_small) | |
| resize_image(album_art_small, 75, 75) | |
| folder_image=os.path.join(album_path, "Folder.jpg") | |
| shutil.copyfile(filename, folder_image) | |
| resize_image(folder_image, 200, 200) | |
| except Exception as ex: | |
| retry += 1 | |
| print ex | |
| print "retry %s of 3" % retry | |
| if retry == 3: | |
| raise ex | |
| continue | |
| retry = 3 | |
| pass | |
| def update_id3(filename, item): | |
| mp3file = MP3(filename, ID3=ID3) | |
| try: | |
| mp3file.add_tags(ID3=ID3) | |
| except mutagen.id3.error: | |
| pass | |
| mp3file["TALB"] = TALB(encoding=3, text=item["album"]) | |
| mp3file["TIT2"] = TIT2(encoding=3, text=item["title"]) | |
| mp3file["TPE1"] = TPE1(encoding=3, text=item["artist"]) | |
| try: | |
| mp3file["TCON"] = TCON(encoding=3, text=item["genre"]) | |
| except KeyError: | |
| pass | |
| mp3file["TRCK"] = TRCK(encoding=3, text=str(item["trackNumber"] + 100 * item["discNumber"])) | |
| download_album_art(item) | |
| directory = get_directory(item) | |
| album_art = os.path.join(directory, "ALBUM_ART.jpg") | |
| if os.path.exists(album_art): | |
| mp3file["APIC"] = APIC(encoding=3, mime="image/jpeg", type=3, desc=u"Cover", data=open(album_art).read()) | |
| mp3file.tags.update_to_v23() | |
| mp3file.tags.save(filename=filename, v2_version=3) | |
| def get_directory(item): | |
| artist_path = os.path.join(base_path, cleanup(item["artist"].strip())) | |
| if not os.path.exists(artist_path): | |
| os.makedirs(artist_path) | |
| album_path = os.path.join(artist_path, cleanup(item["album"].strip())) | |
| if not os.path.exists(album_path): | |
| os.makedirs(album_path) | |
| return album_path | |
| def get_file_name(item): | |
| album_path = get_directory(item) | |
| filename = cleanup(str(item["trackNumber"]) + "-" + item["title"] + ".mp3") | |
| if item["trackNumber"] < 10: | |
| filename = "0" + filename | |
| filename_full = os.path.join(album_path, filename) | |
| if len(filename_full) > 200: | |
| filename = filename[0:40] + ".mp3" | |
| filename_full = os.path.join(album_path, filename) | |
| return filename_full | |
| def update_songs(song_list): | |
| length = len(song_list) | |
| i = 0 | |
| for item in song_list: | |
| i += 1 | |
| print "%s of %s" % (i, length) | |
| print "%s %s" % (cleanup(item["album"], allow_space=True), cleanup(item["title"], allow_space=True)) | |
| filename = get_file_name(item) | |
| if os.path.exists(filename): | |
| update_id3(filename, item) | |
| def download_list(song_list, api): | |
| length = len(song_list) | |
| i = 0 | |
| for item in song_list: | |
| i += 1 | |
| print "%s of %s" % (i, length) | |
| print "%s %s" % (cleanup(item["album"], allow_space=True), cleanup(item["title"], allow_space=True)) | |
| filename = get_file_name(item) | |
| if not os.path.exists(filename): | |
| try: | |
| stream_url = api.get_stream_url(item["id"], android_device_id) | |
| retry = 0 | |
| while retry < 3: | |
| try: | |
| download(stream_url, filename) | |
| except Exception as ex: | |
| retry += 1 | |
| print ex | |
| print "retry %s of 3" % retry | |
| if retry == 3: | |
| raise ex | |
| continue | |
| retry = 3 | |
| update_id3(filename, item) | |
| print "Finished! Waiting 1 seconds" | |
| time.sleep(1) | |
| except CallFailure as error: | |
| print error | |
| def get_track_id(item): | |
| return item['trackId'].strip() | |
| def download_playlist(song_list, api): | |
| song_id_list = map(get_track_id, song_list) | |
| all_songs = api.get_all_songs() | |
| new_song_list = [] | |
| for song in all_songs: | |
| if song['id'] in song_id_list or 'storeId' in song and song['storeId'] in song_id_list or \ | |
| 'nid' in song and song['nid'] in song_id_list: | |
| new_song_list.append(song) | |
| download_list(new_song_list, api) | |
| def list_devices(client): | |
| devices = client.get_registered_devices() | |
| for device in devices: | |
| name = device['name'] | |
| model = "" | |
| if 'model' in device: | |
| model = device['model'] | |
| type = device['type'] | |
| manufacturer = "" | |
| if 'manufacturer' in device: | |
| manufacturer = device['manufacturer'] | |
| id = device['id'] | |
| if type == 'PHONE' or type == 'TABLET': | |
| print "%s %s\n\tName: %s\n\tID: %s" % (manufacturer, model, name, id[2:]) | |
| def list_playlist(client): | |
| playlists = client.get_all_playlists() | |
| for playlist in playlists: | |
| print "%s\n\tType: %s\n\tID: %s" % (playlist['name'], playlist['type'], playlist['id']) | |
| def get_playlist_content(client, playlist_id): | |
| playlists = client.get_all_user_playlist_contents() | |
| for playlist in playlists: | |
| if playlist['id'] == playlist_id: | |
| return playlist['tracks'] | |
| def get_mobile_client(): | |
| api = Mobileclient(debug_logging=True, validate=True, verify_ssl=False) | |
| logged_in = api.login(google_username, google_password, Mobileclient.FROM_MAC_ADDRESS) | |
| if not logged_in: | |
| raise BaseException("login failed") | |
| return api | |
| def main(): | |
| parser = argparse.ArgumentParser(description="download songs from google music") | |
| parser.add_argument('-u', '--username', required=True, help="username for google music (username@gmail.com)") | |
| parser.add_argument('-p', '--password', required=True, | |
| help="password for google music (on 2-factor-auth you have to use an app-specific password)") | |
| parser.add_argument('-d', '--device', help="android device id (required for some operations)") | |
| parser.add_argument('--path', help="path to save in") | |
| parser.add_argument('--update', action='store_true', help="only update id3 tags") | |
| parser.add_argument('--showdevices', action='store_true', help="shows registered android devices") | |
| parser.add_argument('--showplaylist', action='store_true', help="shows all available playlist") | |
| parser.add_argument('--playlist', help="load songs only from this playlist") | |
| args = parser.parse_args() | |
| global google_username, google_password, base_path, android_device_id | |
| if args.username: | |
| google_username = args.username | |
| if args.password: | |
| google_password = args.password | |
| if args.path: | |
| base_path = args.path | |
| if not args.update and not args.showdevices and not args.showplaylist: | |
| if not args.device: | |
| raise BaseException("android device id missing") | |
| android_device_id = args.device | |
| api = get_mobile_client() | |
| if not args.playlist: | |
| song_list = api.get_all_songs() | |
| download_list(song_list, api) | |
| else: | |
| song_list = get_playlist_content(api, args.playlist) | |
| download_playlist(song_list, api) | |
| else: | |
| if args.update: | |
| api = get_mobile_client() | |
| song_list = api.get_all_songs() | |
| update_songs(song_list) | |
| else: | |
| if args.showdevices: | |
| client = Webclient() | |
| logged_in = client.login(google_username, google_password) | |
| if logged_in: | |
| list_devices(client) | |
| else: | |
| if args.showplaylist: | |
| api = get_mobile_client() | |
| list_playlist(api) | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment