C / C++

Parallel implementation of „Monte Carlo” algorithm – reactive way

Monte Carlo method lets us to solve some kind of problems using statistic. One of them is to estimate value of PI number (just like trial and error method). To achieve this, we are going to place N random points inside a square (which radius equals 1) and count the number of points that are inside the circle.

How it works?

There is only one formula for the area of a circle A = πr2. Our circle has the radius 1, so it makes that area is just π. We can split our circle on 4 parts, so area of each part will be π/4. The rest is just a statistic. Take a look at the image below:

pi_30k

This is exactly what we are gonna do. Get N  random points and check how many of them is inside. Then, multiply our result by 4 and voilà! PI number is estimated.

What we will use?

To solve this problem, we are going to use windows.h library, which give us old and really ugly threads API. What is important – our algorithm have to be reactive. Observer pattern should helps us with this. We want to react on „messages” from threads and when all of them will end their calculations – show the result on the screen.

It is not a good solution, critical sections will makes our program slower, we know this. What makes this idea different? The point is that main thread can work in background.

Let’s start coding

Firstly, we must declare interfaces.


class Observable
{
    public:
        virtual void notify(const int) = 0;
};

class Observer
{
	public:
		virtual void onNext(const int) = 0;
		virtual void onComplete() const = 0;
};

Observable and Observer should provide methods, which allows to communicate between them. I think everything is clear here, so now we have to prepare our thread class, which will encapsulate real thread handle.


class ObservableThread : public Observable
{
	public:
		ObservableThread(Observer&, const int, const int);
		~ObservableThread();

		void notify(const int);
		void run();

		inline const HANDLE getHandle() const { return handle; }

	private:
		static DWORD WINAPI start(void *);

		HANDLE handle;
		Observer& observer;
		int id count, result;
};

So this is our ObservableThread, which contains some properties:

  • thread handle,
  • reference to Observer (just one, not list in our case!),
  • id – thread universal identifier,
  • count – number of iterations,
  • result – number of points in circle area.

Okay, now our application class:


class ParallelMCSolver : public Observer
{
	public:
		ParallelMCSolver(const int, const int);
		~ParallelMCSolver();

		static const double random();
		static const bool isInArea(const double, const double, const double);

		void solve();
		void waitAll() const;

		void onNext(const int);
		void onComplete() const;

	private:
		int nThreads, nRunning, count, in;
		std::vector<ObservableThread> threads;
		HANDLE * handlers;
};

ParallelMCSolver contains:

  • threads vector, which will store our ObservableThread instances,
  • handlers array (only to make waitAll() method work properly),
  • nThreads – number of threads,
  • nRunning – number of running threads,
  • count – number of iterations,
  • in – number of points in circle from all threads.

Uninteresting stuff

We created class definitions, now is time to write a lot of uniteresting code, which do not need explanation (I think), such as constructors, destructors and simple methods.


ObservableThread::ObservableThread(Observer& observer, const int id, const int count) :
	observer(observer), 
	handle(NULL), 
	count(count), 
	result(0), 
	id(id)
{}

ObservableThread::~ObservableThread()  
{
	if (handle != NULL) CloseHandle(handle);
}

void ObservableThread::notify(const int val)
{
	observer.onNext(val);
}

ParallelMCSolver::ParallelMCSolver(const int nThreads, const int count) :
	nThreads(nThreads), 
	count(count), 
	in(0)
{
	if (nThreads <= 0 || count <= 0) throw exception();

	const int tCount = count / nThreads;
	handlers = new HANDLE[nThreads];
	for (int i = 0; i < nThreads; i++)
	{
		threads.push_back(ObservableThread(*this, i, tCount));
	}
}

ParallelMCSolver::~ParallelMCSolver()
{
	delete[] handlers;
}

void ParallelMCSolver::solve()
{
	nRunning = nThreads;
	int i = 0;
	for (vector::iterator it = threads.begin(); 
		it != threads.end(); it++, i++)
	{
		it->run();
		handlers[i] = it->getHandle();
	} 
}

void ParallelMCSolver::waitAll() const
{
	WaitForMultipleObjects(nThreads, handlers, true, INFINITE);
}

const double ParallelMCSolver::random()
{
	return (double)rand() / (RAND_MAX + 1);
}

const bool ParallelMCSolver::isInArea(const double x, const double y, const double r)
{
	return ((x * x) + (y * y)) <= (r * r);
}

Make it work

We see that ParallelMCSolver::solve() method invokes run() of every thread. How run() really looks?


void ObservableThread::run()
{
	handle = CreateThread(
		NULL, 0, 
		&ObservableThread::start, 
		static_cast<void*>(this), 
		0, NULL);

	if (handle == NULL) throw exception();
}

It just create thread using windows API and pass to it this pointer of instance, what means, that each thread has access to properties and method of his owner. The start point for thread is start(void*) static method.


DWORD WINAPI ObservableThread::start(void * arg)
{
	ObservableThread * thread = static_cast<ObservableThread*>(arg);
	double x, y;
	srand(time(NULL) * thread->id); 
	 
	int in = 0;
	for (int i = 0; i < thread->count; i++)
	{
		x = ParallelMCSolver::random();
		y = ParallelMCSolver::random();

		if (ParallelMCSolver::isInArea(x, y, 1.0)) ++in;
	}
	thread->notify(in);
	ExitThread(0);
	return 0;
}

It is the most important function. What’s going on here? Firstly, we have to cast arg, which is thread owner – ObservableThread.


ObservableThread * thread = static_cast<ObservableThread*>(arg);

Then, finally (!) we can take care of our algorithm implementation:

  1. Generate random point (x, y).
  2. Check if point is inside the circle.
  3. If yes – increment in variable.

x = ParallelMCSolver::random();
y = ParallelMCSolver::random();

if (ParallelMCSolver::isInArea(x, y, 1.0)) ++in;

After loop, thread has to notify Observer and pass him in variable. As we know, notify() invokes onNext() method.


void ParallelMCSolver::onNext(const int value)
{
	{
		Lock lock(cs);
		in += value;
		if (--nRunning == 0) onComplete();
	}
}

I decided to add some syntatic sugar here. A Lock object, which is responsible for enter and leave critical section (implementation below).


class Lock
{
	public:
		Lock(CRITICAL_SECTION&);
		~Lock();

	private:
		CRITICAL_SECTION& cs;
};

Lock::Lock(CRITICAL_SECTION& cs) : cs(cs)
{
	EnterCriticalSection(&cs);
}

Lock::~Lock()
{
	LeaveCriticalSection(&cs);
}

When nRunning equals 0, which means that all threads finished theirs tasks, onComplete() method is called.


void ParallelMCSolver::onComplete() const
{
	double PI = 4.0 * (in / (double)count);
	cout << "PI: " << PI << endl << endl; 
}

onComplete() shows us PI result.

Summary

It’s time to test our code. The last thing is – of course – main() function.


CRITICAL_SECTION cs;
int main()
{
	const int count = 10000000;
	int nThreads;

	InitializeCriticalSection(&cs);
	cout << fixed << setprecision(8);
	try
	{
		while (true)
		{
			cout << "Threads: "; cin >> nThreads;

			ParallelMCSolver instance(nThreads, count);
			instance.solve();

			/*
				...
				background stuff...
				...
			*/

			instance.waitAll();
		}
	}
	catch (exception& e)
	{
		cout << e.what() << endl;
	}

	DeleteCriticalSection(&cs);
	cin.get();
	return 0;
}

Full source code available at GitHub.