sql >> Database >  >> RDS >> Mysql

Google Dataflow (Apache beam) JdbcIO bulk invoegen in mysql database

BEWERKEN 27-01-2018:

Het blijkt dat dit probleem verband houdt met de DirectRunner. Als u dezelfde pijplijn uitvoert met de DataflowRunner, zou u batches moeten krijgen die in werkelijkheid maximaal 1000 records bevatten. De DirectRunner maakt altijd bundels van maat 1 na een groeperingshandeling.

Oorspronkelijke antwoord:

Ik ben hetzelfde probleem tegengekomen bij het schrijven naar clouddatabases met JdbcIO van Apache Beam. Het probleem is dat hoewel JdbcIO het schrijven van maximaal 1.000 records in één batch ondersteunt, ik het nog nooit echt meer dan 1 rij tegelijk heb zien schrijven (ik moet toegeven:dit gebruikte altijd de DirectRunner in een ontwikkelomgeving).

Ik heb daarom een ​​functie toegevoegd aan JdbcIO waar je zelf de grootte van de batches kunt bepalen door je gegevens te groeperen en elke groep als één batch te schrijven. Hieronder ziet u een voorbeeld van hoe u deze functie kunt gebruiken, gebaseerd op het originele WordCount-voorbeeld van Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

Het verschil met de normale schrijfmethode van JdbcIO is de nieuwe methode writeIterable() waarvoor een PCollection<Iterable<RowT>> . nodig is als invoer in plaats van PCollection<RowT> . Elke Iterable wordt als één batch naar de database geschreven.

De versie van JdbcIO met deze toevoeging is hier te vinden:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Het volledige voorbeeldproject met het bovenstaande voorbeeld is hier te vinden:https://github.com/ olavloite/spanner-beam-voorbeeld

(Er is ook een pull-verzoek in behandeling bij Apache Beam om dit in het project op te nemen)




  1. MySQL-fout #1054 - Onbekende kolom in 'Veldenlijst'

  2. Hoe te repareren "Server is niet geconfigureerd voor RPC" Msg 7411 met behulp van T-SQL

  3. Hoe de activatielink in PHP laten verlopen?

  4. Hoe kan ik een specifiek record bijwerken zonder SELECT-machtiging?