PYSpark没有打印Kafka流中的任何数据,也没有失败也没、数据、PYSpark、Kafka

2023-09-03 09:42:33 作者:跌入无尽深渊

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。

代码:

from pyspark.sql.functions import col

kafkaServer="<server>"

editsDF=(spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",kafkaServer)
        .option("sasl.username","<username>")
        .option("sasl.password","<password>")
        .option("group.id", "%s-consumer" % "<username>")
        .option("session.timeout.ms", 6000)
        .option("default.topic.config", {"auto.offset.reset": "smallest"})
        .option('security.protocol', 'SASL_SSL')
        .option('sasl.mechanisms', 'SCRAM-SHA-256')
        .option("subscribe","<topic>")
        .option("startingOffsets","latest")
        .option("maxOffsetsPerTrigger",1000)
        .load()
        .select(col("value").cast("STRING"))
        )


query = editsDF 
    .writeStream 
    .outputMode("append") 
    .format("console") 
    .start()
kafka maven 依赖 pyspark kafka 环境搭建

The status in databricks while running the code:

如果我遗漏了什么,请告诉我。提前谢谢。

注意:我已经确保了Kafka服务器能够生成消息,并且能够在一个python程序中使用它。但不是在火花源里工作。此外,数据大小非常小,因此不会出现性能问题。

编辑:这个建议的函数Display()仍然不会为这个有问题的Kafka服务器打印任何数据,但是当我尝试完全使用另一个Kafka服务器时,它工作得很好。我认为这是因为这台Kafka服务器(有问题)使用的是SASL-SCRAM身份验证,所以可能需要进行一些不同的配置。请提供任何详细信息/链接/样本,如果您有从派斯帕克连接SASL Kafka。谢谢!

推荐答案

当您使用console接收器时,它会将数据打印到标准输出(请参阅Spark docs),因此您需要检查群集用户界面中的驱动程序日志以获取生成的数据。

要查看Databricks笔记本本身中的数据,您需要使用display函数,该函数支持显示结构化流中的数据(请参阅Databricks docs)。因此,不是

query = editsDF 
    .writeStream 
    .outputMode("append") 
    .format("console") 
    .start()

您只需写:

display(editsDF)

您还可以将其他选项传递给此函数,如checkpointLocationtrigger等。-检查我上面链接的文档。