搜索 | 会员  
Java实时获取oracle变更
来源: http://www.javaeye.com/topic/267893   作者:不详  日期:2009/5/31  类别:编程语言  主题:PYTHON  编辑:德仔
在一个基于数据库的实时系统&里面,实时&获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实

 

在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。
背景:
       要做一个车辆GPS监控系统,主要分两块:
    1.采集。由GPS厂商提供实时数据,通过UDP包接收
    2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
备选方案:
     1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
     2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。
方案评估:
    方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。
    方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。

最终方案:
 
    最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。
    关于捕获进程,请参考《Streams概述》,《Streams捕获进程》

实现:

Sql代码
Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited;  
 
--修改目标表(要捕获变更的表)追加日志  
ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS;  
 
 
create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs;  
 
 
grant connect, resource, select_catalog_role to strmadmin;
授予相应权限
Sql代码
grant execute on dbms_aqadm to strmadmin;  
 
grant execute on dbms_capture_adm to strmadmin;  
 
grant execute on dbms_propagation_adm to strmadmin;  
 
grant execute on dbms_streams_adm to strmadmin;  
 
grant execute on dbms_apply_adm to strmadmin;  
 
grant execute on dbms_flashback to strmadmin;  
 
grant execute on dbms_aq to strmadmin;  
 
grant execute on dbms_aqjms to strmadmin;  
 
grant execute on dbms_aqin to strmadmin;  
 
grant execute on dbms_aqjms_internal to strmadmin;  
  
grant execute on dbms_aqadm to strmadmin;
grant execute on dbms_capture_adm to strmadmin;
grant execute on dbms_propagation_adm to strmadmin;
grant execute on dbms_streams_adm to strmadmin;
grant execute on dbms_apply_adm to strmadmin;
grant execute on dbms_flashback to strmadmin;
grant execute on dbms_aq to strmadmin;
grant execute on dbms_aqjms to strmadmin;
grant execute on dbms_aqin to strmadmin;
grant execute on dbms_aqjms_internal to strmadmin;
 

执行系统存储过程分配权限

   
Sql代码
BEGIN 
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(  
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,  
grantee => 'strmadmin',  
grant_option => FALSE);  
END;  
/  
 
 
 
BEGIN 
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(  
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,  
grantee => 'strmadmin',  
grant_option => FALSE);  
END;  
/  
  
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
 
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
 
 

以strmadmin帐户登录oracle
 

创建AQ,类型为JMS消息
 
Sql代码
BEGIN 
   DBMS_AQADM.CREATE_QUEUE_TABLE(  
        Queue_table            => 'gpsstatus_queue_table',  
        Queue_payload_type     => 'SYS.AQ$_JMS_MESSAGE',  
        multiple_consumers  => false,  
        compatible             => '8.1.5');  
   DBMS_AQADM.CREATE_QUEUE(  
      Queue_name          => 'gpsstatus_queue',  
      Queue_table         => 'gpsstatus_queue_table');  
   DBMS_AQADM.START_QUEUE(  
      queue_name         => 'gpsstatus_queue');  
END;  
/  
BEGIN 
DBMS_STREAMS_ADM.SET_UP_QUEUE(  
    queue_table  => 'gps_temp_queue_table',  
    queue_name   => 'gps_temp_queue');  
END;  
BEGIN
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table            => 'gpsstatus_queue_table',
        Queue_payload_type     => 'SYS.AQ$_JMS_MESSAGE',
        multiple_consumers  => false,
        compatible             => '8.1.5');
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name          => 'gpsstatus_queue',
      Queue_table         => 'gpsstatus_queue_table');
   DBMS_AQADM.START_QUEUE(
      queue_name         => 'gpsstatus_queue');
END;
/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'gps_temp_queue_table',
    queue_name   => 'gps_temp_queue');
END;
/
 
为目标表创建捕获进程
 
Sql代码
BEGIN 
DBMS_STREAMS_ADM.ADD_TABLE_RULES(  
table_name => 'myoracle.TEST_GPS_STATUS',  
streams_type => 'capture',  
streams_name => 'capture_gps',  
queue_name => 'gps_temp_queue',  
include_dml => true,  
include_ddl => false);  
END;  
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'capture',
streams_name => 'capture_gps',
queue_name => 'gps_temp_queue',
include_dml => true,
include_ddl => false);
END;
/
 
初始化scn

Sql代码
DECLARE 
iscn NUMBER; -- Variable to hold instantiation SCN value  
BEGIN 
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();  
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(  
source_object_name => 'myoracle.TEST_GPS_STATUS',  
source_database_name => 'TESTdb',  
instantiation_scn => iscn);  
END;  
/  
  
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'myoracle.TEST_GPS_STATUS',
source_database_name => 'TESTdb',
instantiation_scn => iscn);
END;
/
 

为消息队列创建代理
 
Sql代码
BEGIN 
DBMS_AQADM.CREATE_AQ_AGENT(  
agent_name => 'gpsstatus_agent');  
DBMS_AQADM.ENABLE_DB_ACCESS(  
agent_name => 'gpsstatus_agent',  
db_username => 'strmadmin');  
END;  
/  
DECLARE 
subscriber SYS.AQ$_AGENT;  
BEGIN 
subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);  
SYS.DBMS_AQADM.ADD_SUBSCRIBER(  
queue_name => 'strmadmin.gpsstatus_queue',  
subscriber => subscriber,  
rule => NULL,  
transformation => NULL);  
END;  
BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'gpsstatus_agent');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'gpsstatus_agent',
db_username => 'strmadmin');
END;
/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.gpsstatus_queue',
subscriber => subscriber,
rule => NULL,
transformation => NULL);
END;
/
 
创建存储过程以决定将哪些信息放到消息队列里面
 

Sql代码
CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS 
--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);  
message sys.aq$_jms_message;  
enqueue_options dbms_aq.enqueue_options_t;  
message_properties dbms_aq.message_properties_t;  
msgid raw(16);  
lcr SYS.LCR$_ROW_RECORD;  
rc PLS_INTEGER;  
DEVICEID varchar2(11);  
GATHERDATETIME date;  
LONGITUDETYPE char(1);  
LONGITUDEVALUE number ;  
LATITUDETYPE char(1);  
LATITUDEVALUE number ;  
SPEED number ;  
DIRECTION number ;  
BEGIN 
rc := in_any.GETOBJECT(lcr);  
DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();  
GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();  
LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();  
LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();  
LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();  
LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();  
SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();  
DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();  
message := sys.aq$_jms_message.construct(1);  
--message.set_replyto(agent);  
message.set_type('');  
message.set_userid('strmadmin');  
message.set_appid('');  
message.set_groupid('');  
message.set_groupseq('');  
message.set_string_property('DEVICEID', DEVICEID);  
message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));  
message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);  
message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );  
message.set_string_property('LATITUDETYPE', LATITUDETYPE);  
message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));  
message.set_string_property('SPEED', to_char(SPEED) );  
message.set_string_property('DIRECTION', to_char(DIRECTION) );  
--指定消息生存时间  
message_properties.expiration:=60;  
dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',  
enqueue_options => enqueue_options,  
message_properties => message_properties,  
payload => message,  
msgid => msgid);  
COMMIT;  
END;  
CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS
--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0);
message sys.aq$_jms_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
DEVICEID varchar2(11);
GATHERDATETIME date;
LONGITUDETYPE char(1);
LONGITUDEVALUE number ;
LATITUDETYPE char(1);
LATITUDEVALUE number ;
SPEED number ;
DIRECTION number ;
BEGIN
rc := in_any.GETOBJECT(lcr);
DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2();
GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate();
LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar();
LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber();
LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar();
LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber();
SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber();
DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber();
message := sys.aq$_jms_message.construct(1);
--message.set_replyto(agent);
message.set_type('');
message.set_userid('strmadmin');
message.set_appid('');
message.set_groupid('');
message.set_groupseq('');
message.set_string_property('DEVICEID', DEVICEID);
message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss'));
message.set_string_property('LONGITUDETYPE', LONGITUDETYPE);
message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) );
message.set_string_property('LATITUDETYPE', LATITUDETYPE);
message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE));
message.set_string_property('SPEED', to_char(SPEED) );
message.set_string_property('DIRECTION', to_char(DIRECTION) );
--指定消息生存时间
message_properties.expiration:=60;
dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);
COMMIT;
END;
/
 
为目标表配置处理器
 
Sql代码
 
BEGIN 
DBMS_APPLY_ADM.SET_DML_HANDLER(  
object_name => 'myoracle.TEST_GPS_STATUS',  
object_type => 'TABLE',  
operation_name => 'UPDATE',  --可配置为insert,update,delete等  
error_handler => false,  
user_procedure => 'strmadmin.enq_gps_lcr',  
apply_database_link => NULL);  
END;  

BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'myoracle.TEST_GPS_STATUS',
object_type => 'TABLE',
operation_name => 'UPDATE',  --可配置为insert,update,delete等
error_handler => false,
user_procedure => 'strmadmin.enq_gps_lcr',
apply_database_link => NULL);
END;
/
 
设定参数及启动捕获进程
 
 
Sql代码
BEGIN 
DBMS_STREAMS_ADM.ADD_TABLE_RULES(  
table_name => 'myoracle.TEST_GPS_STATUS',  
streams_type => 'apply',  
streams_name => 'apply_gps',  
queue_name => 'strmadmin.gps_temp_queue',  
include_dml => true,  
include_ddl => false,  
source_database => 'TESTdb');  
END;  
/  
BEGIN 
DBMS_APPLY_ADM.SET_PARAMETER(  
apply_name => 'apply_gps',  
parameter => 'disable_on_error',  
value => 'n');  
END;  
/  
BEGIN 
DBMS_APPLY_ADM.START_APPLY(  
apply_name => 'apply_gps');  
END;  
/  
BEGIN 
DBMS_CAPTURE_ADM.START_CAPTURE(  
capture_name => 'capture_gps');  
END;  
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'myoracle.TEST_GPS_STATUS',
streams_type => 'apply',
streams_name => 'apply_gps',
queue_name => 'strmadmin.gps_temp_queue',
include_dml => true,
include_ddl => false,
source_database => 'TESTdb');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_gps',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_gps');
END;
/
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_gps');
END;
/
 
至此,捕获进程配置完毕
可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。
 
 
下面是java处理代码,可直接使用JMS接口
本例使用oracle提供的API
 
 
Java代码
 
QueueConnectionFactory queueConnectionFactory = null;  
QueueConnection queueConnection = null;  
QueueSession queueSession = null;  
 
Queue queue = null;  
QueueReceiver subscriber = null;  
Message message = null; 

QueueConnectionFactory queueConnectionFactory = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
QueueReceiver subscriber = null;
Message message = null;
 
 
 
 
Java代码
 
log.info("开始连接 ");  
queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");  
queueConnection = queueConnectionFactory.createQueueConnection(userName, password);  
log.info("创建Queue Connection 成功");  
queueConnection.start();  
log.info("connection started");  
queueSession = queueConnection.createQueueSession(false,  
Session.AUTO_ACKNOWLEDGE);  
.info("Queue session created");  
queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);  
log.info("Queue getted");  
subscriber = queueSession.createReceiver(queue);  
log.info("初始化完成"); 

log.info("开始连接 ");
queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin");
queueConnection = queueConnectionFactory.createQueueConnection(userName, password);
log.info("创建Queue Connection 成功");
queueConnection.start();
log.info("connection started");
queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
.info("Queue session created");
queue = ((AQjmsSession) queueSession).getQueue(userName, queueName);
log.info("Queue getted");
subscriber = queueSession.createReceiver(queue);
log.info("初始化完成");
 
开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。
 
 
   
Java代码
while (true) {  
         message = subscriber.receive();//receive方法使没有新消息时,线程挂起  
      //do something...  
}  
  
while (true) {
         message = subscriber.receive();//receive方法使没有新消息时,线程挂起
      //do something...
}
 
 
 

最后:
 
      本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。

 
德仔网尊重行业规范,每篇文章都注明有明确的作者和来源;德仔网的原创文章,请转载时务必注明文章作者和来源:德仔网;
头条那些事
大家在关注
广告那些事
我们的推荐
也许感兴趣的
干货