Spark轻松读取数据库:优秀的方法分享 (spark 读取数据库方法)

随着大数据时代的到来,数据处理与分析变得越来越重要。而其中,读取数据库中的数据也是一项非常重要的工作。在过去,读取数据库中的数据需要经过繁琐的准备工作,比较耗时。而现在,我们可以通过使用Spark轻松读取数据库,节省时间和精力。

本文将会为大家介绍什么是Spark,如何使用Spark读取数据库以及优秀的方法分享。

一、什么是Spark?

Spark是一个快速、通用的大数据处理引擎,可以支持包括Java、Scala、Python在内的多种编程语言。Spark具有高速内存计算和优化引擎,能够加速数据处理的速度。Spark是一种大数据处理框架,也是Hadoop生态系统中的一个重要组件。

二、如何使用Spark读取数据库?

使用Spark读取数据库时,需要先进行一些配置工作。需要在pom.xml中添加以下依赖:

“`

org.apache.spark

spark-sql_2.11

2.4.5

com.microsoft.sqlserver

mssql-jdbc

6.4.0.jre8

“`

这些依赖会将Spark SQL与Microsoft SQL Server JDBC驱动程序添加到项目中。

接下来,需要创建一个Java SparkSession对象。在这个对象中,将设置连接到数据库的参数。需要设置数据库的URL、用户名和密码。然后,在创建SparkSession对象时,需要将这些参数传递给SparkConf。

示例代码如下:

“`

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class SparkSQLJDBCDemo {

public static void loadSQLServerTable() {

//creating SparkSession

SparkSession sparkSession = SparkSession

.builder()

.appName(“Spark SQL JDBC example”)

.master(“local[*]”)

.config(“spark.sql.warehouse.dir”, “file:///C:/temp”)

.getOrCreate();

//creating properties object

Properties properties = new Properties();

properties.setProperty(“user”, “username”);

properties.setProperty(“password”, “password”);

//reading table

Dataset jdbcDF = sparkSession.read()

.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties);

jdbcDF.show();

}

public static void mn(String[] args) {

loadSQLServerTable();

}

}

“`

在上述代码中,我们首先创建了一个SparkSession对象,并设置参数。注意到我们已经设置了用户名、密码,以及连接数据库的URL。然后,我们通过SparkSession的read方法,具体地读取了目标数据库中的表中的数据。在这里,我们读取了员工表中的所有数据。我们通过show方法,将读取的数据进行展示。

三、优秀的方法分享

1.使用Repartition

在读取数据库中的数据时,一步到位将所有数据读取出来往往会导致生成数据倾斜的问题。在这种情况下,我们可以使用Repartition方法,将数据按照某种规则重新划分,避免数据倾斜的问题。示例代码如下:

“`

Dataset jdbcDF = sparkSession.read()

.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)

.repartition(10); //设置Repartition的数量为10

“`

2.使用Cache

如果需要经常使用已读取到的数据,而且又希望查询时速度更快,我们可以使用Cache方法将数据缓存起来。这样,在后续的查询过程中,就可以快速地读取这些数据。示例代码如下:

“`

Dataset jdbcDF = sparkSession.read()

.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)

.cache(); //将读取的数据进行缓存

“`

3.使用PartitionBy和BucketBy

如果我们可以根据某种规则将数据进行划分,那么就可以使用PartitionBy或BucketBy方法。这两种方法可以使查询速度更快。PartitionBy方法可以将数据按照某个字段进行分区,而BucketBy方法可以将数据按照某个字段进行分桶。示例代码如下:

“`

Dataset jdbcDF = sparkSession.read()

.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)

.repartition(10)

.partitionBy(“departmentid”); //按照部门ID进行分区

“`

四、结论

在本文中,我们介绍了Spark的基本概念,以及如何使用Spark轻松读取数据库。我们还分享了一些优秀的方法,包括Repartition、Cache、PartitionBy和BucketBy。

相关问题拓展阅读:

spark编程 mysql得不到数据

这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,1.3及以前的版本 –jars 分薯姿发的jar在executor端是通过 Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。

  不过Spark 1.4应该已经fix了这个问题,即数高绝念汪 –jars 分发的 jar 也会纳入 YARN 的 classloader 范畴。

  今天在使用Spark中DataFrame往Mysql中插入RDD,但是一直报出以下的异常次信息:

$  bin/spark-submit –master local

    –jars lib/mysql-connector-java-5.1.35.jar

    –class  spark.sparkToJDBC ./spark-test_2.10-1.0.jar

 

spark assembly has been built with Hive, including Datanucleus jars on classpath

Exception in thread “main” java.sql.SQLException: No suitable driver found for

jdbc:

true&characterEncoding=utf8&autoReconnect=true

    at java.sql.DriverManager.getConnection(DriverManager.java:602)

    at java.sql.DriverManager.getConnection(DriverManager.java:207)

    at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1189)

    at spark.SparkToJDBC$.toMysqlFromJavaBean(SparkToJDBC.scala:20)

    at spark.SparkToJDBC$.main(SparkToJDBC.scala:47)

    at spark.SparkToJDBC.main(SparkToJDBC.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

    at java.lang.reflect.Method.invoke(Method.java:597)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$

$runMain(SparkSubmit.scala:569)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  感觉很奇怪,我在启动作业的时候加了Mysql驱动啊在,怎么会出现这种异常呢??经过查找,发现在–jars参数里面加入Mysql是没有用的。通过查找,发现提交的作业可以通过加入–driver-class-path参数来设置driver的classpath,试了一下果然没有出现错误!

$  bin/spark-submit –master local

    –driver-class-path lib/mysql-connector-java-5.1.35.jar

    –class  spark.SparkToJDBC ./spark-test_2.10-1.0.jar

  其实,我们还可以在spark安装包的conf/spark-env.sh通过配置SPARK_CLASSPATH来设置driver的环境变量,如下:

(这里需要注意的是,在Spark1.3版本中,在Spark配置中按如下进行配置时,运行程序时会提示该配置方法在Spark1.0之后的版本已经过时,建议使用另外两个方法;其中一个就是上面讲到的方法。另外一个就是在配置文件中配置spark.executor.extraClassPath,具体配置格式会在试验之后进行补充)

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/iteblog/com/mysql-connector-java-5.1.35.jar

  这样也可以解决上面出现的异常。但是,我们不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上–driver-class-path参数,否则会出现以下异常:

查看源代码打印帮助

$  bin/spark-submit –master local

    –driver-class-path lib/mysql-connector-java-5.1.35.jar

    –class  spark.SparkToJDBC ./spark-test_2.10-1.0.jar

 

Spark assembly has been built with Hive, including Datanucleus jars on classpath

Exception in thread “main”org.apache.spark.SparkException:

    Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply

$7.apply(SparkConf.scala:339)

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply

$7.apply(SparkConf.scala:337)

    at scala.collection.immutable.List.foreach(List.scala:318)

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:337)

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:325)

    at scala.Option.foreach(Option.scala:236)

    at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:325)

    at org.apache.spark.SparkContext.(SparkContext.scala:197)

    at spark.SparkToJDBC$.main(SparkToJDBC.scala:41)

    at spark.SparkToJDBC.main(SparkToJDBC.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

    at java.lang.reflect.Method.invoke(Method.java:597)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$

deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  “这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。

  

  编程是编定程序的中文简称,就是让计算机代码解决某个问题,对某个计算体系规定一定的运算方式,使计算体系按照该计算方式运行,并最终得到相应结果的过程。

  为了使计算机能够理解人的意图,人类就必庆扮须将需解决的问题的思路、方法和手段通过计算机能够理解的形式告诉计算机,使得计算机能够根据人的指令一步一步去工作,完成某种特定的任务。这种人和计算体系之间交流的过程就是编程。

  在计算机系统中,一条机器指令规定了计算机系统的一个特定动作。

  一个系列的计算机在硬件设计制造时就用了若干指令规定了该系列计算樱裂机能够进行的基本操作,这些指令一起构成了该系列计算机的指令系统。在计算机应用的初期,程序员使用机器的指令系统来编写计算机应用程序,这种程序称为机脊差闭器语言程序。

  以上内容参考:

百度百科-编程

这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是纯镇针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,1.3及悄悔以前的版本 –jars 分发的jar在executor端是通过 Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。

spark读mysql数据只出来了字段没数据

文件庆亩丢失。spark读mysql数据只出来了字段没数据是文件丢失导致,需要重新卸载仿拍该软件,并誉大森重新下载安装即可。

关于spark 读取数据库方法的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。


数据运维技术 » Spark轻松读取数据库:优秀的方法分享 (spark 读取数据库方法)