Last active
September 27, 2024 17:49
-
-
Save GGontijo/1527adb4dd80736181fb329c337fb92b to your computer and use it in GitHub Desktop.
FastAPI Background Task Queue with Asyncio and Semaphore for Shared Resource Management
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import time | |
| import logging | |
| import asyncio | |
| from fastapi import FastAPI, Response | |
| from fastapi.concurrency import asynccontextmanager | |
| from pydantic import BaseModel | |
| from pygen import anexar_arquivo, baixar_planilha, executar_atividade | |
| from guias import Imposto, emitir_guia_difal, emitir_guia_fethab, emitir_guia_iagro | |
| from apuracao import processar_apuracao_difal, processar_apuracao_fethab, processar_apuracao_iagro | |
| import urllib3 | |
| import os | |
| from selenium import webdriver | |
| from urllib3.exceptions import InsecureRequestWarning | |
| import undetected_chromedriver as uc | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Fila de processamento | |
| task_queue = asyncio.Queue(maxsize=10) | |
| # Apenas uma tarefa pode acessar o Chrome por vez | |
| semaphore = asyncio.Semaphore(1) | |
| # Executor para rodar tarefas em threads | |
| executor = ThreadPoolExecutor(max_workers=1) | |
| async def process_tasks(): | |
| while True: | |
| emissao = await task_queue.get() | |
| if emissao is None: | |
| await asyncio.sleep(10) | |
| logging.info(f"Iniciando processamento da emissão: {emissao}") | |
| await executar_emissao(emissao) | |
| task_queue.task_done() | |
| logging.info(f"Processamento concluído para a emissão: {emissao}") | |
| @asynccontextmanager | |
| async def driver_lifespan(app: FastAPI): | |
| # Inicia o driver do Chrome | |
| urllib3.disable_warnings(InsecureRequestWarning) | |
| os.environ['WDM_SSL_VERIFY'] = '0' | |
| options = webdriver.ChromeOptions() | |
| options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, Gecko) Chrome/120.0.0.0 Safari/537.36') | |
| options.add_argument('--ignore-ssl-errors') | |
| options.add_argument('--ignore-certificate-errors') | |
| options.add_argument('--disable-popup-blocking') | |
| options.add_argument('--disable-blink-features=AutomationControlled') | |
| options.add_experimental_option("excludeSwitches", ["enable-automation"]) | |
| options.add_experimental_option("useAutomationExtension", False) | |
| driver = uc.Chrome(desired_capabilities=options.to_capabilities()) | |
| driver.maximize_window() | |
| driver.set_page_load_timeout(60) | |
| logging.info("Dependência driver instanciada!") | |
| app.state.driver = driver | |
| bg_task = asyncio.create_task(process_tasks()) | |
| logging.info(f"Criado tarefa em Background {bg_task}") | |
| yield | |
| # Finaliza o driver ao encerrar o aplicativo | |
| driver.close() | |
| logging.info("Dependência driver encerrada!") | |
| app = FastAPI(lifespan=driver_lifespan) | |
| class EmissaoGuias(BaseModel): | |
| instancia: str | |
| impostos: list[Imposto] | |
| periodo: str | |
| vencimento: str | |
| async def executar_emissao(emissao: EmissaoGuias): | |
| async with semaphore: # Garantir que apenas uma tarefa acesse o Chrome por vez | |
| try: | |
| driver = app.state.driver | |
| planilha = await asyncio.get_running_loop().run_in_executor(executor, baixar_planilha, emissao.instancia) | |
| lista_chaves_zimp = [] | |
| if Imposto.DIFAL in emissao.impostos: | |
| lista_apuracao_difal = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_difal, planilha) | |
| for apuracao in lista_apuracao_difal: | |
| chaves_zimp = await asyncio.get_running_loop().run_in_executor( | |
| executor, | |
| emitir_guia_difal, | |
| driver, | |
| emissao.instancia, | |
| apuracao['filial'], | |
| apuracao['lista_chaves'], | |
| emissao.periodo, | |
| apuracao['valor_total'], | |
| apuracao['ie'], | |
| emissao.vencimento | |
| ) | |
| lista_chaves_zimp.append({ | |
| 'imposto': Imposto.DIFAL, | |
| 'filial': apuracao['filial'], | |
| 'chaves_zimp': chaves_zimp, | |
| 'valor_total': apuracao['valor_total'] | |
| }) | |
| if Imposto.FETHAB in emissao.impostos: | |
| lista_apuracao_fethab = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_fethab, planilha) | |
| if Imposto.IAGRO in emissao.impostos: | |
| lista_apuracao_iagro = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_iagro, planilha) | |
| await asyncio.get_running_loop().run_in_executor( | |
| executor, emitir_guia_iagro, driver, emissao.impostos, emissao.periodo, emissao.instancia | |
| ) | |
| except Exception as err: | |
| logging.error(f"Erro ao executar emissão: {err}") | |
| @app.post("/gerar") | |
| async def gerar(emissao: EmissaoGuias) -> Response: | |
| await task_queue.put(emissao) | |
| position = task_queue._unfinished_tasks | |
| logging.info(f"Emissão adicionada à fila. Posição {position}.") | |
| return Response(content=f"Emissão recebida e sendo processada. Posição {position}.", status_code=202) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment