Probleem opgelost ! Ik kan niet geloven dat ik hier twee volle dagen aan heb besteed... Ik keek helemaal in de verkeerde richting.
Het probleem lag niet bij een of andere Dataflow- of GCP-netwerkconfiguratie, en voor zover ik weet...
is waar.
Het probleem zat natuurlijk in mijn code:alleen het probleem werd alleen onthuld in een gedistribueerde omgeving. Ik had de fout gemaakt om de tunnel te openen vanuit de hoofdprocessor van de pijpleiding, in plaats van de arbeiders. Dus de SSH-tunnel was open, maar niet tussen de werkers en de doelserver, alleen tussen de hoofdpijplijn en het doel!
Om dit op te lossen, moest ik mijn verzoek om DoFn wijzigen om de uitvoering van de query met de tunnel te verpakken:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
zoals je kunt zien, moest ik enkele stukjes pysql_beam-bibliotheek overschrijven.
Ten slotte opent elke werknemer zijn eigen tunnel voor elk verzoek. Het is waarschijnlijk mogelijk om dit gedrag te optimaliseren, maar het is genoeg voor mijn behoeften.