Inleiding
Python wordt veelvuldig gebruikt door data-ingenieurs en datawetenschappers om allerlei problemen op te lossen, van ETL/ELT-pijplijnen tot het bouwen van machine learning-modellen. Apache HBase is een effectief gegevensopslagsysteem voor veel workflows, maar het kan lastig zijn om deze gegevens specifiek via Python te openen. Voor dataprofessionals die gebruik willen maken van gegevens die zijn opgeslagen in HBase, kan het recente upstream-project 'hbase-connectors' worden gebruikt met PySpark voor basisbewerkingen.
In deze blogserie leggen we uit hoe u PySpark en HBase samen kunt configureren voor basis Spark-gebruik en voor taken die worden onderhouden in CDSW. Voor degenen die niet bekend zijn met CDSW:het is een veilig, selfservice enterprise data science-platform waarmee datawetenschappers hun eigen analysepijplijnen kunnen beheren, waardoor machine learning-projecten van verkenning tot productie worden versneld. Ga voor meer informatie over CDSW naar de productpagina van Cloudera Data Science Workbench.
In dit bericht worden verschillende bewerkingen uitgelegd en gedemonstreerd, samen met voorbeelduitvoer. Voor de context:alle voorbeeldbewerkingen in deze specifieke blogpost worden uitgevoerd met een CDSW-implementatie.
Vereisten:
- Heb een CDP-cluster met HBase en Spark
- Als u voorbeelden via CDSW gaat volgen, moet u deze installeren – Cloudera Data Science Workbench installeren
- Python 3 wordt op elk knooppunt op hetzelfde pad geïnstalleerd
Configuratie:
Eerst moeten HBase en Spark samen worden geconfigureerd om Spark SQL-query's correct te laten werken. Om dit te doen zijn er twee delen:configureer eerst de HBase Region Servers via Cloudera Manager; en ten tweede, zorg ervoor dat de Spark-runtime HBase-bindingen heeft. Een opmerking om in gedachten te houden is echter dat Cloudera Manager al enkele configuratie- en omgevingsvariabelen instelt om Spark automatisch naar HBase voor u te wijzen. Desalniettemin is de eerste stap van het configureren van Spark SQL-query's gebruikelijk bij alle typen implementatie op CDP-clusters, maar de tweede is enigszins anders, afhankelijk van het type implementatie.
HBase-regioservers configureren
- Ga naar Cloudera Manager en selecteer de HBase-service.
- Zoeken naar “regionserver-omgeving”
- Voeg een nieuwe omgevingsvariabele toe met behulp van het RegionServer Environment Advanced Configuration Snippet (Safety Valve):
- Sleutel:HBASE_CLASSPATH
- Waarde:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Zorg ervoor dat u de juiste versienummers gebruikt.
- Herstart regioservers.
Nadat u de bovenstaande stappen heeft gevolgd, volgt u de onderstaande stappen, afhankelijk van of u een CDSW- of een niet-CDSW-implementatie wilt.
HBase-bindingen toevoegen aan Spark-runtime in niet-CDSW-implementaties
Om de shell te implementeren of spark-submit correct te gebruiken, gebruikt u de volgende opdrachten om ervoor te zorgen dat spark de juiste HBase-bindingen heeft.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. pot
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar
HBase-bindingen toevoegen aan Spark-runtime in CDSW-implementaties
Om CDSW te configureren met HBase en PySpark, zijn er een paar stappen die u moet nemen.
1) Zorg ervoor dat Python 3 op elk clusterknooppunt is geïnstalleerd en noteer het pad ernaartoe
2) Maak een nieuw project in CDSW en gebruik een PySpark-sjabloon
3) Open het project, ga naar Instellingen -> Engine -> Omgevingsvariabelen.
4) Instellen PYSPARK3_DRIVER_PYTHON en PYSPARK3_PYTHON naar het pad waar Python is geïnstalleerd op uw clusterknooppunten (pad genoteerd in stap 1).
Hieronder ziet u een voorbeeld van hoe het eruit moet zien.
5) Ga in uw project naar Bestanden -> spark-defaults.conf en open het in de werkbank
6) Kopieer en plak de onderstaande regel in dat bestand en zorg ervoor dat het is opgeslagen voordat u een nieuwe sessie start.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
Op dit moment is CDSW nu geconfigureerd om PySpark-taken op HBase uit te voeren! De rest van deze blogpost verwijst naar enkele voorbeeldbewerkingen op een CDSW-implementatie.
Voorbeeldbewerkingen
Plaatsbewerkingen
Er zijn twee manieren om rijen in HBase in te voegen en bij te werken. De eerste en meest aanbevolen methode is om een catalogus te bouwen. Dit is een schema dat de kolommen van een HBase-tabel toewijst aan een PySpark-dataframe terwijl de tabelnaam en naamruimte worden gespecificeerd. Het bouwen van dit door de gebruiker gedefinieerde JSON-formaat heeft de meeste voorkeur, omdat het ook met andere bewerkingen kan worden gebruikt. Raadpleeg deze documentatie http://hbase.apache.org/book.html#_define_catalog voor meer informatie over catalogi. De tweede methode is het gebruik van een specifieke toewijzingsparameter genaamd "hbase.columns.mapping", waarvoor slechts een reeks sleutel-waardeparen nodig is.
- Catalogi gebruiken
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Controleer of een nieuwe tabel met de naam "tblEmployee" is gemaakt in HBase door simpelweg de HBase-shell te openen en de volgende opdracht uit te voeren:
scan ‘tblEmployee’, {‘LIMIT’ => 2}
Door catalogi te gebruiken, kunt u ook gemakkelijk HBase-tabellen laden. Dit wordt in een volgende aflevering besproken.
- Hbase.columns.mapping gebruiken
Tijdens het schrijven van het PySpark-dataframe kan een optie met de naam "hbase.columns.mapping" worden toegevoegd om een tekenreeks op te nemen die de kolommen correct toewijst. Met deze optie kunt u alleen rijen invoegen in bestaande tabellen.
Laten we in de HBase-shell eerst een tabel maken die 'tblEmployee2', 'personal' maakt
Laten we nu in PySpark 2 rijen invoegen met "hbase.columns.mapping"
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Nogmaals, controleer gewoon of een nieuwe tabel met de naam "tblEmployee2" deze nieuwe rijen heeft.
scan ‘tblEmployee2’, {‘LIMIT’ => 2}
Dat voltooit onze voorbeelden voor het invoegen van rijen via PySpark in HBase-tabellen. In de volgende aflevering bespreek ik Get and Scan Operations, PySpark SQL en enkele probleemoplossing. Tot die tijd moet u een CDP-cluster krijgen en deze voorbeelden doornemen.