scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt3.配置目标端数据初始化进程
配置目标端初始化进程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun GGSCI (172-16-101-242) 6> edit params initkfk添加
SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8") targetdb libfile libggJAVA.so set property=./dirprm/kafka.props SOURCEDEFS ./dirdef/define_kfk.txt EXTFILE ./dirdat/ekfk000000 reportcount every 1 minutes, rate grouptransops 10000 map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
4.配置kafka相关参数
vi ./dirprm/kafka.props
添加
gg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.topicMappingTemplate=test_ogggg.handler.kafkahandler.format=jsongg.handler.kafkahandler.mode=opgg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/* --*/vi custom_kafka_producer.properties
添加
bootstrap.servers=172.16.101.242:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.Apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000
5.源端开启全量数据抽取
源端
GGSCI (dtproxy) 20> start mgr GGSCI (dtproxy) 21> start initkfk6.目标端全量数据应用
GGSCI (172-16-101-242) 13> start mgr ./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD7.kafka数据验证
使用kafka客户端工具查看topic的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}} {"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}全量数据已经同步到目标kafka topic
增量数据同步
1.源端抽取进程配置
GGSCI (dtproxy) 9> edit param extkfk添加
dynamicresolution SETENV (ORACLE_SID = "dtstack") SETENV (NLS_LANG = "american_america.AL32UTF8") userid ggsadmin,password oracle exttrail ./dirdat/to table baiyang.ora_to_kfk;添加extract进程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk2.源端数据推送进程配置
配置源端推送进程
GGSCI (dtproxy) 12> edit param pupkfk添加
extract pupkfk passthru dynamicresolution userid ggsadmin,password oracle rmthost 172.16.101.242 mgrport 7810 rmttrail ./dirdat/to table baiyang.ora_to_kfk;添加extract进程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk3.配置目标端恢复进程
配置目标端恢复进程
edit param repkfk添加
REPLICAT repkfk SOURCEDEFS ./dirdef/define_kfk.txt targetdb libfile libggjava.so set property=./dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;添加trail文件到replicate进程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint4.源端开启实时数据抓取
./ggsci GGSCI (dtproxy) 5> start extkfk Sending START request to MANAGER ... EXTRACT EXTKFK startingGGSCI (dtproxy) 6> start pupkfk Sending START request to MANAGER ... EXTRACT PUPKFK startingGGSCI (dtproxy) 7> status all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTKFK 00:00:00 00:00:10 EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
推荐阅读
- Nginx 整合 FastDFS 实现文件服务器
- 熔断原理与实现Golang版
- 不拆分网线,一根网线实现IPTV和上网单线复用,手把手超详细
- ThinkPHP框架——实现定时任务,定时更新、清理数据
- SpringBoot如何用Session共享实现分布式部署?
- sync-player使用websocket实现异地同步播放
- 物联网网关搭建VPN客户端,来实现PLC远程下载
- H5 实现二维码 / 条形码的识别与解析
- 常做这运动让身体器官更年轻 助你实现长寿梦想
- 带你实现一个静态服务器,超详细
