#!/user/bin/env python3 # Copyright (C) 2018 Tetras Libre <Contact@Tetras-Libre.fr> # Author: Beniamine, David <David.Beniamine@Tetras-Libre.fr> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import multiprocessing as mp import os import traceback from pyprind import ProgBar from time import sleep, time from random import randint from contextlib import ExitStack from sys import stderr MAX_PROC = int(mp.cpu_count()) # Do a parallel imap[_unordered](func, tasks) with numProc processes # + func should always return something # + Tasks should be an iterable which items are acceptable args for func # + The output of func is loggedgradually in outFileName # + If bar is true, show a progress bar # + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName # + numProc sets the number of process to use # + if ordered is true, results are outputed in the same order as tasks # + chunksize : chunkszise for imap # + if debug is set to true, errors are also printed to stderr def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC, ordered=False, chunksize=1, debug=False): # prepare progressbar if(bar): progressBar = ProgBar(len(tasks)+1, update_interval=2) progressBar.update() with mp.Pool(processes=numProc) as pool: # Define imap function imap_func = pool.imap_unordered if(not ordered) else pool.imap with ExitStack() as stack: # open files out = stack.enter_context(open(outFileName, 'w+')) err=stack.enter_context(open(errFileName, 'w+')) if(errFileName) else None numErrors=0 start = time() # actually do stuff for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize): success, message = result if success: f=out else: f=err numErrors+=1 if(debug): stderr.write(message + '\n') f.write(message + '\n') f.flush() progressBar.update() # Report status errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName) if(errFileName) else '' print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start))) # wrapper to run user function def worker(args): func, task, handleErrors = args try: return (True,func(task)) except: if(handleErrors): return (False,traceback.format_exc()) raise # Testing def testFunc(arg): if(randint(0,100)%2 ==0): raise Exception("COUCOU"+ret) return testFunc2(arg) def testFunc2(arg): ret=str(arg)+str(os.getpid()) sleep(randint(0,3)) return ret def test(): print("Error file + debug") parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors", debug=True) print("Error file") parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2, errFileName="errors") print("No error file/ no rror") parallelRunAndWrite(testFunc2, range(10), "testplop", numProc=2) print("No error file") parallelRunAndWrite(testFunc, range(10), "testplop", numProc=2) if __name__ == "__main__": test()