Skip to content

Instantly share code, notes, and snippets.

@zacharyarnaise
Last active July 20, 2024 15:57
Show Gist options
  • Select an option

  • Save zacharyarnaise/590f51c62a9dad5b74daa42c80fcd300 to your computer and use it in GitHub Desktop.

Select an option

Save zacharyarnaise/590f51c62a9dad5b74daa42c80fcd300 to your computer and use it in GitHub Desktop.
Django RQ cheatsheet
import django_rq
import rq
import rq.registry
from conf import settings
conn = django_rq.get_connection()
# Get all workers
workers = django_rq.get_worker().all(con)
for worker in workers:
print(worker.hostname)
# Get all queues
queues = []
for i in range(len(settings.RQ_QUEUES)):
queues.append(django_rq.queues.get_queue_by_index(i))
# Access jobs registries
q_default = queues[0]
scheduled_job_registry = rq.registry.ScheduledJobRegistry(q_default.name, conn)
failed_job_registry = rq.registry.FailedJobRegistry(q_default.name, conn)
finished_job_registry = rq.registry.FinishedJobRegistry(q_default.name, conn)
# Get IDs of jobs in a registry
job_ids = failed_job_registry.get_job_ids()
# Fetch a job from its ID
job0 = rq.job.Job.fetch(job_ids[0], conn)
# Interesting infos of a job
job0.created_at
job0.enqueued_at
job0.ended_at
job0.func_name
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment