diff --git a/Readme.md b/Readme.md index b63ad956084c3feb40c5b835876120ea29795ff5..31ddcd88014814e3a9616ffc76a680dea7a56ec0 100644 --- a/Readme.md +++ b/Readme.md @@ -1,11 +1,16 @@ # Parallel run and write in python -Do a parallel map(func, tasks) with numProc processes +Do a parallel imap[_unordered](func, tasks) with numProc processes + func should always return something -+ If bar is true, show a progress bar + Tasks should be an iterable which items are acceptable args for func + The output of func is loggedgradually in outFileName ++ If bar is true, show a progress bar + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName ++ numProc sets the number of process to use ++ if ordered is true, results are outputed in the same order as tasks ++ chunksize : chunkszise for imap ++ if debug is set to true, errors are also printed to stderr + ## How to use @@ -19,6 +24,8 @@ from random import randint def testFunc(arg): ret=str(arg)+str(os.getpid()) sleep(randint(2,5)) + if(randint(0,100)%2 ==0): + raise Exception("COUCOU"+ret) return ret prw.parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") diff --git a/parallelRunAndWrite.py b/parallelRunAndWrite.py index 94ab5c95cd1eb884a05023670cc36ab333971f04..837af07744ea062e976023bc1f06ddfb5434d65f 100644 --- a/parallelRunAndWrite.py +++ b/parallelRunAndWrite.py @@ -20,77 +20,96 @@ import multiprocessing as mp import os import traceback from pyprind import ProgBar -from time import sleep +from time import sleep, time from random import randint +from contextlib import ExitStack +from sys import stderr MAX_PROC = int(mp.cpu_count()) -# Do a parallel map(func, tasks) with numProc processes +# Do a parallel imap[_unordered](func, tasks) with numProc processes # + func should always return something -# + If bar is true, show a progress bar # + Tasks should be an iterable which items are acceptable args for func # + The output of func is loggedgradually in outFileName +# + If bar is true, show a progress bar # + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName -def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC): - outListener, outQueue = startQueueListener(outFileName, len(tasks) if bar else 0) - errListener, errQueue = startQueueListener(errFileName, 0) +# + numProc sets the number of process to use +# + if ordered is true, results are outputed in the same order as tasks +# + chunksize : chunkszise for imap +# + if debug is set to true, errors are also printed to stderr +def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC, ordered=False, chunksize=1, debug=False): + + # prepare progressbar + if(bar): + progressBar = ProgBar(len(tasks)+1, update_interval=2) + progressBar.update() with mp.Pool(processes=numProc) as pool: - pool.map(worker, [ (func, t, outQueue, errQueue) for t in tasks]) + # Define imap function + if(ordered): + imap_func=pool.imap + else: + imap_func=pool.imap_unordered + + with ExitStack() as stack: + # open files + out = stack.enter_context(open(outFileName, 'w+')) + if(errFileName): + err=stack.enter_context(open(errFileName, 'w+')) + numErrors=0 + else: + err=None + + start = time() + # actually do stuff + for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize): + success, message = result + if success: + f=out + else: + f=err + numErrors+=1 + if(debug): + stderr.write(message + '\n') - endListeners([outListener, errListener]) + f.write(message + '\n') + f.flush() + progressBar.update() + + # Report status + if(err): + errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName) + else: + errmsg='' + print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start))) # wrapper to run user function def worker(args): - func, task, outQueue, errQueue = args + func, task, handleErrors = args try: - outQueue.put(func(task)) - except Exception as e: - if(errQueue is None): - raise e + return (True,func(task)) + except: + if(handleErrors): + return (False,traceback.format_exc()) else: - errQueue.put(traceback.format_exc()) - -# Write all messages from queue to outFileName -def listener(queue, outFileName, barSize): - if(barSize>0): - progressBar = ProgBar(barSize+1, update_interval=2) - - f = open(outFileName, 'w+') - while True: - m = queue.get() - f.write(str(m) + '\n') - f.flush() - if(barSize>0): - progressBar.update() - f.close() - -# Launch a process listening on a queue and writing its content to fname -def startQueueListener(outFileName, bar): - if(outFileName is None): - return None, None - queue = mp.Manager().Queue() - p = mp.Process(target=listener, args=(queue, outFileName, bar)) - p.start() - return p, queue - -# Kill and wait for the end of all given listeners -def endListeners(listeners): - for l in listeners: - if(l is not None): - l.terminate() - l.join() + raise # Testing def testFunc(arg): - # raise Exception("COUCOU") ret=str(arg)+str(os.getpid()) sleep(randint(2,5)) + if(randint(0,100)%2 ==0): + raise Exception("COUCOU"+ret) return ret def test(): - parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") + print("Error file + debug") + parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors", debug=True) + print("Error file") + parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors") + print("No error file") + parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2) if __name__ == "__main__": test()