CDC Connectors for Apache Flink是一组用于Apache Flink的源连接器,使用变更数据捕获(CDC)从不同数据库获取变更。用于 Apache Flink 的CDC连接器将Debezium集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。Flink-CDC一个成型的cdc技术实现(Debezium)的包装。
按常理来说,一个正常的flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务。
使用flink-cdc进行数据变更捕获(可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?就是不要将这个flink-cdc作为一个job 使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动。
2.1、引入依赖
<properties><encoding>UTF-8encoding><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><maven.compiler.source>1.8maven.compiler.source><maven.compiler.target>1.8maven.compiler.target><java.version>1.8java.version><scala.version>2.12scala.version><flink.version>1.13.6flink.version>properties><dependencies><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency><dependency><groupId>com.alibabagroupId><artifactId>fastjsonartifactId><version>1.2.83version>dependency><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-testartifactId><scope>testscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.12artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.12artifactId><version>${flink.version}version>dependency><dependency><groupId>com.ververicagroupId><artifactId>flink-connector-mysql-cdcartifactId><version>2.0.0version>dependency><dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><version>1.18.18version>dependency>dependencies>
2.2、接入springboot项目
无法简单的使用main方法来启动cdc 作业,因为如果这样的话,我们就无法与spring完美的契合。因此我们可以利用springboot的特性, 实现 ApplicationRunner 将flink-cdc 作为一个项目启动时需要运行的分支子任务即可。
A:创建监听类 实现 ApplicationRunner
importcom.ververica.cdc.connectors.mysql.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.DebeziumSourceFunction;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.boot.ApplicationArguments;importorg.springframework.boot.ApplicationRunner;importorg.springframework.stereotype.Component;@ComponentpublicclassMysqlEventListenerimplementsApplicationRunner{privatefinalDataChangeSinkdataChangeSink;publicMysqlEventListener(DataChangeSinkdataChangeSink){this.dataChangeSink =dataChangeSink;}@Overridepublicvoidrun(ApplicationArgumentsargs)throwsException{StreamExecutionEnvironmentenv =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo>dataChangeInfoMySqlSource =buildDataChangeSource();DataStream<DataChangeInfo>streamSource =env .addSource(dataChangeInfoMySqlSource,"mysql-source").setParallelism(1);streamSource.addSink(dataChangeSink);env.execute("mysql-stream-cdc");}privateDebeziumSourceFunction<DataChangeInfo>buildDataChangeSource(){returnMySqlSource.<DataChangeInfo>builder().hostname("10.50.40.145").port(3306).databaseList("paas_common_db").tableList("paas_common_db.base_business_driver_score_*").username("root").password("cdwk-3g-145")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入) * latest:只进行增量导入(不读取历史变化) * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据) */.startupOptions(StartupOptions.latest()).deserializer(newMysqlDeserialization()).serverTimeZone("GMT+8").build();}}
B:自定义数据读取解析器
importcom.alibaba.fastjson.JSON;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importcom.alibaba.fastjson.JSONObject;importjava.util.List;importjava.util.Optional;publicclassMysqlDeserializationimplementsDebeziumDeserializationSchema<DataChangeInfo>{publicstaticfinalStringTS_MS="ts_ms";publicstaticfinalStringBIN_FILE="file";publicstaticfinalStringPOS="pos";publicstaticfinalStringCREATE="CREATE";publicstaticfinalStringBEFORE="before";publicstaticfinalStringAFTER="after";publicstaticfinalStringSOURCE="source";publicstaticfinalStringUPDATE="UPDATE";@Overridepublicvoiddeserialize(SourceRecordsourceRecord,Collector<DataChangeInfo>collector){Stringtopic =sourceRecord.topic();String[]fields =topic.split("\.");Stringdatabase =fields[1];StringtableName =fields[2];Structstruct =(Struct)sourceRecord.value();finalStructsource =struct.getStruct(SOURCE);DataChangeInfodataChangeInfo =newDataChangeInfo();dataChangeInfo.setBeforeData(getJsonObject(struct,BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct,AFTER).toJSONString());//5.获取操作类型 CREATE UPDATE DELETEEnvelope.Operationoperation =Envelope.operationFor(sourceRecord);Stringtype =operation.toString().toUpperCase();inteventType =type.equals(CREATE)?1:UPDATE.equals(type)?2:3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x ->Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据collector.collect(dataChangeInfo);}privateJSONObjectgetJsonObject(Structvalue,StringfieldElement){Structelement =value.getStruct(fieldElement);JSONObjectjsonObject =newJSONObject();if(element !=null){SchemaafterSchema =element.schema();List<Field>fieldList =afterSchema.fields();for(Fieldfield :fieldList){ObjectafterValue =element.get(field);jsonObject.put(field.name(),afterValue);}}returnjsonObject;}@OverridepublicTypeInformation<DataChangeInfo>getProducedType(){returnTypeInformation.of(DataChangeInfo.class);}}
C:变更对象
importlombok.Data;@DatapublicclassDataChangeInfo{/** * 变更前数据 */privateStringbeforeData;/** * 变更后数据 */privateStringafterData;/** * 变更类型 1新增 2修改 3删除 */privateIntegereventType;/** * binlog文件名 */privateStringfileName;/** * binlog当前读取点位 */privateIntegerfilePos;/** * 数据库名 */privateStringdatabase;/** * 表名 */privateStringtableName;/** * 变更时间 */privateLongchangeTime;}
D:自定义sink 交由spring管理
importlombok.extern.log4j.Log4j2;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.springframework.stereotype.Component;@Component@Log4j2publicclassDataChangeSinkimplementsSinkFunction<DataChangeInfo>{@Overridepublicvoidinvoke(DataChangeInfovalue,Contextcontext){log.info("收到变更原始数据:{}",value);// todo 数据处理;因为此sink也是交由了spring管理,您想进行任何操作都非常简单}}
分享让更多人看到