Использование Celery во Flask.Часть 2

Это продолжение статьи об использовании Celery с Flask. В этой части речь пойдет о получении статуса задачи.


В этой статье мы расширим предыдущий пример, чтобы показать фиктивную длительное задание. Пользователь может запускать одну или более из этих длительных заданий нажав на кнопку, и веб страница запущенная в вашем браузере посредством Ajax будет опрашивать сервер для обновления статуса на всех этих задач. Для каждой задачи страница покажет графический строку состояния, процент завершения, сообщение о состоянии и когда задание будет завершено, значение результата будет так же показано.


Фоновые задачи с Обновления статуса

Начнем с описания фоновой задачи, которую я использую для этого второго примера:

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

Для решения этой задачи я добавил аргумент bind=True  в декоратор Celery. Это указывает celery отправить self аргумент моей функции, которые я могу затем использовать, для записи обновления статуса.

Так как это задание на самом деле не делает ничего полезного, я решил использовать юмористические сообщения о состоянии, которые собраны из случайных глаголов, прилагательных и существительных.
Выше вы можете видеть список из бессмысленных элементов, которые я использовал для генерации этих сообщений . Нет ничего плохого в том, чтобы немного повеселиться, не так ли?

Функция
случайного выбирает из чисел от 10 до 50, так что при каждом запуске задания будут иметь различную продолжительность. Случайные статус-сообщения генерируется при первой итерации, а затем могут быть изменены при следующих итерациях с 25% шансом.

Celery получает обновления этих задач посредство вызова self.update_state(). Существует ряд встроенных в состояний, таких как STARTED, SUCCESS и так далее, но Celery позволяет настроить так же и пользовательские состояния. Здесь я использую кастомное состояние, которое я назвал PROGRESS. Имеются дополнительные метаданные в виде Python словаря, прикрепленный к этому состоянию, которые включают текущее и общее количество итераций, а так же случайно сгенерированное сообщение о статусе. Клиент может использовать эти элементы для отображения полоски прогресса. Каждая итерация ждет одну секунду, чтобы имитировать выполнение какой то работы.

 По выходу из цикла, словарь возвращается как результат функции. Этот словарь включает в себя обновленные счетчики итераций, финальные статус-сообщение и юмористический результат.

Функция
long_task() выполняется в воркером Celery. Ниже вы можете увидеть роут flask-приложения, который запускает это фоновое задание:

@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async()
    return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task.id)}

Как вы можете видеть, клиент должен выдать POST запрос к /longtask чтобы запустить одно из этих заданий. Сервер запускает задание, и сохраняет возвращаемое значение. Для ответа я использую код состояния 202, который обычно используется в REST API, для указания на то, что запрос находится в процессе. Я также добавил заголовок Location, с URL, которые клиент может использовать для получения информации о статусе. Этот URL указывает на другой роут Flask называемый taskstatus и task.id в качестве динамической составляющей.


Доступ к статусу задания из flask-приложения

Маршрут taskstatus, указаный выше, ответственен за обеспечение отчетности о статусе обновления, которую предоставляют фоновые задания. Вот реализация этого маршрута:


@app.route('/status/')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        // job did not start yet
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

Этот маршрут генерирует ответ в формате JSON, который включает в себя состояние задания и все значения, которые я задал при вызове update_state() в качестве meta аргумента, который клиент сможет использовать, чтобы построить полоску прогресса. К сожалению, эта функция так же должна проверить несколько краевых условий, так что она немного длительна. Для доступа к данным задания я воссоздать объект задания, который является экземпляром класса AsyncResult, используя id задания переданный в URL.

Первый if блок, отрабатывает когда задание еще не запущено (PENDING состояние). В этом случае нет информации о состоянии, так что я создаю некоторые данные. Следующий блок elif возвращает информацию о состоянии от фоновой задачи. Здесь информация, которую передает задание доступна как task.info. Если данные содержат ключ result- это означает, что это конечный результат и задача завершена, так что я также добавил этот результат в ответ. Блок else охватывает возможные ошибки, о которых Celery сообщит, установив состояние задания "FAILURE", и в этом случае task.info будет содержать само исключение. В качестве обработчика ошибок я установил текст, исключения как сообщение статуса.

Верьте или нет, но это все, что нужно от сервера. Остальные должно быть реализовано клиентом, которым в этом примере является веб-страница с Javascript сценарием.


Клиентский JavaScript

Хоть написание JS скриптов не находится в центре внимания этой статьи, все же стоит сказать об этом пару слов.
Для визуализации полоски процесса я использую nanobar.js, которые я добавил из CDN. Я также включил JQuery, что
значительно упрощает Ajax вызовы:

 src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js">
 src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js">

Кнопка, которая запускает фоновое задание, подключена к следующему обработчику:


    function start_long_task() {
        // add task status elements 
        div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>');
        $('#progress').append(div);

        // create a progress bar
        var nanobar = new Nanobar({
            bg: '#44f',
            target: div[0].childNodes[0]
        });

        // send ajax POST request to start background job
        $.ajax({
            type: 'POST',
            url: '/longtask',
            success: function(data, status, request) {
                status_url = request.getResponseHeader('Location');
                update_progress(status_url, nanobar, div[0]);
            },
            error: function() {
                alert('Unexpected error');
            }
        });
    }

Эта функция начинается с добавления нескольких HTML элементов, которые будут использоваться для отображения полоски прогресса новой фоновой задачи и ее статуса. Это делается динамически, так как пользователь может добавить любое количество заданий, и каждое из этих заданий должно получить свой собственный HTML код.

Чтобы помочь вам лучше в этом разобраться, представляю структуру добавляемых элементов для задания, с комментариями об использовании каждой 
div:

<div class="progress">
    <div></div>         <-- Progress bar
    <div>0%</div>       <-- Percentage
    <div>...</div>      <-- Status message
    <div>&nbsp;</div>   <-- Result
</div>
<hr>

Далее функция start_long_task()получает экземпляр полоски прогресса в соответствии с документацией nanobar, и, наконец, посылает AJAX POST запрос к /longtask для инициализации на сервере фонового задания Celery.

Когда
AJAX POST вызов возвращается, callback функция получает значение заголовка Location, которой, как вы видели в предыдущем разделе, используется в клиенте для обновления статуса. Затем он вызывает другую функцию update_progress() с URL этого статуса, объекта полоски прогресса и корневого div элемента, созданного для этого задания. Ниже вы можете увидеть эту функцию, которая посылает статус-запрос, а затем обновляет UI элементы, посредствам информации, возвращаемой ею:

    function update_progress(status_url, nanobar, status_div) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            // update UI
            percent = parseInt(data['current'] * 100 / data['total']);
            nanobar.go(percent);
            $(status_div.childNodes[1]).text(percent + '%');
            $(status_div.childNodes[2]).text(data['status']);
            if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
                if ('result' in data) {
                    // show result
                    $(status_div.childNodes[3]).text('Result: ' + data['result']);
                }
                else {
                    // something unexpected happened
                    $(status_div.childNodes[3]).text('Result: ' + data['state']);
                }
            }
            else {
                // rerun in 2 seconds
                setTimeout(function() {
                    update_progress(status_url, nanobar, status_div);
                }, 2000);
            }
        });
    }

Эта функция посылает GET запрос к URL статуса, и когда получает ответ, обновляет различные HTML элементы этого задания. Если фоновое задание завершено, и доступен его результат - он так же добаляется на странице. Если результата не был получен - это значит, что задание завершилось с ошибкой, поэтому состояние задания, которым будет FAILURE, будет показано в качестве результата.

Пока сервер выполняет задание, мне нужно продолжать изменять состояние задачи и обновление
UI. Для этого я поставил таймер на вызов этой функции еще раз через 2 секунды. Это будет продолжаться до тех пор, пока задание не будет выполнено.

По дефолту celery воркер запускает одновременные задачи с соответствии с количеством процессоров на сервере, так что, когда вы повторите этот пример, убедитесь, что вы запускаете большое количество задач. Это делается для того, чтобы увидеть, как  Celery держит задания в статусе PENDING, пока воркер не сможет приняться за него.


Запуск примеров

Для вашего удобства автор статьи поделился этим и предидущем примером в своем Github репозитории. Чтобы попробовать эти примеры клонируйте репозиторий и создайте виртуальную среду:

$ git clone https://github.com/miguelgrinberg/flask-celery-example.git
$ cd flask-celery-example
$ virtualenv venv
$ source venv/bin/activate
(venv) $ pip install -r requirements.txt

Обратите внимание, что файл requirements.txt включеный в этот репозиторий содержит Flask, Flask-Mail, Celery и клиент Redis, вместе со всеми их зависимостями.

Теперь вам нужно запустить три процесса, необходимых для этого приложения, так что лучше всего открыть три окна терминала. На первом терминале запустить Redis. Вы можете просто установить Redis в соответствии с
download instructions вашей операционной системы. Если же у вас машина с Linux или OS X, я включил небольшой скрипт, который загружает, и компилирует Redis в качестве частного сервера:

$ ./run-redis.sh

Обратите внимание, что для выполнения этого скрипта, вы должны иметь установленный gcc.

На втором терминале запустите
Celery воркер. Это делается с помощью команды celery, которая уже установлена в вашей виртуальной среде. Поскольку в репозиторий включен так же пример из первой части ститьи, используемый вами Gmail аккаунт должен быть подготовлен. (прим. преводчика: о тонкостях настройки Gmail для работы с непроверенными приложениями можите прочитать в здесь)

Наконец, в третьем окне терминала из виртуальной среды запустите flask-приложение:

$ source venv/bin/activate
(venv) $ python app.py

Теперь вы можете перейти по  http://localhost:5000/ в вашем браузере и опробовать примеры!


Вывод

К сожалению, при работе с Celery, вы должны сделать немного больше чем просто отправить задание в фоновой поток, но преимущества в гибкости и масштабируемости трудно игнорировать. В этой статье я попытался выйти за рамки примера "Давайте запустим фоновое задание" и дать вам более полную и реалистичную картину, о том что использовние Celery может повлечь за собой. Я искренне надеюсь, я не напугал вас слишком большим количеством информации!


Данный пост базируется на статье за авторством Miguel Grinberg. Первую часть статьи можно найти тут.