Project

General

Profile

Developing multithreaded algorithms

For this task Openwalnut provides so called WThreadedFuntion objects and can be used like in the following examples.

// Shorthand notation: WThreadedFunction< T > WTF

class A
{
public:
    void operator()( size_t id, size_t numThreads, WBoolFlag& b )
    {
        ++p.getWriteTicket()->get();
   }

private:
    WSharedObject< int > p;
};

boost shared_ptr< A > a = ...

WTF< A > n(4,a); // construction of the WThreadedFunction object with the object of type A that does the computation
                 // 4 means 4 threads in this case
                 // If the number of threads equals 0, a good number of threads will be determined by the threadpool. 
n.run();  //returns immediately
n.wait(); //return if all threads are completed
n.stop(); //returns immediately

n.getThreadsDoneCondition(); // get signals if all threads are completed
n.status(); // W_THREADS_INITIALIZED
            // W_THREADS_RUNNING
            // W_THREADS_STOP_REQUESTED
            // W_THREADS_ABORTED --> the finished stop or exception
            // W_THREADS_FINISHED

// in WMDingsModule:

n.subscribeExceptionSignal( boost:bind( &WMDingsModule::exc, this, _1 ));

void exc( const WException & e ) // note that the signal handler runs in the workerthreads
                                 // copy the exception into a local module variable if you want to rethrow

Then threaded jobs can be defined like this:

WThreadedJobs< J >
{
    virtual bool getJob( J& j ) = 0;
    virtual compute( const J& j ) = 0;
}

// add all entries of a vector
class B : public WThreadedJobs< int >  // int is the job type
{
    // implements the job aquisition
    // returns true when all jobs are done
    bool getJob( int& j )
    {
        if(c==v.size()) return false;
        j = c++;
        return true;
    }

    WSharedObject< int > c;

    // do the computation for job j
    void compute( const int& j )
    {
        result.getWriteTicket()->get() += v[j];
    }

    WSharedObject< int > result;
    std::vector< int > v;
}

And also per-voxel-operations can be done like this:

// use 15 float values per voxel to compute 1 double value per voxel
typedef WThreadedPerVoxelOperation< float, 15, double, 1> WTPVO;

ptr< WTPVO > t( new WTPVO( ds, boost::bind( &T::keinpansen, this, _1 ))); // ds is a shared_ptr to a WDataSetSingle, which is the input

// in your module

    // calc gfa per voxel
    boost::array< double, 1 > keinpansen( const WValueSet< float >::Subarray& s )
    {
        // s is a float 'array' of size 15
        // and contains the values of this voxel 
        wmath::WSymmetricSphericalHarmonic h( s ); // make a spherical harmonic from the input data
                                                   // this actually doesn't work, input has to be a WValue< float > 

        boost::array< double, 1 > a;
        a[0] = h.calcGFA();
        return a;
    }

// to execute the operation, you could use this:
WThreadedFunction< WTPVO > p( 4, t );
p.run();
p.wait();

// to get the result, use
boost::shared_ptr< WDataSetSingle > ds = t->getResult();