Created
March 4, 2022 06:25
-
-
Save feifei9606/353affc994c9e77fb950111808d9e252 to your computer and use it in GitHub Desktop.
[批量从sci-hub下载文件]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """ | |
| Sci-API Unofficial API | |
| [Search|Download] research papers from [scholar.google.com|sci-hub.io]. | |
| @author zaytoun | |
| """ | |
| import re | |
| import argparse | |
| import hashlib | |
| import logging | |
| import os | |
| import multiprocessing as mp | |
| import requests | |
| import urllib3 | |
| from bs4 import BeautifulSoup | |
| from retrying import retry | |
| import redis | |
| from random import choice | |
| # log config | |
| logging.basicConfig() | |
| logger = logging.getLogger('Sci-Hub') | |
| logger.setLevel(logging.DEBUG) | |
| # | |
| urllib3.disable_warnings() | |
| # constants | |
| SCHOLARS_BASE_URL = 'https://scholar.google.com/scholar' | |
| class SciHub(object): | |
| """ | |
| SciHub class can search for papers on Google Scholars | |
| and fetch/download papers from sci-hub.io | |
| """ | |
| def __init__(self): | |
| self.sess = requests.Session() | |
| self.available_base_url_list = self._get_available_scihub_urls() | |
| self.base_url = self.available_base_url_list[0] + '/' | |
| self.r = redis.StrictRedis(host="172.17.61.169", port=6666) | |
| self.UALIST = [ | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A", | |
| "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 OPR/26.0.1656.60", | |
| "Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11", | |
| "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 UBrowser/4.0.3214.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:49.0) Gecko/20100101 Firefox/49.0", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.100 Safari/537.36" | |
| ] | |
| try: | |
| self.r.ping() | |
| except ConnectionError: | |
| raise ConnectionError("cannot connect to redis") | |
| def _get_available_scihub_urls(self): | |
| ''' | |
| Finds available scihub urls via https://sci-hub.now.sh/ | |
| ''' | |
| urls = [] | |
| res = requests.get('https://sci-hub.now.sh/', timeout=3) | |
| s = self._get_soup(res.content) | |
| for a in s.find_all('a', href=True): | |
| if 'sci-hub.' in a['href']: | |
| urls.append(a['href']) | |
| return urls | |
| def set_proxy(self): | |
| """ | |
| set proxy for session | |
| :return: | |
| """ | |
| proxies = self.r.hkeys("use_proxy") | |
| proxy = choice(proxies).decode('utf8') | |
| self.sess.proxies = { | |
| "http": f"http://{proxy}", | |
| "https": f"http://{proxy}", } | |
| return | |
| def delete_proxy(self, proxy): | |
| proxy = proxy['http'].replace("http://", "") | |
| self.r.hdel("use_proxy", proxy) | |
| return | |
| def _change_base_url(self): | |
| if not self.available_base_url_list: | |
| raise Exception('Ran out of valid sci-hub urls') | |
| del self.available_base_url_list[0] | |
| self.base_url = self.available_base_url_list[0] + '/' | |
| logger.info("I'm changing to {}".format(self.available_base_url_list[0])) | |
| def search(self, query, limit=10, download=False): | |
| """ | |
| Performs a query on scholar.google.com, and returns a dictionary | |
| of results in the form {'papers': ...}. Unfortunately, as of now, | |
| captchas can potentially prevent searches after a certain limit. | |
| """ | |
| start = 0 | |
| results = {'papers': []} | |
| while True: | |
| try: | |
| res = self.sess.get(SCHOLARS_BASE_URL, params={'q': query, 'start': start}, timeout=10, | |
| headers={'User-Agent': choice(self.UALIST)}) | |
| except requests.exceptions.RequestException as e: | |
| results['err'] = 'Failed to complete search with query %s (connection error)' % query | |
| return results | |
| s = self._get_soup(res.content) | |
| papers = s.find_all('div', class_="gs_r") | |
| if not papers: | |
| if 'CAPTCHA' in str(res.content): | |
| results['err'] = 'Failed to complete search with query %s (captcha)' % query | |
| return results | |
| for paper in papers: | |
| if not paper.find('table'): | |
| source = None | |
| pdf = paper.find('div', class_='gs_ggs gs_fl') | |
| link = paper.find('h3', class_='gs_rt') | |
| if pdf: | |
| source = pdf.find('a')['href'] | |
| elif link.find('a'): | |
| source = link.find('a')['href'] | |
| else: | |
| continue | |
| results['papers'].append({ | |
| 'name': link.text, | |
| 'url': source | |
| }) | |
| if len(results['papers']) >= limit: | |
| return results | |
| start += 10 | |
| @retry(wait_random_min=1000, wait_random_max=3000, stop_max_attempt_number=10) | |
| def download(self, identifier, destination='', path=None): | |
| """ | |
| Downloads a paper from sci-hub given an indentifier (DOI, PMID, URL). | |
| Currently, this can potentially be blocked by a captcha if a certain | |
| limit has been reached. | |
| """ | |
| data = self.fetch(identifier) | |
| if not 'err' in data: | |
| self._save(data['pdf'], | |
| os.path.join(destination, path if path else data['name'])) | |
| return data | |
| def fetch(self, identifier): | |
| """ | |
| Fetches the paper by first retrieving the direct link to the pdf. | |
| If the indentifier is a DOI, PMID, or URL pay-wall, then use Sci-Hub | |
| to access and download paper. Otherwise, just download paper directly. | |
| """ | |
| try: | |
| url = self._get_direct_url(identifier) | |
| # verify=False is dangerous but sci-hub.io | |
| # requires intermediate certificates to verify | |
| # and requests doesn't know how to download them. | |
| # as a hacky fix, you can add them to your store | |
| # and verifying would work. will fix this later. | |
| res = self.sess.get(url, verify=False, timeout=10, headers={'User-Agent': choice(self.UALIST)}) | |
| if res.headers['Content-Type'] != 'application/pdf': | |
| self._change_base_url() | |
| logger.info('Failed to fetch pdf with identifier %s ' | |
| '(resolved url %s) due to captcha' % (identifier, url)) | |
| raise CaptchaNeedException('Failed to fetch pdf with identifier %s ' | |
| '(resolved url %s) due to captcha' % (identifier, url)) | |
| # return { | |
| # 'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to captcha' | |
| # % (identifier, url) | |
| # } | |
| else: | |
| return { | |
| 'pdf': res.content, | |
| 'url': url, | |
| # 'name': self._generate_name(res) | |
| 'name': f'{identifier.replace("/", "_")}.pdf', | |
| 'identifier': identifier | |
| } | |
| except requests.exceptions.ProxyError: | |
| self.delete_proxy(self.sess.proxies) | |
| self.set_proxy() | |
| logger.info(f'change proxy to {self.sess.proxies["http"]}') | |
| except requests.exceptions.ConnectionError: | |
| logger.info('Cannot access {}, changing url'.format(self.available_base_url_list[0])) | |
| self._change_base_url() | |
| except requests.exceptions.RequestException as e: | |
| logger.info('Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.' | |
| % (identifier, url)) | |
| return { | |
| 'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.' | |
| % (identifier, url) | |
| } | |
| def _get_direct_url(self, identifier): | |
| """ | |
| Finds the direct source url for a given identifier. | |
| """ | |
| id_type = self._classify(identifier) | |
| return identifier if id_type == 'url-direct' \ | |
| else self._search_direct_url(identifier) | |
| def _search_direct_url(self, identifier): | |
| """ | |
| Sci-Hub embeds papers in an iframe. This function finds the actual | |
| source url which looks something like https://moscow.sci-hub.io/.../....pdf. | |
| """ | |
| res = self.sess.get(self.base_url + identifier, verify=False, timeout=10, | |
| headers={'User-Agent': choice(self.UALIST)}) | |
| s = self._get_soup(res.content) | |
| iframe = s.find('iframe') | |
| if iframe: | |
| return iframe.get('src') if not iframe.get('src').startswith('//') \ | |
| else 'http:' + iframe.get('src') | |
| else: | |
| raise requests.exceptions.ProxyError() | |
| def _classify(self, identifier): | |
| """ | |
| Classify the type of identifier: | |
| url-direct - openly accessible paper | |
| url-non-direct - pay-walled paper | |
| pmid - PubMed ID | |
| doi - digital object identifier | |
| """ | |
| if (identifier.startswith('http') or identifier.startswith('https')): | |
| if identifier.endswith('pdf'): | |
| return 'url-direct' | |
| else: | |
| return 'url-non-direct' | |
| elif identifier.isdigit(): | |
| return 'pmid' | |
| else: | |
| return 'doi' | |
| def _save(self, data, path): | |
| """ | |
| Save a file give data and a path. | |
| """ | |
| with open(path, 'wb') as f: | |
| f.write(data) | |
| def _get_soup(self, html): | |
| """ | |
| Return html soup. | |
| """ | |
| return BeautifulSoup(html, 'html.parser') | |
| def _generate_name(self, res): | |
| """ | |
| Generate unique filename for paper. Returns a name by calcuating | |
| md5 hash of file contents, then appending the last 20 characters | |
| of the url which typically provides a good paper identifier. | |
| """ | |
| name = res.url.split('/')[-1] | |
| name = re.sub('#view=(.+)', '', name) | |
| pdf_hash = hashlib.md5(res.content).hexdigest() | |
| return '%s-%s' % (pdf_hash, name[-20:]) | |
| def logger(self, result): | |
| if 'err' in result: | |
| logger.debug('%s', result['err']) | |
| else: | |
| logger.debug('Successfully downloaded file with identifier %s', result['identifier']) | |
| return | |
| class CaptchaNeedException(Exception): | |
| pass | |
| def main(): | |
| parser = argparse.ArgumentParser(description='SciHub - To remove all barriers in the way of science.') | |
| parser.add_argument('-d', '--download', metavar='(DOI|PMID|URL)', help='tries to find and download the paper', | |
| type=str) | |
| parser.add_argument('-f', '--file', metavar='path', help='pass file with list of identifiers and download each', | |
| type=str) | |
| parser.add_argument('-s', '--search', metavar='query', help='search Google Scholars', type=str) | |
| parser.add_argument('-sd', '--search_download', metavar='query', | |
| help='search Google Scholars and download if possible', type=str) | |
| parser.add_argument('-l', '--limit', metavar='N', help='the number of search results to limit to', default=10, | |
| type=int) | |
| parser.add_argument('-o', '--output', metavar='path', help='directory to store papers', default='', type=str) | |
| parser.add_argument('-v', '--verbose', help='increase output verbosity', action='store_true') | |
| parser.add_argument('-p', '--proxy', help='via proxy format like socks5://user:pass@host:port', action='store', | |
| type=str) | |
| parser.add_argument('-t', '--thread', help='num of threads', type=int, default=4) | |
| args = parser.parse_args() | |
| if args.verbose: | |
| logger.setLevel(logging.DEBUG) | |
| # if args.proxy: | |
| # sh.set_proxy(args.proxy) | |
| if args.download: | |
| result = sh.download(args.download, args.output) | |
| if 'err' in result: | |
| logger.debug('%s', result['err']) | |
| else: | |
| logger.debug('Successfully downloaded file with identifier %s', args.download) | |
| elif args.search: | |
| results = sh.search(args.search, args.limit) | |
| if 'err' in results: | |
| logger.debug('%s', results['err']) | |
| else: | |
| logger.debug('Successfully completed search with query %s', args.search) | |
| print(results) | |
| elif args.search_download: | |
| results = sh.search(args.search_download, args.limit) | |
| if 'err' in results: | |
| logger.debug('%s', results['err']) | |
| else: | |
| logger.debug('Successfully completed search with query %s', args.search_download) | |
| for paper in results['papers']: | |
| result = sh.download(paper['url'], args.output) | |
| if 'err' in result: | |
| logger.debug('%s', result['err']) | |
| else: | |
| logger.debug('Successfully downloaded file with identifier %s', paper['url']) | |
| elif args.file: | |
| cpuNum = min(args.thread, mp.cpu_count()) | |
| pool = mp.Pool(cpuNum) | |
| with open(args.file, 'r') as f: | |
| identifiers = f.read().splitlines() | |
| for identifier in identifiers: | |
| if os.path.exists(os.path.join(args.output, f'{identifier.replace("/", "_")}.pdf')): | |
| continue | |
| sh = SciHub() | |
| sh.set_proxy() | |
| pool.apply_async(sh.download, (identifier, args.output,), callback=sh.logger) | |
| # result = sh.download(identifier, args.output) | |
| pool.close() | |
| pool.join() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment