Ik heb een oplossing gevonden. Omdat ik de juiste Mongo-driver voor Structured Streaming niet kon vinden, heb ik aan een andere oplossing gewerkt. Nu gebruik ik de directe verbinding met mongoDb en gebruik ik "foreach(...)" in plaats van foreachbatch(. ..). Mijn code ziet er als volgt uit in het bestand testSpark.py:
....
import pymongo
from pymongo import MongoClient
local_url = "mongodb://localhost:27017"
def write_machine_df_mongo(target_df):
cluster = MongoClient(local_url)
db = cluster["test_db"]
collection = db.test1
post = {
"machine_id": target_df.machine_id,
"proc_type": target_df.proc_type,
"sensor1_id": target_df.sensor1_id,
"sensor2_id": target_df.sensor2_id,
"time": target_df.time,
"sensor1_val": target_df.sensor1_val,
"sensor2_val": target_df.sensor2_val,
}
collection.insert_one(post)
machine_df.writeStream\
.outputMode("append")\
.foreach(write_machine_df_mongo)\
.start()