Использование Celery во Flask

Вопрос запуска фоновых задач является довольно комплексным, и поэтому существует много путаницы вокруг него. Отличным инструментом, по мнению многих специалистов является Celery. Существует немало статей по настройке Celery в взаимодействии с Django, поэтому в моем сегодняшнем посте я расскажу вам как использовать Celery в фреймворке Flask.


Что такое Celery?

Celery является распределенной очередью заданий. Он предназначен для выполнения задач вне контекста приложения. В общем идея заключается в том что любая задача требующая ресурсов которую вашему приложению необходимо выполнить может быть переложена на очередь заданий, освобождая ваше приложение непосредственно для ответов на запросы пользователей.

Запуск фоновых задач через celery не так тривиально, как это делается через потоки. Но вместе с этим появляется и много преимуществ, т.к . Celery имеет распределенную архитектуру, которая даст вашему приложению хороший уровень масштабируемости.


Установка Celery заключается в трех основных этапах:

  1. Клиент Celery. Он используется для выдачи фоновых заданий. При работе с Flask клиент запускается одновременно с приложением.
  2. Воркеры Celery. Это процессы, которые выполняют фоновые задания. Celery поддерживает локальные и удаленные воркеры, так что вы можете начать с одного воркера, работающей на той же машине, что и сервер flask, а затем вы можете добавлять другие воркеры, по мере того как ваше приложение будет расти.
  3. Брокер сообщений. Клиент общается с воркерами через очереди сообщений и celery поддерживает несколько способов реализации этих очередей. Чаще всего используются брокеры RabbitMQ и Redis.


Работа с Flask и Celery

Интеграция celery с flask давольно проста и не требует никаких дополнительных расширений. Flask-приложение, который использует celery требует инициализации клиента celery следующим образом:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

Как вы можете видеть, celery инициализируется путем создания объекта класса Celery и передавая этому объекту имя приложения и URL соединения для брокера сообщений, который я поместил в app.config как CELERY_BROKER_URL. Этот URL сообщает celery, где работает служба  брокера. Если вы запустите какой то другой брокер, либо запускаете брокер на другой машине - вам будет нужно соответственно изменить и URL.

Любые дополнительные параметры конфигурации для Celery может быть переданы непосредственно из конфигурации flask через вызов celery.conf.update(). Опция CELERY_RESULT_BACKEND необходима в случае если вам нужно получать Celery store status  и результаты из задач. В примере, который я покажу, такая функциональность не требуется, но в следующем посте о celery и flask она будет необходима.


Любые функции, которые вы хотите запустить как фоновых задач должны быть оформлены с помощью декоратора celery.task. Например:

@celery.task
def my_background_task(arg1, arg2):
    # какое то задание 
    return result

Затем  flask-приложение может запросить выполнение этой задачи в фоне следующим образом:


task = my_background_task.delay(10, 20)

Метод delay() ярлык для вызова более мощной apply_async(). Вот эквивалент вызова с помощью apply_async():

task = my_background_task.apply_async(args=[10, 20])

При использовании apply_async(), вы можете дать celery более подробные инструкции о том, как должна быть выполнена фоновая задача . Это полезная опция, для выполнения задач в какой-то другой момент в будущем. Например, это обращение будет планировать выполнение задачи примерно через минуту:

task = my_background_task.apply_async(args=[10, 20], countdown=60)

Возвращаемое значение delay() и apply_async() это объект, который представляет собой задачу, и этот объект может быть использован для получения статуса. Я покажу вам, как это делается в следующей статье, а сейчас давайте начнем с простого и не будем беспокоится о результатах задач.


Чтобы узнать о многих других доступных опций вы можете изучить документацию celery.



Пример. Асинхронное отправление писем

В этом примере я рассмотрю так часто необходимую для приложения возможность отправки писем без блокирования основного приложения.

Для этого примера я буду использовать расширение Flask-Mail о котором я писал в прошлых статьях . Так что будем считать, что вы уже знакомы с этим расширением

Приложение, которое я собираюсь использовать, чтобы проиллюстрировать тему, представляет собой простую веб-форму с одним текстовым полем. Пользователь должен ввести в это поле адрес электронной почты, и по подтверждению, сервер пришлет
на этот адрес тестовое письмо. Форма включает в себя две кнопки подтверждения, первая отправляет письмо сразу же, а вторая отправляет письмо через одну минуту.

Шаблон HTML, этого примера:

<html>
  <head>
    <title>Flask + Celery Examples</title>
  </head>
  <body>
    <h1>Flask + Celery Examples</h1>
    <h2>Example 1: Send Asynchronous Email</h2>
    {% for message in get_flashed_messages() %}
    <p style="color: red;">{{ message }}</p>
    {% endfor %}
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
  </body>
</html>

Ничего необычного, просто очередная HTML форма, плюс возможность отображение flashed_messages.

Расширение Flask-Mail требует некоторых настроек, в частности, подробности о сервере электронной почты, которая будет использоваться при отправке писем. Для простоты, я использую мою учетную запись Gmail, как сервер электронной почты:


# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = 'username'
app.config['MAIL_PASSWORD'] = 'password'
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'


Вот route, для данного примера:

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # отправка почты
    msg = Message('Hello from Flask',
                  recipients=[request.form['email']])
    msg.body = 'This is a test email sent from a background Celery task.'
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(msg)
        flash('Sending email to {0}'.format(email))
    else:
        # отправка почты через минуту
        send_async_email.apply_async(args=[msg], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))

Опять же, все довольно стандартно для flask-приложения. Так как это очень простая форма, я решил не прибегать к использованию расширений, поэтому я использую request.method и request.form. Я сохраняю значение, которое пользователь вводит в текстовом поле в session, таким образом я могу запомнить его после перезагрузки страницы.

Вот что интересно в этом route. Отправка электронной почты обрабатывается с помощью Celery task, называемой send_async_email, которая вызывается либо через delay() или apply_async().

Последней частью этого приложения является асинхронная задача, которая выполняет работу:

@celery.task
def send_async_email(msg):
    """Фоновое задание по отправки email с помощью Flask-Mail."""
    with app.app_context():
        mail.send(msg)

Эта задача декорерированна celery.task, для того чтобы сделать её фоновым задание. Единственным о чем стоит упомянуть в этой функции это то, что Flask-Mail требует для запуска контекст приложения, так что должна быть создана до того, как метод send() может быть вызван.

Важно отметить, что в этом примере возвращаемое значение из асинхронного вызова не сохраняется, так что приложение никогда не узнает, был ли вызов успешен или нет.

Приведенный выше пример давольно простой. Запускается фоновое задание, а затем приложение забывает об этом. Большинство Celery-учебников по веб-разработки оканчивается прямо здесь, но для многих приложений необходимо контролировать свои фоновые задачи и получать результаты от них. В продолжении этой статьи, я слегка изменю это приложение, чтобы показать фиктивную длительную задачу


Данный пост базируется на статье за авторством Miguel Grinberg. Вторую часть статьи можно найти тут.