УрокОбработка большого количества данных с помощью Queue API

В жизни каждого программиста попадаются заказчики, которые просят сделать для их сайта обработку большого количества данных, будь то постоянное обновление товаров для магазина, парсинг xml файлов, отправка десятков тесяч емейлов и так далее. Смысл в том, что решить задачу "в лоб" просто невозможно - сервер может не выдержать; или закончится время выполнения скрипта и обработка данных прервётся; или же (в случае рассылки емейлов) ляжет почтовый сервер, или того хуже - почтовый сервер решит, что вы спамер, и ваши письма будут доходить до адресатов с большим опозданием (если вообще дойдут).

Так же заказчик, естественно, не захочет каждый день заходить на сайт и нажимать какие-то кнопки, чтобы сайт крутился. Поэтому основной задачей в данной ситуации является невмешательно человека в жизненный цикл сайта.

В Drupal'e панацеей от этой болезни является Queue API. В седьмом Друпале оно уже находится в ядре, а для шестой версии был сделан бэкпорт в виде модуля drupal_queue.

Смысл Queue API:

  • При запуске крона Queue API создаёт очередь из данных, которые надо обработать
  • К сайту добавляется ещё один крон, который запускается раз в несколько минут, и обрабатывает небольшую порцию из очереди

Теперь к реализации.

Первым делом положите в корень сайта файл drupal_queue_cron.php с вот таким содержимым (его также можно взять из модуля drupal_queue):

<?php
 
/**
 * @file
 * Entry point for worker calls.
 */
include_once './includes/bootstrap.inc';
drupal_bootstrap(DRUPAL_BOOTSTRAP_FULL);
if (function_exists('drupal_queue_cron_run')) {
  drupal_queue_cron_run();
}

Настройте запуск этого файла на сервере раз в несколько минут.

Далее переходим к коду. В своем модуле надо имплементировать хук hook_cron_queue_info(), который является отправной точкой для создания очереди:

/**
 * Implements hook_cron_queue_info().
 */
function ИМЯМОДУЛЯ_cron_queue_info() {
  $queue['mymodule_queue'] = array(
    'worker callback' => 'mymodule_item_process',
    'time' => 60,
  );
  return $queue;
}

Небольшое пояснение:

  • mymodule_queue - имя очереди. Просто следите за тем, чтобы оно не перекликалось с другими очередями на сайте.
  • worker callback - функция, которая будет обрабатывать каждый элемент очереди.
  • time - максимальное время выполнения крона очереди (дополнительного крона).

Далее имплементируем hook_cron(), который будет вызван при запуске выполнения регулярных процедур:

/**
 * Implements hook_cron().
 */
function ИМЯМОДУЛЯ_cron() {
 
  // Загружаем массив, каждый элемент которого 
  // будет являться элементом очереди
  $items = mymodule_load_data();
 
  if ($items) {
    // Создаём новую очередь
    $queue = drupal_queue_get('mymodule_queue');   
    $queue->createQueue();
 
    // Каждый элемент ставим в очередь, т.е.
    // по факту - идёт запись в базу данных
    foreach ($items as $item) {
      $queue->createItem($item);
    }
  }
}

Например, загружу емейлы всех пользователей сайта:

function mymodule_load_data() {
  // Сюда я буду ложить все данные
  $data = array();
  // Вытягиваю из бд имя и емейл каждого пользователя
  $users = db_query('SELECT name, mail FROM {users}');
  // Ложу всех 
  foreach ($users as $user) {
    $data[] = array(
      'mail' => $user->mail,
      'name' => $user->name,
    ); 
  }
  return $data;
}

Собственно, сама функция, которая будет обрабатывать элемент (worker callback):

