DataFrameLoadedFromLeftDatabase = 使用DataFrameReader第一个数据库(例如LeftDB)加载的数据。

我需要

  • 迭代此数据框中的每一行,
  • 连接到第二个数据库,例如RightDB
  • 从RightDB中找到一些匹配的记录,
  • 并做一些业务逻辑

这是一个迭代操作,因此它不能简单地通过LeftDBRightDB之间的 JOIN来查找一些新字段、创建新的 Dataframe targetDF并使用 ThirdDB 写入第三个数据库DataframeWriter

我知道我可以使用

val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
  partition => {
    val rightDBconnection = new DbConnection // establish a connection to RightDB 
    val result = partition.map(record => {
    readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
  }).toList
    rightDBconnection.close()
    result.iterator
  }
).toDF()
targetDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "table3")
  .option("user", "username")
  .option("password", "password")
  .save()
  1. 我想知道apache Spark是否适合这些类型的聊天数据处理应用程序
  2. 我想知道在这种方法中通过 RightDB 中的每条记录进行交互是否会太麻烦
  3. 我期待收到一些改进此设计以利用 SPARK 功能的建议。我还想确保处理不会因性能原因导致过多的洗牌操作

参考:相关 SO 帖子


在这种情况下我们总是更喜欢spark.sql基本上定义两个不同的 DF 并根据查询连接它们,然后您可以应用您的业务逻辑。

例如;

import org.apache.spark.sql.{DataFrame, SparkSession}

// Add your columns here
case class MyResult(ID: String, NAME: String)

// Create a SparkSession
val spark = SparkSession.builder()
  .appName("Join Tables and Add Prefix to ID Column")
  .config("spark.master", "local[*]")
  .getOrCreate()

// Read the first table from DB1
val firstTable: DataFrame = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB1")
  .option("dbtable", "FIRST_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .load()

firstTable.createOrReplaceTempView("firstTable")

// Read the second table from DB2
val secondTable: DataFrame = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB2")
  .option("dbtable", "SECOND_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .load()

secondTable.createOrReplaceTempView("secondTable")

// Apply you filtering here
val result: DataFrame = spark.sql("SELECT f.*, s.* FROM firstTable as f left join secondTable as s on f.ID = s.ID")

val finalData = result.as[MyResult]
  .map{record=>
    // Do your business logic
    businessLogic(record)
  }

// Write the result to the third table in DB3
finalData.write
  .format("jdbc")
  .option("url", "jdbc:postgresql://localhost/DB3")
  .option("dbtable", "THIRD_TABLE")
  .option("user", "your_username")
  .option("password", "your_password")
  .save()

如果您的表很大,您可以执行查询并直接读取其结果。如果这样做,您可以通过按日期等过滤来减少输入大小:

val myQuery = """
  (select * from table
    where // do your filetering here
  ) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:postgresql://localhost/DB").
  .option("user", "your_username")
  .option("password", "your_password")
  .option("dbtable", myQuery)
  .load()

除此之外,直接通过spark来记录具体操作是比较困难的。您必须将客户端连接等维护为自定义逻辑。Spark 旨在读取/写入大量数据。它为此目的创建管道。简单的操作将是它的开销。始终在函数中执行 API 调用(或单个数据库调用)map如果您在那里使用缓存层,那么在性能方面可能会节省很多时间。始终尝试connection pool在自定义数据库调用中使用 a,否则 Spark 将尝试使用不同的连接执行所有映射操作,这可能会对数据库造成压力并导致连接失败。


可以想到很多改进,但总的来说,所有这些改进都将取决于将数据预先分布在 HDFS、HBase、Hive 数据库、MongoDB 等中。

我的意思是:您正在考虑“具有分布式处理思维的关系数据”...我认为我们已经超越了 XD


可以readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList在RightDB中批量查找吗?类似的东西select * from rightdb where key IN ()。除此之外,Spark 看起来会相当满意。

“迭代操作”仍然可以用于连接和其他操作。您能详细说明一下吗?

@vdep 通过“迭代”,例如对于 leftDF 中的每个 recordid,我需要循环遍历 rightDF 中的匹配记录并查找 /aggregate/consolidate 一些值(可以认为类似于我们在经典数据库存储过程中编写的值),我认为不是可以轻松表达为 DF 声明式 api 或 sql 语句

使用交叉连接可以轻松实现上述内容。如果您能描述您的确切需求,您可能会得到更好的答案。

@vdep 交叉连接可能会导致性能问题。请注意,在我的问题中,我将左侧数据加载到 DF 中,但右侧数据未完全加载到 rightDF 中。相反,在左侧 DF 中进行迭代并直接从右侧数据库中查找匹配并进行处理