sql >> Database >  >> NoSQL >> HBase

Procedure:gezouten Apache HBase-tabellen scannen met regiospecifieke sleutelbereiken in MapReduce

Met dank aan Pengyu Wang, softwareontwikkelaar bij FINRA, voor toestemming om dit bericht opnieuw te publiceren.

Salted Apache HBase-tabellen met pre-split is een bewezen effectieve HBase-oplossing om uniforme werklastverdeling over RegionServers te bieden en hotspots tijdens bulkschrijfbewerkingen te voorkomen. In dit ontwerp is een rijsleutel gemaakt met aan het begin een logische sleutel plus salt. Een manier om salt te genereren is door n (aantal regio's) modulo te berekenen op de hash-code van de logische rijsleutel (datum, enz.).

Rijtoetsen zouten

Een tabel die bijvoorbeeld dagelijks gegevens accepteert, kan logische rijsleutels gebruiken die met een datum beginnen, en we willen deze tabel vooraf opsplitsen in 1000 regio's. In dit geval verwachten we 1.000 verschillende zouten te genereren. Het zout kan bijvoorbeeld worden gegenereerd als:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey =2015-04-26|abcrowKey =893|2015-04-26|abc

De uitvoer van hashCode() met modulo biedt willekeurigheid voor zoutwaarde van "000" tot "999". Met deze sleuteltransformatie wordt de tabel vooraf gesplitst op de zoutgrenzen terwijl deze wordt gemaakt. Hierdoor worden rijvolumes uniform verdeeld tijdens het laden van de HFiles met MapReduce bulkload. Het garandeert dat rijtoetsen met hetzelfde zout in dezelfde regio vallen.

In veel gevallen, zoals bij het archiveren van gegevens, moet u de gegevens scannen of kopiëren over een bepaald logisch sleutelbereik (datumbereik) met behulp van MapReduce-taak. Standaardtabel MapReduce-taken worden ingesteld door de Scan . te verstrekken instantie met sleutelbereikkenmerken.

Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015-) 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Stel de tabeltoewijzingstaak in */TableMapReduceUtil.initTableMapperJob(tabelnaam,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);…

Het instellen van een dergelijke taak wordt echter een uitdaging voor gezouten vooraf gesplitste tabellen. Start- en stoprijtoetsen zullen voor elke regio anders zijn omdat elk een uniek zout heeft. En we kunnen niet meerdere bereiken specificeren voor één Scan instantie.

Om dit probleem op te lossen, moeten we onderzoeken hoe de tabel MapReduce werkt. Over het algemeen creëert het MapReduce-framework één kaarttaak om elke invoersplitsing te lezen en te verwerken. Elke splitsing wordt gegenereerd in InputFormat klassenbasis, volgens de methode getSplits() .

In HBase-tabel MapReduce-taak, TableInputFormat wordt gebruikt als InputFormat . Binnen de implementatie, de getSplits() methode wordt overschreven om de start- en stoprijtoetsen op te halen uit de Scan voorbeeld. Omdat de start- en stoprijtoetsen zich over meerdere regio's uitstrekken, wordt het bereik gedeeld door regiogrenzen en retourneert het de lijst met TableSplit objecten die het bereik van de scantoetsen bestrijken. In plaats van te zijn gebaseerd op HDFS-blok, TableSplit s zijn gebaseerd op regio. Door de getSplits() . te overschrijven methode, kunnen we de TableSplit .

Aangepaste tabelinvoerindeling maken

Om het gedrag van de getSplits() te wijzigen methode, een aangepaste klasse die TableInputFormat uitbreidt Is benodigd. Het doel van getSplits() hier is om het logische sleutelbereik in elke regio te dekken, hun rijsleutelbereik te construeren met hun unieke zout. De klasse HTable biedt methode getStartEndKeys() die start- en eindrijsleutels voor elke regio retourneert. Ontleed van elke starttoets het corresponderende zout voor de regio.

Keys koppelen =table.getStartEndKeys();for (int i =0; i  

Taakconfiguratie voldoet aan logisch sleutelbereik

TableInputFormat haalt de start- en stopsleutel op uit Scan voorbeeld. Omdat we Scan niet kunnen gebruiken in onze MapReduce-taak zouden we Configuration . kunnen gebruiken in plaats daarvan om deze twee variabelen door te geven en alleen een logische start- en stopsleutel is goed genoeg (een variabele kan een datum of andere zakelijke informatie zijn). De getSplits() methode heeft JobContext argument, De configuratie-instantie kan worden gelezen als context.getConfiguration() .

In MapReduce-stuurprogramma:

Configuratie conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "27-04-2015");

In Custom TableInputFormat :

@Override openbare lijst getSplits(JobContext-context) genereert IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");…}

Reconstrueer het gezouten sleutelbereik per regio

Nu we de salt- en logische start/stop-sleutel voor elke regio hebben, kunnen we het werkelijke rijsleutelbereik opnieuw opbouwen.

byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);

Een TableSplit maken voor elke regio

Met het rijsleutelbereik kunnen we nu TableSplit initialiseren bijvoorbeeld voor de regio.

Lijst splits =new ArrayList(keys.getFirst().length);for (int i =0; i  

Nog een ding om naar te kijken is de gegevenslocatie. Het framework gebruikt locatie-informatie in elke invoersplitsing om een ​​kaarttaak toe te wijzen aan de lokale host. Voor ons TableInputFormat , gebruiken we de methode getTableRegionLocation() om de regiolocatie op te halen die de rijsleutel bedient.

Deze locatie wordt vervolgens doorgegeven aan de TableSplit aannemer. Dit zorgt ervoor dat de mapper die de tabelsplitsing verwerkt, zich op dezelfde regioserver bevindt. Eén methode, genaamd DNS.reverseDns() , vereist het adres voor de HBase-naamserver. Dit kenmerk wordt opgeslagen in de configuratie "hbase.nameserver.address ".

this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);...public String getTableRegionLocation(HTable table, byte[] rowKey) genereert IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHost regionLocation(); }protected String reverseDNS (InetAddress ipAddress) gooit NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}

Een volledige code van getSplits ziet er als volgt uit:

@Override openbare lijst getSplits(JobContext-context) genereert IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Er is geen tabel opgegeven.");}// Haal het naamserveradres op en de standaardwaarde is null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Sleutels koppelen =table.getStartEndKeys();if (keys ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Er wordt minimaal één regio verwacht");}List splits =new ArrayList(keys.getFirst().length);for (int i =0; i  

Gebruik de Custom TableInoutFormat in de MapReduce Driver

Nu moeten we de TableInputFormat . vervangen klasse met de aangepaste build die we hebben gebruikt voor het instellen van de tabel MapReduce-taak.

Configuratie conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tabelnaam);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Stel de tabeltoewijzingstaak in */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, true, MultiRangeTableInput>Format.class);
 De aanpak van aangepast TableInputFormat biedt een efficiënte en schaalbare scanmogelijkheid voor HBase-tabellen die zijn ontworpen om salt te gebruiken voor een evenwichtige gegevensbelasting. Aangezien de scan alle niet-gerelateerde rijsleutels kan omzeilen, ongeacht hoe groot de tabel is, is de complexiteit van de scan alleen beperkt tot de grootte van de doelgegevens. In de meeste gevallen kan dit een relatief consistente verwerkingstijd garanderen naarmate de tabel groeit.


  1. mongodb documenten van de ene collectie naar de andere collectie verplaatsen

  2. blpop stopt na een tijdje met het verwerken van de wachtrij

  3. Meldings- en nieuwsgedeelte met behulp van Redis

  4. Hoe de MongoDB-insertprestaties te verbeteren