Classes | Typedefs | Enumerations | Functions

chomp::multiwork Namespace Reference

This namespace contains an interface to the MultiWork module which allows one to easily distribute a computational task over multiple processes and run concurrent computations. More...

Classes

class  mwWorkerData
 A helper class for storing data on a single worker. More...
class  mwCoordinator
 This class defines a generic coordinator task object for the multi-work distributed computations framework. More...
class  mwData
 This class is used to convert data structures into a single sequence of bytes and to retrieve this data for the purpose of communication between a coordinator and workers. More...
class  mwSubWorker
 This class defines a worker for the multi-work subdivision framework. More...
class  mwIniProbes
 This is a helper class for producing an initial set of probes to test the interior of the requested area of parameters. More...
class  mwBoxes
 This is a helper class for iterating all the boxes which share a given vertex. More...
class  mwCorners
 This is a helper class for iterating all the corners of a given box. More...
class  mwSubDataPack
 This is a helper class which defines a single data pack used in the communication between coordinator and workers in the multi-work subdivision framework. More...
class  mwSubCoordinator
 This class defines a coordinator for the multi-work subdivision framework. More...
class  mwTask
 This class defines a generic task object (coordinator or worker) for the multi-work distributed computations framework. More...
class  mwWorker
 This class defines a generic worker task object for the multi-work distributed computations framework. More...

Typedefs

typedef int(* fcompute )(const double *left, const double *right, int dim, int level)
 The type of a function which computes a value of interest for the given product of intervals.

Enumerations

enum  mwConstants { mwOk = 0, mwError = -1, mwNoData = -2, mwReject = -3 }
 

Various error codes used during the communication.

More...
enum  mwIOconstants {
  mwNone = 0, mwCanRead = 0x01, mwCanWrite = 0x02, mwTimeOut = -4,
  mwLost = -5
}
 

Input/output flags and error codes used for network communication, mainly for the 'select' function to check which sockets are available for reading/writing.

More...
enum  mwCodes {
  mwNoMsg = 0x0000, mwInitMsg = 0x0001, mwStdMsg = 0x0002, mwKeepMsg = 0x0004,
  mwDontKeepMsg = 0x0008, mwByeMsg = 0x0010, mwRejectedMsg = 0x0001, mwPortMsg = 0x0002
}
 

Various message codes used for the communication between the coordinator and workers.

More...

Functions

void swap (mwWorkerData &data1, mwWorkerData &data2)
void swap (mwData &x, mwData &y)
template<class type >
mwDataoperator<< (mwData &m, const type &x)
template<class type >
mwDataoperator>> (mwData &m, type &x)
mwDataoperator<< (mwData &m, const char *x)
mwDataoperator<< (mwData &m, const unsigned char *x)
mwDataoperator>> (mwData &m, char *x)
mwDataoperator>> (mwData &m, unsigned char *x)
std::ostream & operator<< (std::ostream &s, const mwData &m)
std::istream & operator>> (std::istream &s, mwData &m)
int mwSendBytes (int fd, const char *buf, int len)
 Sends the given buffer to the given socket.
int mwRecvBytes (int fd, char *buf, int len)
 Receives the given amount of data from the given socket.
int mwConnect (const char *name, int port)
 Connects to the given computer at the given port.
int mwListen (int port, int queuesize)
 Begins listening at the given port.
int mwAccept (int fd, std::string &computer, int timeout=-1)
 Waits for and accepts a connection at the given socket.
