基于OGG 实现Oracle到Kafka增量数据实时同步( 三 )


scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt3.配置目标端数据初始化进程
配置目标端初始化进程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun GGSCI (172-16-101-242) 6> edit params initkfk添加
SPECIALRUN     end runtime     setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")     targetdb libfile libggJAVA.so set property=./dirprm/kafka.props     SOURCEDEFS ./dirdef/define_kfk.txt     EXTFILE ./dirdat/ekfk000000     reportcount every 1 minutes, rate     grouptransops 10000 map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
4.配置kafka相关参数
vi ./dirprm/kafka.props
添加
gg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.topicMappingTemplate=test_ogggg.handler.kafkahandler.format=jsongg.handler.kafkahandler.mode=opgg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/vi custom_kafka_producer.properties
添加
bootstrap.servers=172.16.101.242:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.Apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000
5.源端开启全量数据抽取
源端
GGSCI (dtproxy) 20>  start mgr GGSCI (dtproxy) 21>  start initkfk6.目标端全量数据应用
GGSCI (172-16-101-242) 13> start mgr ./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD7.kafka数据验证
使用kafka客户端工具查看topic的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}} {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}全量数据已经同步到目标kafka topic
增量数据同步
1.源端抽取进程配置
GGSCI (dtproxy) 9> edit param extkfk添加
dynamicresolution SETENV (ORACLE_SID = "dtstack") SETENV (NLS_LANG = "american_america.AL32UTF8") userid ggsadmin,password oracle exttrail ./dirdat/to table baiyang.ora_to_kfk;添加extract进程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk2.源端数据推送进程配置
配置源端推送进程
GGSCI (dtproxy) 12> edit param pupkfk添加
extract pupkfk passthru dynamicresolution userid ggsadmin,password oracle rmthost 172.16.101.242 mgrport 7810 rmttrail ./dirdat/to table baiyang.ora_to_kfk;添加extract进程
GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk3.配置目标端恢复进程
配置目标端恢复进程
edit param repkfk添加
REPLICAT repkfk SOURCEDEFS ./dirdef/define_kfk.txt targetdb libfile libggjava.so set property=./dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;添加trail文件到replicate进程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint4.源端开启实时数据抓取
./ggsci GGSCI (dtproxy) 5> start extkfk Sending START request to MANAGER ... EXTRACT EXTKFK startingGGSCI (dtproxy) 6> start pupkfk Sending START request to MANAGER ... EXTRACT PUPKFK startingGGSCI (dtproxy) 7> status all Program     Status      Group       Lag at Chkpt  Time Since Chkpt MANAGER     RUNNING EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10 EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00


推荐阅读