DataFrameLoadedFromLeftDatabase = 使用DataFrameReader
第一个数据库(例如LeftDB)加载的数据。
我需要
- 迭代此数据框中的每一行,
- 连接到第二个数据库,例如RightDB,
- 从RightDB中找到一些匹配的记录,
- 并做一些业务逻辑
这是一个迭代操作,因此它不能简单地通过LeftDB和RightDB之间的 JOIN来查找一些新字段、创建新的 Dataframe targetDF并使用 ThirdDB 写入第三个数据库DataframeWriter
我知道我可以使用
val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
partition => {
val rightDBconnection = new DbConnection // establish a connection to RightDB
val result = partition.map(record => {
readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
}).toList
rightDBconnection.close()
result.iterator
}
).toDF()
targetDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "table3")
.option("user", "username")
.option("password", "password")
.save()
- 我想知道apache Spark是否适合这些类型的聊天数据处理应用程序
- 我想知道在这种方法中通过 RightDB 中的每条记录进行交互是否会太麻烦
- 我期待收到一些改进此设计以利用 SPARK 功能的建议。我还想确保处理不会因性能原因导致过多的洗牌操作
参考:相关 SO 帖子
在这种情况下我们总是更喜欢spark.sql
。基本上定义两个不同的 DF 并根据查询连接它们,然后您可以应用您的业务逻辑。
例如;
import org.apache.spark.sql.{DataFrame, SparkSession}
// Add your columns here
case class MyResult(ID: String, NAME: String)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Join Tables and Add Prefix to ID Column")
.config("spark.master", "local[*]")
.getOrCreate()
// Read the first table from DB1
val firstTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB1")
.option("dbtable", "FIRST_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
firstTable.createOrReplaceTempView("firstTable")
// Read the second table from DB2
val secondTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB2")
.option("dbtable", "SECOND_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
secondTable.createOrReplaceTempView("secondTable")
// Apply you filtering here
val result: DataFrame = spark.sql("SELECT f.*, s.* FROM firstTable as f left join secondTable as s on f.ID = s.ID")
val finalData = result.as[MyResult]
.map{record=>
// Do your business logic
businessLogic(record)
}
// Write the result to the third table in DB3
finalData.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB3")
.option("dbtable", "THIRD_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.save()
如果您的表很大,您可以执行查询并直接读取其结果。如果这样做,您可以通过按日期等过滤来减少输入大小:
val myQuery = """
(select * from table
where // do your filetering here
) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:postgresql://localhost/DB").
.option("user", "your_username")
.option("password", "your_password")
.option("dbtable", myQuery)
.load()
除此之外,直接通过spark来记录具体操作是比较困难的。您必须将客户端连接等维护为自定义逻辑。Spark 旨在读取/写入大量数据。它为此目的创建管道。简单的操作将是它的开销。始终在函数中执行 API 调用(或单个数据库调用)map
。如果您在那里使用缓存层,那么在性能方面可能会节省很多时间。始终尝试connection pool
在自定义数据库调用中使用 a,否则 Spark 将尝试使用不同的连接执行所有映射操作,这可能会对数据库造成压力并导致连接失败。
可以想到很多改进,但总的来说,所有这些改进都将取决于将数据预先分布在 HDFS、HBase、Hive 数据库、MongoDB 等中。
我的意思是:您正在考虑“具有分布式处理思维的关系数据”...我认为我们已经超越了 XD
可以readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList在RightDB中批量查找吗?类似的东西select * from rightdb where key IN ()。除此之外,Spark 看起来会相当满意。
“迭代操作”仍然可以用于连接和其他操作。您能详细说明一下吗?
@vdep 通过“迭代”,例如对于 leftDF 中的每个 recordid,我需要循环遍历 rightDF 中的匹配记录并查找 /aggregate/consolidate 一些值(可以认为类似于我们在经典数据库存储过程中编写的值),我认为不是可以轻松表达为 DF 声明式 api 或 sql 语句
使用交叉连接可以轻松实现上述内容。如果您能描述您的确切需求,您可能会得到更好的答案。
@vdep 交叉连接可能会导致性能问题。请注意,在我的问题中,我将左侧数据加载到 DF 中,但右侧数据未完全加载到 rightDF 中。相反,在左侧 DF 中进行迭代并直接从右侧数据库中查找匹配并进行处理