前段时间, 我必须从 mysql 表中读取数据, 对这些数据进行一些操作, 并将结果存储在磁盘上。

显而易见的选择是使用 spark, 因为我已经在其他东西上使用它, 它似乎超级容易实现。

这或多或少是我必须做的事情 (为了简单起见, 我删除了做操作的部分):

spark.read.format("jdbc").
option("url", "jdbc:mysql://dbhost/sbschhema").
option("dbtable", "mytable").
option("user", "myuser").
option("password", "mypassword").
load().write.parquet("/data/out")

看起来不错, 只是没有完全奏效。要么是超级慢, 要么是根据桌子的大小完全崩溃。

调整 spark 和集群属性有一点帮助, 但并没有解决问题。

由于我使用的是 aws emr, 因此尝试使用 sqoop是 emr 支持的应用程序的一部分是有意义的。

sqoop import --verbose --connect jdbc:mysql://dbhost/sbschhema --username myuser --table opportunity --password  mypassword --m 20 --as-parquetfile --target-dir /data/out

sqoop 几乎立刻就表现得更好, 你所需要做的就是根据数据的大小设置映射器的数量, 它的工作非常完美。

由于 spark 和 sqoop 都基于 hadoop 地图减少框架, 因此 spark 显然可以至少和 sqoop 一样好工作, 因此我只需要了解如何做到这一点。我决定仔细看看斯库普做了什么, 看看能不能用 spark 模仿。

通过打开 sqoop 的详细标志, 您可以获得更多详细信息。我发现, sqoop 正在将输入拆分到不同的映射器, 这是有意义的, 这毕竟是减少地图, spark 做同样的事情。但在这样做之前, sqoop 做了一些斯派克没有做的聪明的事情。

它首先获取主键 (除非您为他提供另一个键来拆分数据), 然后检查其最小值和最大值。然后, 它允许其每个映射器查询数据, 但键具有不同的边界, 以便在映射器之间平均拆分行。

例如, 如果键最大值为 100, 并且有5个映射器, 则第一个映射器的查询将如下所示:

SELECT * FROM mytable WHERE mykey >= 1 AND mykey <= 20;

第二个映射器的查询将如下所示:

SELECT * FROM mytable WHERE mykey >= 21 AND mykey <= 40;

等等。

这完全有道理。spark 无法正常工作, 因为它不知道如何在映射器之间拆分数据。

因此, 现在是用 spark 实现同样逻辑的时候了。这意味着我必须对我的代码执行这些操作, 才能使 spark 正常工作。

  1. 获取表的主键。

  2. 查找键的最小值和最大值。

  3. 使用这些值执行 spark。

这是我最后得到的代码:

def main(args: Array[String]){

// parsing input parameters ...

val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1

val spark = SparkSession

appname (“spark 读数 jdbc”). getorcreate () var df = spark.read.format (“jdbc”)。
选项 (“url”, s “${url} ${config (” 架构 “)}”)。
选项 (“驱动程序”、”com.mysql.jdbc.Driver”)。
选项 (“小写”, 最小值)。
选项 (“向上绑定”, 最大值)。
选项 (“数字”、数字分区)。
选项 (“分区列”, 主键)。
选项 (“dbtable”、配置 (“表”))。
选项 (“用户”, 用户)。
选项 (“密码”、密码). load ()///某些数据操作… df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)}

而且效果非常好。

言论:

  1. numPartitions 我为 spark 设置的只是一个值, 我发现根据行数给出了很好的结果。这可以更改, 因为数据的大小也会受到当然的列大小和数据类型的影响。

  2. 最后的重新分区操作是为了避免有小文件。

Comments are closed.