In part three of this series we looked at how message queues could be used to receive updates from multiple threads and ensure that a data structure remain consistent. Now I would like to look at how messages queue can be used to spread work between different queues to allow it to the work to be performed in parallel. We will also look at an alternative mechanism of doing this using semaphores.

To illustrate this we’re going to take another look at option pricing as we did in the “Recalculating Values Only When Dependencies Change” series. We’ll used the same simplified option pricing function we found here: http://www.uncarved.com/blog/ocaml_deriv_1.mrk And adapt to:

// pricing.fs - A rudimentary European call option pricer designed to

// mimic  listings 1.1 to 1.3 in Joshi

 

// Written by Sean Hunter <sean@uncarved.com>,

// Adpated to F# by Robert Pickering

 

// This is demonstration code only.  You are free to use it under the

// Creative Commons Attribution 2.5 license, but don't expect it to

// accurately price real options.

#light

 

// get a random gaussian using a Box-Muller transform, described

// here http://en.wikipedia.org/wiki/Box-Muller_transform *)

let rec getOneGaussianByBoxMuller (rand:System.Random) =

    (* Generate two uniform numbers from -1 to 1 *)

    let x = (rand.NextDouble() * 2.0) - 1.0

    let y = (rand.NextDouble() * 2.0) - 1.0

    let s = x * x + y *y

    if s > 1.0 then getOneGaussianByBoxMuller rand

    else x * sqrt (-2.0 * (log s) / s)

 

// Price a European call using Monte Carlo.

//

// We pre-compute as much as possible before the simulation, then the

// actual mc paths are done as a nested recursive function.  This seems

// more idiomatically functional even though ocaml has for loops.

let simpleMonteCarlo1 expiry strike spot vol r numPaths =

    let rand = new System.Random()

    let variance = vol * vol * expiry

    let rootVariance = sqrt variance

    let itoCorrection = -0.5 * variance

    let movedSpot = spot * exp (r * expiry + itoCorrection)

    let rec doPath i runningSum =

        if i < numPaths then begin

            let thisGaussian = getOneGaussianByBoxMuller rand

            let thisSpot = movedSpot * (exp (rootVariance * thisGaussian))

            let thisPayoff = max (thisSpot - strike) 0.0

            doPath (i+1) (runningSum + thisPayoff)

        end

        else (runningSum / (float numPaths)) * (exp (-1.0 * r * expiry))

    doPath 0 0.0

This will provide the work that we need to do. We’re going to build a small app that will allow us to test how to the price as we change the inputs. Pricing can take a fair amount of time, so we don’t want to lock the GUI during pricing, to allow the user to carry on using the application. This requirement also means we need to think of the design of the GUI, if we only allowed one pricing at time we could just provide a text box or label for the result, but the fact that several pricing could be happening at once means that if we only provide one slot for the result and two pricings finish at the same time the first result will be overwritten without the user having chance to see it. So I came up with this GUI, the results of the pricing, along with the pricing parameters, are displayed in a DataGridView:

To perform the pricing we use 2 types of queuing agent, we have a master queue which is responsible for dividing the work between a number of worker agents. We could have had just one queue, but then the pricing request would have always been executed in serially, these two types of queue offer way to execute the work in parallel yet ensure that an arbitrary number of requests executing in parallel is not exceeded. The design of the worker and master agent is:

// this is the agent that actually performs the pricing of the option

type WorkerAgent() =

    let counter = MailboxProcessor.Start(fun inbox ->

             // read a message, preform the pricing and fead the result

             // to the result handler

             let rec loop() =

                async { let! expiry, strike, spot, vol, r, numPaths, result

                            = inbox.Receive()

                        let expiry = Double.Parse(expiry)

                        let strike = Double.Parse(strike)

                        let spot = Double.Parse(spot)

                        let vol = Double.Parse(vol)

                        let r = Double.Parse(r)

                        let numPaths = Int32.Parse(numPaths)

                       

                        let price = Pricing.simpleMonteCarlo1

                                        expiry strike spot vol r numPaths

 

                        // post the result back the UI via the result function

                        do result price

                        return! loop() }

             loop())

    member x.Post(n) = counter.Post(n)

 

// This queue is designed to handle the rotation of messages between

// the worker queues

type MasterQueueAgent() =

    let counter = MailboxProcessor.Start(fun inbox ->

             // initalize a list of works limited by the number

             // of process avaiable

             let workers =

                [ for x in 1 .. Environment.ProcessorCount -> new WorkerAgent() ]

             // loop which peels a work off the queue then places

             // adds a the work item to its queue then place it at the back

             // of the queue

             let rec loop(workers:list<WorkerAgent>) =

                async { let! msg = inbox.Receive()

                        let woker = workers.Head

                        do woker.Post(msg)

                        return! loop(workers.Tail @ [workers.Head])}

             loop(workers))

 

    member x.Post(n) = counter.Post(n)

// the intance of the master queue we will interact with

let masterQueue = MasterQueueAgent()

 

We can see from the above listing that each agent type offers one simple Post method, to which we post work items. The master queue sits in a loop waiting for messages, once it receives a message it picks a worker from an internal list of works to execute the request, the method of choosing which worker to use is very simple we take the head of the queue and assign it the work once work has been assigned to it, it is placed in the back of the queue. I choose to limit the number of workers to the number of processors available, this is because I know the pricing is processor intensive and therefore processor bound rather than I/O bound (in fact there should be no I/O). Once a work item it posted to the master queue it should be assigned to a work quickly was the master does not do much work itself.

This approach seems to work quite well, on my duel core machine when two or more request are running we use all of both processors until all but one of the request has finished and the UI stays response – we are able to change the input values and execute more pricing request.

One alternative to this approach is to use a semaphore. What is a semaphore? Wikipedia defines a semaphore as “a protected variable (or abstract data type) which constitutes the classic method for restricting access to shared resources, such as shared memory, in a multiprogramming environment. A semaphore is a counter for a set of available resources, rather than a locked/unlocked flag of a single resource”. The windows API supports semaphores and this is exposed to .NET thought the System.Threading.Semaphore class. Although the semaphore is quite a low level concept, it’s surprisingly straight forward to use, and our semaphore based solution is a little shorter than the message queue mechanism.

// A type that helps limit the number of active web requests

type RequestGate(n) =

    let semaphore = new Semaphore(initialCount=n,maximumCount=n)

    member x.AcquireAsync(?timeout) =

        async { let! ok = semaphore.WaitOneAsync(?millisecondsTimeout=timeout)

                if ok then

                   return

                     { new System.IDisposable with

                         member x.Dispose() =

                             semaphore.Release() |> ignore }

                else

                   return! failwith "couldn't acquire a semaphore" }

                  

let requestGate = RequestGate(Environment.ProcessorCount)

                  

let priceOption expiry strike spot vol r numPaths result =

    async { use! holder = requestGate.AcquireAsync()

           

            let expiry = Double.Parse(expiry)

            let strike = Double.Parse(strike)

            let spot = Double.Parse(spot)

            let vol = Double.Parse(vol)

            let r = Double.Parse(r)

            let numPaths = Int32.Parse(numPaths)

           

            let price = Pricing.simpleMonteCarlo1

                            expiry strike spot vol r numPaths

 

            do result price }

 

The key to understanding this listing is the RequestGate class; this provides a user friendly way of accessing the semaphore. All we need do is called the “AcquireAsync” method from inside an asynchronous workflow and the workflow will wait till we acquire the semaphore. Because this is an asynchronous workflow a thread will not be blocked, the end of the workflow will be invoked via call back as discussed in parts one and two. All that remains is to either post a message to master queue or spawn the workflow that will try and acquire the semaphore. Here is how we post a message to the queue:

        let result f =

            let updateGrid() =

                results.Add({ expiry = expiry;

                              strike = strike;

                              spot = spot;

                              r = r;

                              vol = vol;

                              numPaths = numPaths;

                              price = f })

            if resultsGrid.InvokeRequired then

                resultsGrid.Invoke(new ResultAction(fun _ -> updateGrid())) |> ignore

            else

                updateGrid()

        // post the message to the master queue

        masterQueue.Post((expiry, strike, spot, vol, r, numPaths, result)) )

 

The most important thing to notice about this is how the “result” function is created, notice that we test whether we need to invoke back to the GUI thread (in most cases we will). Calling the version that waits for a semaphore is almost exactly the same:

        // spawn off the request that will wait for semaphore

        Async.Spawn(priceOption expiry strike spot vol r numPaths result))

 

So which version do I prefer? Well the version using the semaphore is shorter, which is always a good thing in my book, but we have less control with the semaphore, once the maximum count of a semaphore is fixed it would be difficult to change it. The master/worker queue implementation offers much more flexibility in this respect; workers could be added or removed to the queue dynamically in response to heuristics about the state of the host machine, or work could be scheduled based on the number of work items waiting in a worker’s queue. We could also make our implementation of the master worker queue much more generic, it could be rewritten so a unit of work was just a function of type “unit -> unit”. However in many ways this is what the implementers of the “Task Parallel Library are trying to do so it may be easier just to use their implementation of queues (the Task/TaskManager classes). Messages queues of this style maybe more suited to task that need to execute in parallel but still need to share data between them, unlike our option pricing where the price is independent.

I’ve not shown some the GUI code from the example; if you want to run it for yourself you can download it here.

This will be my last blog post before setting off to Barcelona, hope to see you there!

Feedback:

Feedback was imported from my only blog engine, it’s no longer possible to post feedback here.

re: Concurrency in F# – Part IV – Queuing Working using Semaphores or Message Queues - Joel

Thank you very much for your sharings here. I’ve learnt a lot from it.