If you need to run a task every few seconds or create a report every week, you can use Celery and its feature for periodic tasks – at least if we use Linux or WSL on Windows.
This post is part of my journey to learn Python. You can find the other parts of this series here. You find the code for this post in my PythonFriday repository on GitHub.
Run every X seconds
Celery beat is the scheduler in Celery that we can use for our periodic tasks. We can create a task definition with the scheduling information to run every 10 seconds with this code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from celery import Celery from celery.schedules import crontab from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery(broker='amqp://guest@localhost//') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls ping('https://improveandrepeat.com') every 10 seconds. sender.add_periodic_task(10.0, ping.s('https://improveandrepeat.com')) @app.task def ping(url): logger.info(f"ping {url}") |
We can start Celery beat with this command (on Linux or in WSL):
1 |
celery -A periodic_task beat --loglevel=INFO |
Celery beat now sends every 10 seconds a task to the queue. To process them, we need to start the worker process with this command on Linux or Windows:
1 |
celery -A periodic_task worker --loglevel=INFO -P threads |
We should now see in the log of Celery beat that it creates a task every 10 seconds:
[…] beat: Starting…
[…] Scheduler: Sending due task run every 10 (periodic_task.ping)
[…] Scheduler: Sending due task run every 10 (periodic_task.ping)
[…] Scheduler: Sending due task run every 10 (periodic_task.ping)
In the log of the worker, we should see that the task gets processed:
[…] Task periodic_task.ping[90bcf2d2-*] received
[…] periodic_task.ping[90bcf2d2-*]: ping https://improveandrepeat.com
[…] Task periodic_task.ping[90bcf2d2-*] succeeded in 0.0310s: None
[…] Task periodic_task.ping[4f68153d-*] received
[…] periodic_task.ping[4f68153d-*]: ping https://improveandrepeat.com
[…] Task periodic_task.ping[4f68153d-*] succeeded in 0.0s: None
[…] Task periodic_task.ping[a9fe514e-*] received
[…] periodic_task.ping[a9fe514e-*]: ping https://improveandrepeat.com
[…] Task periodic_task.ping[a9fe514e-*] succeeded in 0.0s: None
Run at a predefined interval
For tasks that should run every Monday morning, we can use the crontab format to specify the start time:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from celery import Celery from celery.schedules import crontab from celery.utils.log import get_task_logger logger = get_task_logger(__name__) app = Celery(broker='amqp://guest@localhost//') app.conf.timezone = 'Europe/Zurich' @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Runs every Monday morning at 6:00 sender.add_periodic_task( crontab(hour=6, minute=0, day_of_week=1), make_report.s('WEEKLY_SALES'), ) @app.task def make_report(report_type): logger.info(f"report created: {report_type}") |
Make sure that you set your time zone. Otherwise, Celery uses UTC what may not be what you want.
Start the Celery beat service with this command (on Linux or in WSL):
1 |
celery -A weekly_report beat --loglevel=INFO |
Start the worker with this command:
1 |
celery -A weekly_report worker --loglevel=INFO -P threads |
On the next Monday at 6:00 you should see this message in the beat log:
[2022-06-06 6:00:00,024: INFO/MainProcess] Scheduler: Sending due task weekly_report.make_report(‘WEEKLY_SALES’) (weekly_report.make_report)
In the worker log should be something like this:
[…] Task weekly_report.make_report[bc68ce63-*] received
[…] weekly_report.make_report[bc68ce63-*]: report created: WEEKLY_SALES
[…] Task weekly_report.make_report[bc68ce63-*] succeeded in 0.034s: None
I hope this sample gives you an idea on how to work with recuring tasks. The crontab syntax offers great flexibility to put a task into the worker queue at a specific time. But with all asynchronous tasks, there is no guarantee that the task will be processed immediately. If you stop your worker process, the task will wait in the queue.
Conclusion
Over the last weeks we explored Celery and found a tool that offers us a lot of flexibility to handle our asynchronous tasks. As so often, that flexibility comes with a long list of things we need to learn. I recommend you take the time to experiment a bit before you run it in production. That will prevent nasty surprises and helps you to find the right operation mode.