Skip to content

Instantly share code, notes, and snippets.

@fredgido
Last active September 16, 2021 17:07
Show Gist options
  • Select an option

  • Save fredgido/6d10b20f55ceb316f5c359cc0f012a90 to your computer and use it in GitHub Desktop.

Select an option

Save fredgido/6d10b20f55ceb316f5c359cc0f012a90 to your computer and use it in GitHub Desktop.
import uuid
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
@app.route('/')
def hello_world(): # put application's code here
return 'Hello World!'
# set configuration values
class Config:
SCHEDULER_API_ENABLED = True
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
SCHEDULER_JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///jobs.db?check_same_thread=true",
engine_options={'isolation_level': "AUTOCOMMIT"})}
from sqlalchemy import event
@event.listens_for(SCHEDULER_JOBSTORES['default'].engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode")
if (mode := cursor.fetchall()) == [('delete',)]:
cursor.execute("PRAGMA journal_mode = wal2;")
cursor.execute("PRAGMA journal_mode")
if (mode := cursor.fetchall()) == [('delete',)]:
cursor.execute("PRAGMA journal_mode = wal;")
cursor.execute("PRAGMA journal_mode")
if (mode := cursor.fetchall()) != [('wal',)] and mode != [('wal2',)]:
cursor.execute("PRAGMA journal_mode = wal;")
cursor.close()
SCHEDULER_EXECUTORS = {"default": {"type": "processpool", "max_workers": 3}}
SCHEDULER_JOB_DEFAULTS = {"coalesce": False, "max_instances": 1, 'misfire_grace_time': 30}
SCHEDULER_MISFIRE_GRACE_TIME = 30
# create app
app = Flask(__name__)
app.config.from_object(Config())
# initialize scheduler
scheduler = APScheduler()
# if you don't wanna use a config, you can set options here:
scheduler.init_app(app)
scheduler.start()
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def job():
import time, os
print(os.getpid(), 'job wait start', time.time())
time.sleep(2)
print(os.getpid(), 'job end', time.time())
@scheduler.task("interval", id="do_job", seconds=10, misfire_grace_time=900)
def periodic_job():
import time, os
print(os.getpid(), ' peridic job wait start', time.time())
scheduler.add_job(str(uuid.uuid4()), job)
print(os.getpid(), 'job end', time.time())
@app.route('/')
def hello_world(): # put application's code here
jobs = []
for i in range(9):
jobs.append(scheduler.add_job(str(uuid.uuid4()), job))
print(jobs)
print('submited jobs')
return 'Hello World!'
if __name__ == '__main__':
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment