Bij Apache HBase draait alles om het geven van willekeurige, realtime lees-/schrijftoegang tot uw Big Data, maar hoe krijgt u die gegevens in de eerste plaats efficiënt in HBase? Intuïtief zal een nieuwe gebruiker proberen dat te doen via de client-API's of door een MapReduce-taak te gebruiken met TableOutputFormat, maar die benaderingen zijn problematisch, zoals u hieronder zult leren. In plaats daarvan is de HBase-functie voor bulksgewijs laden veel gemakkelijker te gebruiken en kan dezelfde hoeveelheid gegevens sneller worden ingevoegd.
Deze blogpost introduceert de basisconcepten van de functie voor bulkladen, presenteert twee gebruiksscenario's en stelt twee voorbeelden voor.
Overzicht van bulkladen
Als u een van deze symptomen heeft, is bulklading waarschijnlijk de juiste keuze voor u:
- Je moest je MemStores aanpassen om het meeste geheugen te gebruiken.
- Je moest ofwel grotere WAL's gebruiken of ze volledig omzeilen.
- Uw wachtrijen voor verdichting en spoeling lopen in de honderden.
- Uw GC loopt uit de hand omdat uw tussenvoegsels in de MB's lopen.
- Je latentie gaat uit je SLA wanneer je gegevens importeert.
De meeste van die symptomen worden gewoonlijk 'groeipijnen' genoemd. Het gebruik van bulklading kan u helpen ze te vermijden.
In HBase-spreken is bulkladen het proces van het voorbereiden en laden van HFiles (het eigen bestandsformaat van HBase) rechtstreeks in de RegionServers, waardoor het schrijfpad wordt omzeild en deze problemen volledig worden vermeden. Dit proces is vergelijkbaar met ETL en ziet er als volgt uit:
- Inspecteert de tafel om een partitie voor de totale bestelling te configureren
- Upload het partitiebestand naar het cluster en voegt het toe aan de DistributedCache
- Stelt het aantal verminderingstaken in om overeen te komen met het huidige aantal regio's
- Stelt de uitvoersleutel/waardeklasse in om overeen te komen met de vereisten van HFileOutputFormat
- Stelt de reducer in om de juiste sortering uit te voeren (ofwel KeyValueSortReducer of PutSortReducer)
In dit stadium wordt er één HFile gemaakt per regio in de uitvoermap. Houd er rekening mee dat de invoergegevens bijna volledig opnieuw worden geschreven, dus u hebt minimaal twee keer zoveel schijfruimte nodig als de oorspronkelijke gegevensset. Voor een mysqldump van 100 GB moet u bijvoorbeeld ten minste 200 GB beschikbare schijfruimte in HDFS hebben. U kunt het dumpbestand aan het einde van het proces verwijderen.
Hier is een illustratie van dit proces. De gegevensstroom gaat van de oorspronkelijke bron naar HDFS, waar de RegionServers de bestanden eenvoudigweg naar de mappen van hun regio's verplaatsen.
Gebruiksvoorbeelden
Oorspronkelijke dataset geladen: Alle gebruikers die migreren vanuit een andere datastore zouden deze use case moeten overwegen. Eerst moet u de oefening doorlopen om het tabelschema te ontwerpen en vervolgens de tabel zelf maken, vooraf gesplitst. De gesplitste punten moeten rekening houden met de verdeling van de rijsleutels en het aantal RegionServers. Ik raad aan om de presentatie van mijn collega Lars George over geavanceerd schema-ontwerp te lezen voor elk serieus gebruik.
Het voordeel hiervan is dat het veel sneller is om de bestanden rechtstreeks te schrijven dan door het schrijfpad van de RegionServer te gaan (schrijven naar zowel de MemStore als de WAL) en dan uiteindelijk te spoelen, comprimeren, enzovoort. Het betekent ook dat u uw cluster niet hoeft af te stemmen op een schrijfzware werkbelasting en het vervolgens opnieuw af te stemmen op uw normale werkbelasting.
Incrementele belasting: Laten we zeggen dat je een dataset hebt die momenteel wordt bediend door HBase, maar nu moet je meer data in batch importeren van een derde partij of je hebt een nachtelijke taak die een paar gigabytes genereert die je moet invoegen. Het is waarschijnlijk niet zo groot als de dataset die HBase al aanbiedt, maar het kan van invloed zijn op het 95e percentiel van uw latentie. Als u het normale schrijfpad gebruikt, heeft dit het nadelige effect dat er tijdens het importeren meer flushes en verdichtingen optreden dan normaal. Deze extra IO-stress zal concurreren met uw latentiegevoelige vragen.
Voorbeelden
U kunt de volgende voorbeelden gebruiken in uw eigen Hadoop-cluster, maar de instructies zijn bedoeld voor de Cloudera QuickStart VM, een cluster met één knooppunt, een gast-besturingssysteem, en voorbeeldgegevens en voorbeelden die zijn ingebakken in een virtuele machine-appliance voor uw desktop.
Zodra u de VM start, vertelt u hem, via de webinterface die automatisch wordt geopend, om CDH te implementeren en zorgt u er vervolgens voor dat de HBase-service ook wordt gestart.
Ingebouwde TSV-bulklader
HBase wordt geleverd met een MR-taak die een bestand met door scheidingstekens gescheiden waarden kan lezen en rechtstreeks naar een HBase-tabel kan uitvoeren of HFiles kan maken voor bulklading. Hier gaan we naar toe:
- Verkrijg de voorbeeldgegevens en upload deze naar HDFS.
- Voer de ImportTsv-taak uit om het bestand om te zetten in meerdere HFiles volgens een vooraf geconfigureerde tabel.
- Bereid de bestanden voor en laad ze in HBase.
De eerste stap is om een console te openen en de volgende opdracht te gebruiken om voorbeeldgegevens op te halen:
curl -O https://people.apache.org/~jdcryans/word_count.csv
Ik heb dit bestand gemaakt door een woordentelling uit te voeren op het originele manuscript van deze blogpost en het resultaat vervolgens in csv-indeling uit te voeren, zonder kolomtitels. Upload nu het bestand naar HDFS:
hdfs dfs -put word_count.csv
Nu het extractiegedeelte van de bulklading is voltooid, moet u het bestand transformeren. Eerst moet je de tafel ontwerpen. Om het simpel te houden, noem het "wordcount" - de rijtoetsen zullen de woorden zelf zijn en de enige kolom zal de telling bevatten, in een familie die we "f" zullen noemen. De beste praktijk bij het maken van een tabel is om deze te splitsen volgens de rijsleuteldistributie, maar voor dit voorbeeld maken we gewoon vijf regio's met splitspunten gelijkmatig verdeeld over de sleutelruimte. Open de hbase-shell:
hbase shell
En voer de volgende opdracht uit om de tabel te maken:
create 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}
De vier splitpunten genereren vijf regio's, waarbij de eerste regio begint met een lege rijsleutel. Om betere splitpunten te krijgen, zou je ook een snelle analyse kunnen doen om te zien hoe de woorden echt zijn verdeeld, maar dat laat ik aan jou over.
Als u de browser van uw VM naar http://localhost:60010/ verwijst, ziet u onze nieuw gemaakte tabel en de vijf regio's die allemaal zijn toegewezen aan de RegionServer.
Nu is het tijd om het zware werk te doen. Als u de HBase-jar op de opdrachtregel oproept met het "hadoop" -script, wordt een lijst met beschikbare tools weergegeven. Degene die we willen heet importtsv en heeft het volgende gebruik:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv ERROR: Wrong number of arguments: 0 Usage: importtsv -Dimporttsv.columns=a,b,c
De opdrachtregel die we gaan gebruiken is de volgende:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0- security.jar importtsv -Dimporttsv.separator=, -Dimporttsv.bulk.output=output -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
Hier is een overzicht van de verschillende configuratie-elementen:
- -Dimporttsv.separator=, geeft aan dat het scheidingsteken een komma is.
- -Dimporttsv.bulk.output=output is een relatief pad naar waar de HFiles worden geschreven. Aangezien uw gebruiker op de VM standaard "cloudera" is, betekent dit dat de bestanden in /user/cloudera/output staan. Als u deze optie overslaat, wordt de taak rechtstreeks naar HBase geschreven.
- -Dimporttsv.columns=HBASE_ROW_KEY,f:count is een lijst van alle kolommen in dit bestand. De rijsleutel moet worden geïdentificeerd met behulp van de HBASE_ROW_KEY-reeks in hoofdletters; anders start het de taak niet. (Ik besloot de kwalificatie "count" te gebruiken, maar het kan van alles zijn.)
De taak zou binnen een minuut moeten zijn voltooid, gezien de kleine invoergrootte. Merk op dat er vijf Reducers actief zijn, één per regio. Hier is het resultaat op HDFS:
-rw-r--r-- 3 cloudera cloudera 4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b -rw-r--r-- 3 cloudera cloudera 3163 2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36 -rw-r--r-- 3 cloudera cloudera 2487 2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2 -rw-r--r-- 3 cloudera cloudera 2961 2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914 -rw-r--r-- 3 cloudera cloudera 1336 2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f
Zoals u kunt zien, behoren de bestanden momenteel toe aan de gebruiker "cloudera". Om ze te laden, moeten we de eigenaar wijzigen in "hbase" of HBase heeft geen toestemming om de bestanden te verplaatsen. Voer de volgende opdracht uit:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output
Voor de laatste stap moeten we de tool completebulkload gebruiken om te verwijzen naar waar de bestanden zijn en naar welke tabellen we laden:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount
Als u teruggaat naar de HBase-shell, kunt u de opdracht count uitvoeren die u laat zien hoeveel rijen er zijn geladen. Als je bent vergeten te chownen, blijft het commando hangen.
Aangepaste MR-taak
De TSV-bulklader is goed voor het maken van prototypes, maar omdat het alles als strings interpreteert en geen ondersteuning biedt voor het manipuleren van de velden tijdens transformatietijd, zul je uiteindelijk je eigen MR-taak moeten schrijven. Mijn collega James Kinley, die werkt als solution architect in Europa, schreef zo'n opdracht die we gaan gebruiken voor ons volgende voorbeeld. De gegevens voor de baan bevatten openbare Facebook- en Twitter-berichten met betrekking tot de 2010 NBA Finals (game 1) tussen de Lakers en de Celtics. Je vindt de code hier. (De Quick Start VM wordt geleverd met git en maven geïnstalleerd, zodat je de repository erop kunt klonen.)
Als we naar de Driver-klasse kijken, zijn de belangrijkste onderdelen de volgende:
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); … // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable);
Eerst moet uw Mapper een ImmutableBytesWritable uitvoeren die de rijsleutel bevat, en de uitvoerwaarde kan een KeyValue, een Put of een Delete zijn. Het tweede fragment laat zien hoe de Reducer moet worden geconfigureerd; het wordt in feite volledig afgehandeld door HFileOutputFormat. configureIncrementalLoad() zoals eerder beschreven in de sectie "Transformeren".
De klasse HBaseKVMapper bevat alleen de mapper die de geconfigureerde uitvoersleutel en waarden respecteert:
public class HBaseKVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {uit
Om het uit te voeren, moet je het project compileren met maven en de gegevensbestanden pakken via de links in de README. (Het bevat ook het shellscript om de tabel te maken.) Vergeet niet om de bestanden naar HDFS te uploaden en uw klassenpad in te stellen om op de hoogte te zijn van HBase, omdat u de jar deze keer niet gaat gebruiken voordat u de taak start. :
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*
U kunt de taak starten met een opdrachtregel die lijkt op deze:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar com.cloudera.examples.hbase.bulkimport.Driver -libjars /home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar, /home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010
Zoals u kunt zien, moeten de afhankelijkheden van de taak afzonderlijk worden toegevoegd. Ten slotte kunt u de bestanden laden door eerst hun eigenaar te wijzigen en vervolgens de tool completebulkload uit te voeren:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010
Potentiële problemen
Onlangs verwijderde gegevens verschijnen weer. Dit probleem doet zich voor wanneer een Delete via een bulklading wordt ingevoegd en sterk wordt gecomprimeerd terwijl de corresponderende Put zich nog in een MemStore bevindt. De gegevens worden als verwijderd beschouwd wanneer de Delete zich in een HFile bevindt, maar zodra deze tijdens de verdichting is verwijderd, wordt de Put weer zichtbaar. Als u een dergelijk gebruiksscenario heeft, kunt u overwegen uw kolomfamilies zo te configureren dat de verwijderde cellen behouden blijven met KEEP_DELETED_CELLS in de shell of HColumnDescriptor.setKeepDeletedCells().
In bulk geladen gegevens kunnen niet worden overschreven door een andere bulklading. Dit probleem treedt op wanneer twee in bulk geladen HFiles die op verschillende tijdstippen zijn geladen, proberen een andere waarde in dezelfde cel te schrijven, wat betekent dat ze dezelfde rijsleutel, familie, kwalificatie en tijdstempel hebben. Het resultaat is dat de eerste ingevoegde waarde wordt geretourneerd in plaats van de tweede. Deze bug wordt opgelost in HBase 0.96.0 en CDH 5 (de volgende hoofdversie van CDH) en er wordt gewerkt aan HBASE-8521 voor de 0.94-tak en CDH 4.
Bulklading veroorzaakt grote verdichtingen. Dit probleem doet zich voor wanneer u incrementele bulkladingen uitvoert en er voldoende in bulk geladen bestanden zijn om een kleine verdichting te activeren (de standaarddrempel is 3). De HFiles worden geladen met een volgnummer dat is ingesteld op 0, zodat ze als eerste worden opgepikt wanneer de RegionServer bestanden selecteert voor een verdichting, en als gevolg van een bug zal het ook alle resterende bestanden selecteren. Dit probleem zal ernstige gevolgen hebben voor degenen die al grote regio's hebben (meerdere GB's) of die vaak bulksgewijs laden (om de paar uur en minder), omdat veel gegevens worden gecomprimeerd. HBase 0.96.0 heeft de juiste oplossing en dat geldt ook voor CDH 5; HBASE-8521 lost het probleem op in 0.94, aangezien de in bulk geladen HFiles nu een correct volgnummer krijgen. HBASE-8283 kan worden ingeschakeld met hbase.hstore.useExploringCompation na 0.94.9 en CDH 4.4.0 om dit probleem te verhelpen door een slimmer algoritme voor verdichtingselectie te zijn.
In bulk geladen gegevens worden niet gerepliceerd . Omdat bulksgewijs laden het schrijfpad omzeilt, wordt er niet naar de WAL geschreven als onderdeel van het proces. Replicatie werkt door de WAL-bestanden te lezen, zodat het de in bulk geladen gegevens niet ziet - en hetzelfde geldt voor de bewerkingen die Put.setWriteToWAL(true) gebruiken. Een manier om dat aan te pakken is om de onbewerkte bestanden of de HFiles naar het andere cluster te verzenden en daar de andere verwerking uit te voeren.
Conclusie
Het doel van deze blogpost was om u kennis te laten maken met de basisconcepten van Apache HBase bulklading. We hebben uitgelegd hoe het proces is zoals het doen van ETL, en dat het veel beter is voor grote datasets dan het gebruik van de normale API, omdat het het schrijfpad omzeilt. De twee voorbeelden zijn opgenomen om te laten zien hoe eenvoudige TSV-bestanden in bulk in HBase kunnen worden geladen en hoe u uw eigen Mapper voor andere gegevensindelingen kunt schrijven.
Nu kun je hetzelfde proberen met een grafische gebruikersinterface via Hue.