sql >> Database >  >> NoSQL >> MongoDB

Zink Kafka Stream naar MongoDB met behulp van PySpark Structured Streaming

Ik heb een oplossing gevonden. Omdat ik de juiste Mongo-driver voor Structured Streaming niet kon vinden, heb ik aan een andere oplossing gewerkt. Nu gebruik ik de directe verbinding met mongoDb en gebruik ik "foreach(...)" in plaats van foreachbatch(. ..). Mijn code ziet er als volgt uit in het bestand testSpark.py:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. mapping in index maken in elasticsearch door mongodb-rivier heeft geen effect

  2. Mongodb-aggregatie op subdocument in array

  3. Hoe sessie delen tussen NodeJs en PHP met Redis?

  4. het MongoDB-cachesysteem begrijpen