Parallel Monte Carlo with Dask
Today, we will look at how to implement a Monte Carlo simulation
to value an Asian option in a Jupyter notebook; and how to distribute that Monte Carlo
simluation across a cluster using Dask. Don’t worry if you are not a
Quantitative Analyst, this blog is more about Dask,
from dask import delayed and exciting
cluster stuff than about financial mathematics.
An Asian option is an exotic type of financial option contract. The payoff at expiry depends on the average underlying price over some pre-set period of time - different to a plain vanilla option where the payoff depends on the underlying price at expiry (fun fact: they are called Asian options because legend has it that Bankers Trust traded the first option linked to the average price of crude oil in in Tokyo back in 1987).
The payoff of a fixed-strike Asian call option at expiry is defined as:
C(T) = max(0, A(T) - K).
There is no closed form analytical formula to determine the fair value any time before
expiry for this kind of option. But we can determine the fair value by simulating a
large number of paths for the underlying asset price and calculate the payout at expiry
for each path. The fair value is then the discounted average over all these Monte Carlo
We assume that the current value of the underlying asset price is composed of the past value and
a determinstic upward trend plus an error term defined as a white noise (a normal variable with zero mean and variance one). This is called a random walk with drift. Given a start price
s0, a constant drift
mu and volatility
sigma, we can simulate the price path over
days number of days like this:
import numpy as np def random_walk(s0, mu, sigma, days): dt = 1/365. prices = np.zeros(days) shocks = np.zeros(days) prices = s0 for i in range(1, days): e = np.random.normal(loc=mu * dt, scale=sigma * np.sqrt(dt)) prices[i] = prices[i-1] * (1 + e) return prices
For any random path, we can now calculate the average price and, given the strike price
K, the payoff at expiry:
days = 365 * 4 # days to expiry s0 = 100 # current underlying price mu = 0.02 # drift sigma = 0.2 # volatility K = 100 # strike price A = np.average(random_walk(s0, mu, sigma, days) C = max(0, A - K)
Monte Carlo Simulation
We need to generate a large number of random price paths for the underlying. For each price path we calculate the associated payoff. These payoffs are averaged and discounted to today. This result is the value of the option.
The actual number of simulations
n required depends on the confidence level you are comfortable with for
the estimate. That, in turn, depends on your option’s parameters. I won’t go into details here, so let’s just assume that
n = 10000. We need to generate
n random paths and calculate the average value of the option at expiry:
n = 10000 %time np.average([max(0, np.average(random_walk(s0, mu, sigma, days)) - K) for i in range(0, n)]) Wall time: 1min 10s 11.673350929139112
Dask task scheduler: dask.distributed
If you have more than one CPU at your disposal, you can bring down the calculation time by
distributing the random walk generation across multiple CPUs. This is where Dask comes in.
Dask is a flexible library for parallel computing in Python, optimised for
interactive computational workloads. Instead of running
n random walks in a single
Jupyter notebook process, we make Dask distribute these calculations across
a number of processes.
Distributing these calculations is the job of the
task scheduler. Think of the task
scheduler as a server that manages where and how calculations as executed. There are different types of schedulers: single machine scheduler (limited to local machine) and distributed scheduler (distributed across a cluster). Here, I will
use a simple single machine scheduler.
In a new shell/command window, start a local
dask.distributed process scheduler:
~$ python ... >>> from dask.distributed import Client >>> client = Client(scheduler_port=8786)
You can interrogate the
client object to confirm it runs on port 8786 on localhost.
By default, it spins up as many processes as you have CPUs on your computer, in my case I have 4 cores:
>>> client <Client: scheduler='tcp://127.0.0.1:8786' processes=4 cores=4>
Wrapping functions with dask.delayed
We need to instruct our
np.average function calls
to execute lazily. Instead of executing the functions immediately,
we want to defer execution via the Dask task scheduler. We can achieve
this by wrapping functions in
dask.delayed. Alternatively, you can decorate
your functions with
@dask.delayed (not covered in this blog post, though
have a look at http://dask.pydata.org/en/latest/delayed.html#decorator).
This delays the execution of the function and generates a task graph instead.
from dask import delayed s0 = 100 K = 100 mu = 0.02 sigma = 0.2 days = 365*4 n = 10000 result = delayed(np.average)([ delayed(max)( 0, delayed(np.average)(random_walk(s0, mu, sigma, days)) - K ) for i in range(0, n) ])
result is now a
Delayed object and contains the task graph. The task graph is a
directed acyclic graph (DAG) and models the dependencies between the
random_walk function calls - without executing them. This allows Dask to optimise
its task execution strategy.
Execute the task graph
We have everything in place now to execute the actual calculation of
result up to this point is just a Delayed object. Execute the computation:
%time result.compute() Wall time: 39.9 s 11.617145839123664
In essence, the calculation time has roughly halved on 4 cores compared to 1 core. The reason it has not reduced to 25% is that there is some overhead involved in managing and distributing the task themselves. Have a look at the Dask docs about dask.delayed and task scheduling.