Skip to content
Snippets Groups Projects
Verified Commit bbf400ce authored by David Beniamine's avatar David Beniamine
Browse files

New version using imap

parent 7205826a
Branches
No related tags found
No related merge requests found
# Parallel run and write in python # Parallel run and write in python
Do a parallel map(func, tasks) with numProc processes Do a parallel imap[_unordered](func, tasks) with numProc processes
+ func should always return something + func should always return something
+ If bar is true, show a progress bar
+ Tasks should be an iterable which items are acceptable args for func + Tasks should be an iterable which items are acceptable args for func
+ The output of func is loggedgradually in outFileName + The output of func is loggedgradually in outFileName
+ If bar is true, show a progress bar
+ If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
+ numProc sets the number of process to use
+ if ordered is true, results are outputed in the same order as tasks
+ chunksize : chunkszise for imap
+ if debug is set to true, errors are also printed to stderr
## How to use ## How to use
...@@ -19,6 +24,8 @@ from random import randint ...@@ -19,6 +24,8 @@ from random import randint
def testFunc(arg): def testFunc(arg):
ret=str(arg)+str(os.getpid()) ret=str(arg)+str(os.getpid())
sleep(randint(2,5)) sleep(randint(2,5))
if(randint(0,100)%2 ==0):
raise Exception("COUCOU"+ret)
return ret return ret
prw.parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") prw.parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors")
......
...@@ -20,77 +20,96 @@ import multiprocessing as mp ...@@ -20,77 +20,96 @@ import multiprocessing as mp
import os import os
import traceback import traceback
from pyprind import ProgBar from pyprind import ProgBar
from time import sleep from time import sleep, time
from random import randint from random import randint
from contextlib import ExitStack
from sys import stderr
MAX_PROC = int(mp.cpu_count()) MAX_PROC = int(mp.cpu_count())
# Do a parallel map(func, tasks) with numProc processes # Do a parallel imap[_unordered](func, tasks) with numProc processes
# + func should always return something # + func should always return something
# + If bar is true, show a progress bar
# + Tasks should be an iterable which items are acceptable args for func # + Tasks should be an iterable which items are acceptable args for func
# + The output of func is loggedgradually in outFileName # + The output of func is loggedgradually in outFileName
# + If bar is true, show a progress bar
# + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName # + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC): # + numProc sets the number of process to use
outListener, outQueue = startQueueListener(outFileName, len(tasks) if bar else 0) # + if ordered is true, results are outputed in the same order as tasks
errListener, errQueue = startQueueListener(errFileName, 0) # + chunksize : chunkszise for imap
# + if debug is set to true, errors are also printed to stderr
def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC, ordered=False, chunksize=1, debug=False):
with mp.Pool(processes=numProc) as pool: # prepare progressbar
pool.map(worker, [ (func, t, outQueue, errQueue) for t in tasks]) if(bar):
progressBar = ProgBar(len(tasks)+1, update_interval=2)
progressBar.update()
endListeners([outListener, errListener]) with mp.Pool(processes=numProc) as pool:
# Define imap function
if(ordered):
imap_func=pool.imap
else:
imap_func=pool.imap_unordered
# wrapper to run user function with ExitStack() as stack:
def worker(args): # open files
func, task, outQueue, errQueue = args out = stack.enter_context(open(outFileName, 'w+'))
try: if(errFileName):
outQueue.put(func(task)) err=stack.enter_context(open(errFileName, 'w+'))
except Exception as e: numErrors=0
if(errQueue is None):
raise e
else: else:
errQueue.put(traceback.format_exc()) err=None
# Write all messages from queue to outFileName start = time()
def listener(queue, outFileName, barSize): # actually do stuff
if(barSize>0): for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize):
progressBar = ProgBar(barSize+1, update_interval=2) success, message = result
if success:
f=out
else:
f=err
numErrors+=1
if(debug):
stderr.write(message + '\n')
f = open(outFileName, 'w+') f.write(message + '\n')
while True:
m = queue.get()
f.write(str(m) + '\n')
f.flush() f.flush()
if(barSize>0):
progressBar.update() progressBar.update()
f.close()
# Launch a process listening on a queue and writing its content to fname # Report status
def startQueueListener(outFileName, bar): if(err):
if(outFileName is None): errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName)
return None, None else:
queue = mp.Manager().Queue() errmsg=''
p = mp.Process(target=listener, args=(queue, outFileName, bar)) print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start)))
p.start()
return p, queue
# Kill and wait for the end of all given listeners # wrapper to run user function
def endListeners(listeners): def worker(args):
for l in listeners: func, task, handleErrors = args
if(l is not None): try:
l.terminate() return (True,func(task))
l.join() except:
if(handleErrors):
return (False,traceback.format_exc())
else:
raise
# Testing # Testing
def testFunc(arg): def testFunc(arg):
# raise Exception("COUCOU")
ret=str(arg)+str(os.getpid()) ret=str(arg)+str(os.getpid())
sleep(randint(2,5)) sleep(randint(2,5))
if(randint(0,100)%2 ==0):
raise Exception("COUCOU"+ret)
return ret return ret
def test(): def test():
parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") print("Error file + debug")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors", debug=True)
print("Error file")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors")
print("No error file")
parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2)
if __name__ == "__main__": if __name__ == "__main__":
test() test()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment