Skip to content
Snippets Groups Projects
Verified Commit 8526d3e6 authored by David Beniamine's avatar David Beniamine
Browse files

Small refactor

parent bbf400ce
No related branches found
No related tags found
No related merge requests found
...@@ -46,20 +46,14 @@ def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, nu ...@@ -46,20 +46,14 @@ def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, nu
with mp.Pool(processes=numProc) as pool: with mp.Pool(processes=numProc) as pool:
# Define imap function # Define imap function
if(ordered): imap_func = pool.imap_unordered if(not ordered) else pool.imap
imap_func=pool.imap
else:
imap_func=pool.imap_unordered
with ExitStack() as stack: with ExitStack() as stack:
# open files # open files
out = stack.enter_context(open(outFileName, 'w+')) out = stack.enter_context(open(outFileName, 'w+'))
if(errFileName): err=stack.enter_context(open(errFileName, 'w+')) if(errFileName) else None
err=stack.enter_context(open(errFileName, 'w+'))
numErrors=0
else:
err=None
numErrors=0
start = time() start = time()
# actually do stuff # actually do stuff
for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize): for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize):
...@@ -77,10 +71,7 @@ def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, nu ...@@ -77,10 +71,7 @@ def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, nu
progressBar.update() progressBar.update()
# Report status # Report status
if(err): errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName) if(errFileName) else ''
errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName)
else:
errmsg=''
print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start))) print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start)))
# wrapper to run user function # wrapper to run user function
...@@ -91,23 +82,28 @@ def worker(args): ...@@ -91,23 +82,28 @@ def worker(args):
except: except:
if(handleErrors): if(handleErrors):
return (False,traceback.format_exc()) return (False,traceback.format_exc())
else:
raise raise
# Testing # Testing
def testFunc(arg): def testFunc(arg):
ret=str(arg)+str(os.getpid())
sleep(randint(2,5))
if(randint(0,100)%2 ==0): if(randint(0,100)%2 ==0):
raise Exception("COUCOU"+ret) raise Exception("COUCOU"+ret)
return testFunc2(arg)
def testFunc2(arg):
ret=str(arg)+str(os.getpid())
sleep(randint(0,3))
return ret return ret
def test(): def test():
print("Error file + debug") print("Error file + debug")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors", debug=True) parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors", debug=True)
print("Error file") print("Error file")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors") parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors")
print("No error file/ no rror")
parallelRunAndWrite(testFunc2, range(10), "testplop", numProc=2)
print("No error file") print("No error file")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2) parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment