УрокОбработка большого количества данных с помощью Queue API

В жизни каждого программиста попадаются заказчики, которые просят сделать для их сайта обработку большого количества данных, будь то постоянное обновление товаров для магазина, парсинг xml файлов, отправка десятков тесяч емейлов и так далее. Смысл в том, что решить задачу "в лоб" просто невозможно - сервер может не выдержать; или закончится время выполнения скрипта и обработка данных прервётся; или же (в случае рассылки емейлов) ляжет почтовый сервер, или того хуже - почтовый сервер решит, что вы спамер, и ваши письма будут доходить до адресатов с большим опозданием (если вообще дойдут).

Так же заказчик, естественно, не захочет каждый день заходить на сайт и нажимать какие-то кнопки, чтобы сайт крутился. Поэтому основной задачей в данной ситуации является невмешательно человека в жизненный цикл сайта.

В Drupal'e панацеей от этой болезни является Queue API. В седьмом Друпале оно уже находится в ядре, а для шестой версии был сделан бэкпорт в виде модуля drupal_queue.

Смысл Queue API:

  • При запуске крона Queue API создаёт очередь из данных, которые надо обработать
  • К сайту добавляется ещё один крон, который запускается раз в несколько минут, и обрабатывает небольшую порцию из очереди

Теперь к реализации.

Первым делом положите в корень сайта файл drupal_queue_cron.php с вот таким содержимым (его также можно взять из модуля drupal_queue):

<?php
 
/**
 * @file
 * Entry point for worker calls.
 */
include_once './includes/bootstrap.inc';
drupal_bootstrap(DRUPAL_BOOTSTRAP_FULL);
if (function_exists('drupal_queue_cron_run')) {
  drupal_queue_cron_run();
}

Настройте запуск этого файла на сервере раз в несколько минут.

Далее переходим к коду. В своем модуле надо имплементировать хук hook_cron_queue_info(), который является отправной точкой для создания очереди:

/**
 * Implements hook_cron_queue_info().
 */
function ИМЯМОДУЛЯ_cron_queue_info() {
  $queue['mymodule_queue'] = array(
    'worker callback' => 'mymodule_item_process',
    'time' => 60,
  );
  return $queue;
}

Небольшое пояснение:

  • mymodule_queue - имя очереди. Просто следите за тем, чтобы оно не перекликалось с другими очередями на сайте.
  • worker callback - функция, которая будет обрабатывать каждый элемент очереди.
  • time - максимальное время выполнения крона очереди (дополнительного крона).

Далее имплементируем hook_cron(), который будет вызван при запуске выполнения регулярных процедур:

/**
 * Implements hook_cron().
 */
function ИМЯМОДУЛЯ_cron() {
 
  // Загружаем массив, каждый элемент которого 
  // будет являться элементом очереди
  $items = mymodule_load_data();
 
  if ($items) {
    // Создаём новую очередь
    $queue = drupal_queue_get('mymodule_queue');   
    $queue->createQueue();
 
    // Каждый элемент ставим в очередь, т.е.
    // по факту - идёт запись в базу данных
    foreach ($items as $item) {
      $queue->createItem($item);
    }
  }
}

Например, загружу емейлы всех пользователей сайта:

function mymodule_load_data() {
  // Сюда я буду ложить все данные
  $data = array();
  // Вытягиваю из бд имя и емейл каждого пользователя
  $users = db_query('SELECT name, mail FROM {users}');
  // Ложу всех 
  foreach ($users as $user) {
    $data[] = array(
      'mail' => $user->mail,
      'name' => $user->name,
    ); 
  }
  return $data;
}

Собственно, сама функция, которая будет обрабатывать элемент (worker callback):

function mymodule_item_process($data) {
  // Строю параметры отправки письма
  $site_name = variable_get('site_name', '');
  $params = array();
  $params['subject'] = t('Notification from !sitename', array('!sitename' => $site_name));
  $params['body'] = t('Hi, !username', array('!username' => $data['name'));
  // Отправляю письмо
  drupal_mail('mymodule', 'send_notifications', $data['mail'], language_default(), $params);
}

Ну и для порядка уже допишу имплементацию hook_mail(), которая должна отправлять мои письма:

/**
 * Implementats of hook_mail().
 */
function mymodule_mail($key, &$message, $params) {
  if ($key == 'send_notifications') {
    $message['subject'] = $params['subject'];
    $message['body'][] = $params['body'];
  }
}

Кстати, у меня при работе с Queue API у меня сразу возник вопрос - а сколько элементов очереди функция обрабатывает за раз? Ответ - столько, сколько успеет выполнить скрипт, пока не закончится время выполнения скрипта 'time' в хуке hook_cron_queue_info().

Комментарии

Аватар пользователя Anton L. Safin
Anton L. Safin написал:

Да, использовал quque для модуля импорта базы из Domino (аналог 1С), очень удобная штука. Странно, что во многих достаточно "тяжелых" модулях ее не используют.

21.11.2011 13:32
Аватар пользователя Spleshka
Spleshka написал:

Согласен, некоторым модулям очень не помешало бы.

21.11.2011 13:59
Аватар пользователя Гость
Гость написал:

Также можно использовать batch API, хотя это в некотором роде просто надстройка над queue API

22.11.2011 04:45
Аватар пользователя Spleshka
Spleshka написал:

Нет, совсем другая задача. Батч позволяет запускать визуальный пакетный обработчик, вызывать который надо вручную. А я написал, что основной задачей здесь является полное невмешательноство человека в процесс. Плюс Queue API работает быстрее, т.к. поддерживает атомарность операций.

22.11.2011 13:56
Аватар пользователя Виталий
Виталий написал:

Спасибо за пост! А в 7м Друпале нужно 2 крона настраивать?

23.11.2011 16:04
Аватар пользователя Spleshka
Spleshka написал:

По умолчанию там один крон, как и в 6м. Но можно сделать и второй :)

