sql >> Database >  >> RDS >> PostgreSQL

Primaire sleutels met Apache Spark

Scala :

Als je alleen maar unieke nummers nodig hebt, kun je zipWithUniqueId . gebruiken en maak DataFrame opnieuw. Eerst wat import- en dummygegevens:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Schema uitpakken voor verder gebruik:

val schema = df.schema

ID-veld toevoegen:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

DataFrame maken:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Hetzelfde in Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Als u de voorkeur geeft aan een opeenvolgend nummer, kunt u zipWithUniqueId . vervangen met zipWithIndex maar het is wel wat duurder.

Direct met DataFrame API :

(universele Scala, Python, Java, R met vrijwel dezelfde syntaxis)

Eerder heb ik monotonicallyIncreasingId gemist functie die prima zou moeten werken zolang u geen opeenvolgende nummers nodig heeft:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Hoewel nuttig monotonicallyIncreasingId is niet-deterministisch. Niet alleen id's kunnen van uitvoering tot uitvoering verschillen, maar kunnen zonder extra trucs niet worden gebruikt om rijen te identificeren wanneer daaropvolgende bewerkingen filters bevatten.

Opmerking :

Het is ook mogelijk om rowNumber . te gebruiken vensterfunctie:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Helaas:

WAARSCHUW Venster:geen partitie gedefinieerd voor gebruik met venster! Als u alle gegevens naar één partitie verplaatst, kan dit ernstige prestatievermindering veroorzaken.

Dus tenzij je een natuurlijke manier hebt om je gegevens te partitioneren en ervoor te zorgen dat uniekheid op dit moment niet erg handig is.



  1. Hoe Access 2019 ScreenTips te gebruiken

  2. PostgreSQL-anonimisering op aanvraag

  3. Aandacht besteden aan schattingen

  4. SQLite-queryresultaten uitvoeren als een INSERT-instructie