This class defines a generic worker task object for the multi-work distributed computations framework. More...
#include <mwworker.h>
Public Member Functions | |
mwWorker () | |
The default constructor. | |
virtual | ~mwWorker () |
The destructor. | |
int | Work () |
Runs the worker on this computer. | |
void | KeepWorker (bool keep=true) |
Makes the worker keep running after the coordinator has disconnected. | |
Private Member Functions | |
virtual int | Process (mwData &data) |
This function is called to process a data portion and replace it with the result of computation. | |
virtual int | Initialize (mwData &data) |
This function is called to process initialization data, if any. | |
int | WorkOne (int fd) |
Runs one working session after having connected to the coordinator at the socket identified by 'fd'. | |
int | SendMessageW (int fd, unsigned int code, const mwData &x) const |
Sends a message with data to the given socket as a worker. | |
int | RecvMessageW (int fd, unsigned int &code, mwData &x) const |
Receives a message with data from the socket of a coordinator. | |
Private Attributes | |
bool | keepWorker |
Should the worker remain running after coordinator disconnects? | |
Friends | |
class | mwCoordinator |
This class defines a generic worker task object for the multi-work distributed computations framework.
Each worker class defined by the user must inherit from this class.
Definition at line 55 of file mwworker.h.
chomp::multiwork::mwWorker::mwWorker | ( | ) | [inline] |
The default constructor.
Definition at line 120 of file mwworker.h.
: keepWorker (false) { return; } /* mwWorker::mwWorker */
chomp::multiwork::mwWorker::~mwWorker | ( | ) | [inline, virtual] |
int chomp::multiwork::mwWorker::Initialize | ( | mwData & | data | ) | [inline, private, virtual] |
This function is called to process initialization data, if any.
This data is sent to a new worker immediately after the connection with the coordinator has been established. It should return mwOk on success or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual code.
Definition at line 145 of file mwworker.h.
Referenced by WorkOne().
{ return mwOk; } /* mwWorker::Initialize */
void chomp::multiwork::mwWorker::KeepWorker | ( | bool | keep = true |
) | [inline] |
Makes the worker keep running after the coordinator has disconnected.
Otherwise, the worker quits in that situation.
Definition at line 132 of file mwworker.h.
References keepWorker.
{ keepWorker = keep; return; } /* mwWorker::KeepWorkers */
int chomp::multiwork::mwWorker::Process | ( | mwData & | data | ) | [inline, private, virtual] |
This function is called to process a data portion and replace it with the result of computation.
It should return mwOk on success, mwReject to reject the data, or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual task.
Reimplemented in chomp::multiwork::mwSubWorker< dim, coord >.
Definition at line 140 of file mwworker.h.
Referenced by WorkOne().
{ return mwOk; } /* mwWorker::Process */
int chomp::multiwork::mwWorker::RecvMessageW | ( | int | fd, | |
unsigned int & | code, | |||
mwData & | x | |||
) | const [inline, private] |
Receives a message with data from the socket of a coordinator.
Returns mwOk on success or mwError in the case of failure.
Definition at line 162 of file mwworker.h.
References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().
Referenced by WorkOne().
{ // receive the message unsigned int ctrl = 0; int result = this -> RecvMessage (fd, ctrl, code, x); // if there was an error then return its code if (result != mwOk) return result; // if the control number is correct then finish successfully if (ctrl == this -> ControlNumber ()) return mwOk; // if the control number is wrong then return an error code if (logFile) *logFile << "Wrong control code received " "from the coordinator: " << ctrl << "." << std::endl; return mwError; } /* mwWorker::RecvMessageW */
int chomp::multiwork::mwWorker::SendMessageW | ( | int | fd, | |
unsigned int | code, | |||
const mwData & | x | |||
) | const [inline, private] |
Sends a message with data to the given socket as a worker.
Returns mwOk on success and mwError in the case of failure.
Definition at line 152 of file mwworker.h.
References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().
Referenced by WorkOne().
{ // prepare the negation of the control number to send unsigned int ctrl = ~(this -> ControlNumber ()); // send the message with the control number and the message code return this -> SendMessage (fd, ctrl, code, x); } /* mwWorker::SendMessageW */
int chomp::multiwork::mwWorker::Work | ( | ) | [inline] |
Runs the worker on this computer.
Returns mwOk or mwError.
Definition at line 388 of file mwworker.h.
References chomp::multiwork::mwTask::computers, keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwConnect(), chomp::multiwork::mwDisconnect(), chomp::multiwork::mwListen(), chomp::multiwork::mwOk, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), chomp::multiwork::mwTask::ports, chomp::multiwork::mwTask::TimeOut(), chomp::multiwork::mwTask::timeout, and WorkOne().
{ if (logFile) { *logFile << "Running as a WORKER." << std::endl; // indicate what kind of network connection is in use #if !mwNETWORK *logFile << "There is no network in use, " "so exiting right now." << std::endl; #elif mwWXWIN *logFile << "Using the sockets interface " "provided by wxWindows." << std::endl; #else *logFile << "Using the standard sockets " "for network connections." << std::endl; #endif } // connect to the first comp. on the list that is running int fd = -1; for (unsigned cur = 0; cur < this -> computers. size (); ++ cur) { // retrieve the computer name and port from the list int port = this -> ports [cur]; const char *name = this -> computers [cur]. c_str (); // if no valid name or no port read, skip this data if (!*name || !port) continue; // try connecting to the computer fd = mwConnect (name, port); // add an appropriate message to the log file if (logFile) { *logFile << "Connection to " << name << ":" << port << ((fd < 0) ? " refused." : " established.") << std::endl; } } while (1) { // if not connected then try listening if ((fd < 0) && (this -> Port () > 0)) { // listen at the given port (use a very short queue) if (logFile) *logFile << "Waiting for a coordinator at " "port " << this -> Port () << "." << std::endl; int fdlisten = mwListen (this -> Port (), 1); // if the listening failed, return with an error if (fdlisten < 0) { if (logFile) *logFile << "Error: This port is " "probably in use." << std::endl; return mwError; } // accept a connection and stop listening std::string computer; int timeout = this -> TimeOut (); fd = mwAccept (fdlisten, computer, timeout); mwDisconnect (fdlisten); // if too much time elapsed, quit the job if (fd == mwTimeOut) { if (logFile) *logFile << "Time out. Exiting." << std::endl; return mwOk; } // if an error occurred, quit the job else if (fd < 0) { if (logFile) *logFile << "Error while connecting " "to a coordinator." << std::endl; return mwError; } // report the fact of connection to the log file if (logFile) *logFile << "Connected to a coordinator " "at '" << computer << "'." << std::endl; } // receive messages and work int result = WorkOne (fd); mwDisconnect (fd); fd = -1; // quit if there is no point to listen to another coordinator // or if an error occurred if ((this -> Port () <= 0) || !keepWorker || (result != mwOk)) { return result; } // add a line to the log file between working sessions if (logFile) *logFile << "============================" << std::endl; } } /* mwWorker::Work */
int chomp::multiwork::mwWorker::WorkOne | ( | int | fd | ) | [inline, private] |
Runs one working session after having connected to the coordinator at the socket identified by 'fd'.
Returns mwOK if everything was fine or mwError if some error occurred and the worker should better exit.
Definition at line 186 of file mwworker.h.
References Initialize(), keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, chomp::multiwork::mwOk, chomp::multiwork::mwReject, chomp::multiwork::mwSelect(), chomp::multiwork::mwStdMsg, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), Process(), RecvMessageW(), SendMessageW(), and chomp::multiwork::mwTask::TimeOut().
Referenced by Work().
{ // be ready to send the port number at which the worker is listening bool sendPort = (this -> Port () > 0); int timelimit = this -> TimeOut (); while (1) { // disconnect if the coordinator is not responding if (timelimit > 0) { // call the function 'select' to wait // until data is available for reading int portArray [1]; portArray [0] = fd; int ioFlags [2]; ioFlags [0] = mwCanRead; ioFlags [1] = mwNone; int result = mwSelect (portArray, 1, fd, ioFlags, timelimit); // disconnect in case of an I/O error if (result == mwError) { if (logFile) *logFile << "Error while " "waiting for data from the " "coordinator." << std::endl; return mwError; } // finish if disconnected if (result == mwLost) { if (logFile) *logFile << "Disconnected while " "waiting for data from the " "coordinator." << std::endl; return mwOk; } // finish if not responding if (result == mwTimeOut) { if (logFile) *logFile << "Time out reached while " "waiting for data from the " "coordinator." << std::endl; return mwOk; } } // receive a message mwData msg; unsigned int code = 0; int result = RecvMessageW (fd, code, msg); // disconnect in case of an I/O error if (result == mwError) { if (logFile) *logFile << "Error while receiving data " "from the coordinator." << std::endl; return mwError; } // disconnect if the connection was lost if (result == mwLost) { if (logFile) *logFile << "The connection closed " "by the coordinator." << std::endl; return mwOk; } // initialize the worker if requested to if (code & mwInitMsg) { if (logFile) *logFile << "Initializing the worker." << std::endl; // update the 'keepWorker' flag if requested to if (code & mwKeepMsg) keepWorker = true; if (code & mwDontKeepMsg) keepWorker = false; // initialize the data int initResult = Initialize (msg); // if the initialization fails then quit the work if (initResult != mwOk) { if (logFile) *logFile << "The initialization " "failed." << std::endl; return mwError; } } // disconnect if requested to if (code & mwByeMsg) { // update the 'keepWorker' flag if requested to if (code & mwKeepMsg) keepWorker = true; if (code & mwDontKeepMsg) keepWorker = false; if (logFile) *logFile << "Disconnecting upon " "coordinator's request." << std::endl; return mwOk; } // skip the rest if there is no standard message to process if (!(code & mwStdMsg)) continue; // prepare a basis for the returned code unsigned int retcode = 0; // process the message data result = Process (msg); // if an error occurred, disconnect and quit if (result == mwError) { if (logFile) *logFile << "Data processing failed." << std::endl; return mwError; } // if rejected, reset the data and set the rejection flag else if (result == mwReject) { if (logFile) *logFile << "* Data rejected." << std::endl; msg. Reset (); retcode |= mwRejectedMsg; } // if processed successfully, make a note of it else { if (logFile) *logFile << "* Data processed." << std::endl; } // send port number if relevant if (sendPort) { // send the port number at which the worker listens mwData d; d << this -> Port (); unsigned int code = mwPortMsg; int res = SendMessageW (fd, code, d); // quit in case of failure if (res < 0) { if (logFile) *logFile << "Error while sending " "the port number." << std::endl; return mwError; } // make a note of having sent the port number sendPort = false; if (logFile) *logFile << "* Port number sent." << std::endl; } // send the result of the processed data result = SendMessageW (fd, retcode, msg); // quit if the connection was lost if (result == mwLost) { if (logFile) *logFile << "Connection lost while sending " "data to the coordinator." << std::endl; return mwError; } // quit if an error occurred if (result != mwOk) { if (logFile) *logFile << "Error while sending data." << std::endl; return mwError; } } } /* mwWorker::WorkOne */
friend class mwCoordinator [friend] |
Definition at line 104 of file mwworker.h.
bool chomp::multiwork::mwWorker::keepWorker [private] |
Should the worker remain running after coordinator disconnects?
Definition at line 101 of file mwworker.h.
Referenced by KeepWorker(), Work(), and WorkOne().