Ik geloof dat de TypeError
komt van multiprocessing
's get
.
Ik heb alle DB-code uit je script verwijderd. Kijk hier eens naar:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Met behulp van r.wait
geeft het verwachte resultaat terug, maar gebruikt r.get
verhoogt TypeError
. Zoals beschreven in python-documenten
, gebruik r.wait
na een map_async
.
Bewerken :Ik moet mijn vorige antwoord aanpassen. Ik geloof nu dat de TypeError
komt van SQLAlchemy. Ik heb mijn script aangepast om de fout te reproduceren.
Bewerk 2 :Het lijkt erop dat het probleem is dat multiprocessing.pool
speelt niet goed als een worker een Exception verhoogt waarvan de constructor een parameter vereist (zie ook hier
).
Ik heb mijn script aangepast om dit te benadrukken.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
In jouw geval, aangezien je code een SQLAlchemy-uitzondering oproept, is de enige oplossing die ik kan bedenken om alle uitzonderingen op te vangen in de do
functie en re-raise een normale Exception
in plaats van. Zoiets als dit:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Bewerk 3 :het lijkt dus een bug met Python te zijn , maar de juiste uitzonderingen in SQLAlchemy zouden dit omzeilen:daarom heb ik het probleem aan de orde gesteld met SQLAlchemy , ook.
Als een tijdelijke oplossing voor het probleem, denk ik dat de oplossing aan het einde van Edit 2 zou doen (callbacks inpakken in try-behalve en re-raise).