function mymodule_item_process($data) {
  // Строю параметры отправки письма
  $site_name = variable_get('site_name', '');
  $params = array();
  $params['subject'] = t('Notification from !sitename', array('!sitename' => $site_name));
  $params['body'] = t('Hi, !username', array('!username' => $data['name'));
  // Отправляю письмо
  drupal_mail('mymodule', 'send_notifications', $data['mail'], language_default(), $params);
}

Ну и для порядка уже допишу имплементацию hook_mail(), которая должна отправлять мои письма:

/**
 * Implementats of hook_mail().
 */
function mymodule_mail($key, &$message, $params) {
  if ($key == 'send_notifications') {
    $message['subject'] = $params['subject'];
    $message['body'][] = $params['body'];
  }
}

Кстати, у меня при работе с Queue API у меня сразу возник вопрос - а сколько элементов очереди функция обрабатывает за раз? Ответ - столько, сколько успеет выполнить скрипт, пока не закончится время выполнения скрипта 'time' в хуке hook_cron_queue_info().

Комментарии

Аватар пользователя Anton L. Safin
Anton L. Safin написал:

Да, использовал quque для модуля импорта базы из Domino (аналог 1С), очень удобная штука. Странно, что во многих достаточно "тяжелых" модулях ее не используют.

21.11.2011 13:32
Аватар пользователя SplasH
SplasH написал:

Согласен, некоторым модулям очень не помешало бы.

21.11.2011 13:59
Аватар пользователя Гость
Гость написал:

Также можно использовать batch API, хотя это в некотором роде просто надстройка над queue API

22.11.2011 04:45
Аватар пользователя SplasH
SplasH написал:

Нет, совсем другая задача. Батч позволяет запускать визуальный пакетный обработчик, вызывать который надо вручную. А я написал, что основной задачей здесь является полное невмешательноство человека в процесс. Плюс Queue API работает быстрее, т.к. поддерживает атомарность операций.

22.11.2011 13:56
Аватар пользователя Виталий
Виталий написал:

Спасибо за пост! А в 7м Друпале нужно 2 крона настраивать?

23.11.2011 16:04
Аватар пользователя SplasH
SplasH написал:

По умолчанию там один крон, как и в 6м. Но можно сделать и второй :)

23.11.2011 19:07
Аватар пользователя kruss
kruss написал:

Здравствуйте!
Огромное спасибо за статью - очень познавательно. Вопрос: у меня в проекте на Drupal 7 есть необходимость по запуску крона отправить с момощью циклического вызова drupal_mail около сотни писем. если их 2-3 то улетают, а больше хостер рассматривает как спам. Как можно с помошью Queue API сотню писем?

14.03.2012 09:36
Аватар пользователя SplasH
SplasH написал:

Ну так поставьте ограничение на выполнение одной операции секунд 20. Вряд ли за 20 секунд у вас успеет уйти более 100 писем. И поставьте выполнение этой операции каждые минут 5, например. И всё у вас потихонечку из очереди уйдёт =)

17.03.2012 14:02
Аватар пользователя Shot
Shot написал:

Одно небольшое замечание от новичка:
в друпале 7 надо заменить drupal_queue_get на DrupalQueue::get а так отличный пример.

27.03.2012 13:19
Аватар пользователя SplasH
SplasH написал:

Да, просто статья под D6 писалась :)

12.04.2012 23:33

Комментировать

                                                                        
88888888ba ad88888ba ad88888ba ,ad8888ba, 88888888ba
88 "8b d8" "8b d8" "8b d8"' `"8b 88 "8b
88 ,8P Y8, Y8a a8P d8' 88 ,8P
88aaaaaa8P' `Y8aaaaa, "Y8aaa8P" 88 88aaaaaa8P'
88""""""8b, `"""""8b, ,d8"""8b, 88 88""""""'
88 `8b `8b d8" "8b Y8, 88
88 a8P Y8a a8P Y8a a8P Y8a. .a8P 88
88888888P" "Y88888P" "Y88888P" `"Y8888Y"' 88

Enter the code depicted in ASCII art style.