Public Member Functions | Private Member Functions | Private Attributes | Friends

chomp::multiwork::mwWorker Class Reference

This class defines a generic worker task object for the multi-work distributed computations framework. More...

#include <mwworker.h>

Inheritance diagram for chomp::multiwork::mwWorker:
chomp::multiwork::mwTask chomp::multiwork::mwSubWorker< dim, coord >

List of all members.

Public Member Functions

 mwWorker ()
 The default constructor.
virtual ~mwWorker ()
 The destructor.
int Work ()
 Runs the worker on this computer.
void KeepWorker (bool keep=true)
 Makes the worker keep running after the coordinator has disconnected.

Private Member Functions

virtual int Process (mwData &data)
 This function is called to process a data portion and replace it with the result of computation.
virtual int Initialize (mwData &data)
 This function is called to process initialization data, if any.
int WorkOne (int fd)
 Runs one working session after having connected to the coordinator at the socket identified by 'fd'.
int SendMessageW (int fd, unsigned int code, const mwData &x) const
 Sends a message with data to the given socket as a worker.
int RecvMessageW (int fd, unsigned int &code, mwData &x) const
 Receives a message with data from the socket of a coordinator.

Private Attributes

bool keepWorker
 Should the worker remain running after coordinator disconnects?

Friends

class mwCoordinator

Detailed Description

This class defines a generic worker task object for the multi-work distributed computations framework.

Each worker class defined by the user must inherit from this class.

Definition at line 55 of file mwworker.h.


Constructor & Destructor Documentation

chomp::multiwork::mwWorker::mwWorker (  )  [inline]

The default constructor.

Definition at line 120 of file mwworker.h.

                          : keepWorker (false)
{
        return;
} /* mwWorker::mwWorker */

chomp::multiwork::mwWorker::~mwWorker (  )  [inline, virtual]

The destructor.

Definition at line 125 of file mwworker.h.

{
        return;
} /* mwWorker::~mwWorker */


Member Function Documentation

int chomp::multiwork::mwWorker::Initialize ( mwData data  )  [inline, private, virtual]

This function is called to process initialization data, if any.

This data is sent to a new worker immediately after the connection with the coordinator has been established. It should return mwOk on success or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual code.

Definition at line 145 of file mwworker.h.

Referenced by WorkOne().

{
        return mwOk;
} /* mwWorker::Initialize */

void chomp::multiwork::mwWorker::KeepWorker ( bool  keep = true  )  [inline]

Makes the worker keep running after the coordinator has disconnected.

Otherwise, the worker quits in that situation.

Definition at line 132 of file mwworker.h.

References keepWorker.

{
        keepWorker = keep;
        return;
} /* mwWorker::KeepWorkers */

int chomp::multiwork::mwWorker::Process ( mwData data  )  [inline, private, virtual]

This function is called to process a data portion and replace it with the result of computation.

It should return mwOk on success, mwReject to reject the data, or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual task.

Reimplemented in chomp::multiwork::mwSubWorker< dim, coord >.

Definition at line 140 of file mwworker.h.

Referenced by WorkOne().

{
        return mwOk;
} /* mwWorker::Process */

int chomp::multiwork::mwWorker::RecvMessageW ( int  fd,
unsigned int &  code,
mwData x 
) const [inline, private]

Receives a message with data from the socket of a coordinator.

Returns mwOk on success or mwError in the case of failure.

Definition at line 162 of file mwworker.h.

References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().

Referenced by WorkOne().

{
        // receive the message
        unsigned int ctrl = 0;
        int result = this -> RecvMessage (fd, ctrl, code, x);

        // if there was an error then return its code
        if (result != mwOk)
                return result;

        // if the control number is correct then finish successfully
        if (ctrl == this -> ControlNumber ())
                return mwOk;

        // if the control number is wrong then return an error code
        if (logFile)
                *logFile << "Wrong control code received "
                        "from the coordinator: " << ctrl << "." << std::endl;
        return mwError;
} /* mwWorker::RecvMessageW */

int chomp::multiwork::mwWorker::SendMessageW ( int  fd,
unsigned int  code,
const mwData x 
) const [inline, private]

Sends a message with data to the given socket as a worker.

Returns mwOk on success and mwError in the case of failure.

Definition at line 152 of file mwworker.h.

References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().

Referenced by WorkOne().

{
        // prepare the negation of the control number to send
        unsigned int ctrl = ~(this -> ControlNumber ());

        // send the message with the control number and the message code
        return this -> SendMessage (fd, ctrl, code, x);
} /* mwWorker::SendMessageW */

int chomp::multiwork::mwWorker::Work (  )  [inline]

Runs the worker on this computer.

Returns mwOk or mwError.

Definition at line 388 of file mwworker.h.

References chomp::multiwork::mwTask::computers, keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwConnect(), chomp::multiwork::mwDisconnect(), chomp::multiwork::mwListen(), chomp::multiwork::mwOk, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), chomp::multiwork::mwTask::ports, chomp::multiwork::mwTask::TimeOut(), chomp::multiwork::mwTask::timeout, and WorkOne().

{
        if (logFile)
        {
                *logFile << "Running as a WORKER." << std::endl;

                // indicate what kind of network connection is in use
                #if !mwNETWORK
                *logFile << "There is no network in use, "
                        "so exiting right now." << std::endl;
                #elif mwWXWIN
                *logFile << "Using the sockets interface "
                        "provided by wxWindows." << std::endl;
                #else
                *logFile << "Using the standard sockets "
                        "for network connections." << std::endl;
                #endif
        }

        // connect to the first comp. on the list that is running
        int fd = -1;
        for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
        {
                // retrieve the computer name and port from the list
                int port = this -> ports [cur];
                const char *name = this -> computers [cur]. c_str ();

                // if no valid name or no port read, skip this data
                if (!*name || !port)
                        continue;

                // try connecting to the computer
                fd = mwConnect (name, port);

                // add an appropriate message to the log file
                if (logFile)
                {
                        *logFile << "Connection to " << name << ":" <<
                                port << ((fd < 0) ? " refused." :
                                " established.") << std::endl;
                }
        }

        while (1)
        {
                // if not connected then try listening
                if ((fd < 0) && (this -> Port () > 0))
                {
                        // listen at the given port (use a very short queue)
                        if (logFile)
                                *logFile << "Waiting for a coordinator at "
                                        "port " << this -> Port () << "." <<
                                        std::endl;
                        int fdlisten = mwListen (this -> Port (), 1);

                        // if the listening failed, return with an error
                        if (fdlisten < 0)
                        {
                                if (logFile)
                                        *logFile << "Error: This port is "
                                                "probably in use." <<
                                                std::endl;
                                return mwError;
                        }

                        // accept a connection and stop listening
                        std::string computer;
                        int timeout = this -> TimeOut ();
                        fd = mwAccept (fdlisten, computer, timeout);
                        mwDisconnect (fdlisten);
        
                        // if too much time elapsed, quit the job
                        if (fd == mwTimeOut)
                        {
                                if (logFile)
                                        *logFile << "Time out. Exiting." <<
                                                std::endl;
                                return mwOk;
                        }

                        // if an error occurred, quit the job
                        else if (fd < 0)
                        {
                                if (logFile)
                                        *logFile << "Error while connecting "
                                                "to a coordinator." <<
                                                std::endl;
                                return mwError;
                        }

                        // report the fact of connection to the log file
                        if (logFile)
                                *logFile << "Connected to a coordinator "
                                        "at '" << computer << "'." <<
                                        std::endl;
                }

                // receive messages and work
                int result = WorkOne (fd);
                mwDisconnect (fd);
                fd = -1;

                // quit if there is no point to listen to another coordinator
                // or if an error occurred
                if ((this -> Port () <= 0) || !keepWorker ||
                        (result != mwOk))
                {
                        return result;
                }

                // add a line to the log file between working sessions
                if (logFile)
                        *logFile << "============================" <<
                                std::endl;
        }
} /* mwWorker::Work */

int chomp::multiwork::mwWorker::WorkOne ( int  fd  )  [inline, private]

Runs one working session after having connected to the coordinator at the socket identified by 'fd'.

Returns mwOK if everything was fine or mwError if some error occurred and the worker should better exit.

Definition at line 186 of file mwworker.h.

References Initialize(), keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, chomp::multiwork::mwOk, chomp::multiwork::mwReject, chomp::multiwork::mwSelect(), chomp::multiwork::mwStdMsg, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), Process(), RecvMessageW(), SendMessageW(), and chomp::multiwork::mwTask::TimeOut().

Referenced by Work().

{
        // be ready to send the port number at which the worker is listening
        bool sendPort = (this -> Port () > 0);
        int timelimit = this -> TimeOut ();

        while (1)
        {
                // disconnect if the coordinator is not responding
                if (timelimit > 0)
                {
                        // call the function 'select' to wait
                        // until data is available for reading
                        int portArray [1];
                        portArray [0] = fd;
                        int ioFlags [2];
                        ioFlags [0] = mwCanRead;
                        ioFlags [1] = mwNone;
                        int result = mwSelect (portArray, 1, fd, ioFlags,
                                timelimit);

                        // disconnect in case of an I/O error
                        if (result == mwError)
                        {
                                if (logFile)
                                        *logFile << "Error while "
                                                "waiting for data from the "
                                                "coordinator." << std::endl;
                                return mwError;
                        }

                        // finish if disconnected
                        if (result == mwLost)
                        {
                                if (logFile)
                                        *logFile << "Disconnected while "
                                                "waiting for data from the "
                                                "coordinator." << std::endl;
                                return mwOk;
                        }

                        // finish if not responding
                        if (result == mwTimeOut)
                        {
                                if (logFile)
                                        *logFile << "Time out reached while "
                                                "waiting for data from the "
                                                "coordinator." << std::endl;
                                return mwOk;
                        }
                }

                // receive a message
                mwData msg;
                unsigned int code = 0;
                int result = RecvMessageW (fd, code, msg);

                // disconnect in case of an I/O error
                if (result == mwError)
                {
                        if (logFile)
                                *logFile << "Error while receiving data "
                                        "from the coordinator." << std::endl;
                        return mwError;
                }
        
                // disconnect if the connection was lost
                if (result == mwLost)
                {
                        if (logFile)
                                *logFile << "The connection closed "
                                        "by the coordinator." << std::endl;
                        return mwOk;
                }

                // initialize the worker if requested to
                if (code & mwInitMsg)
                {
                        if (logFile)
                                *logFile << "Initializing the worker." <<
                                        std::endl;

                        // update the 'keepWorker' flag if requested to
                        if (code & mwKeepMsg)
                                keepWorker = true;
                        if (code & mwDontKeepMsg)
                                keepWorker = false;

                        // initialize the data
                        int initResult = Initialize (msg);

                        // if the initialization fails then quit the work
                        if (initResult != mwOk)
                        {
                                if (logFile)
                                        *logFile << "The initialization "
                                                "failed." << std::endl;
                                return mwError;
                        }
                }

                // disconnect if requested to
                if (code & mwByeMsg)
                {
                        // update the 'keepWorker' flag if requested to
                        if (code & mwKeepMsg)
                                keepWorker = true;
                        if (code & mwDontKeepMsg)
                                keepWorker = false;

                        if (logFile)
                                *logFile << "Disconnecting upon "
                                        "coordinator's request." <<
                                        std::endl;
                        return mwOk;
                }

                // skip the rest if there is no standard message to process
                if (!(code & mwStdMsg))
                        continue;

                // prepare a basis for the returned code
                unsigned int retcode = 0;
                
                // process the message data
                result = Process (msg);

                // if an error occurred, disconnect and quit
                if (result == mwError)
                {
                        if (logFile)
                                *logFile << "Data processing failed." <<
                                        std::endl;
                        return mwError;
                }

                // if rejected, reset the data and set the rejection flag
                else if (result == mwReject)
                {
                        if (logFile)
                                *logFile << "* Data rejected." << std::endl;
                        msg. Reset ();
                        retcode |= mwRejectedMsg;
                }

                // if processed successfully, make a note of it
                else
                {
                        if (logFile)
                                *logFile << "* Data processed." << std::endl;
                }

                // send port number if relevant
                if (sendPort)
                {
                        // send the port number at which the worker listens
                        mwData d;
                        d << this -> Port ();
                        unsigned int code = mwPortMsg;
                        int res = SendMessageW (fd, code, d);

                        // quit in case of failure
                        if (res < 0)
                        {
                                if (logFile)
                                        *logFile << "Error while sending "
                                                "the port number." <<
                                                std::endl;
                                return mwError;
                        }

                        // make a note of having sent the port number
                        sendPort = false;
                        if (logFile)
                                *logFile << "* Port number sent." <<
                                        std::endl;
                }

                // send the result of the processed data
                result = SendMessageW (fd, retcode, msg);

                // quit if the connection was lost
                if (result == mwLost)
                {
                        if (logFile)
                                *logFile << "Connection lost while sending "
                                        "data to the coordinator." <<
                                        std::endl;
                        return mwError;
                }
                
                // quit if an error occurred
                if (result != mwOk)
                {
                        if (logFile)
                                *logFile << "Error while sending data." <<
                                        std::endl;
                        return mwError;
                }
        }
} /* mwWorker::WorkOne */


Friends And Related Function Documentation

friend class mwCoordinator [friend]

Definition at line 104 of file mwworker.h.


Member Data Documentation

Should the worker remain running after coordinator disconnects?

Definition at line 101 of file mwworker.h.

Referenced by KeepWorker(), Work(), and WorkOne().


The documentation for this class was generated from the following file: