Introduction

Now that the multiprocessing library comes standard in Python 2.6, I thought I’d migrate some of my apps to take full advantage. However, there aren’t many examples out there showing how to write a basic multiprocessing program with a graphical front-end. Here’s a simple wxPython multiprocessing example. The actual calculation is trivial – the hard bit is making sure all queues are emptied and processes terminated properly upon exit.

A more complete example is listed on my website.

Objects Involved

Process Overview

There are a few key aspects to the design: On Windows, it is important to protect the “entry point” of the program by using if __name__ == '__main__': as recommended in the official documentation.

The worker processes must be independent of any object instance (ie. defined as a class method or similar) and started prior to wx.App instantiation (ie. in the __main__).

  • The task queue should be given tasks only as required to prevent any residual tasks upon the cessation of processing.
  • Special Concerns

    None yet!

    Code Sample

       1 """
       2 Simpler wxPython Multiprocessing Example
       3 ----------------------------------------
       4 
       5 This simple example uses a wx.App to control and monitor a pool of workers
       6 instructed to carry out a list of tasks.
       7 
       8 The program creates the GUI plus a list of tasks, then starts a pool of workers
       9 (processes) implemented with a classmethod. Within the GUI, the user can start
      10 and stop processing the tasks at any time.
      11 
      12 Copyright (c) 2010, Roger Stuckey. All rights reserved.
      13 """
      14 
      15 import getopt, math, random, sys, time, types, wx
      16 
      17 from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
      18 
      19 
      20 class MyFrame(wx.Frame):
      21     """
      22     A simple Frame class.
      23     """
      24     def __init__(self, parent, id, title, processes, taskqueue, donequeue, tasks):
      25         """
      26         Initialise the Frame.
      27         """
      28         self.processes = processes
      29         self.numprocesses = len(processes)
      30         self.taskQueue = taskqueue
      31         self.doneQueue = donequeue
      32         self.Tasks = tasks
      33         self.numtasks = len(tasks)
      34 
      35         wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))
      36 
      37         # Create the panel, sizer and controls
      38         self.panel = wx.Panel(self, wx.ID_ANY)
      39         self.sizer = wx.GridBagSizer(5, 5)
      40 
      41         self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
      42         self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
      43         self.start_bt.SetDefault()
      44         self.start_bt.SetToolTipString('Start the execution of tasks')
      45         self.start_bt.ToolTip.Enable(True)
      46 
      47         self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)
      48 
      49         # Add the controls to the sizer
      50         self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP|wx.RIGHT, border=5)
      51         self.sizer.Add(self.output_tc, (1, 0), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
      52         self.sizer.AddGrowableCol(0)
      53         self.sizer.AddGrowableRow(1)
      54 
      55         self.panel.SetSizer(self.sizer)
      56 
      57         self.Bind(wx.EVT_CLOSE, self.OnClose)
      58 
      59         # Set some program flags
      60         self.keepgoing = True
      61         self.i = 0
      62         self.j = 0
      63 
      64         self.output_tc.AppendText('Number of processes = %d\n' % self.numprocesses)
      65 
      66     def OnStart(self, event):
      67         """
      68         Start the execution of tasks by the processes.
      69         """
      70         self.start_bt.Enable(False)
      71         self.output_tc.AppendText('Unordered results...\n')
      72         # Start processing tasks
      73         self.processTasks(self.update)
      74         if (self.keepgoing):
      75             self.start_bt.Enable(True)
      76 
      77     def OnClose(self, event):
      78         """
      79         Stop the task queue, terminate processes and close the window.
      80         """
      81         if (self.j < self.i):
      82             self.output_tc.AppendText('Completing queued tasks...\n')
      83         self.start_bt.Enable(False)
      84         busy = wx.BusyInfo("Waiting for processes to terminate...")
      85         # Stop processing tasks and terminate the processes
      86         self.processTerm(self.update)
      87         self.Destroy()
      88 
      89     def processTasks(self, resfunc=None):
      90         """
      91         Start the execution of tasks by the processes.
      92         """
      93         self.keepgoing = True
      94 
      95         # Submit first set of tasks
      96         numprocstart = min(self.numprocesses, self.numtasks)
      97         for self.i in range(numprocstart):
      98             self.taskQueue.put(self.Tasks[self.i])
      99 
     100         self.j = -1 # done queue index
     101         self.i = numprocstart - 1 # task queue index
     102         while (self.j < self.i):
     103             # Get and print results
     104             self.j += 1
     105             output = self.doneQueue.get()
     106             # Execute some function (Yield to a wx.Button event)
     107             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
     108                 resfunc(output)
     109             if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
     110                 # Submit another task
     111                 self.i += 1
     112                 self.taskQueue.put(self.Tasks[self.i])
     113 
     114     def update(self, output):
     115         """
     116         Get and print the results from one completed task.
     117         """
     118         self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % output)
     119         # Give the user an opportunity to interact
     120         wx.YieldIfNeeded()
     121 
     122     def processTerm(self, resfunc=None):
     123         """
     124         Stop the execution of tasks by the processes.
     125         """
     126         self.keepgoing = False
     127 
     128         while (self.j < self.i):
     129             # Get and print any results remining in the done queue
     130             self.j += 1
     131             output = self.doneQueue.get()
     132             if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
     133                 resfunc(output)
     134 
     135         for n in range(self.numprocesses):
     136             # Terminate any running processes
     137             self.processes[n].terminate()
     138 
     139         # Wait for all processes to stop
     140         isalive = 1
     141         while isalive:
     142             isalive = 0
     143             for n in range(self.numprocesses):
     144                 isalive = isalive + self.processes[n].is_alive()
     145             time.sleep(0.5)
     146 
     147     def worker(cls, input, output):
     148         """
     149         Create a TaskProcessor object and calculate the result.
     150         """
     151         while True:
     152             args = input.get()
     153             result = 0
     154             # Calculate the result of a task
     155             for i in range(args[0]):
     156                 angle_rad = math.radians(args[1])
     157                 result += math.tanh(angle_rad)/math.cosh(angle_rad)/args[0]
     158             # Put the result on the output queue
     159             output.put(( current_process().name, current_process().pid, args[1], result ))
     160 
     161     # The worker must not require any existing object for execution!
     162     worker = classmethod(worker)
     163 
     164 
     165 class MyApp(wx.App):
     166     """
     167     A simple App class, modified to hold the processes and task queues.
     168     """
     169     def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, processes=[ ], taskqueue=[ ], donequeue=[ ], tasks=[ ]):
     170         """
     171         Initialise the App.
     172         """
     173         self.Processes = processes
     174         self.taskQueue = taskqueue
     175         self.doneQueue = donequeue
     176         self.Tasks = tasks
     177 
     178         wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)
     179 
     180     def OnInit(self):
     181         """
     182         Initialise the App with a Frame.
     183         """
     184         self.frame = MyFrame(None, -1, 'wxSimpler_MP', self.Processes, self.taskQueue, self.doneQueue, self.Tasks)
     185         self.frame.Show(True)
     186         return True
     187 
     188 
     189 if __name__ == '__main__':
     190 
     191     freeze_support()
     192 
     193     numtasks = 20
     194     # Determine the number of CPU's/cores
     195     numproc = cpu_count()
     196 
     197     # Create the task list
     198     Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
     199 
     200     # Create the queues
     201     taskQueue = Queue()
     202     doneQueue = Queue()
     203 
     204     Processes = [ ]
     205 
     206     # The worker processes must be started here!
     207     for n in range(numproc):
     208         process = Process(target=MyFrame.worker, args=(taskQueue, doneQueue))
     209         process.start()
     210         Processes.append(process)
     211 
     212     # Create the app, including worker processes
     213     app = MyApp(redirect=True, filename='wxsimpler_mp.stderr.log', processes=Processes, taskqueue=taskQueue, donequeue=doneQueue, tasks=Tasks)
     214     app.MainLoop()
    

    Comments

    If you have any questions, please feel free to contact the author, whose information is available in his profile.

    MultiProcessing (last edited 2010-07-23 04:53:24 by digger2)

    NOTE: To edit pages in this wiki you must be a member of the TrustedEditorsGroup.