Oracle AQ队列 管理应用程序流程的关键工具(oracle aq 队列)

Oracle AQ队列: 管理应用程序流程的关键工具

Oracle Advanced Queueing(AQ)是Oracle数据库提供的一种基于消息传递机制的队列服务,它支持异步消息处理,可以帮助应用程序处理大量的事务性数据和并发请求,并提供数据传递的可靠性和可恢复性。本文将介绍Oracle AQ队列的基本概念和使用方法,并通过示例代码演示如何在应用程序中使用Oracle AQ队列。

一、Oracle AQ队列的基本概念

Oracle AQ队列是一个在Oracle数据库中创建的对象,它由3部分组成:队列表、队列、和消息。队列表是支持AQ队列的Oracle表,它包含队列的定义和属性以及队列中消息的存储。队列是在队列表上创建的一个或多个逻辑队列,每个队列有唯一的名称和属性。消息是放置在队列中的传递对象,它由消息体和属性组成。队列中的消息可以由生产者发送并由消费者接收和处理。

Oracle AQ队列提供了以下基本功能:

(1) 消息的持久性:可以将消息保存到数据库中以实现数据的可靠性和持久性。

(2) 异步消息传递:可以在不等待响应的情况下发送和接收消息。

(3) 事务处理:可以将消息的发送和接收与数据库事务进行绑定,以确保数据的一致性和可靠性。

(4) 消息优先级:可以为消息设置不同的优先级,以确保高优先级消息被及时处理。

二、Oracle AQ队列的使用方法

创建队列表

在Oracle数据库中创建队列表的方法如下:

CREATE TABLE queue_table_name

(queue_column_name queue_type, …)

LOB(queue_column_name) STORE AS

(queue_type LOB_type)

TABLESPACE tablespace_name;

其中,queue_table_name是队列表的名称,queue_column_name是队列表中的列名,queue_type是Oracle AQ预定义的数据类型(如sys.aq$_jms_text_message、sys.aq$_jms_map_message等),LOB_type是大对象的类型(如CLOB、BLOB等),tablespace_name是队列表所在的表空间名称。

创建队列

在Oracle数据库中创建队列的方法如下:

BEGIN

DBMS_AQADM.CREATE_QUEUE(queue_name => ‘queue_name’,

queue_table => ‘queue_table_name’,

queue_type => sys.aq$_jms_text_message);

END;

其中,queue_name是队列的名称,queue_table_name是队列表的名称,sys.aq$_jms_text_message是队列中消息的类型。

发送消息

在Oracle数据库中发送消息的方法如下:

DECLARE

msg sys.aq$_jms_text_message;

propid NUMBER;

qn sys.aq$_agent;

DEQUEUE_OPTIONS sys.aq$_dequeue_options;

BEGIN

broker_options.msgproperties :=

sys.aq$_jms_header_property(‘PRIORITY’, ‘1’);

msg := sys.aq$_jms_text_message.init(‘Hello AQ!’);

propid := broker_options.msgproperties.add_property(‘id’, ‘1’);

qn := sys.aq$_agent.init(‘queue_owner.queue_name’,

NULL,

‘queue_owner’);

DBMS_AQ.ENQUEUE(queue_name => ‘queue_owner.queue_name’,

enqueue_options => NULL,

message_properties => broker_options.msgproperties,

payload => msg,

msgid => propid,

msg_correlation => NULL,

transform => NULL,

queue_agent => qn);

END;

其中,queue_owner是队列的所有者,queue_name是队列的名称,broker_options.msgproperties是消息的属性,msg是消息的内容。

接收消息

在Oracle数据库中接收消息的方法如下:

DECLARE

msg sys.aq$_jms_text_message;

propid NUMBER;

qn sys.aq$_agent;

DEQUEUE_OPTIONS sys.aq$_dequeue_options;

BEGIN

qn := sys.aq$_agent.init(NULL,

‘queue_owner.queue_name’,

‘queue_owner’);

DEQUEUE_OPTIONS.CONSUMER_NAME := ‘consumer_name’;

DEQUEUE_OPTIONS.NAV_MODE := DBMS_AQ.FIRST_MESSAGE;

DBMS_AQ.DEQUEUE(queue_name => ‘queue_owner.queue_name’,

dequeue_options => DEQUEUE_OPTIONS,

message_properties => NULL,

payload => msg,

msgid => propid,

msg_correlation => NULL,

transform => NULL,

queue_agent => qn);

END;

其中,queue_owner是队列的所有者,queue_name是队列的名称,consumer_name是消费者的名称,msg是接收到的消息的内容。

