• [高清组图]环广西赛:参赛车队赛前适应训练 2018-08-31
  • [高清组图]特谢拉复出吴曦失单刀 苏宁0-0平斯威 2018-08-31
  • [高清组图]潜水偶遇座头鲸 亲密互动玩起“水中击掌” 2018-08-31
  • [高清组图]法拉利拍定妆照 维特尔KIMI准备好了 2018-08-31
  • [高清组图]毛剑卿伤退莫雷诺捅射 申花1-0一方 2018-08-31
  • [高清组图]比埃拉双响巴坎布建功 国安5-1富力 2018-08-31
  • [高清组图]武磊世界波胡尔克点射 上港2-0胜申花 2018-08-31
  • [高清组图]武磊2球吕文君建功 上港3-1富力 2018-08-31
  • [高清组图]欧超杯-科斯塔2球 皇马加时赛2-4马竞 2018-08-31
  • [高清组图]格里芬赤膊骑行 休赛期享受二人世界 2018-08-31
  • [视频]【深化改革 重在实效】精准扶贫 四川彝区要拔掉“穷根” 2018-08-31
  • [视频]【深化改革 重在实效】破藩篱促合力 体制创新粘合“两张皮” 2018-08-31
  • [视频]【深化改革 重在实效】激发活力 实现市场准入全程便利化 2018-08-31
  • [视频]【深化改革 重在实效】打通简政放权的“最后一公里” 2018-08-31
  • [视频]【深化改革 重在实效】广东:户籍改革为外来工打开一扇门 2018-08-31
  • 手机版
    你好,游客 登录 注册 搜索
    背景:
    阅读新闻

    OGG实现Oracle数据同步到Kafka

    [日期:2018-03-22] 来源:Linux社区  作者:lyzbg [字体: ]

    奥门新萄京官方正版 www.arianalance.com 环境:
    源端:Oracle12.2 ogg for Oracle 12.3
    目标端:Kafka ogg for bigdata 12.3
    将Oracle中的数据通过OGG同步到Kafka

    源端配置:
    1、为要同步的表添加附加日志
    dblogin USERID ogg@orclpdb, PASSWORD ogg
    add trandata scott.tab1
    add trandata scott.tab2

    2、 添加抽取进程
    GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
    GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

    编辑抽取进程参数:
    GGSCI> edit params EXT_KAF1

    extract EXT_KAF1
    userid c##ggadmin,PASSWORD ggadmin
    LOGALLSUPCOLS
    UPDATERECORDFORMAT COMPACT
    exttrail ./dirdat/k1,FORMAT RELEASE 12.3
    SOURCECATALOG orclpdb --(指定pdb)
    table scott.tab1;
    table scott.tab2;

    注册进程
    GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
    GGSCI> register extract EXT_KAF1 database container (orclpdb)

    3、添加投递进程:
    GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
    GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

    编辑投递进程参数:
    GGSCI>edit param PMP_KAF1

    EXTRACT PMP_KAF1
    USERID c##ggadmin,PASSWORD ggadmin
    PASSTHRU
    RMTHOST 10.1.1.247, MGRPORT 9178
    RMTTRAIL ./dirdat/f1,format release 12.3
    SOURCECATALOG orclpdb
    TABLE scott.tab1;
    table scott.tab2;

    4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化
    GGSCI> add extract ek_01, sourceistable

    编辑参数:
    GGSCI> EDIT PARAMS ek_01

    EXTRACT ek_01
    USERID c##ggadmin,PASSWORD ggadmin
    RMTHOST 10.1.1.247, MGRPORT 9178
    RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
    SOURCECATALOG orclpdb
    table scott.tab1;

    GGSCI> add extract ek_02, sourceistable

    EDIT PARAMS ek_02
    EXTRACT ek_02
    USERID c##ggadmin,PASSWORD ggadmin
    RMTHOST 10.1.1.247, MGRPORT 9178
    RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
    SOURCECATALOG orclpdb
    table scott.tab2;

    5、生成def文件:
    GGSCI> edit param defgen1

    USERID c##ggadmin,PASSWORD ggadmin
    defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
    SOURCECATALOG orclpdb
    table scott.tab1;
    table scott.tab2;

    在OGG_HOME下执行如下命令生成def文件
    defgen paramfile dirprm/defgen1.prm

    将生成的def文件传到目标端$OGG_HOME/dirdef下

    目标端配置:
    1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下
    cd $OGG_HOME/AdapterExamples/big-data/kafka
    cp * $OGG_HOME/dirprm

    2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下
    cd $ORACLE_HOME/AdapterExamples/trail
    cp tr000000000 $OGG_HOME/dirdat

    3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)
    GGSCI> ADD replicat rp_01, specialrun

    GGSCI> EDIT PARAMS rp_01
    SPECIALRUN
    end runtime
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    targetdb libfile libggjava.so set property=./dirprm/kafka1.props
    SOURCEDEFS ./dirdef/defgen1.def
    EXTFILE ./dirdat/ka
    reportcount every 1 minutes, rate
    grouptransops 10000
    MAP orclpdb.scott.tab1, TARGET scott.tab1;

    GGSCI> ADD replicat rp_02, specialrun
    GGSCI> EDIT PARAMS rp_02

    SPECIALRUN
    end runtime
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    targetdb libfile libggjava.so set property=./dirprm/kafka2.props
    SOURCEDEFS ./dirdef/defgen1.def
    EXTFILE ./dirdat/kb
    reportcount every 1 minutes, rate
    grouptransops 10000
    MAP orclpdb.scott.tab2, TARGET scott.tab2;

    4、添加恢复进程:
    GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
    GGSCI>edit params r_kaf1

    REPLICAT r_kaf1
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    HANDLECOLLISIONS
    targetdb libfile libggjava.so set property=./dirprm/kafka1.props
    SOURCEDEFS ./dirdef/defgen1.def
    reportcount every 1 minutes, rate
    grouptransops 10000
    MAP orclpdb.scott.tab1, TARGET scott.tab1;

    GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
    GGSCI> edit params r_kaf2

    REPLICAT r_kaf2
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    HANDLECOLLISIONS
    targetdb libfile libggjava.so set property=./dirprm/kafka2.props
    SOURCEDEFS ./dirdef/defgen1.def
    reportcount every 1 minutes, rate
    grouptransops 10000
    MAP orclpdb.scott.tab2, TARGET scott.tab2;

    5、参数配置:
    custom_kafka_producer.properties文件内容如下:

    bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号
    acks=1
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=16384
    linger.ms=10000

    kafka1.props文件内容如下:
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    #The following resolves the topic name using the short table name
    gg.handler.kafkahandler.topicMappingTemplate= topic1
    #gg.handler.kafkahandler.format=avro_op
    gg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式
    gg.handler.kafkahandler.format.insertOpKey=I
    gg.handler.kafkahandler.format.updateOpKey=U
    gg.handler.kafkahandler.format.deleteOpKey=D
    gg.handler.kafkahandler.format.truncateOpKey=T
    gg.handler.kafkahandler.format.prettyPrint=false
    gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
    gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键
    gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    gg.handler.kafkahandler.mode=op
    goldengate.userexit.timestamp=utc
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    gg.log=log4j
    gg.log.level=INFO
    gg.report.time=30sec
    #Sample gg.classpath for Apache Kafka
    gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。
    #Sample gg.classpath for HDP
    #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    启动进程进程恢复:
    1、启动源端抓取进程
    GGSCI> start EXT_KAF1
    2、启动源端投递进程
    GGSCI> start PMP_KAF1
    3、启动源端初始化进程
    GGSCI> start ek_01
    4、启动目标端初始化进程
    在$OGG_HOME下执行如下命令:
    ./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
    5、启动目标端恢复进程
    GGSCI> start R_KAF1

    遇到的错误:
    1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

    原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的)
    解决:重启MGR进程

    2、ERROR OG-15051 Java or JNI exception

    原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。
    解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

    更多Oracle相关信息见Oracle 专题页面 http://www.arianalance.com/topicnews.aspx?tid=12

    本文永久更新链接地址http://www.arianalance.com/Linux/2018-03/151513.htm

    linux
    相关资讯       OGG  Oracle数据同步到Kafka 
    本文评论   查看全部评论 (0)
    表情: 表情 姓名: 字数

           

    评论声明
    • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
    • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
    • 本站管理人员有权保留或删除其管辖留言中的任意内容
    • 本站有权在网站内转载或引用您的评论
    • 参与本评论即表明您已经阅读并接受上述条款