Skip to content

Instantly share code, notes, and snippets.

@feifei9606
Created March 4, 2022 06:25
Show Gist options
  • Select an option

  • Save feifei9606/353affc994c9e77fb950111808d9e252 to your computer and use it in GitHub Desktop.

Select an option

Save feifei9606/353affc994c9e77fb950111808d9e252 to your computer and use it in GitHub Desktop.
[批量从sci-hub下载文件]
# -*- coding: utf-8 -*-
"""
Sci-API Unofficial API
[Search|Download] research papers from [scholar.google.com|sci-hub.io].
@author zaytoun
"""
import re
import argparse
import hashlib
import logging
import os
import multiprocessing as mp
import requests
import urllib3
from bs4 import BeautifulSoup
from retrying import retry
import redis
from random import choice
# log config
logging.basicConfig()
logger = logging.getLogger('Sci-Hub')
logger.setLevel(logging.DEBUG)
#
urllib3.disable_warnings()
# constants
SCHOLARS_BASE_URL = 'https://scholar.google.com/scholar'
class SciHub(object):
"""
SciHub class can search for papers on Google Scholars
and fetch/download papers from sci-hub.io
"""
def __init__(self):
self.sess = requests.Session()
self.available_base_url_list = self._get_available_scihub_urls()
self.base_url = self.available_base_url_list[0] + '/'
self.r = redis.StrictRedis(host="172.17.61.169", port=6666)
self.UALIST = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 OPR/26.0.1656.60",
"Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 UBrowser/4.0.3214.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:49.0) Gecko/20100101 Firefox/49.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.100 Safari/537.36"
]
try:
self.r.ping()
except ConnectionError:
raise ConnectionError("cannot connect to redis")
def _get_available_scihub_urls(self):
'''
Finds available scihub urls via https://sci-hub.now.sh/
'''
urls = []
res = requests.get('https://sci-hub.now.sh/', timeout=3)
s = self._get_soup(res.content)
for a in s.find_all('a', href=True):
if 'sci-hub.' in a['href']:
urls.append(a['href'])
return urls
def set_proxy(self):
"""
set proxy for session
:return:
"""
proxies = self.r.hkeys("use_proxy")
proxy = choice(proxies).decode('utf8')
self.sess.proxies = {
"http": f"http://{proxy}",
"https": f"http://{proxy}", }
return
def delete_proxy(self, proxy):
proxy = proxy['http'].replace("http://", "")
self.r.hdel("use_proxy", proxy)
return
def _change_base_url(self):
if not self.available_base_url_list:
raise Exception('Ran out of valid sci-hub urls')
del self.available_base_url_list[0]
self.base_url = self.available_base_url_list[0] + '/'
logger.info("I'm changing to {}".format(self.available_base_url_list[0]))
def search(self, query, limit=10, download=False):
"""
Performs a query on scholar.google.com, and returns a dictionary
of results in the form {'papers': ...}. Unfortunately, as of now,
captchas can potentially prevent searches after a certain limit.
"""
start = 0
results = {'papers': []}
while True:
try:
res = self.sess.get(SCHOLARS_BASE_URL, params={'q': query, 'start': start}, timeout=10,
headers={'User-Agent': choice(self.UALIST)})
except requests.exceptions.RequestException as e:
results['err'] = 'Failed to complete search with query %s (connection error)' % query
return results
s = self._get_soup(res.content)
papers = s.find_all('div', class_="gs_r")
if not papers:
if 'CAPTCHA' in str(res.content):
results['err'] = 'Failed to complete search with query %s (captcha)' % query
return results
for paper in papers:
if not paper.find('table'):
source = None
pdf = paper.find('div', class_='gs_ggs gs_fl')
link = paper.find('h3', class_='gs_rt')
if pdf:
source = pdf.find('a')['href']
elif link.find('a'):
source = link.find('a')['href']
else:
continue
results['papers'].append({
'name': link.text,
'url': source
})
if len(results['papers']) >= limit:
return results
start += 10
@retry(wait_random_min=1000, wait_random_max=3000, stop_max_attempt_number=10)
def download(self, identifier, destination='', path=None):
"""
Downloads a paper from sci-hub given an indentifier (DOI, PMID, URL).
Currently, this can potentially be blocked by a captcha if a certain
limit has been reached.
"""
data = self.fetch(identifier)
if not 'err' in data:
self._save(data['pdf'],
os.path.join(destination, path if path else data['name']))
return data
def fetch(self, identifier):
"""
Fetches the paper by first retrieving the direct link to the pdf.
If the indentifier is a DOI, PMID, or URL pay-wall, then use Sci-Hub
to access and download paper. Otherwise, just download paper directly.
"""
try:
url = self._get_direct_url(identifier)
# verify=False is dangerous but sci-hub.io
# requires intermediate certificates to verify
# and requests doesn't know how to download them.
# as a hacky fix, you can add them to your store
# and verifying would work. will fix this later.
res = self.sess.get(url, verify=False, timeout=10, headers={'User-Agent': choice(self.UALIST)})
if res.headers['Content-Type'] != 'application/pdf':
self._change_base_url()
logger.info('Failed to fetch pdf with identifier %s '
'(resolved url %s) due to captcha' % (identifier, url))
raise CaptchaNeedException('Failed to fetch pdf with identifier %s '
'(resolved url %s) due to captcha' % (identifier, url))
# return {
# 'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to captcha'
# % (identifier, url)
# }
else:
return {
'pdf': res.content,
'url': url,
# 'name': self._generate_name(res)
'name': f'{identifier.replace("/", "_")}.pdf',
'identifier': identifier
}
except requests.exceptions.ProxyError:
self.delete_proxy(self.sess.proxies)
self.set_proxy()
logger.info(f'change proxy to {self.sess.proxies["http"]}')
except requests.exceptions.ConnectionError:
logger.info('Cannot access {}, changing url'.format(self.available_base_url_list[0]))
self._change_base_url()
except requests.exceptions.RequestException as e:
logger.info('Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.'
% (identifier, url))
return {
'err': 'Failed to fetch pdf with identifier %s (resolved url %s) due to request exception.'
% (identifier, url)
}
def _get_direct_url(self, identifier):
"""
Finds the direct source url for a given identifier.
"""
id_type = self._classify(identifier)
return identifier if id_type == 'url-direct' \
else self._search_direct_url(identifier)
def _search_direct_url(self, identifier):
"""
Sci-Hub embeds papers in an iframe. This function finds the actual
source url which looks something like https://moscow.sci-hub.io/.../....pdf.
"""
res = self.sess.get(self.base_url + identifier, verify=False, timeout=10,
headers={'User-Agent': choice(self.UALIST)})
s = self._get_soup(res.content)
iframe = s.find('iframe')
if iframe:
return iframe.get('src') if not iframe.get('src').startswith('//') \
else 'http:' + iframe.get('src')
else:
raise requests.exceptions.ProxyError()
def _classify(self, identifier):
"""
Classify the type of identifier:
url-direct - openly accessible paper
url-non-direct - pay-walled paper
pmid - PubMed ID
doi - digital object identifier
"""
if (identifier.startswith('http') or identifier.startswith('https')):
if identifier.endswith('pdf'):
return 'url-direct'
else:
return 'url-non-direct'
elif identifier.isdigit():
return 'pmid'
else:
return 'doi'
def _save(self, data, path):
"""
Save a file give data and a path.
"""
with open(path, 'wb') as f:
f.write(data)
def _get_soup(self, html):
"""
Return html soup.
"""
return BeautifulSoup(html, 'html.parser')
def _generate_name(self, res):
"""
Generate unique filename for paper. Returns a name by calcuating
md5 hash of file contents, then appending the last 20 characters
of the url which typically provides a good paper identifier.
"""
name = res.url.split('/')[-1]
name = re.sub('#view=(.+)', '', name)
pdf_hash = hashlib.md5(res.content).hexdigest()
return '%s-%s' % (pdf_hash, name[-20:])
def logger(self, result):
if 'err' in result:
logger.debug('%s', result['err'])
else:
logger.debug('Successfully downloaded file with identifier %s', result['identifier'])
return
class CaptchaNeedException(Exception):
pass
def main():
parser = argparse.ArgumentParser(description='SciHub - To remove all barriers in the way of science.')
parser.add_argument('-d', '--download', metavar='(DOI|PMID|URL)', help='tries to find and download the paper',
type=str)
parser.add_argument('-f', '--file', metavar='path', help='pass file with list of identifiers and download each',
type=str)
parser.add_argument('-s', '--search', metavar='query', help='search Google Scholars', type=str)
parser.add_argument('-sd', '--search_download', metavar='query',
help='search Google Scholars and download if possible', type=str)
parser.add_argument('-l', '--limit', metavar='N', help='the number of search results to limit to', default=10,
type=int)
parser.add_argument('-o', '--output', metavar='path', help='directory to store papers', default='', type=str)
parser.add_argument('-v', '--verbose', help='increase output verbosity', action='store_true')
parser.add_argument('-p', '--proxy', help='via proxy format like socks5://user:pass@host:port', action='store',
type=str)
parser.add_argument('-t', '--thread', help='num of threads', type=int, default=4)
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
# if args.proxy:
# sh.set_proxy(args.proxy)
if args.download:
result = sh.download(args.download, args.output)
if 'err' in result:
logger.debug('%s', result['err'])
else:
logger.debug('Successfully downloaded file with identifier %s', args.download)
elif args.search:
results = sh.search(args.search, args.limit)
if 'err' in results:
logger.debug('%s', results['err'])
else:
logger.debug('Successfully completed search with query %s', args.search)
print(results)
elif args.search_download:
results = sh.search(args.search_download, args.limit)
if 'err' in results:
logger.debug('%s', results['err'])
else:
logger.debug('Successfully completed search with query %s', args.search_download)
for paper in results['papers']:
result = sh.download(paper['url'], args.output)
if 'err' in result:
logger.debug('%s', result['err'])
else:
logger.debug('Successfully downloaded file with identifier %s', paper['url'])
elif args.file:
cpuNum = min(args.thread, mp.cpu_count())
pool = mp.Pool(cpuNum)
with open(args.file, 'r') as f:
identifiers = f.read().splitlines()
for identifier in identifiers:
if os.path.exists(os.path.join(args.output, f'{identifier.replace("/", "_")}.pdf')):
continue
sh = SciHub()
sh.set_proxy()
pool.apply_async(sh.download, (identifier, args.output,), callback=sh.logger)
# result = sh.download(identifier, args.output)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment