Inleiding
Sommige configuratie-eigenschappen in Apache Hadoop hebben een direct effect op clients, zoals Apache HBase. Een van die eigenschappen heet 'dfs.datanode.max.xcievers' en hoort bij het HDFS-subproject. Het definieert het aantal server-side threads en - tot op zekere hoogte - sockets die worden gebruikt voor gegevensverbindingen. Als u dit aantal te laag instelt, kan dit problemen veroorzaken als u groeit of het gebruik van uw cluster verhoogt. Dit bericht helpt je te begrijpen wat er tussen de client en de server gebeurt en hoe je een redelijk aantal voor deze eigenschap kunt bepalen.
Het probleem
Aangezien HBase alles opslaat wat het nodig heeft in HDFS, kan de harde bovengrens die wordt opgelegd door de configuratie-eigenschap "dfs.datanode.max.xcievers" ertoe leiden dat er te weinig bronnen beschikbaar zijn voor HBase, wat zich manifesteert als IOExceptions aan beide zijden van de verbinding. Hier is een voorbeeld van de HBase-mailinglijst [1], waar de volgende berichten aanvankelijk aan de RegionServer-kant werden vastgelegd:
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Uitzondering in createBlockOutputStream java.io.IOException:Kan niet lezen uit stream
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient:Blokkering verlaten blk_-5467014108758633036_595771
2008-11- 11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient:DataStreamer-uitzondering:java.io.IOException:kan geen nieuw blok maken.
2008-11-11 19:55:58,455 WARN org.apache .hadoop.dfs.DFSClient:Foutherstel voor blok blk_-5467014108758633036_595771 slechte datanode[0]
2008-11-11 19:55:58,482 FATAL org.apache.hadoop.hbase.regionserver.Flusher:Opnieuw afspelen van hlog vereist . Afsluiten van de server forceren
Door dit te correleren met de Hadoop DataNode-logboeken bleek het volgende item:
FOUT org.apache.hadoop.dfs.DataNode: DatanodeRegistration(10.10.10.53:50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver:java.io.IOException: xceiverCount 258 overschrijdt de limiet van gelijktijdige xcievers 256
In dit voorbeeld zorgde de lage waarde van "dfs.datanode.max.xcievers" voor de DataNodes ervoor dat de hele RegionServer werd afgesloten. Dit is echt een slechte situatie. Helaas is er geen vaste regel die uitlegt hoe de vereiste limiet moet worden berekend. Het wordt vaak aangeraden om het aantal te verhogen van de standaardwaarde van 256 naar iets als 4096 (zie [1], [2], [3], [4] en [5] ter referentie). Dit wordt gedaan door deze eigenschap toe te voegen aan het bestand hdfs-site.xml van alle DataNodes (merk op dat het verkeerd is gespeld):
Opmerking:u moet uw DataNodes opnieuw opstarten nadat u deze wijziging in het configuratiebestand heeft aangebracht.
Dit zou moeten helpen bij het bovenstaande probleem, maar misschien wilt u toch meer weten over hoe dit allemaal samenwerkt en wat HBase met deze bronnen doet. We zullen dit in de rest van dit bericht bespreken. Maar voordat we dat doen, moeten we duidelijk zijn waarom je dit aantal niet gewoon heel hoog kunt instellen, zeg 64K en klaar ermee.
Er is een reden voor een bovengrens, en die is tweeledig:ten eerste hebben threads hun eigen stapel nodig, wat betekent dat ze geheugen in beslag nemen. Voor huidige servers betekent dit standaard 1 MB per thread[6]. Met andere woorden, als u alle 4096 DataXceiver-threads opgebruikt, heeft u ongeveer 4 GB aan opslagruimte nodig om ze op te vangen. Dit snijdt in de ruimte die u hebt toegewezen voor memstores en blokcaches, evenals alle andere bewegende delen van de JVM. In het ergste geval kunt u een OutOfMemoryException tegenkomen en is het RegionServer-proces toast. U wilt deze eigenschap op een redelijk hoog aantal instellen, maar ook niet te hoog.
Ten tweede, als je deze vele threads actief hebt, zul je ook zien dat je CPU steeds meer wordt belast. Er zullen veel contextwisselingen plaatsvinden om al het gelijktijdige werk af te handelen, wat middelen kost voor het echte werk. Net als bij de zorgen over het geheugen, wil je dat het aantal threads niet grenzeloos groeit, maar een redelijke bovengrens biedt - en dat is waar "dfs.datanode.max.xcievers" voor is.
Hadoop-bestandssysteemdetails
Vanaf de clientzijde biedt de HDFS-bibliotheek de abstractie genaamd Path. Deze klasse vertegenwoordigt een bestand in een bestandssysteem dat wordt ondersteund door Hadoop, vertegenwoordigd door de klasse FileSystem. Er zijn een paar concrete implementaties van de abstracte FileSystem-klasse, waaronder het DistributedFileSytem, dat HDFS vertegenwoordigt. Deze klasse omhult op zijn beurt de eigenlijke DFSClient-klasse die alle interacties met de externe servers afhandelt, d.w.z. de NameNode en de vele DataNodes.
Wanneer een client, zoals HBase, een bestand opent, doet hij dit bijvoorbeeld door de methoden open() of create() van de klasse FileSystem aan te roepen, hier de meest simplistische incarnaties
public DFSInputStream open(String src) genereert IOException
public FSDataOutputStream create(Path f) genereert IOException
De geretourneerde stream-instantie heeft een socket en thread aan de serverzijde nodig, die worden gebruikt om gegevensblokken te lezen en te schrijven. Ze maken deel uit van het contract om gegevens uit te wisselen tussen de client en de server. Merk op dat er andere, op RPC gebaseerde protocollen in gebruik zijn tussen de verschillende machines, maar voor het doel van deze discussie kunnen ze worden genegeerd.
De geretourneerde stream-instantie is een gespecialiseerde DFSOutputStream- of DFSInputStream-klasse, die alle interactie met de NameNode afhandelt om erachter te komen waar de kopieën van de blokken zich bevinden, en de datacommunicatie per blok per DataNode.
Aan de serverkant verpakt de DataNode een instantie van DataXceiverServer, de eigenlijke klasse die de bovenstaande configuratiesleutel leest en ook de bovenstaande uitzondering genereert wanneer de limiet wordt overschreden.
Wanneer de DataNode start, wordt een threadgroep gemaakt en wordt de genoemde DataXceiverServer-instantie als volgt gestart:
this.threadGroup =new ThreadGroup(“dataXceiverServer”);
this.dataXceiverServer =new Daemon( threadGroup,
nieuwe DataXceiverServer(ss, conf, dit));
this.threadGroup.setDaemon(true); // automatisch vernietigen wanneer leeg
Houd er rekening mee dat de DataXceiverServer-thread al één plek in de threadgroep inneemt. De DataNode heeft ook deze interne klasse om het aantal momenteel actieve threads in deze groep op te halen:
/** Aantal gelijktijdige xceivers per node. */
int getXceiverCount() {
return threadGroup ==null ? 0:threadGroup.activeCount();
}
Lees- en schrijfblokkades, zoals geïnitieerd door de client, zorgen ervoor dat er een verbinding tot stand wordt gebracht, die door de DataXceiverServer-thread wordt verpakt in een DataXceiver-instantie. Tijdens deze overdracht wordt een thread gemaakt en geregistreerd in de bovenstaande threadgroep. Dus voor elke actieve lees- en schrijfbewerking wordt een nieuwe thread aan de serverzijde bijgehouden. Als het aantal threads in de groep het geconfigureerde maximum overschrijdt, wordt de genoemde uitzondering gegenereerd en vastgelegd in de logboeken van de DataNode:
if (curXceiverCount> dataXceiverServer.maxXceiverCount) {
throw new IOException(“xceiverCount ” + curXceiverCount
+ ” overschrijdt de limiet van gelijktijdige xcievers ”
+ dataXceiverServer.maxXceiverCount);
}
Implicaties voor klanten
Nu is de vraag, hoe verhoudt het lezen en schrijven van de client zich tot de server-side threads. Voordat we echter ingaan op de details, laten we de foutopsporingsinformatie gebruiken die de DataXceiver-klasse logt wanneer deze wordt gemaakt en gesloten
LOG.debug(“Aantal actieve verbindingen is:” + datanode.getXceiverCount());
…
LOG.debug(datanode.dnRegistration + “:Aantal actieve verbindingen is:” + datanode.getXceiverCount());
en controleer tijdens het opstarten van HBase wat er op de DataNode is vastgelegd. Omwille van de eenvoud wordt dit gedaan op een pseudo-gedistribueerde installatie met een enkele DataNode- en RegionServer-instantie. Het volgende toont de bovenkant van de statuspagina van de RegionServer.
Het belangrijkste deel bevindt zich in het gedeelte "Metrieken", waar "storefiles =22" staat. Dus, ervan uitgaande dat HBase op zijn minst zoveel bestanden heeft om te verwerken, plus enkele extra bestanden voor het write-ahead-logboek, zouden we het bovenstaande logs-bericht moeten zien dat we ten minste 22 "actieve verbindingen" hebben. Laten we HBase starten en de logbestanden van DataNode en RegionServer controleren:
Opdrachtregel:
$ bin/start-hbase.sh
…
DataNode-logboek:
2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Aantal actieve verbindingen is:1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS- 1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:2
12/03/05 13:01:35 INFO regionserver.MemStoreFlusher:globalMemStoreLimit=396.7m, globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
03/12/05 13:01:39 INFO http.HttpServer:Poort geretourneerd door webServer.getConnectors()[0].getLocalPort() voor open() is -1 . De listener openen op 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:1
12/03/05 13:01:40 INFO regionserver.HRegionServer:Verzoek ontvangen om regio te openen:-ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode :Aantal actieve verbindingen is:3
2012-03-05 13:01:40,884 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448 -10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server. datanode.DataNode:Aantal actieve verbindingen is:3
…
12/03/05 13:01:40 INFO regionserver.HRegion:Onlined -ROOT-,,0.70236052; volgende sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:3
2012-03-05 13 :01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:4
…
12/03/05 13:01:41 INFO regionserver.HRegionServer:Verzoek ontvangen om regio te openen:.META.,,1.1028785192
2012-03 -05 13:01:41,026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:3
2012-03-05 13:01:41.027 DEBUG org.apache.hadoop. hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:4
…
03/12/05 13:01:41 INFO regionserver.HRegion:Onlined .META.,,1.1028785192; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:3
2012-03-05 13 :01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:Aantal actieve verbindingen is:5
12/03/05 13:01:41 INFO regionserver.HRegionServer:Verzoek ontvangen om 16 regio('s) te openen
12/03/05 13 :01:41 INFO regionserver.HRegionServer:Ontvangen verzoek om regio te openen:usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
12/03/05 13:01:41 INFO regionserver.HRegionServer:Ontvangen verzoek om regio te openen:usertable,,user11203 1330944810191.90d287473fe223f0ddc137020efda25d.
…
2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Aantal actieve verbindingen is:6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:7
…
2012-03-05 13:01:41.257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772 , infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:10
2012-03-05 13:01:41.257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:9
…
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined gebruikerstabel,gebruiker1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.; volgende sequenceid=62917
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined gebruikerstabel,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; volgende sequenceid=62916
…
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:6
2012-03-05 13 :01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server .datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:8
2012-03 -05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:7
…
12/03/05 13:01:41 INFO regionserver .HRregio:Onlined gebruikerstabel,gebruiker1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:6
2012-03-05 13 :01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:7
…
2012-03-05 13:01:41,829 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID =DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:7
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined gebruikerstabel ,gebruiker515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:6
2012-03-05 13 :01:41.838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:7
12/03/05 13:01:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
…
2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:4
2012 -03-05 22:48:41.945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:4
12/03/05 22:48:41 INFO regionserver.HRegion:Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; volgende sequenceid=62929
2012-03-05 22:48:41,963 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64 -50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:4
Je kunt zien hoe de regio's de een na de ander worden geopend, maar wat je misschien ook opvalt, is dat het aantal actieve verbindingen nooit oploopt tot 22 - het bereikt amper 10. Waarom is dat? Om dit beter te begrijpen, moeten we zien hoe bestanden in HDFS worden toegewezen aan de server-side DataXceiver-instantie - en de daadwerkelijke threads die ze vertegenwoordigen.
Hadoop diepe duik
De eerder genoemde DFSInputStream en DFSOutputStream zijn echt façades rond de gebruikelijke streamconcepten. Ze verpakken de client-servercommunicatie in deze standaard Java-interfaces, terwijl ze het verkeer intern naar een geselecteerde DataNode routeren - die een kopie van het huidige blok bevat. Het heeft de vrijheid om deze verbinding naar behoefte te openen en te sluiten. Terwijl een client een bestand in HDFS leest, schakelen de klassen van de clientbibliotheek transparant van blok naar blok, en dus van DataNode naar DataNode, dus het moet verbindingen openen en sluiten als dat nodig is.
De DFSInputStream heeft een instantie van een DFSClient.BlockReader-klasse, die de verbinding met de DataNode opent. De stream-instantie roept blockSeekTo() aan voor elke aanroep naar read() die zorgt voor het openen van de verbinding, als die er al is. Zodra een blok volledig is gelezen, wordt de verbinding gesloten. Het sluiten van de stream heeft natuurlijk hetzelfde effect.
De DFSOutputStream heeft een vergelijkbare helperklasse, de DataStreamer. Het volgt de verbinding met de server, die wordt geïnitieerd door de methode nextBlockOutputStream(). Het heeft nog meer interne klassen die helpen bij het wegschrijven van de blokgegevens, die we hier kortheidshalve weglaten.
Zowel schrijf- als leesblokken hebben een thread nodig om de socket en tussenliggende gegevens aan de serverzijde vast te houden, verpakt in de DataXceiver-instantie. Afhankelijk van wat uw klant doet, ziet u dat het aantal verbindingen schommelt rond het aantal momenteel geopende bestanden in HDFS.
Terug naar het HBase-raadsel hierboven:de reden dat je tijdens de start geen 22 (en meer) verbindingen ziet, is dat terwijl de regio's open zijn, de enige vereiste gegevens het infoblok van de HFile zijn. Dit blok wordt gelezen om essentiële details over elk bestand te krijgen, maar wordt dan weer gesloten. Dit betekent dat de server-side resource snel achter elkaar wordt vrijgegeven. De overige vier verbindingen zijn moeilijker te bepalen. U kunt JStack gebruiken om alle threads op de DataNode te dumpen, die in dit voorbeeld dit item toont:
"DataXceiver voor client /127.0.0.1:64281 [verzendblok blk_55327412334432227208_4201]" daemon prio=5 tid=7fb96481d000 nid=0x1178b4000 uitvoerbaar [1178b3000]
java.lang.Thread.State:RUNNABLE
…
"DataXceiver voor client /127.0.0.1:64172 [receiving block blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29 ,60020,1330984111693_1330984118810]” daemon prio=5 tid=7fb966109000 nid=0x1169cb000 uitvoerbaar [1169ca000]
java.lang.Thread.State:RUNNABLE
…
Dit zijn de enige DataXceiver-vermeldingen (in dit voorbeeld), dus het aantal in de threadgroep is een beetje misleidend. Bedenk dat de DataXceiverServer daemon-thread al verantwoordelijk is voor één extra item, wat in combinatie met de twee bovenstaande de drie actieve verbindingen oplevert – wat in feite drie actieve threads betekent. De reden dat het logboek in plaats daarvan vier vermeldt, is dat het de telling registreert van een actieve thread die op het punt staat te eindigen. Dus kort nadat het aantal van vier is geregistreerd, is het er eigenlijk één minder, d.w.z. drie en komt dus overeen met ons aantal actieve threads.
Merk ook op dat de interne hulpklassen, zoals de PacketResponder, een andere thread in de groep bezetten terwijl ze actief zijn. De JStack-uitvoer geeft dat feit aan, en vermeldt de thread als zodanig:
'PacketResponder 0 for Block blk_-2005512129579433420_4199' daemon prio=5 tid=7fb96384d000 nid=0x116ace000 in Object.wait () [116acd000]
java.lang.Thread.State:TIMED_WAITING (op objectmonitor)
op java.lang.Object.wait(Native Method)
op org.apache.hadoop. hdfs.server.datanode.BlockReceiver$PacketResponder \
.lastDataNodeRun(BlockReceiver.java:779)
– vergrendeld (een org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
op org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
op java.lang.Thread.run(Thread.java:680)
Deze thread bevindt zich momenteel in de status TIMED_WAITING en wordt niet als actief beschouwd. Dat is de reden waarom de telling die wordt uitgezonden door de DataXceiver-loginstructies, dit soort threads niet omvat. Als ze actief worden doordat de client verzendgegevens verstuurt, gaat het aantal actieve threads weer omhoog. Een ander ding om op te merken is dat deze thread geen aparte verbinding of socket nodig heeft tussen de client en de server. De PacketResponder is slechts een thread aan de serverzijde om blokgegevens te ontvangen en deze naar de volgende DataNode in de schrijfpijplijn te streamen.
Het Hadoop fsck-commando heeft ook een optie om te rapporteren welke bestanden momenteel open staan om te schrijven:
$ hadoop fsck /hbase -openforwrite
FSCK gestart door larsgeorge vanaf /10.0.0.29 voor pad / hbase op ma 05 maart 22:59:47 CET 2012
……/hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 bytes, 1 blok(ken), OPENFORWITE:………………………………..Status:GEZOND
Totale grootte: 2088783626 B
Totaal mappen: 54
Totaal bestanden: 45
…
Dit heeft niet direct betrekking op een bezette thread aan de serverzijde, omdat deze worden toegewezen door blok-ID. Maar je kunt er wel uit opmaken dat er één open blok is om te schrijven. Het Hadoop-commando heeft extra opties om de daadwerkelijke bestanden af te drukken en de ID te blokkeren waaruit ze bestaan:
$ hadoop fsck /hbase -files -blocks
FSCK gestart door larsgeorge vanaf /10.0.0.29 voor pad /hbase op di 06 mrt 10:39:50 CET 2012
…
/hbase/.META./1028785192/.tmp
/hbase/.META./1028785192/info
/hbase/.META./1028785192/info/4027596949915293355 36517 bytes, 1 blok(ken): OK
0. blk_5532741233443227208_4201 len=36517 repl=1
…
Status:GEZOND
Totale grootte: 2088788703 B
Totaal dirs : 54
Totaal aantal bestanden: 45 (Bestanden die momenteel worden geschreven:1)
Totaal aantal blokken (gevalideerd): 64 (gem. blokgrootte 32637323 B) (Totaal aantal geopende bestandsblokken (niet gevalideerd):1)
Minimaal gerepliceerde blokken: 64 (100,0 %)
…
Dit geeft je twee dingen. Ten eerste stelt de samenvatting dat er één open bestandsblok was op het moment dat de opdracht werd uitgevoerd - overeenkomend met de telling gerapporteerd door de optie "-openforwrite" hierboven. Ten tweede kunt u met de lijst met blokken naast elk bestand de threadnaam afstemmen op het bestand dat het blok bevat dat wordt geopend. In dit voorbeeld wordt het blok met de ID “blk_5532741233443227208_4201” van de server naar de client gestuurd, hier een RegionServer. Dit blok behoort tot de HBase .META. tabel, zoals blijkt uit de uitvoer van het Hadoop fsck-commando. De combinatie van JStack en fsck kan dienen als een vervanger voor lsof (een tool op de Linux-opdrachtregel om "open bestanden weer te geven").
De JStack meldt ook dat er een DataXceiver-thread is, met een bijbehorende PacketResponder, voor blok-ID "blk_-2005512129579433420_4199", maar deze ID ontbreekt in de lijst met blokken gerapporteerd door fsck. Dit komt omdat het blok nog niet klaar is en dus niet beschikbaar is voor lezers. Met andere woorden, Hadoop fsck rapporteert alleen over volledige (of gesynchroniseerde[7][8], voor Hadoop-versies die deze functie ondersteunen) blokken.
Terug naar HBase
Het openen van alle regio's vereist niet zoveel bronnen op de server als u had verwacht. Als u echter de hele HBase-tabel scant, dwingt u HBase om alle blokken in alle HFiles te lezen:
HBase Shell:
hbase(main):003:0> scan 'usertable'
…
1000000 rij(en) in 1460.3120 seconden
DataNode-logboek:
2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode. DataNode:Aantal actieve verbindingen is:6
2012-03-05 14:43:23,293 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:7
2012 -03-05 14:43:23.299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:8
…
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0. 0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:11
2012-03-05 14:49:24.332 DEBUG org .apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanod e.DataNode:Aantal actieve verbindingen is:11
2012-03-05 14:51:12.603 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:12
2012-03-05 14:51:12.605 DEBUG org.apache.hadoop.hdfs .server.datanode.DataNode:Aantal actieve verbindingen is:11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:12
…
2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:15
2012-03-05 14:57:31,722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs. server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of act ive verbindingen is:17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:16
…
2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:21
2012-03-05 15:04:17,689 DEBUG org.apache .hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is:22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:DatanodeRegistration (127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Aantal actieve verbindingen is :22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode:Aantal actieve verbindingen is:21
Het aantal actieve verbindingen bereikt nu de ongrijpbare 22. Merk op dat dit aantal al de serverthread omvat, dus we zitten nog steeds een beetje onder wat we zouden kunnen beschouwen als het theoretische maximum - op basis van het aantal bestanden dat HBase moet verwerken.
Wat betekent dat allemaal?
Dus, hoeveel "xcievers (sic)" heb je nodig? Aangezien u alleen HBase gebruikt, kunt u eenvoudig de bovenstaande "storefiles" -metriek controleren (die u ook via Ganglia of JMX krijgt) en een paar procent toevoegen voor tussenliggende en vooruitlopende logbestanden. Dit zou moeten werken voor systemen in beweging. Als u dat aantal echter zou bepalen op een inactief, volledig gecomprimeerd systeem en ervan uitgaat dat dit het maximum is, zou u kunnen merken dat dit aantal te laag is zodra u meer winkelbestanden begint toe te voegen tijdens normale memstore-flushes, d.w.z. zodra u begint met gegevens toevoegen aan de HBase-tabellen. Of als u MapReduce ook gebruikt op datzelfde cluster, Flume-logboekaggregatie, enzovoort. Je moet rekening houden met die extra bestanden en, belangrijker nog, blokken openen voor lezen en schrijven.
Merk nogmaals op dat de voorbeelden in dit bericht een enkele DataNode gebruiken, iets wat je niet zult hebben op een echt cluster. Daartoe moet u het totale aantal winkelbestanden (volgens de HBase-metriek) delen door het aantal DataNodes dat u heeft. Als u bijvoorbeeld een winkelbestand van 1000 heeft en uw cluster 10 DataNodes heeft, dan zou u in orde moeten zijn met de standaard 256 xceiver-threads per DataNode.
In het slechtste geval zou het aantal actieve lezers en schrijvers zijn, dat wil zeggen degenen die momenteel gegevens verzenden of ontvangen. Maar aangezien dit moeilijk van tevoren te bepalen is, kunt u overwegen om een behoorlijke reserve in te bouwen. Omdat het schrijfproces een extra - hoewel kortere levensduur - draad nodig heeft (voor de PacketResponder), moet je daar ook rekening mee houden. Een redelijke, maar nogal simplistische formule zou dus kunnen zijn:
Deze formule houdt er rekening mee dat je ongeveer twee threads nodig hebt voor een actieve schrijver en een andere voor een actieve lezer. Dit wordt vervolgens opgeteld en gedeeld door het aantal DataNodes, aangezien u de “dfs.datanode.max.xcievers” per DataNode moet specificeren.
Als je teruggaat naar de bovenstaande schermafbeelding van HBase RegionServer, zag je dat er 22 winkelbestanden waren. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads – but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.
For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver’s with the following formula:
Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:
Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below – in an attempt to not force you to change the value too often.
On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.
Final Advice &TL;DR
Here is the final formula you want to use:
It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.
Note:Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first.
Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] – if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13].
Links:
- [1] http://old.nabble.com/Re%3A-xceiverCount-257-exceeds-the-limit-of-concurrent-xcievers-256-p20469958.html
- [2] http://ccgtech.blogspot.com/2010/02/hadoop-hdfs-deceived-by-xciever.html
- [3] https://issues.apache.org/jira/browse/HDFS-1861 “Rename dfs.datanode.max.xcievers and bump its default value”
- [4] https://issues.apache.org/jira/browse/HDFS-1866 “Document dfs.datanode.max.transfer.threads in hdfs-default.xml”
- [5] http://hbase.apache.org/book.html#dfs.datanode.max.xcievers
- [6] http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom
- [7] https://issues.apache.org/jira/browse/HDFS-200 “In HDFS, sync() not yet guarantees data available to the new readers”
- [8] https://issues.apache.org/jira/browse/HDFS-265 “Revisit append”
- [9] http://search-hadoop.com/m/CBBoV3z24H1 “HBase, mail # user – region size/count per regionserver”
- [10] http://hbase.apache.org/book.html#ulimit “ulimit and nproc”
- [11] http://akka.io/ “Akka”
- [12] https://issues.apache.org/jira/browse/HDFS-223 “Asynchronous IO Handling in Hadoop and HDFS”
- [13] https://issues.apache.org/jira/browse/HDFS-918 “Use single Selector and small thread pool to replace many instances of BlockSender for reads”