diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000000000000000000000000000000000000..5c44ff2bc5bd337a950be93a9ce50826e4d0329a --- /dev/null +++ b/Readme.md @@ -0,0 +1,20 @@ +# Parallel run and write in python + +Do a parallel map(func, tasks) with numProc processes ++ func should always return something ++ If bar is true, show a progress bar ++ Tasks should be an iterable which items are acceptable args for func ++ The output of func is loggedgradually in outFileName ++ If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName + +## How to use + +```python +def testFunc(arg): + ret=str(arg)+str(os.getpid()) + sleep(randint(2,5)) + return ret + +def test(): + parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") +``` diff --git a/parallelRunAndWrite.py b/parallelRunAndWrite.py new file mode 100644 index 0000000000000000000000000000000000000000..94ab5c95cd1eb884a05023670cc36ab333971f04 --- /dev/null +++ b/parallelRunAndWrite.py @@ -0,0 +1,96 @@ +#!/user/bin/env python3 + +# Copyright (C) 2018 Tetras Libre <Contact@Tetras-Libre.fr> +# Author: Beniamine, David <David.Beniamine@Tetras-Libre.fr> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import multiprocessing as mp +import os +import traceback +from pyprind import ProgBar +from time import sleep +from random import randint + +MAX_PROC = int(mp.cpu_count()) + +# Do a parallel map(func, tasks) with numProc processes +# + func should always return something +# + If bar is true, show a progress bar +# + Tasks should be an iterable which items are acceptable args for func +# + The output of func is loggedgradually in outFileName +# + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName +def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC): + outListener, outQueue = startQueueListener(outFileName, len(tasks) if bar else 0) + errListener, errQueue = startQueueListener(errFileName, 0) + + with mp.Pool(processes=numProc) as pool: + pool.map(worker, [ (func, t, outQueue, errQueue) for t in tasks]) + + endListeners([outListener, errListener]) + +# wrapper to run user function +def worker(args): + func, task, outQueue, errQueue = args + try: + outQueue.put(func(task)) + except Exception as e: + if(errQueue is None): + raise e + else: + errQueue.put(traceback.format_exc()) + +# Write all messages from queue to outFileName +def listener(queue, outFileName, barSize): + if(barSize>0): + progressBar = ProgBar(barSize+1, update_interval=2) + + f = open(outFileName, 'w+') + while True: + m = queue.get() + f.write(str(m) + '\n') + f.flush() + if(barSize>0): + progressBar.update() + f.close() + +# Launch a process listening on a queue and writing its content to fname +def startQueueListener(outFileName, bar): + if(outFileName is None): + return None, None + queue = mp.Manager().Queue() + p = mp.Process(target=listener, args=(queue, outFileName, bar)) + p.start() + return p, queue + +# Kill and wait for the end of all given listeners +def endListeners(listeners): + for l in listeners: + if(l is not None): + l.terminate() + l.join() + +# Testing + +def testFunc(arg): + # raise Exception("COUCOU") + ret=str(arg)+str(os.getpid()) + sleep(randint(2,5)) + return ret + +def test(): + parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors") + +if __name__ == "__main__": + test()