sql >> Database >  >> RDS >> PostgreSQL

Hang in Python-script met SQLAlchemy en multiprocessing

Ik geloof dat de TypeError komt van multiprocessing 's get .

Ik heb alle DB-code uit je script verwijderd. Kijk hier eens naar:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Met behulp van r.wait geeft het verwachte resultaat terug, maar gebruikt r.get verhoogt TypeError . Zoals beschreven in python-documenten , gebruik r.wait na een map_async .

Bewerken :Ik moet mijn vorige antwoord aanpassen. Ik geloof nu dat de TypeError komt van SQLAlchemy. Ik heb mijn script aangepast om de fout te reproduceren.

Bewerk 2 :Het lijkt erop dat het probleem is dat multiprocessing.pool speelt niet goed als een worker een Exception verhoogt waarvan de constructor een parameter vereist (zie ook hier ).

Ik heb mijn script aangepast om dit te benadrukken.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

In jouw geval, aangezien je code een SQLAlchemy-uitzondering oproept, is de enige oplossing die ik kan bedenken om alle uitzonderingen op te vangen in de do functie en re-raise een normale Exception in plaats van. Zoiets als dit:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Bewerk 3 :het lijkt dus een bug met Python te zijn , maar de juiste uitzonderingen in SQLAlchemy zouden dit omzeilen:daarom heb ik het probleem aan de orde gesteld met SQLAlchemy , ook.

Als een tijdelijke oplossing voor het probleem, denk ik dat de oplossing aan het einde van Edit 2 zou doen (callbacks inpakken in try-behalve en re-raise).



  1. Hoe wijzig ik de naam van een primaire sleutelkolom in MySQL?

  2. prestatieprobleem:verschil tussen select s.* vs select *

  3. Maak een lijst van alle dagen per maand en verdeel de waarde gelijkelijk over elke dag

  4. PostgreSQL zoeken en vervangen waar voorwaarde