23.11.2011 19:07
Аватар пользователя xandeadx
xandeadx написал:

каким образом второй сделать, именно для queue? но чтоб drupal-way

28.01.2013 01:27
Аватар пользователя Spleshka
Spleshka написал:

Рядом с файлом cron.php ложишь какой-нибудь cron-queue.php, в который полностью копируешь содержимое файла cron.php. Далее функцию drupal_cron_run() заменяешь на свой обработчик очереди. На сервере настраиваешь дёргать файл cron-queue.php с таким же ключём, как и крон, каждые минут 5.

Это не совсем drupal-way (дефолтные очереди и так обрабатываются по крону), однако так ты делаешь свою очередь полностью управляемой.

28.01.2013 18:30
Аватар пользователя kruss
kruss написал:

Здравствуйте!
Огромное спасибо за статью - очень познавательно. Вопрос: у меня в проекте на Drupal 7 есть необходимость по запуску крона отправить с момощью циклического вызова drupal_mail около сотни писем. если их 2-3 то улетают, а больше хостер рассматривает как спам. Как можно с помошью Queue API сотню писем?

14.03.2012 09:36
Аватар пользователя Spleshka
Spleshka написал:

Ну так поставьте ограничение на выполнение одной операции секунд 20. Вряд ли за 20 секунд у вас успеет уйти более 100 писем. И поставьте выполнение этой операции каждые минут 5, например. И всё у вас потихонечку из очереди уйдёт =)

17.03.2012 14:02
Аватар пользователя Shot
Shot написал:

Одно небольшое замечание от новичка:
в друпале 7 надо заменить drupal_queue_get на DrupalQueue::get а так отличный пример.

27.03.2012 13:19
Аватар пользователя Spleshka
Spleshka написал:

Да, просто статья под D6 писалась :)

12.04.2012 23:33
Аватар пользователя Pacemaker
Pacemaker написал:

Подскажите пожалуйста, как настроить второй крон в 7-ке. Так же как здесь описано или будет отличаться?

31.05.2012 12:23
Аватар пользователя Spleshka
Spleshka написал:

В 7ке всё несколько и сложнее и проще одновременно :) Для 7ки не надо создавать отдельный файл drupal_queue_cron.php - очередь запускается вместе с обычным кроном (это и хорошо и плохо, зависит от задачи).

31.05.2012 17:49
Аватар пользователя Pacemaker
Pacemaker написал:

По-моему, так менее гибко...
Большое спасибо за статью, пишите ещё!

04.06.2012 10:57
Аватар пользователя oOLokiOo
oOLokiOo написал:

Возможно подскажете что-то вот тут?
http://www.drupal.ru/node/87987

Заранее спасибо!

20.09.2012 14:52
Аватар пользователя Ромка
Ромка написал:

Не понимаю зачем нужен второй крон? Все прекрасно работает и с одним:

  • на hook_cron при необходимости заполняем очередь,
  • на hook_cron_queue_info задаем функцию-обработчик очереди и максимальное время ее работы.

Рекомендую посмотреть модуль http://drupal.org/project/piwik_stats для примера.

30.10.2012 10:51
Аватар пользователя Spleshka
Spleshka написал:

Например, модуль рассылок очереди не решат по нескольким причинам:
1. Мне не хватает частоты запуска крона. Запускать крон чаще раз в час - нет смысла. А отправить за раз больше ~100 писем проблематично. Выходит, в сутки я смогу отправить только 24 * 100 = 2400 писем. А у меня 200 тысяч пользователей. Как в этом случае быть?
2. Меня не устраивает задание время выполнения функции. Я хочу четко знать, что каждые 5 минут у меня сервер отсылает 100 писем, а не "сколько успеет". Я хочу планировать рассылку и быть уверенным в её точном выполнении, а не в приблизительном.
3. По сути, это не "второй крон" в понимании Друпала. А просто скрипт, который выполняет заданную мною функцию каждые N минут.

30.10.2012 21:46
Аватар пользователя Евгений Свободный
Евгений Свободный написал:

Можно вопрос?
А как мне один раз очередь прокрутить, чтобы не пошли повторно по крону выполнять процедуры.
К примеру, я хочу к товару просчитать цену со скидкой. Как сделать так, чтобы очередь один раз прошла и больше не выполнялось.

10.09.2015 01:23

Комментировать