int mwSelect (const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
 Determines IOflags for each of the workers and additionally the listensocket (the last flag).
void mwDisconnect (int fd)
 Disconnects the given socket.
template<class coord >
double mwSubdivPoint (const double &left, const double &right, coord part, coord full)
 Returns a subdivision point corresponding to the given fraction of the provided interval.
template<int dim, class coord >
int mwSubdivMain (int argc, char *argv[], const char *title, const char *helpinfo, int defaultPortNumber, int controlNumber, const double *paramLeft, const double *paramRight, int minSubdivLevel, fcompute compute)
 The main procedure for running the computations in the multiwork subdivision framework.

Detailed Description

This namespace contains an interface to the MultiWork module which allows one to easily distribute a computational task over multiple processes and run concurrent computations.


Typedef Documentation

typedef int(* chomp::multiwork::fcompute)(const double *left, const double *right, int dim, int level)

The type of a function which computes a value of interest for the given product of intervals.

The subdivision level is provided in case the computations on higher levels need to be done in a more thorough way.

Definition at line 103 of file mwsubdiv.h.


Enumeration Type Documentation

Various message codes used for the communication between the coordinator and workers.

Enumerator:
mwNoMsg 

Message: No special code information included.

mwInitMsg 

Message to the Worker: A piece of initialization data to be processed.

mwStdMsg 

Message to the Worker: A standard piece of data to be processed.

mwKeepMsg 

Message to the Worker: Keep running after having disconnected.

mwDontKeepMsg 

Message to the Worker: Don't keep running after disconnecting.

mwByeMsg 

Message to the Worker: Please, disconnect.

mwRejectedMsg 

Message to the Coordinator: The data has been rejected.

mwPortMsg 

Message to the Coordinator: I will listen at this port number.

Definition at line 57 of file mwtask.h.

{
        mwNoMsg = 0x0000,

        mwInitMsg = 0x0001,

        mwStdMsg = 0x0002,

        mwKeepMsg = 0x0004,

        mwDontKeepMsg = 0x0008,

        mwByeMsg = 0x0010,

        mwRejectedMsg = 0x0001,

        mwPortMsg = 0x0002

}; /* enum mwCodes */

Various error codes used during the communication.

The OK code must equal zero, and error codes must be negative.

Enumerator:
mwOk 

Everything is fine.

mwError 

A serious error occurred.

mwNoData 

There is no data to be sent to workers, for example, because everything has been already sent.

mwReject 

The data has been rejected.

Definition at line 157 of file mwconfig.h.

{
        mwOk = 0,

        mwError = -1,

        mwNoData = -2,

        mwReject = -3

}; /* enum mwConstants */

Input/output flags and error codes used for network communication, mainly for the 'select' function to check which sockets are available for reading/writing.

Enumerator:
mwNone 

No flag selected.

mwCanRead 

Reading possible.

mwCanWrite 

Writing possible.

mwTimeOut 

A connection time out has occurred.

mwLost 

The network connection has been lost.

Definition at line 52 of file mwlowlev.h.

{
        mwNone = 0,

        mwCanRead = 0x01,

        mwCanWrite = 0x02,

        mwTimeOut = -4,

        mwLost = -5

}; /* enum mwIOconstants */


Function Documentation

int chomp::multiwork::mwAccept ( int  fd,
std::string &  computer,
int  timeout = -1 
)

Waits for and accepts a connection at the given socket.

Saves the computer name in the string provided. Returns the new socket number (non-negative), mwTimeOut or mwError.

Referenced by chomp::multiwork::mwCoordinator::RunLoop(), and chomp::multiwork::mwWorker::Work().

int chomp::multiwork::mwConnect ( const char *  name,
int  port 
)

Connects to the given computer at the given port.

Returns the socket number (non-negative) or mwError.

Referenced by chomp::multiwork::mwCoordinator::ConnectWorkers(), chomp::multiwork::mwTask::QuitWorkers(), and chomp::multiwork::mwWorker::Work().

void chomp::multiwork::mwDisconnect ( int  fd  ) 

Disconnects the given socket.

If the number is negative, then this function call is ignored.

Referenced by chomp::multiwork::mwCoordinator::DisconnectAll(), chomp::multiwork::mwTask::QuitWorkers(), chomp::multiwork::mwCoordinator::RunLoop(), and chomp::multiwork::mwWorker::Work().

int chomp::multiwork::mwListen ( int  port,
int  queuesize 
)

Begins listening at the given port.

Allows for the given queue length. Returns the socket number (non-negative) or mwError.

Referenced by chomp::multiwork::mwCoordinator::BeginListening(), and chomp::multiwork::mwWorker::Work().

int chomp::multiwork::mwRecvBytes ( int  fd,
char *  buf,
int  len 
)

Receives the given amount of data from the given socket.

Returns mwOk, mwError or mwLost.

Referenced by chomp::multiwork::mwTask::RecvMessage().

int chomp::multiwork::mwSelect ( const int *  workers,
int  nworkers,
int  listensocket,
int *  ioflags,
int  timeout 
)

Determines IOflags for each of the workers and additionally the listensocket (the last flag).

The IOflags of interest are initially set. Returns mwOk, mwTimeOut or mwError.

Referenced by chomp::multiwork::mwCoordinator::RunLoop(), and chomp::multiwork::mwWorker::WorkOne().

int chomp::multiwork::mwSendBytes ( int  fd,
const char *  buf,
int  len 
)

Sends the given buffer to the given socket.

Returns mwOk, mwError or mwLost.

Referenced by chomp::multiwork::mwTask::SendMessage().

template<int dim, class coord >
int chomp::multiwork::mwSubdivMain ( int  argc,
char *  argv[],
const char *  title,
const char *  helpinfo,
int  defaultPortNumber,
int  controlNumber,
const double *  paramLeft,
const double *  paramRight,
int  minSubdivLevel,
fcompute  compute 
)

The main procedure for running the computations in the multiwork subdivision framework.

Returns: 0 = Ok, -1 = Error, 1 = Help displayed, 2 = Wrong arguments.

Definition at line 1465 of file mwsubdiv.h.

References chomp::homology::arg(), chomp::homology::arghelp(), argstreamprepare, argstreamset, chomp::homology::argswitch(), mwOk, chomp::homology::program_time, and chomp::homology::sout.

{
        const char *arginfo = "\
Command line arguments (at least '-w' or '-m N' must be specified):\n\
-w [port] - run as a worker (by default the program runs as a coordinator),\n\
-c [port] - run as a coordinator only (don't process any data locally),\n\
-p port - set the port number for the multi-work communication,\n\
-k - keep workers waiting after the computations have been completed,\n\
computer:port - use this connection at start-up (can be repeated),\n\
-s FILE - save the workers' list to this file (default: mwsubdiv.txt),\n\
-r FILE - retrieve the workers' list from this file (def: mwsubdiv.txt),\n\
-f filename - results file (new results will be appended); coord only!\n\
--flush - flush the results file very frequently (slows down the program),\n\
-i N - set the initial subdivision level (some minimum is enforced),\n\
-m N - set the maximal subdivision level (default: inilevel + 2),\n\
-q - quit all the workers who are waiting (provide addresses or use -r),\n\
--quiet - do not display any messages on the standard output,\n\
--log filename - save the console output to the given file,\n\
--help - show this brief help information and exit.\n\
For more information ask the author at http://www.PawelPilarczyk.com/.";

        // prepare user-configurable data
        char *retrieveworkers = 0;
        char *saveworkers = 0;
        const int maxaddr = 1024;
        char *addr [maxaddr];
        int naddr = 0;
        int portnum = -1;
        int workport = -1;
        int coordport = -1;
        bool keepworkers = false;
        bool quitworkers = false;
        char *filename = 0;
        int inilevel = 0;
        int maxlevel = 0;
        bool flushfile = false;

        // interprete the command-line arguments
        arguments a;
        arg (a, 0, addr, naddr, maxaddr);
        arg (a, "r", retrieveworkers, "mwsubdiv.txt");
        arg (a, "s", saveworkers, "mwsubdiv.txt");
        arg (a, "f", filename);
        arg (a, "i", inilevel);
        arg (a, "m", maxlevel);
        arg (a, "w", workport, defaultPortNumber);
        arg (a, "p", portnum);
        arg (a, "c", coordport, defaultPortNumber);
        argswitch (a, "k", keepworkers, true);
        argswitch (a, "q", quitworkers, true);
        argswitch (a, "-flush", flushfile, true);
        arghelp (a);

        argstreamprepare (a);
        int argresult = a. analyze (argc, argv);
        argstreamset ();

        // show the program's main title
        if (argresult >= 0)
                sout << title << '\n';

        // if something was incorrect, show an additional message and exit
        if (argresult < 0)
        {
                sout << "Call with '--help' for help.\n";
                return 2;
        }

        // set the right port number and determine if to run as a worker
        // or as a coordinator, with or without local work
        int port = defaultPortNumber;
        bool localwork = (coordport < 0);
        if (coordport >= 0)
                port = coordport;
        bool worker = (workport >= 0);
        if (workport >= 0)
                port = workport;
        if (portnum >= 0)
                port = portnum;

        // if no data packs are to be processed, don't run the program
        if (!maxlevel && !worker && !quitworkers)
                argresult = 1;
        
        // if help requested, show help information
        if (argresult > 0)
        {
                sout << helpinfo << '\n' << arginfo << '\n';
                return 1;
        }

        // try running the main function and catch an error message if thrown
        try
        {
                // set an appropriate program time message
                program_time = "Aborted after:";
                program_time = 1;

                // quit all the workers from the list if requested to
                if (quitworkers)
                {
                        // prepare a dummy coordinator class
                        mwSubCoordinator<dim,coord> c;

                        // set up the parameters necessary for identification
                        c. Port (port);
                        c. ControlNumber (controlNumber);

                        // prepare a list of workers' addresses
                        if (retrieveworkers)
                                c. Load (retrieveworkers);
                        for (int i = 0; i < naddr; ++ i)
                                c. Add (addr [i]);

                        // quit all the workers which appear in the list
                        sout << "Quitting workers... ";
                        c. QuitWorkers ();
                        sout << "Done.\n";
                }

                // run as a worker if requested to
                else if (worker)
                {
                        // prepare a worker object
                        mwSubWorker<dim,coord> w (compute);

                        // set up various options of the worker object
                        w. Port (port);
                        w. ControlNumber (controlNumber);
                        for (int i = 0; i < naddr; ++ i)
                                w. Add (addr [i]);

                        // run the computations
                        sout << "Running as a worker...\n";
                        int result = w. Work ();
                        if (result == mwOk)
                                sout << "Work completed successfully.\n";
                        else
                                sout << "Could not work - probably "
                                        "an error occurred.\n";
                }

                // run as a coordinator otherwise
                else
                {
                        // make a correction to the subdivision level bounds
                        if (inilevel <= minSubdivLevel)
                                inilevel = minSubdivLevel;
                        if (maxlevel <= inilevel)
                                maxlevel = inilevel + 2;

                        // prepare a local worker and a coordinator
                        mwSubWorker<dim,coord> w (compute);
                        mwSubCoordinator<dim,coord> c (filename, inilevel,
                                maxlevel, flushfile, paramLeft, paramRight);

                        // set up various options of the coordinator
                        c. KeepWorkers (keepworkers);
                        c. Port (port);
                        c. ControlNumber (controlNumber);
                        for (int i = 0; i < naddr; ++ i)
                                c. Add (addr [i]);
                        if (retrieveworkers)
                                c. Load (retrieveworkers);

                        // run the computations
                        sout << "Running as a coordinator...\n";
                        int result = c. Coordinate (localwork ? &w : 0);
                        if (result == mwOk)
                                sout << "The task completed successfully.\n";
                        else
                                sout << "Could not coordinate - probably "
                                        "an error occurred.\n";

                        // save the connected workers if necessary
                        if (saveworkers)
                                c. SaveWorkers (saveworkers);
                }

                // set an appropriate program time message
                program_time = "Total time used:";

                // finalize
                return 0;
        }
        catch (const char *msg)
        {
                sout << "ERROR: " << msg << '\n';
                return -1;
        }
        catch (const std::exception &e)
        {
                sout << "ERROR: " << e. what () << '\n';
                return -1;
        }
        catch (...)
        {
                sout << "ABORT: An unknown error occurred.\n";
                return -1;
        }
} /* mwSubdivMain */

template<class coord >
double chomp::multiwork::mwSubdivPoint ( const double &  left,
const double &  right,
coord  part,
coord  full 
) [inline]

Returns a subdivision point corresponding to the given fraction of the provided interval.

Should be called with part = 0, ..., full, to get a subdivision of the entire interval, including the endpoints.

Definition at line 203 of file mwsubdiv.h.

Referenced by chomp::multiwork::mwSubCoordinator< dim, coord >::Prepare().

{
        // uncomment if using CAPD to switch the rounding mode to the nearest
//      round_nearest ();

        // if this is a boundary subdivision point then return the bound
        if (part == 0)
                return left;
        else if (part == full)
                return right;

        // divide 'part' and 'full' by their largest common divisor
        // which is a power of two
        while (!(part & 1) && !(full & 1))
        {
                part >>= 1;
                full >>= 1;
        }

        // compute the corresponding subdivision point of the interval
        double fraction = static_cast<double> (part) / full;
        return (left + (right - left) * fraction);
} /* mwSubdivPoint */

std::ostream& chomp::multiwork::operator<< ( std::ostream &  s,
const mwData &  m 
) [inline]

Definition at line 864 of file mwdata.h.

References chomp::homology::write().

{
        if (m. Length ())
                s. write (m. Buffer (), m. Length ());
        return s;
} /* operator << */

template<class type >
mwData& chomp::multiwork::operator<< ( mwData &  m,
const type &  x 
) [inline]

Definition at line 832 of file mwdata.h.

{
        return m. Append (x);
} /* operator << */

mwData& chomp::multiwork::operator<< ( mwData &  m,
const char *  x 
) [inline]

Definition at line 843 of file mwdata.h.

{
        return m. Append (x);
} /* operator << */

mwData& chomp::multiwork::operator<< ( mwData &  m,
const unsigned char *  x 
) [inline]

Definition at line 848 of file mwdata.h.

{
        return m. Append (x);
} /* operator << */

mwData& chomp::multiwork::operator>> ( mwData &  m,
unsigned char *  x 
) [inline]

Definition at line 858 of file mwdata.h.

{
        return m. Retrieve (x);
} /* operator >> */

template<class type >
mwData& chomp::multiwork::operator>> ( mwData &  m,
type &  x 
) [inline]

Definition at line 838 of file mwdata.h.

{
        return m. Retrieve (x);
} /* operator >> */

std::istream& chomp::multiwork::operator>> ( std::istream &  s,
mwData &  m 
) [inline]

Definition at line 871 of file mwdata.h.

{
        char *buf = NULL;
        int pos = 0, len = 0;

        // read the entire stream to the given buffer
        int ch = s. get ();
        while (ch != EOF)
        {
                if (len <= pos)
                {
                        len = pos + pos + 3;
                        char *newbuf = new char [len];
                        if (!newbuf)
                                break;
                        for (int i = 0; i < pos; ++ i)
                                newbuf [i] = buf [i];
                        delete [] buf;
                        buf = newbuf;
                }
                buf [pos ++] = (unsigned char) (ch);
                ch = s. get ();
        }

        // allocate a new buffer of the exact size and take the data
        if (pos)
        {
                char *newbuf = new char [pos];
                for (int i = 0; i < pos; ++ i)
                        newbuf [i] = buf [i];
                delete [] buf;
                m. Take (newbuf, pos);
        }

        return s;
} /* operator >> */

mwData& chomp::multiwork::operator>> ( mwData &  m,
char *  x 
) [inline]

Definition at line 853 of file mwdata.h.

{
        return m. Retrieve (x);
} /* operator >> */

void chomp::multiwork::swap ( mwData &  x,
mwData &  y 
) [inline]

Definition at line 268 of file mwdata.h.

{
        x. Swap (y);
        return;
} /* swap */

void chomp::multiwork::swap ( mwWorkerData &  data1,
mwWorkerData &  data2 
) [inline]

Definition at line 109 of file mwcoord.h.

Referenced by chomp::multiwork::mwCoordinator::RunLoop(), chomp::homology::mvmap< domelement, imgelement >::swap(), chomp::homology::mmatrix< euclidom >::swapcols(), and chomp::homology::mmatrix< euclidom >::swaprows().

{
        swap (data1. data, data2. data);
        std::swap (data1. fd, data2. fd);
        std::swap (data1. name, data2. name);
        std::swap (data1. port, data2. port);
        std::swap (data1. status, data2. status);
        return;
} /* swap */