This class defines a generic worker task object for the multi-work distributed computations framework. More...
#include <mwworker.h>
Public Member Functions | |
| mwWorker () | |
| The default constructor. | |
| virtual | ~mwWorker () |
| The destructor. | |
| int | Work () |
| Runs the worker on this computer. | |
| void | KeepWorker (bool keep=true) |
| Makes the worker keep running after the coordinator has disconnected. | |
Private Member Functions | |
| virtual int | Process (mwData &data) |
| This function is called to process a data portion and replace it with the result of computation. | |
| virtual int | Initialize (mwData &data) |
| This function is called to process initialization data, if any. | |
| int | WorkOne (int fd) |
| Runs one working session after having connected to the coordinator at the socket identified by 'fd'. | |
| int | SendMessageW (int fd, unsigned int code, const mwData &x) const |
| Sends a message with data to the given socket as a worker. | |
| int | RecvMessageW (int fd, unsigned int &code, mwData &x) const |
| Receives a message with data from the socket of a coordinator. | |
Private Attributes | |
| bool | keepWorker |
| Should the worker remain running after coordinator disconnects? | |
Friends | |
| class | mwCoordinator |
This class defines a generic worker task object for the multi-work distributed computations framework.
Each worker class defined by the user must inherit from this class.
Definition at line 55 of file mwworker.h.
| chomp::multiwork::mwWorker::mwWorker | ( | ) | [inline] |
The default constructor.
Definition at line 120 of file mwworker.h.
: keepWorker (false) { return; } /* mwWorker::mwWorker */
| chomp::multiwork::mwWorker::~mwWorker | ( | ) | [inline, virtual] |
| int chomp::multiwork::mwWorker::Initialize | ( | mwData & | data | ) | [inline, private, virtual] |
This function is called to process initialization data, if any.
This data is sent to a new worker immediately after the connection with the coordinator has been established. It should return mwOk on success or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual code.
Definition at line 145 of file mwworker.h.
Referenced by WorkOne().
{
return mwOk;
} /* mwWorker::Initialize */
| void chomp::multiwork::mwWorker::KeepWorker | ( | bool | keep = true |
) | [inline] |
Makes the worker keep running after the coordinator has disconnected.
Otherwise, the worker quits in that situation.
Definition at line 132 of file mwworker.h.
References keepWorker.
{
keepWorker = keep;
return;
} /* mwWorker::KeepWorkers */
| int chomp::multiwork::mwWorker::Process | ( | mwData & | data | ) | [inline, private, virtual] |
This function is called to process a data portion and replace it with the result of computation.
It should return mwOk on success, mwReject to reject the data, or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual task.
Reimplemented in chomp::multiwork::mwSubWorker< dim, coord >.
Definition at line 140 of file mwworker.h.
Referenced by WorkOne().
{
return mwOk;
} /* mwWorker::Process */
| int chomp::multiwork::mwWorker::RecvMessageW | ( | int | fd, | |
| unsigned int & | code, | |||
| mwData & | x | |||
| ) | const [inline, private] |
Receives a message with data from the socket of a coordinator.
Returns mwOk on success or mwError in the case of failure.
Definition at line 162 of file mwworker.h.
References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().
Referenced by WorkOne().
{
// receive the message
unsigned int ctrl = 0;
int result = this -> RecvMessage (fd, ctrl, code, x);
// if there was an error then return its code
if (result != mwOk)
return result;
// if the control number is correct then finish successfully
if (ctrl == this -> ControlNumber ())
return mwOk;
// if the control number is wrong then return an error code
if (logFile)
*logFile << "Wrong control code received "
"from the coordinator: " << ctrl << "." << std::endl;
return mwError;
} /* mwWorker::RecvMessageW */
| int chomp::multiwork::mwWorker::SendMessageW | ( | int | fd, | |
| unsigned int | code, | |||
| const mwData & | x | |||
| ) | const [inline, private] |
Sends a message with data to the given socket as a worker.
Returns mwOk on success and mwError in the case of failure.
Definition at line 152 of file mwworker.h.
References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().
Referenced by WorkOne().
{
// prepare the negation of the control number to send
unsigned int ctrl = ~(this -> ControlNumber ());
// send the message with the control number and the message code
return this -> SendMessage (fd, ctrl, code, x);
} /* mwWorker::SendMessageW */
| int chomp::multiwork::mwWorker::Work | ( | ) | [inline] |
Runs the worker on this computer.
Returns mwOk or mwError.
Definition at line 388 of file mwworker.h.
References chomp::multiwork::mwTask::computers, keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwConnect(), chomp::multiwork::mwDisconnect(), chomp::multiwork::mwListen(), chomp::multiwork::mwOk, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), chomp::multiwork::mwTask::ports, chomp::multiwork::mwTask::TimeOut(), chomp::multiwork::mwTask::timeout, and WorkOne().
{
if (logFile)
{
*logFile << "Running as a WORKER." << std::endl;
// indicate what kind of network connection is in use
#if !mwNETWORK
*logFile << "There is no network in use, "
"so exiting right now." << std::endl;
#elif mwWXWIN
*logFile << "Using the sockets interface "
"provided by wxWindows." << std::endl;
#else
*logFile << "Using the standard sockets "
"for network connections." << std::endl;
#endif
}
// connect to the first comp. on the list that is running
int fd = -1;
for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
{
// retrieve the computer name and port from the list
int port = this -> ports [cur];
const char *name = this -> computers [cur]. c_str ();
// if no valid name or no port read, skip this data
if (!*name || !port)
continue;
// try connecting to the computer
fd = mwConnect (name, port);
// add an appropriate message to the log file
if (logFile)
{
*logFile << "Connection to " << name << ":" <<
port << ((fd < 0) ? " refused." :
" established.") << std::endl;
}
}
while (1)
{
// if not connected then try listening
if ((fd < 0) && (this -> Port () > 0))
{
// listen at the given port (use a very short queue)
if (logFile)
*logFile << "Waiting for a coordinator at "
"port " << this -> Port () << "." <<
std::endl;
int fdlisten = mwListen (this -> Port (), 1);
// if the listening failed, return with an error
if (fdlisten < 0)
{
if (logFile)
*logFile << "Error: This port is "
"probably in use." <<
std::endl;
return mwError;
}
// accept a connection and stop listening
std::string computer;
int timeout = this -> TimeOut ();
fd = mwAccept (fdlisten, computer, timeout);
mwDisconnect (fdlisten);
// if too much time elapsed, quit the job
if (fd == mwTimeOut)
{
if (logFile)
*logFile << "Time out. Exiting." <<
std::endl;
return mwOk;
}
// if an error occurred, quit the job
else if (fd < 0)
{
if (logFile)
*logFile << "Error while connecting "
"to a coordinator." <<
std::endl;
return mwError;
}
// report the fact of connection to the log file
if (logFile)
*logFile << "Connected to a coordinator "
"at '" << computer << "'." <<
std::endl;
}
// receive messages and work
int result = WorkOne (fd);
mwDisconnect (fd);
fd = -1;
// quit if there is no point to listen to another coordinator
// or if an error occurred
if ((this -> Port () <= 0) || !keepWorker ||
(result != mwOk))
{
return result;
}
// add a line to the log file between working sessions
if (logFile)
*logFile << "============================" <<
std::endl;
}
} /* mwWorker::Work */
| int chomp::multiwork::mwWorker::WorkOne | ( | int | fd | ) | [inline, private] |
Runs one working session after having connected to the coordinator at the socket identified by 'fd'.
Returns mwOK if everything was fine or mwError if some error occurred and the worker should better exit.
Definition at line 186 of file mwworker.h.
References Initialize(), keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, chomp::multiwork::mwOk, chomp::multiwork::mwReject, chomp::multiwork::mwSelect(), chomp::multiwork::mwStdMsg, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), Process(), RecvMessageW(), SendMessageW(), and chomp::multiwork::mwTask::TimeOut().
Referenced by Work().
{
// be ready to send the port number at which the worker is listening
bool sendPort = (this -> Port () > 0);
int timelimit = this -> TimeOut ();
while (1)
{
// disconnect if the coordinator is not responding
if (timelimit > 0)
{
// call the function 'select' to wait
// until data is available for reading
int portArray [1];
portArray [0] = fd;
int ioFlags [2];
ioFlags [0] = mwCanRead;
ioFlags [1] = mwNone;
int result = mwSelect (portArray, 1, fd, ioFlags,
timelimit);
// disconnect in case of an I/O error
if (result == mwError)
{
if (logFile)
*logFile << "Error while "
"waiting for data from the "
"coordinator." << std::endl;
return mwError;
}
// finish if disconnected
if (result == mwLost)
{
if (logFile)
*logFile << "Disconnected while "
"waiting for data from the "
"coordinator." << std::endl;
return mwOk;
}
// finish if not responding
if (result == mwTimeOut)
{
if (logFile)
*logFile << "Time out reached while "
"waiting for data from the "
"coordinator." << std::endl;
return mwOk;
}
}
// receive a message
mwData msg;
unsigned int code = 0;
int result = RecvMessageW (fd, code, msg);
// disconnect in case of an I/O error
if (result == mwError)
{
if (logFile)
*logFile << "Error while receiving data "
"from the coordinator." << std::endl;
return mwError;
}
// disconnect if the connection was lost
if (result == mwLost)
{
if (logFile)
*logFile << "The connection closed "
"by the coordinator." << std::endl;
return mwOk;
}
// initialize the worker if requested to
if (code & mwInitMsg)
{
if (logFile)
*logFile << "Initializing the worker." <<
std::endl;
// update the 'keepWorker' flag if requested to
if (code & mwKeepMsg)
keepWorker = true;
if (code & mwDontKeepMsg)
keepWorker = false;
// initialize the data
int initResult = Initialize (msg);
// if the initialization fails then quit the work
if (initResult != mwOk)
{
if (logFile)
*logFile << "The initialization "
"failed." << std::endl;
return mwError;
}
}
// disconnect if requested to
if (code & mwByeMsg)
{
// update the 'keepWorker' flag if requested to
if (code & mwKeepMsg)
keepWorker = true;
if (code & mwDontKeepMsg)
keepWorker = false;
if (logFile)
*logFile << "Disconnecting upon "
"coordinator's request." <<
std::endl;
return mwOk;
}
// skip the rest if there is no standard message to process
if (!(code & mwStdMsg))
continue;
// prepare a basis for the returned code
unsigned int retcode = 0;
// process the message data
result = Process (msg);
// if an error occurred, disconnect and quit
if (result == mwError)
{
if (logFile)
*logFile << "Data processing failed." <<
std::endl;
return mwError;
}
// if rejected, reset the data and set the rejection flag
else if (result == mwReject)
{
if (logFile)
*logFile << "* Data rejected." << std::endl;
msg. Reset ();
retcode |= mwRejectedMsg;
}
// if processed successfully, make a note of it
else
{
if (logFile)
*logFile << "* Data processed." << std::endl;
}
// send port number if relevant
if (sendPort)
{
// send the port number at which the worker listens
mwData d;
d << this -> Port ();
unsigned int code = mwPortMsg;
int res = SendMessageW (fd, code, d);
// quit in case of failure
if (res < 0)
{
if (logFile)
*logFile << "Error while sending "
"the port number." <<
std::endl;
return mwError;
}
// make a note of having sent the port number
sendPort = false;
if (logFile)
*logFile << "* Port number sent." <<
std::endl;
}
// send the result of the processed data
result = SendMessageW (fd, retcode, msg);
// quit if the connection was lost
if (result == mwLost)
{
if (logFile)
*logFile << "Connection lost while sending "
"data to the coordinator." <<
std::endl;
return mwError;
}
// quit if an error occurred
if (result != mwOk)
{
if (logFile)
*logFile << "Error while sending data." <<
std::endl;
return mwError;
}
}
} /* mwWorker::WorkOne */
friend class mwCoordinator [friend] |
Definition at line 104 of file mwworker.h.
bool chomp::multiwork::mwWorker::keepWorker [private] |
Should the worker remain running after coordinator disconnects?
Definition at line 101 of file mwworker.h.
Referenced by KeepWorker(), Work(), and WorkOne().
1.7.1