Scala :
Als je alleen maar unieke nummers nodig hebt, kun je zipWithUniqueId
. gebruiken en maak DataFrame opnieuw. Eerst wat import- en dummygegevens:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Schema uitpakken voor verder gebruik:
val schema = df.schema
ID-veld toevoegen:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
DataFrame maken:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Hetzelfde in Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Als u de voorkeur geeft aan een opeenvolgend nummer, kunt u zipWithUniqueId
. vervangen met zipWithIndex
maar het is wel wat duurder.
Direct met DataFrame
API :
(universele Scala, Python, Java, R met vrijwel dezelfde syntaxis)
Eerder heb ik monotonicallyIncreasingId
gemist functie die prima zou moeten werken zolang u geen opeenvolgende nummers nodig heeft:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Hoewel nuttig monotonicallyIncreasingId
is niet-deterministisch. Niet alleen id's kunnen van uitvoering tot uitvoering verschillen, maar kunnen zonder extra trucs niet worden gebruikt om rijen te identificeren wanneer daaropvolgende bewerkingen filters bevatten.
Opmerking :
Het is ook mogelijk om rowNumber
. te gebruiken vensterfunctie:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Helaas:
WAARSCHUW Venster:geen partitie gedefinieerd voor gebruik met venster! Als u alle gegevens naar één partitie verplaatst, kan dit ernstige prestatievermindering veroorzaken.
Dus tenzij je een natuurlijke manier hebt om je gegevens te partitioneren en ervoor te zorgen dat uniekheid op dit moment niet erg handig is.