三、示例代码

下面是一个简单的示例,在这个示例中,我们将创建一个队列和2个消费者,每个消费者都会向队列中发送5条消息,每条消息包含一个递增的整数值。第三个消费者会从队列中接收并打印所有的消息。

1. 创建队列

BEGIN

DBMS_AQADM.CREATE_QUEUE(queue_name => ‘test_queue’,

queue_table => ‘test_queue_table’,

queue_type => sys.aq$_jms_text_message);

DBMS_AQADM.START_QUEUE(queue_name => ‘test_queue’);

END;

2. 创建消费者

CREATE OR REPLACE PROCEDURE consumer1_proc AS

queue_handle SYS.AQ$_QUEUE;

message_handle SYS.AQ$_AGENT;

enq_options DBMS_AQ.ENQUEUE_OPTIONS_T;

propid NUMBER;

msg sys.aq$_jms_text_message;

BEGIN

message_handle := SYS.AQ$_AGENT(‘test_queue_owner’, ‘test_queue_name’, NULL);

queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);

FOR i IN 1..5 LOOP

msg := sys.aq$_jms_text_message.init(‘Message ‘ || TO_CHAR(i) || ‘ from consumer 1!’);

propid := enq_options.msgproperties.add_property(‘id’, i);

DBMS_AQ.ENQUEUE(

queue_name => queue_handle,

enqueue_options => enq_options,

message_properties => enq_options.msgproperties,

payload => msg,

msgid => propid);

END LOOP;

END;

CREATE OR REPLACE PROCEDURE consumer2_proc AS

queue_handle SYS.AQ$_QUEUE;

message_handle SYS.AQ$_AGENT;

enq_options DBMS_AQ.ENQUEUE_OPTIONS_T;

propid NUMBER;

msg sys.aq$_jms_text_message;

BEGIN

message_handle := SYS.AQ$_AGENT(‘test_queue_owner’, ‘test_queue_name’, NULL);

queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);

FOR i IN 1..5 LOOP

msg := sys.aq$_jms_text_message.init(‘Message ‘ || TO_CHAR(i) || ‘ from consumer 2!’);

propid := enq_options.msgproperties.add_property(‘id’, i);

DBMS_AQ.ENQUEUE(

queue_name => queue_handle,

enqueue_options => enq_options,

message_properties => enq_options.msgproperties,

payload => msg,

msgid => propid);

END LOOP;

END;

3. 接收消息

CREATE OR REPLACE PROCEDURE consumer3_proc AS

queue_handle SYS.AQ$_QUEUE;

message_handle SYS.AQ$_AGENT;

deq_options DBMS_AQ.DEQUEUE_OPTIONS_T;

buf sys.DBMS_SQL.VARCHAR2A;

n INTEGER;

msg sys.aq$_jms_text_message;

BEGIN

message_handle := SYS.AQ$_AGENT(NULL, ‘test_queue_owner’, ‘test_queue_name’);

queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);

deq_options.consumer_name := ‘consumer3’;

deq_options.navigation_mode := DBMS_AQ.FIRST_MESSAGE;

deq_options.visibility := DBMS_AQ.IMMEDIATE;

n := 0;

LOOP

DBMS_AQ.DEQUEUE(

queue_name => queue_handle,

dequeue_options => deq_options,

message_properties => NULL,

payload => msg,

msgid => buf(n),

msg_correlation => NULL);

n := n+1;

DBMS_OUTPUT.PUT_LINE(msg.text || ‘ received by consumer 3’);

DEQ_OPTIONS.MSGID := buf(n);

EXCEPTION

WHEN NO_DATA_FOUND THEN

IF n = 0 THEN

DBMS_OUTPUT.PUT_LINE(‘Queue is empty…’);

ELSE

DBMS_OUTPUT.PUT_LINE(‘All messages received!’);

END IF;

EXIT;

END;

END LOOP;

END;

执行consumer1_proc、consumer2_proc、consumer3_proc后,可以看到如下输出:

Message 1 from consumer 1! received by consumer 3

Message 1 from consumer 2! received by consumer 3

Message 2 from consumer 1! received by consumer 3

Message 2 from consumer 2! received by consumer 3

Message 3 from consumer 1! received by consumer 3

Message 3 from consumer 2! received by consumer 3

Message 4 from consumer 1! received by consumer 3

Message 4 from consumer 2! received by consumer 3

Message 5 from consumer 1! received by consumer 3

Message 5 from consumer


数据运维技术 » Oracle AQ队列 管理应用程序流程的关键工具(oracle aq 队列)