sql >> Database >  >> NoSQL >> Redis

Redis op Spark:Taak niet serialiseerbaar

In Spark zijn de functies op RDD s (zoals map hier) worden geserialiseerd en naar de uitvoerders gestuurd voor verwerking. Dit houdt in dat alle elementen in deze bewerkingen serialiseerbaar moeten zijn.

De Redis-verbinding hier is niet serialiseerbaar omdat het TCP-verbindingen opent naar de doel-DB die zijn gebonden aan de machine waarop het is gemaakt.

De oplossing is om die verbindingen op de uitvoerders te maken, in de lokale uitvoeringscontext. Er zijn maar weinig manieren om dat te doen. Twee die me te binnen schieten zijn:

  • rdd.mapPartitions :hiermee kunt u een hele partitie in één keer verwerken en daardoor de kosten voor het maken van verbindingen afschrijven)
  • Singleton-verbindingsmanagers:maak de verbinding één keer per uitvoerder

mapPartitions is gemakkelijker omdat het alleen een kleine wijziging in de programmastructuur vereist:

val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Een singleton-verbindingsmanager kan worden gemodelleerd met een object dat een luie verwijzing naar een verbinding bevat (opmerking:een veranderlijke ref werkt ook).

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Dit object kan vervolgens worden gebruikt om 1 verbinding per worker-JVM te instantiëren en wordt gebruikt als een Serializable object in een operatie-afsluiting.

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

Het voordeel van het gebruik van het singleton-object is minder overhead, aangezien verbindingen slechts één keer worden gemaakt door JVM (in tegenstelling tot 1 per RDD-partitie)

Er zijn ook enkele nadelen:

  • opschonen van verbindingen is lastig (shutdown hook/timers)
  • men moet de thread-veiligheid van gedeelde bronnen garanderen

(*) code verstrekt ter illustratie. Niet gecompileerd of getest.



  1. MongoDB $add

  2. Is het mogelijk om één resultaat in totaal te krijgen?

  3. Hoe Spring te gebruiken om verbinding te maken met MongoDB waarvoor authenticatie vereist is?

  4. Groepeer records per maand en tel ze - Mongoose, nodeJs, mongoDb