Oracle Advanced Queuing Notification

Oracle Advanced Queuing Notification представляют собой механизм уведомлений, который срабатывает при получении сообщений. В качестве получателя уведомления может использоваться E-Mail, HTTP POST, OCI callback function, Java Messaging Service (JMS) или вызов PL/SQL процедуры. Одним из основных применений уведомлений является немедленная обработка сообщения при его поступлении в заданную очередь. В этом случае выполняется извлечение (Dequeuing) сообщения из очереди при помощи PL/SQL процедуры или JMS.

Другим применением может быть отправка сообщения на заданный электронный адрес.

В этой статье будет показан вызов PL/SQL функции при получении сообщения.

PL/SQL Notification

Мы напишем PL/SQL  процедуру, которая будет вызываться при приеме сообщения, извлекать сообщение из очереди и делать запись в отладочной таблице.  Для этого используем таблицу и функцию для формирования отладочной записи из вот этой статьи. Callback PL/SQL процедура для извлечения сообщения и регистрации факта приема сообщения будет следующей:

CREATE OR REPLACE PROCEDURE Queue1Callback(CONTEXT RAW,
                                           reginfo sys.aq$_reg_info,
                                           descr sys.aq$_descriptor,
                                           payload RAW,
                                           payloadl NUMBER)
IS
  deq_options     DBMS_AQ.Dequeue_options_t;
  msg_properties  DBMS_AQ.message_properties_t;
  msg_handle      RAW(16);
  msg             msg_type;
BEGIN
   deq_options.msgid         := descr.msg_id;
   deq_options.consumer_name := descr.consumer_name;
   DBMS_AQ.DEQUEUE(
      queue_name          =>     descr.queue_name,
      dequeue_options     =>     deq_options,
      message_properties  =>     msg_properties,
      payload             =>     msg,
      msgid               =>     msg_handle);
   COMMIT;
   DBG('ID: '|| msg.id||', Subj: '||msg.subj||', Text: '||msg.text);
END;

Параметры этой PL/SQL процедуры являются предопределенными и не могут быть изменены. Как видно из кода, идентификатор сообщения, название очереди и имя подписчика (subscriber или по другому consumer) передаются в структуре записи descr с типом sys.aq$_descriptor. После извлечения  сообщения из очереди выполняется запись атрибутов сообщения в отладочную таблицу CORE_DEBUG.Для того, чтобы эта процедура вызывалась автоматически при поступлении сообщения в очередь, необходимо зарегистрировать соответствующее уведомление при помощи dbms_aq.register

BEGIN
  dbms_aq.register(sys.aq$_reg_info_list(
        sys.aq$_reg_info('aq_test.Queue1:Subscriber_1',
                          DBMS_AQ.NAMESPACE_AQ,
                         'plsql://aq_test.Queue1Callback',
                          HEXTORAW('FF')) ) ,
      1);
END;

В dbms_aq.register передается список параметров регистрации, следовательно, можно за один раз зарегистрировать несколько уведомлений. Регистрационная информация включает в себя: Название подписчика, задается в виде: 

schema_name.queue_name:subscriber_name

Пространство имен подписки. Для приема уведомлений из Oracle Streams должно быть DBMS_AQ.NAMESPACE_AQ. Для приема уведомлений из сторонних приложений через DBMS_AQ.POST или OCISubscriptionPost()должно быть DBMS_AQ.NAMESPACE_ANONYMOUS.Callback. Здесь задается действие, которое будет выполняться при обработке уведомления. Возможные варианты:Для E-Mail:

mailto://abc@site.ru

HTTP Post:

http://www.site.ru:8080

PL/SQL Callback:

plsql://schema_name.procedure_name

После успешной регистрации уведомления отправим тестовое сообщение в очередь:

DECLARE
   enqueue_options DBMS_AQ.enqueue_options_t;
   msg_properties  DBMS_AQ.message_properties_t;
   msg_handle      RAW(16);
   msg             msg_type;
BEGIN
   msg := msg_type(2, 'Subject 2', 'Message to Queue1');
   DBMS_AQ.ENQUEUE(
      queue_name              => 'queue1',
      enqueue_options         => enqueue_options,
      message_properties      => msg_properties,
      payload                 => msg,
      msgid                   => msg_handle);
   COMMIT;
END;
/

Затем проверим, действительно ли произошел автоматический прием сообщения и его обработка.

SELECT *
  FROM core_debug

Да, как мы видим, сообщение действительно принято и обработано.

Удаление уведомлений

Для удаления уведомлений требуется сделать вызов процедуры unregister с теми же параметрами, что и register. Например:

BEGIN
  dbms_aq.unregister(sys.aq$_reg_info_list(
        sys.aq$_reg_info('aq_test.Queue1:Subscriber_1',
                          DBMS_AQ.NAMESPACE_AQ,
                         'plsql://aq_test.Queue1Callback',
                          HEXTORAW('FF')) ) ,
      1);
END;
Ссылка на основную публикацию