Skip to content
Snippets Groups Projects
Verified Commit fc435ee2 authored by David Beniamine's avatar David Beniamine
Browse files

Python file and readme

parent 8c7bf056
No related branches found
No related tags found
No related merge requests found
# Parallel run and write in python
Do a parallel map(func, tasks) with numProc processes
+ func should always return something
+ If bar is true, show a progress bar
+ Tasks should be an iterable which items are acceptable args for func
+ The output of func is loggedgradually in outFileName
+ If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
## How to use
```python
def testFunc(arg):
ret=str(arg)+str(os.getpid())
sleep(randint(2,5))
return ret
def test():
parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors")
```
#!/user/bin/env python3
# Copyright (C) 2018 Tetras Libre <Contact@Tetras-Libre.fr>
# Author: Beniamine, David <David.Beniamine@Tetras-Libre.fr>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import multiprocessing as mp
import os
import traceback
from pyprind import ProgBar
from time import sleep
from random import randint
MAX_PROC = int(mp.cpu_count())
# Do a parallel map(func, tasks) with numProc processes
# + func should always return something
# + If bar is true, show a progress bar
# + Tasks should be an iterable which items are acceptable args for func
# + The output of func is loggedgradually in outFileName
# + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC):
outListener, outQueue = startQueueListener(outFileName, len(tasks) if bar else 0)
errListener, errQueue = startQueueListener(errFileName, 0)
with mp.Pool(processes=numProc) as pool:
pool.map(worker, [ (func, t, outQueue, errQueue) for t in tasks])
endListeners([outListener, errListener])
# wrapper to run user function
def worker(args):
func, task, outQueue, errQueue = args
try:
outQueue.put(func(task))
except Exception as e:
if(errQueue is None):
raise e
else:
errQueue.put(traceback.format_exc())
# Write all messages from queue to outFileName
def listener(queue, outFileName, barSize):
if(barSize>0):
progressBar = ProgBar(barSize+1, update_interval=2)
f = open(outFileName, 'w+')
while True:
m = queue.get()
f.write(str(m) + '\n')
f.flush()
if(barSize>0):
progressBar.update()
f.close()
# Launch a process listening on a queue and writing its content to fname
def startQueueListener(outFileName, bar):
if(outFileName is None):
return None, None
queue = mp.Manager().Queue()
p = mp.Process(target=listener, args=(queue, outFileName, bar))
p.start()
return p, queue
# Kill and wait for the end of all given listeners
def endListeners(listeners):
for l in listeners:
if(l is not None):
l.terminate()
l.join()
# Testing
def testFunc(arg):
# raise Exception("COUCOU")
ret=str(arg)+str(os.getpid())
sleep(randint(2,5))
return ret
def test():
parallelRunAndWrite(testFunc, range(5), "testplop", numProc=2, errFileName="errors")
if __name__ == "__main__":
test()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment