Skip to content

Instantly share code, notes, and snippets.

@GGontijo
Last active September 27, 2024 17:49
Show Gist options
  • Select an option

  • Save GGontijo/1527adb4dd80736181fb329c337fb92b to your computer and use it in GitHub Desktop.

Select an option

Save GGontijo/1527adb4dd80736181fb329c337fb92b to your computer and use it in GitHub Desktop.
FastAPI Background Task Queue with Asyncio and Semaphore for Shared Resource Management
from datetime import time
import logging
import asyncio
from fastapi import FastAPI, Response
from fastapi.concurrency import asynccontextmanager
from pydantic import BaseModel
from pygen import anexar_arquivo, baixar_planilha, executar_atividade
from guias import Imposto, emitir_guia_difal, emitir_guia_fethab, emitir_guia_iagro
from apuracao import processar_apuracao_difal, processar_apuracao_fethab, processar_apuracao_iagro
import urllib3
import os
from selenium import webdriver
from urllib3.exceptions import InsecureRequestWarning
import undetected_chromedriver as uc
from concurrent.futures import ThreadPoolExecutor
# Fila de processamento
task_queue = asyncio.Queue(maxsize=10)
# Apenas uma tarefa pode acessar o Chrome por vez
semaphore = asyncio.Semaphore(1)
# Executor para rodar tarefas em threads
executor = ThreadPoolExecutor(max_workers=1)
async def process_tasks():
while True:
emissao = await task_queue.get()
if emissao is None:
await asyncio.sleep(10)
logging.info(f"Iniciando processamento da emissão: {emissao}")
await executar_emissao(emissao)
task_queue.task_done()
logging.info(f"Processamento concluído para a emissão: {emissao}")
@asynccontextmanager
async def driver_lifespan(app: FastAPI):
# Inicia o driver do Chrome
urllib3.disable_warnings(InsecureRequestWarning)
os.environ['WDM_SSL_VERIFY'] = '0'
options = webdriver.ChromeOptions()
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, Gecko) Chrome/120.0.0.0 Safari/537.36')
options.add_argument('--ignore-ssl-errors')
options.add_argument('--ignore-certificate-errors')
options.add_argument('--disable-popup-blocking')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
driver = uc.Chrome(desired_capabilities=options.to_capabilities())
driver.maximize_window()
driver.set_page_load_timeout(60)
logging.info("Dependência driver instanciada!")
app.state.driver = driver
bg_task = asyncio.create_task(process_tasks())
logging.info(f"Criado tarefa em Background {bg_task}")
yield
# Finaliza o driver ao encerrar o aplicativo
driver.close()
logging.info("Dependência driver encerrada!")
app = FastAPI(lifespan=driver_lifespan)
class EmissaoGuias(BaseModel):
instancia: str
impostos: list[Imposto]
periodo: str
vencimento: str
async def executar_emissao(emissao: EmissaoGuias):
async with semaphore: # Garantir que apenas uma tarefa acesse o Chrome por vez
try:
driver = app.state.driver
planilha = await asyncio.get_running_loop().run_in_executor(executor, baixar_planilha, emissao.instancia)
lista_chaves_zimp = []
if Imposto.DIFAL in emissao.impostos:
lista_apuracao_difal = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_difal, planilha)
for apuracao in lista_apuracao_difal:
chaves_zimp = await asyncio.get_running_loop().run_in_executor(
executor,
emitir_guia_difal,
driver,
emissao.instancia,
apuracao['filial'],
apuracao['lista_chaves'],
emissao.periodo,
apuracao['valor_total'],
apuracao['ie'],
emissao.vencimento
)
lista_chaves_zimp.append({
'imposto': Imposto.DIFAL,
'filial': apuracao['filial'],
'chaves_zimp': chaves_zimp,
'valor_total': apuracao['valor_total']
})
if Imposto.FETHAB in emissao.impostos:
lista_apuracao_fethab = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_fethab, planilha)
if Imposto.IAGRO in emissao.impostos:
lista_apuracao_iagro = await asyncio.get_running_loop().run_in_executor(executor, processar_apuracao_iagro, planilha)
await asyncio.get_running_loop().run_in_executor(
executor, emitir_guia_iagro, driver, emissao.impostos, emissao.periodo, emissao.instancia
)
except Exception as err:
logging.error(f"Erro ao executar emissão: {err}")
@app.post("/gerar")
async def gerar(emissao: EmissaoGuias) -> Response:
await task_queue.put(emissao)
position = task_queue._unfinished_tasks
logging.info(f"Emissão adicionada à fila. Posição {position}.")
return Response(content=f"Emissão recebida e sendo processada. Posição {position}.", status_code=202)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment