From bbf400ce0bf93ff9740b6aab9365903b0d676898 Mon Sep 17 00:00:00 2001
From: David Beniamine <david.beniamine@tetras-libre.fr>
Date: Wed, 5 Sep 2018 12:35:50 +0200
Subject: [PATCH] New version using imap

---
 Readme.md              |  11 +++-
 parallelRunAndWrite.py | 111 ++++++++++++++++++++++++-----------------
 2 files changed, 74 insertions(+), 48 deletions(-)

diff --git a/Readme.md b/Readme.md
index b63ad95..31ddcd8 100644
--- a/Readme.md
+++ b/Readme.md
@@ -1,11 +1,16 @@
 # Parallel run and write in python
 
-Do a parallel map(func, tasks) with numProc processes
+Do a parallel imap[_unordered](func, tasks) with numProc processes
 + func should always return something
-+ If bar is true, show a progress bar
 + Tasks should be an iterable which items are acceptable args for func
 + The output of func is loggedgradually in outFileName
++ If bar is true, show a progress bar
 + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
++ numProc sets the number of process to use
++ if ordered is true, results are outputed in the same order as tasks
++ chunksize : chunkszise for imap
++ if debug is set to true, errors are also printed to stderr
+
 
 ## How to use
 
@@ -19,6 +24,8 @@ from random import randint
 def testFunc(arg):
     ret=str(arg)+str(os.getpid())
     sleep(randint(2,5))
+    if(randint(0,100)%2 ==0):
+        raise Exception("COUCOU"+ret)
     return ret
 
 prw.parallelRunAndWrite(testFunc, range(5), "testplop",  numProc=2, errFileName="errors")
diff --git a/parallelRunAndWrite.py b/parallelRunAndWrite.py
index 94ab5c9..837af07 100644
--- a/parallelRunAndWrite.py
+++ b/parallelRunAndWrite.py
@@ -20,77 +20,96 @@ import multiprocessing as mp
 import os
 import traceback
 from pyprind import ProgBar
-from time import sleep
+from time import sleep, time
 from random import randint
+from contextlib import ExitStack
+from sys import stderr
 
 MAX_PROC = int(mp.cpu_count())
 
-# Do a parallel map(func, tasks) with numProc processes
+# Do a parallel imap[_unordered](func, tasks) with numProc processes
 # + func should always return something
-# + If bar is true, show a progress bar
 # + Tasks should be an iterable which items are acceptable args for func
 # + The output of func is loggedgradually in outFileName
+# + If bar is true, show a progress bar
 # + If errFileName is not None, all exceptions produced by func are intercepted and gradually logged in errFileName
-def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC):
-    outListener, outQueue = startQueueListener(outFileName, len(tasks) if bar else 0)
-    errListener, errQueue = startQueueListener(errFileName, 0)
+# + numProc sets the number of process to use
+# + if ordered is true, results are outputed in the same order as tasks
+# + chunksize : chunkszise for imap
+# + if debug is set to true, errors are also printed to stderr
+def parallelRunAndWrite(func, tasks, outFileName, bar=True, errFileName=None, numProc=MAX_PROC, ordered=False, chunksize=1, debug=False):
+
+    # prepare progressbar
+    if(bar):
+        progressBar = ProgBar(len(tasks)+1, update_interval=2)
+        progressBar.update()
 
     with mp.Pool(processes=numProc) as pool:
-        pool.map(worker, [ (func, t, outQueue, errQueue) for t in tasks])
+        # Define imap function
+        if(ordered):
+            imap_func=pool.imap
+        else:
+            imap_func=pool.imap_unordered
+
+        with ExitStack() as stack:
+            # open files
+            out = stack.enter_context(open(outFileName, 'w+'))
+            if(errFileName):
+                err=stack.enter_context(open(errFileName, 'w+'))
+                numErrors=0
+            else:
+                err=None
+
+            start = time()
+            # actually do stuff
+            for result in imap_func(worker, [(func,t,err is not None) for t in tasks], chunksize):
+                success, message = result
+                if success:
+                    f=out
+                else:
+                    f=err
+                    numErrors+=1
+                    if(debug):
+                        stderr.write(message + '\n')
 
-    endListeners([outListener, errListener])
+                f.write(message + '\n')
+                f.flush()
+                progressBar.update()
+
+    # Report status
+    if(err):
+        errmsg= '{} errors / {} tasks written to file "{}" \n'.format(numErrors, len(tasks), errFileName)
+    else:
+        errmsg=''
+    print('Output written to file "{}"\n{}Time elapsed: {}s'.format(outFileName, errmsg, int(time() - start)))
 
 # wrapper to run user function
 def worker(args):
-    func, task, outQueue, errQueue = args
+    func, task, handleErrors = args
     try:
-        outQueue.put(func(task))
-    except Exception as e:
-        if(errQueue is None):
-            raise e
+        return (True,func(task))
+    except:
+        if(handleErrors):
+            return (False,traceback.format_exc())
         else:
-            errQueue.put(traceback.format_exc())
-
-# Write all messages from queue to outFileName
-def listener(queue, outFileName, barSize):
-    if(barSize>0):
-        progressBar = ProgBar(barSize+1, update_interval=2)
-
-    f = open(outFileName, 'w+')
-    while True:
-        m = queue.get()
-        f.write(str(m) + '\n')
-        f.flush()
-        if(barSize>0):
-            progressBar.update()
-    f.close()
-
-# Launch a process listening on a queue and writing its content to fname
-def startQueueListener(outFileName, bar):
-    if(outFileName is None):
-        return None, None
-    queue = mp.Manager().Queue()
-    p = mp.Process(target=listener, args=(queue, outFileName, bar))
-    p.start()
-    return p, queue
-
-# Kill and wait for the end of all given listeners
-def endListeners(listeners):
-    for l in listeners:
-        if(l is not None):
-            l.terminate()
-            l.join()
+            raise
 
 # Testing
 
 def testFunc(arg):
-    # raise Exception("COUCOU")
     ret=str(arg)+str(os.getpid())
     sleep(randint(2,5))
+    if(randint(0,100)%2 ==0):
+        raise Exception("COUCOU"+ret)
     return ret
 
 def test():
-    parallelRunAndWrite(testFunc, range(5), "testplop",  numProc=2, errFileName="errors")
+    print("Error file + debug")
+    parallelRunAndWrite(testFunc, range(10), "testplop",  numProc=2, errFileName="errors", debug=True)
+    print("Error file")
+    parallelRunAndWrite(testFunc, range(10), "testplop",  numProc=2, errFileName="errors")
+    print("No error file")
+    parallelRunAndWrite(testFunc, range(10), "testplop",  numProc=2)
 
 if __name__ == "__main__":
    test()
-- 
GitLab