百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 优雅编程 > 正文

基于Spark的数据库增量准实时同步

sinye56 2024-10-01 20:02 15 浏览 0 评论

王浩,葛昂,赵晴

(华北计算机系统工程研究所,北京 100083)

为了实现将传统关系型数据库中的增量数据快速导入同构或者异构目的库,在使用已有的增量提取方法的基础上,提出了通过增加并行度和流式计算的方法加快同步速度。此方法不仅支持插入、更新和删除的增量数据同步,而且可以抽取出数据库表结构信息动态支持表结构变更。与传统单点抽取方式相比,大大提高了目的库数据的新鲜度。

随着大数据技术的发展,越来越多的企业开始构建大数据平台进行数据处理。然而如何将保存在关系型数据库中的数据快速同步到大数据平台组件(例如HBase、HDFS)中,正成为很多企业面临的问题。Sqoop是常用的数据同步工具,其实质是MapReduce任务,延时较高,而且需要通过定时任务来达到自动化流程效果。本文在触发器记录数据变化的基础上,提出了一种使用Spark Streaming将增量数据抽取出来,然后根据需要写入到不同的目的库的方法。由于只提取增量数据,所以较Sqoop减少了数据量。另外由于是流式处理方式,降低了延时。

1增量提取

增量提取是针对上一次提取而言,将上一次提取时间点到现在数据库中插入、更新、删除的数据提取出来[1]。

1.2常用的增量提取方法

1.2.1基于业务系统日志

在业务中将数据库DML(Data Manipulation Language)语句输出以日志的方式存储,然后通过解析日志将DML语句在目的库中重放以达到目的。此方法需要侵入业务系统,对于已经成型的业务系统不适用。

1.2.2基于数据库日志

解析数据库日志也能达到增量提取的目的,但是各大数据库厂商不对外开放数据库系统的日志格式,这就使得解析日志变成了问题。而且各数据库的日志格式还不尽相同,难以达到通用性。

1.2.3基于触发器

基于触发器的方式,目前被广泛运用于数据库增量提取。它通过在源表上建立插入、更新、删除触发器来记录对数据的操作。每当有数据变化时,就会触发相应的触发器,然后运行触发器定义的逻辑,将变化记录到增量表。

1.3基于触发器方法的具体实现

由于触发器方法具有实现逻辑简单,对业务无入侵,数据库通用等优点,所以本文采用了基于触发器方式的增量提取方法。具体实现方法如下:

(1)创建名为dml_log的数据库表,字段为id、table_name、record_id、execute_date、dml_type。其中id为自增id,table_name存储要同步的源表表名称,record_id是源表中发生变化的记录的唯一标识,execute_date为触发器执行时的时间戳,dml_type为I、U、D分别代表insert、update、delete操作。

(2)在源表上创建插入、更新、删除类型的触发器。创建语句在此省略。

2构建Spark Streaming程序

2.1Spark Streaming

Spark是目前大数据处理领域比较常用的计算框架。它将中间计算结果维护在内存中,这样不仅可以做到中间结果的重用,而且减少了磁盘IO,大大加快了计算速度。Spark Streaming是构建于Spark core之上的流式处理模块。其原理是将流式数据切分成一个个小的片段,以mini batch的形式来处理这一小部分数据,从而模拟流式计算达到准实时的效果。

2.2JdbcRDD

弹性分布式数据集(Resilient Distributed Datasets,RDD),它是Spark数据抽象的基石。RDD是一个只读的分区记录集合,分区分散在各个计算节点[2]。RDD提供了transformation和action两类操作,其中transformation是lazy级别的,主要对数据处理流程进行标记,而不立即进行运算。action操作会触发作业的提交,然后进行回溯导致transformation操作进行运算。

JdbcRDD扩展自RDD,是RDD的子类。内部通过JDBC(Java Data Base Connectivity)操作以数据库为源头构建RDD。其构造函数签名为:

class JdbcRDD[T: ClassTag](

sc: SparkContext,

getConnection:=> Connection,

sql: String,

lowerBound: Long,

upperBound: Long,

numPartitions: Int,

mapRow:(ResultSet) => T =

JdbcRDD.resultSetToObjectArray _)

extends RDD[T](sc, Nil) with Logging {…}

2.3具体实现

Spark官方提供用于构建Spark Streaming的数据源没有对数据库进行支持,所以本文自己实现对数据库的支持。编写继承自InputDStream类的DirectJdbcInputDStream类,其签名为:

class DirectJdbcInputDStream[T: ClassTag](

@transient ssc_ : StreamingContext,

param: JdbcParam) extends

InputDStream[Row] (ssc_) with Logging {…}

对start、compute和stop方法进行重写。

(1)在start函数中注册JDBC驱动,用于JDBC获取初始化信息(构造JdbcRDD时的参数);

(2)compute函数会被框架间隔指定的时间反复调用,其实质是如何返回一个JdbcRDD。首先通过JDBC获取本次需要拉取的trigger记录的id的上下界以及表的Schema信息;然后以这些信息为参数生成提取真实数据的SQL,其逻辑为用选中的trigger表中的记录和原表在record_id上进行左连接;最后使用该SQL当做参数构建JdbcRDD。值得说明的是,构建JdbcRDD时是可以指定并行度的,每个worker节点都会建立到数据库的JDBC连接,由多个节点并行去数据库拉取属于自己的那一部分数据,这就大大增加了提取和处理速度。

(3)在stop函数中关闭JDBC连接。总体来看,就是在driver程序中执行的JDBC程序获取初始化参数,在executor中执行的JDBC程序拉取真实的数据。

(4)编写driver程序:

val sc = new SparkContext(new SparkConf)

val ssc = new StreamingContext(sc, Seconds(30))

val directStream = new DirectJdbcInputDStream[Row](ssc, jdbcParam)

directStream.foreachRDD(rdd => {

…//对数据进行处理

})

2.4限流

假设当前时间点到上次提取的时间点之间新增数据量太大,就会导致在新一次作业提交时,上一次的作业仍然没有完成,可能会因此造成作业积压使得系统不稳定。本文使用了基于规则的限流方法,综合考虑集群处理能力以及间隔时间,可以配置化设置每次最大提取条数。如果当前需要提取的数据条数大于最大提取条数,则本次就只提取最大条数,剩下的延时到下次再进行提取。这样做的好处是削减了峰流对系统造成的影响。

3测试分析

测试环境:VMware虚拟机,处理器设置为4核心,2 GB内存, 64位CentOS 6.5操作系统,Spark 1.5.1,Oracle 11g。使用4台虚拟机搭建成Spark集群,1台为Master,3台为Worker。数据库表分别设置为20、40个字段,每次最大抽取记录数分别设置为10 000、50 000、500 000。将抽取出来的数据写成parquet格式的文件存储到hdfs上。测试结果如表1所示。

4结束语

本文在基于数据库触发器记录数据变化的基础上,通过自己构造DirectJdbcStream类提供Spark Streaming对数据库的支持,达到准实时从数据库中抽取出增量数据的目的。并且可以对抽取出来的数据进行过滤、清洗等操作,根据需求灵活地写入到不同的目的库。

参考文献

[1] 郭亮. 基于MD5与HASH的数据库增量提取算法及其应用[D]. 长沙:湖南大学,2013.

[2] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault tolerant abstraction for in memory cluster computing[C]. Usenix Conference on Networked Systems Design & Implementation, 2012, 70(2):141146.

[3] DEAN J, GHEMAWAT S. MapReduce: simplified dataprocessing on large clusters[C]. USENIX Association OSDI′04: 6th Symposium on Operating Systems Design and Implementation, 2004:137149.

[4] MARTIN O. Programming in scala[M]. California: Artima Press,2010.

[5] YADAV R. Spark cookbook[M]. UK: Packt Publishing Ltd, 2015.

[6] KARAU H. Learning spark[M]. America: O’Reilly Media, Inc. 2015.

[7] 梁刚. 企业大数据管理解决方案[J]. 微型机与应用,2013,32(24):7 10,13.

AET会员年终大福利!

相关推荐

程序员:JDK的安装与配置(完整版)_jdk的安装方法

对于Java程序员来说,jdk是必不陌生的一个词。但怎么安装配置jdk,对新手来说确实头疼的一件事情。我这里以jdk10为例,详细的说明讲解了jdk的安装和配置,如果有不明白的小伙伴可以评论区留言哦下...

Linux中安装jdk并配置环境变量_linux jdk安装教程及环境变量配置

一、通过连接工具登录到Linux(我这里使用的Centos7.6版本)服务器连接工具有很多我就不一一介绍了今天使用比较常用的XShell工具登录成功如下:二、上传jdk安装包到Linux服务器jdk...

麒麟系统安装JAVA JDK教程_麒麟系统配置jdk

检查检查系统是否自带java在麒麟系统桌面空白处,右键“在终端打开”,打开shell对话框输入:java–version查看是否自带java及版本如图所示,系统自带OpenJDK,要先卸载自带JDK...

学习笔记-Linux JDK - 安装&配置

前提条件#检查是否存在JDKrpm-qa|grepjava#删除现存JDKyum-yremovejava*安装OracleJDK不分系统#进入安装文件目...

Linux新手入门系列:Linux下jdk安装配置

本系列文章是把作者刚接触和学习Linux时候的实操记录分享出来,内容主要包括Linux入门的一些理论概念知识、Web程序、mysql数据库的简单安装部署,希望能够帮到一些初学者,少走一些弯路。注意:L...

测试员必备:Linux下安装JDK 1.8你必须知道的那些事

1.简介在Oracle收购Sun后,Java的一系列产品就被整合到Oracle官网中,打开官网乍眼一看也不知道去哪里下载,还得一个一个的摸索尝试,而且网上大多数都是一些Oracle收购Sun前,或者就...

Linux 下安装JDK17_linux 安装jdk1.8 yum

一、安装环境操作系统:JDK版本:17二、安装步骤第一步:下载安装包下载Linux环境下的jdk1.8,请去官网(https://www.oracle.com/java/technologies/do...

在Ubuntu系统中安装JDK 17并配置环境变量教程

在Ubuntu系统上安装JDK17并配置环境变量是Java开发环境搭建的重要步骤。JDK17是Oracle提供的长期支持版本,广泛用于开发Java应用程序。以下是详细的步骤,帮助你在Ubuntu系...

如何在 Linux 上安装 Java_linux安装java的步骤

在桌面上拥抱Java应用程序,然后在所有桌面上运行它们。--SethKenlon(作者)无论你运行的是哪种操作系统,通常都有几种安装应用程序的方法。有时你可能会在应用程序商店中找到一个应用程序...

Windows和Linux环境下的JDK安装教程

JavaDevelopmentKit(简称JDK),是Java开发的核心工具包,提供了Java应用程序的编译、运行和开发所需的各类工具和类库。它包括了JRE(JavaRuntimeEnviro...

linux安装jdk_linux安装jdk软连接

JDK是啥就不用多介绍了哈,外行的人也不会进来看我的博文。依然记得读大学那会,第一次实验课就是在机房安装jdk,编写HelloWorld程序。时光飞逝啊,一下过了十多年了,挣了不少钱,买了跑车,娶了富...

linux安装jdk,全局配置,不同用户不同jdk

jdk1.8安装包链接:https://pan.baidu.com/s/14qBrh6ZpLK04QS8ogCepwg提取码:09zs上传文件解压tar-zxvfjdk-8u152-linux-...

运维大神教你在linux下安装jdk8_linux安装jdk1.7

1.到官网下载适合自己机器的版本。楼主下载的是jdk-8u66-linux-i586.tar.gzhttp://www.oracle.com/technetwork/java/javase/downl...

window和linux安装JDK1.8_linux 安装jdk1.8.tar

Windows安装JDK1.8的步骤:步骤1:下载JDK打开浏览器,找到JDK下载页面https://d.injdk.cn/download/oraclejdk/8在页面中找到并点击“下载...

最全的linux下安装JavaJDK的教程(图文详解)不会安装你来打我?

默认已经有了linux服务器,且有root账号首先检查一下是否已经安装过java的jdk任意位置输入命令:whichjava像我这个已经安装过了,就会提示在哪个位置,你的肯定是找不到。一般我们在...

取消回复欢迎 发表评论: