sql >> Database >  >> NoSQL >> HBase

Spark-on-HBase:op DataFrame gebaseerde HBase-connector

Deze blogpost is gepubliceerd op Hortonworks.com vóór de fusie met Cloudera. Sommige links, bronnen of verwijzingen zijn mogelijk niet langer nauwkeurig.

Met trots kondigen we de technische preview aan van Spark-HBase Connector, ontwikkeld door Hortonworks in samenwerking met Bloomberg.

De Spark-HBase-connector maakt gebruik van Data Source API (SPARK-3247) die is geïntroduceerd in Spark-1.2.0. Het overbrugt de kloof tussen de eenvoudige HBase Key Value-opslag en complexe relationele SQL-query's en stelt gebruikers in staat om complexe gegevensanalyses uit te voeren bovenop HBase met behulp van Spark. Een HBase DataFrame is een standaard Spark DataFrame en kan communiceren met andere gegevensbronnen zoals Hive, ORC, Parquet, JSON, enz.

Achtergrond

Er zijn verschillende open source Spark HBase-connectoren beschikbaar als Spark-pakketten, als onafhankelijke projecten of in HBase-trunk.

Spark is verhuisd naar de Dataset/DataFrame-API's, die ingebouwde optimalisatie van queryplannen bieden. Nu geven eindgebruikers er de voorkeur aan een op DataFrames/Datasets gebaseerde interface te gebruiken.

De HBase-connector in de HBase-trunk heeft een rijke ondersteuning op RDD-niveau, b.v. BulkPut, enz., maar de DataFrame-ondersteuning is niet zo uitgebreid. De HBase-trunkconnector is afhankelijk van de standaard HadoopRDD met HBase ingebouwd TableInputFormat heeft enkele prestatiebeperkingen. Bovendien kan BulkGet uitgevoerd in de driver een single point of failure zijn.

Er zijn enkele andere alternatieve implementaties. Neem Spark-SQL-on-HBase als voorbeeld. Het past zeer geavanceerde aangepaste optimalisatietechnieken toe door zijn eigen query-optimalisatieplan in te bedden in de standaard Spark Catalyst-engine, verzendt de RDD naar HBase en voert gecompliceerde taken uit, zoals gedeeltelijke aggregatie, in de HBase-coprocessor. Deze aanpak is in staat om hoge prestaties te bereiken, maar het is moeilijk te handhaven vanwege de complexiteit en de snelle evolutie van Spark. Ook het toestaan ​​dat willekeurige code in een coprocessor wordt uitgevoerd, kan beveiligingsrisico's met zich meebrengen.

De Spark-on-HBase Connector (SHC) is ontwikkeld om deze potentiële knelpunten en zwakke punten te verhelpen. Het implementeert de standaard Spark Datasource-API en maakt gebruik van de Spark Catalyst-engine voor query-optimalisatie. Tegelijkertijd wordt de RDD helemaal opnieuw opgebouwd in plaats van TableInputFormat . te gebruiken om hoge prestaties te bereiken. Met deze op maat gemaakte RDD kunnen alle kritische technieken worden toegepast en volledig geïmplementeerd, zoals partitie pruning, column pruning, predicaat pushdown en data locality. Het ontwerp maakt het onderhoud zeer eenvoudig, terwijl een goede balans tussen prestatie en eenvoud wordt bereikt.

Architectuur

We gaan ervan uit dat Spark en HBase in hetzelfde cluster worden geïmplementeerd en dat Spark-uitvoerders zich op dezelfde locatie bevinden als regioservers, zoals geïllustreerd in de onderstaande afbeelding.

Afbeelding 1. Spark-on-HBase-connectorarchitectuur

Op een hoog niveau behandelt de connector zowel Scannen als ophalen op een vergelijkbare manier, en beide acties worden uitgevoerd in de uitvoerders. De driver verwerkt de query, aggregeert scans/gets op basis van de metadata van de regio en genereert taken per regio. De taken worden verzonden naar de voorkeursuitvoerders die zich samen met de regioserver bevinden, en worden parallel uitgevoerd in de uitvoerders om een ​​betere gegevenslokaliteit en gelijktijdigheid te bereiken. Als een regio niet over de vereiste gegevens beschikt, krijgt die regioserver geen taak toegewezen. Een taak kan bestaan ​​uit meerdere Scans en BulkGets, en de gegevensverzoeken door een taak worden opgehaald van slechts één regioserver, en deze regioserver zal ook de locatievoorkeur voor de taak zijn. Merk op dat de bestuurder niet betrokken is bij de echte taakuitvoering, behalve bij het plannen van taken. Dit voorkomt dat de bestuurder de bottleneck wordt.

Tabelcatalogus

Om de HBase-tabel als relationele tabel in Spark te brengen, definiëren we een toewijzing tussen HBase- en Spark-tabellen, genaamd Table Catalog. Er zijn twee kritieke delen van deze catalogus. Een daarvan is de rowkey-definitie en de andere is de toewijzing tussen tabelkolom in Spark en de kolomfamilie en kolomkwalificatie in HBase. Raadpleeg het gedeelte Gebruik voor details.

Native Avro-ondersteuning

De connector ondersteunt native de Avro-indeling, omdat het heel gebruikelijk is om gestructureerde gegevens als bytearray in HBase te bewaren. De gebruiker kan de Avro-record rechtstreeks in HBase bewaren. Intern wordt het Avro-schema automatisch geconverteerd naar een native Spark Catalyst-gegevenstype. Houd er rekening mee dat beide sleutel/waarde-onderdelen in een HBase-tabel kunnen worden gedefinieerd in Avro-indeling. Raadpleeg de voorbeelden/testcases in de repo voor het exacte gebruik.

Predikaat Pushdown

De connector haalt alleen vereiste kolommen op van de regioserver om de netwerkoverhead te verminderen en redundante verwerking in de Spark Catalyst-engine te voorkomen. Bestaande standaard HBase-filters worden gebruikt om predikaat-push-down uit te voeren zonder gebruik te maken van de coprocessor-capaciteit. Omdat HBase het gegevenstype niet kent, behalve byte-array, en de volgorde-inconsistentie tussen Java-primitieve typen en byte-array, moeten we de filtervoorwaarde vooraf verwerken voordat het filter in de scanbewerking wordt ingesteld om gegevensverlies te voorkomen. Binnen de regioserver worden records die niet overeenkomen met de queryvoorwaarde eruit gefilterd.

Partitie snoeien

Door de rijsleutel uit de predikaten te extraheren, splitsen we de Scan/BulkGet op in meerdere niet-overlappende bereiken, alleen de regioservers met de gevraagde gegevens zullen Scan/BulkGet uitvoeren. Momenteel wordt het snoeien van partities uitgevoerd op de eerste dimensie van de rijsleutels. Als een rijsleutel bijvoorbeeld "key1:key2:key3" is, wordt het opschonen van partities alleen gebaseerd op "key1". Merk op dat de WHERE-voorwaarden zorgvuldig moeten worden gedefinieerd. Anders wordt het snoeien van partities mogelijk niet van kracht. Bijvoorbeeld, WHERE rowkey1> "abc" OR column ="xyz" (waar rowkey1 de eerste dimensie van de rowkey is en column een normale hbase-kolom) resulteert in een volledige scan, omdat we alle bereiken moeten bestrijken omdat van de OF logica.

Gegevenslocatie

Wanneer een Spark-uitvoerder zich op dezelfde locatie bevindt met HBase-regioservers, wordt de gegevenslocatie bereikt door de locatie van de regioserver te identificeren en wordt er alles aan gedaan om de taak samen met de regioserver te plaatsen. Elke uitvoerder voert Scan/BulkGet uit van de kant van de gegevens die zich op dezelfde host bevinden.

Scannen en bulksgewijs ophalen

Deze twee operators zijn zichtbaar voor gebruikers door WHERE CLAUSE op te geven, bijvoorbeeld WHERE column> x en column voor scan en WAAR kolom =x vergeten. De bewerkingen worden uitgevoerd in de uitvoerders en de bestuurder construeert alleen deze bewerkingen. Intern worden ze geconverteerd naar scannen en/of get, en Iterator[Row] wordt teruggestuurd naar de katalysatormotor voor verwerking van de bovenste laag.

Gebruik

Het volgende illustreert de basisprocedure voor het gebruik van de connector. Raadpleeg de voorbeelden in de repository voor meer details en geavanceerde use-cases, zoals ondersteuning voor Avro en samengestelde sleutels.

1) Definieer de catalogus voor de schematoewijzing:

[code language="scala"]def catalog = s"""{
         |"table":{"namespace":"default", "name":"table1"},
         |"rowkey":"key",
         |"columns":{
           |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
           |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
           |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
           |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
           |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
           |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
           |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
           |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
           |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
         |}
       |}""".stripMargin
[/code]

2) Bereid de gegevens voor en vul de HBase-tabel:
case class HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

object HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 tot 255).map { i =>  HBaseRecord(i, "extra")}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Laad het DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(catalog)

4) Taalgeïntegreerde zoekopdracht:
val s =df.filter((($”col0″ <=“row050″ &&$”col0”> “row040”) ||
 $”col0″ ===“row005” ||
 $”col0″ ===“row020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“row005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .show

5) SQL-query:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Spark-pakket configureren

Gebruikers kunnen de Spark-on-HBase-connector gebruiken als een standaard Spark-pakket. Om het pakket in uw Spark-toepassing op te nemen, gebruikt u:

spark-shell, pyspark of spark-submit

> $SPARK_HOME/bin/spark-shell –pakketten zhzhan:shc:0.0.11-1.6.1-s_2.10

Gebruikers kunnen het pakket ook als afhankelijkheid in uw SBT-bestand opnemen. Het formaat is de spark-package-name:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Uitgevoerd in beveiligde cluster

Voor uitvoering in een cluster met Kerberos-ondersteuning moet de gebruiker HBase-gerelateerde jars opnemen in het klassenpad, aangezien het ophalen en vernieuwen van het HBase-token wordt gedaan door Spark en onafhankelijk is van de connector. Met andere woorden, de gebruiker moet de omgeving op de normale manier initiëren, hetzij via kinit, hetzij door principal/keytab op te geven. De volgende voorbeelden laten zien hoe u in een beveiligd cluster kunt werken met zowel de garenclient- als de garenclustermodus. Houd er rekening mee dat SPARK_CLASSPATH voor beide modi moet worden ingesteld, en de voorbeeldkruik is slechts een tijdelijke aanduiding voor Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Stel dat hrt_qa een headless account is, dan kan de gebruiker het volgende commando gebruiken voor kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master garen-client –pakketten zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-geheugen 512m –executor-geheugen 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master garen-cluster –bestanden /etc/hbase/conf/hbase -site.xml –pakketten zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-geheugen 512m –executor-geheugen 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Alles samenvoegen

We hebben zojuist een kort overzicht gegeven van hoe HBase Spark ondersteunt op DataFrame-niveau. Met de DataFrame API kunnen Spark-applicaties net zo gemakkelijk werken met gegevens die zijn opgeslagen in de HBase-tabel als gegevens die zijn opgeslagen in andere gegevensbronnen. Met deze nieuwe functie kunnen gegevens in HBase-tabellen eenvoudig worden gebruikt door Spark-applicaties en andere interactieve tools, b.v. gebruikers kunnen een complexe SQL-query uitvoeren bovenop een HBase-tabel in Spark, een tabel-join uitvoeren op Dataframe of integreren met Spark Streaming om een ​​ingewikkelder systeem te implementeren.

Wat nu?

Momenteel wordt de connector gehost in de opslagplaats van Hortonworks en gepubliceerd als een Spark-pakket. Het wordt momenteel gemigreerd naar Apache HBase-trunk. Tijdens de migratie hebben we enkele kritieke bugs in de HBase-trunk geïdentificeerd en deze zullen samen met de samenvoeging worden opgelost. Het gemeenschapswerk wordt gevolgd door de overkoepelende HBase JIRA HBASE-14789, inclusief HBASE-14795 en HBASE-14796 om de onderliggende computerarchitectuur voor Scan en BulkGet te optimaliseren, HBASE-14801 om een ​​JSON-gebruikersinterface te bieden voor gebruiksgemak, HBASE-15336 voor het DataFrame-schrijfpad, HBASE-15334 voor Avro-ondersteuning, HBASE-15333  voor ondersteuning van Java-primitieve typen, zoals short, int, long, float en double, enz., HBASE-15335 voor ondersteuning van samengestelde rijsleutel en HBASE-15572 om optionele tijdstempelsemantiek toe te voegen. We kijken ernaar uit om een ​​toekomstige versie van de connector te produceren die het nog gemakkelijker maakt om met de connector te werken.

Bevestiging

We willen Hamel Kothari, Sudarshan Kadambi en het Bloomberg-team bedanken voor het begeleiden van dit werk en het helpen valideren van dit werk. We willen ook de HBase-community bedanken voor het geven van feedback en het verbeteren hiervan. Ten slotte heeft dit werk gebruik gemaakt van de lessen uit eerdere Spark HBase-integraties en we willen hun ontwikkelaars bedanken voor het effenen van de weg.

Referentie:

SHC:https://github.com/hortonworks/shc-release

Spark-pakket:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Afbeeldingen opslaan in een MongoDB-database

  2. Fatale fout:Klasse 'MongoDB\Driver\Manager' niet gevonden

  3. Batch invoegen/update met Mongoid?

  4. Afhandeling van tijdelijke netwerkfouten met StackExchange.Redis