sql >> Database >  >> RDS >> PostgreSQL

Hoe zet ik een SSH-tunnel in Google Cloud Dataflow op naar een externe databaseserver?

Probleem opgelost ! Ik kan niet geloven dat ik hier twee volle dagen aan heb besteed... Ik keek helemaal in de verkeerde richting.

Het probleem lag niet bij een of andere Dataflow- of GCP-netwerkconfiguratie, en voor zover ik weet...

is waar.

Het probleem zat natuurlijk in mijn code:alleen het probleem werd alleen onthuld in een gedistribueerde omgeving. Ik had de fout gemaakt om de tunnel te openen vanuit de hoofdprocessor van de pijpleiding, in plaats van de arbeiders. Dus de SSH-tunnel was open, maar niet tussen de werkers en de doelserver, alleen tussen de hoofdpijplijn en het doel!

Om dit op te lossen, moest ik mijn verzoek om DoFn wijzigen om de uitvoering van de query met de tunnel te verpakken:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

zoals je kunt zien, moest ik enkele stukjes pysql_beam-bibliotheek overschrijven.

Ten slotte opent elke werknemer zijn eigen tunnel voor elk verzoek. Het is waarschijnlijk mogelijk om dit gedrag te optimaliseren, maar het is genoeg voor mijn behoeften.




  1. Hoe kan ik een LIKE-query uitvoeren voor een jsonb-sleutel?

  2. Een aangepaste kolom maken met tekens voor automatisch ophogen

  3. Start en stop lokale mysql-instantie vanuit de toepassing

  4. Sqlplus orakel:Hoe kan ik het sql-commando op bash in 1 regel uitvoeren?