Deze blogpost is gepubliceerd op Hortonworks.com vóór de fusie met Cloudera. Sommige links, bronnen of verwijzingen zijn mogelijk niet langer nauwkeurig.
In 2016 hebben we de tweede versie v1.0.1 van Spark HBase Connector (SHC) gepubliceerd. In deze blog bespreken we de belangrijkste functies die we dit jaar hebben geïmplementeerd.
Ondersteuning Phoenix-coder
SHC kan worden gebruikt om gegevens naar het HBase-cluster te schrijven voor verdere stroomafwaartse verwerking. Het ondersteunt Avro-serialisatie voor invoer- en uitvoergegevens en is standaard ingesteld op een aangepaste serialisatie met behulp van een eenvoudig native coderingsmechanisme. Bij het lezen van invoergegevens duwt SHC filters naar HBase voor efficiënte scans van gegevens. Gezien de populariteit van Phoenix-gegevens in HBase, lijkt het logisch om naast Avro-gegevens ook Phoenix-gegevens te ondersteunen als invoer voor HBase. Bovendien lijkt het in gebreke blijven van de eenvoudige native binaire codering vatbaar voor toekomstige wijzigingen en vormt dit een risico voor gebruikers die gegevens van SHC naar HBase schrijven. Met SHC in de toekomst bijvoorbeeld, moet achterwaartse compatibiliteit op de juiste manier worden afgehandeld. Dus de standaard, SHC moet veranderen naar een meer standaard en goed getest formaat zoals Phoenix.
Voor de ondersteuning van samengestelde sleutels moest vóór deze functie de waardelengte van elke dimensie worden vastgesteld, met uitzondering van de laatste dimensie van de samengestelde sleutel. Deze beperking is verwijderd door Phoenix coder. Als gebruikers momenteel Phoenix als datacoder kiezen, hoeven ze niet de lengte van elk deel van de samengestelde sleutel in de catalogus op te geven.
Aangezien Phoenix de standaardcodeerder is, is de enige verandering voor de gebruikers dat als ze PrimitiveType willen gebruiken als de gegevenscodeerder, ze "tableCoder":"PrimitiveType" in hun catalogi moeten specificeren om SHC te informeren dat ze in plaats daarvan PrimitiveType willen gebruiken van Phoenix als "tableCoder".
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”key”,
|”kolommen”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf2″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf3″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf4″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf5″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf6″, “col”:”col6 ″, “type”:”smallint”},
|”col7″:{“cf”:”cf7″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf8″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
Cache Spark HBase-verbindingen
SHC heeft eerder geen verbindingsobjecten naar HBase in de cache opgeslagen. In het bijzonder werd de aanroep naar 'ConnectionFactory.createConnection' elke keer gedaan wanneer SHC HBase-tabellen en -regio's moest bezoeken. Gebruikers konden dit eenvoudig zien door naar de logboeken van de uitvoerder te kijken en te observeren dat er voor elk verzoek verbindingen met dierenverzorgers tot stand werden gebracht. In de documentatie van interface Connection staat dat het maken van een verbinding een zware operatie is en dat implementaties van verbindingen thread-safe zijn. Daarom zou het voor processen met een lange levensduur erg handig zijn voor SHC om een verbinding in de cache te bewaren. Met deze functie vermindert SHC het aantal gemaakte verbindingen drastisch en verbetert het de prestaties in het proces aanzienlijk.
Ondersteuning van dubbele kolomfamilies
SHC heeft ondersteuning voor dubbele kolomfamilies ondersteund. Nu kunnen gebruikers hun catalogi als volgt definiëren:
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”key”,
|”kolommen”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf1″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf1″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf2″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf2″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf3″, “col”:”col6 ″, “type”:”smallint”},
|”col7″:{“cf”:”cf3″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf3″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
In de bovenstaande catalogusdefinitie hebben kolom 'col0', 'col1' en 'col2' dezelfde kolomfamilie 'cf1'.
Gebruik Spark UnhandledFilters API
SHC heeft ook de Spark API unhandledFilters geïmplementeerd, wat een effectieve optimalisatie is. Deze API vertelt Spark over filters die SHC niet implementeert, in tegenstelling tot het retourneren van alle filters. Het vorige gedrag, in dit geval, was om alle filters opnieuw toe te passen zodra de gegevens in Spark zijn opgehaald. Dit zou idempotent moeten zijn, dus er worden geen gegevens gewijzigd, maar het kan duur zijn als de filters ingewikkeld zijn.
SHC-gemeenschap
De SHC-gemeenschap is groter en invloedrijker dan een jaar geleden. In 2016 gaven we lezingen in Hadoop Summit en in HBase/Spark meetup, en schreven we uitgebreide blogs. Nu het aantal SHC-gebruikers toeneemt, ontvangen we een groter aantal gebruikersvragen. We zijn erg blij met de toegenomen acceptatie van SHC en als je ideeën hebt over hoe je het verder kunt verbeteren, geef ons dan feedback via Hortonworks Community Connection.
BEVESTIGING
We willen het Bloomberg-team bedanken voor het begeleiden van dit werk en het helpen valideren van dit werk. We willen ook de HBase-community bedanken voor het geven van feedback en het verbeteren hiervan. Ten slotte heeft dit werk gebruik gemaakt van de lessen uit eerdere Spark HBase-integraties en we willen hun ontwikkelaars bedanken voor het effenen van de weg.
REFERENTIE:
SHC: https://github.com/hortonworks-spark/shc
Apache HBase: https://hbase.apache.org/
Apache Spark: http://spark.apache.org/
Apache Phoenix: https://phoenix.apache.org/