This class defines a generic task object (coordinator or worker) for the multi-work distributed computations framework. More...
#include <mwtask.h>
Public Member Functions | |
mwTask () | |
The default constructor. | |
virtual | ~mwTask () |
The destructor. | |
void | Port (int number) |
Sets the port number for the communication or 0 to use none. | |
int | Port () const |
Returns the current port number. | |
void | ControlNumber (unsigned int number) |
Sets the control number for identification. | |
unsigned int | ControlNumber () const |
Returns the currently set identification control number. | |
void | TimeOut (int seconds) |
Sets the network connection time-out interval in seconds. | |
int | TimeOut () const |
Returns the currently set network connection time-out interval. | |
int | LogFile (const char *filename) |
Begins logging detailed communication debug information to the given file. | |
void | LogFile (const mwTask &other) |
Uses another task's log file to log this task's information. | |
void | LogClose () |
Closes the log file and adds a line with the time information unless this log file was borrowed from another task. | |
int | Add (const char *name, int port=-1) |
Adds an address to the list of computers to connect to at the beginning of working or coordinating. | |
int | Load (const char *filename) |
Loads computer addresses from the given file. | |
int | QuitWorkers () |
Quits all the workers whose addresses were added with the 'Add' and 'Load' functions. | |
Static Protected Member Functions | |
static int | SendMessage (int fd, unsigned int ctrl, unsigned int code, const mwData &x) |
Sends a message with data to the given socket. | |
static int | RecvMessage (int fd, unsigned int &ctrl, unsigned int &code, mwData &x) |
Receives a message with data from the given socket. | |
Protected Attributes | |
std::ofstream * | logFile |
The debug log file stream. | |
std::vector< std::string > | computers |
A list of workers or coordinators to connect to at start-up. | |
std::vector< int > | ports |
A list of port numbers of workers to connect to at start-up. | |
Private Member Functions | |
mwTask (const mwTask &) | |
The copy constructor is forbidden. | |
mwTask & | operator= (const mwTask &) |
The assignment operator is forbidden. | |
Private Attributes | |
int | portnum |
The network communication port number. | |
unsigned int | ctrlnum |
The control number that is used to recognize a compatible worker or a compatible coordinator. | |
int | timeout |
The network communication time-out in seconds. | |
bool | logBorrowed |
Is this log file pointer borrowed from another task? |
This class defines a generic task object (coordinator or worker) for the multi-work distributed computations framework.
It is the common part of a worker task and a coordinator task, and contains some general settings that apply to both. Note that this class is inherited by the worker and the coordinator class in the virtual mode, which allows one to create a class that is both a worker and a coordinator at the same time (although this is not recommended in general).
Definition at line 99 of file mwtask.h.
chomp::multiwork::mwTask::mwTask | ( | ) | [inline] |
chomp::multiwork::mwTask::~mwTask | ( | ) | [inline, virtual] |
The destructor.
Definition at line 305 of file mwtask.h.
References LogClose().
{ // close the log file if it was in use LogClose (); return; } /* mwTask::~mwTask */
chomp::multiwork::mwTask::mwTask | ( | const mwTask & | ) | [inline, private] |
int chomp::multiwork::mwTask::Add | ( | const char * | name, | |
int | port = -1 | |||
) | [inline] |
Adds an address to the list of computers to connect to at the beginning of working or coordinating.
The addresses must be in the form "computer.domain:port". If port is not defined then the default port number is used.
Definition at line 351 of file mwtask.h.
References computers, portnum, and ports.
Referenced by Load().
{ // if the name is empty, ignore it if (!name || !*name) return mwError; // determine whether the name contains a colon and port number int pos = 1; while (name [pos]) ++ pos; -- pos; while (pos && (name [pos] != ':') && (name [pos] >= '0') && (name [pos] <= '9')) { -- pos; } // if the name contains colon and some digits after the colon... if (pos && (name [pos] == ':') && name [pos + 1]) { // append the computer name and the chosen port number char *compname = new char [pos + 1]; for (int i = 0; i < pos; ++ i) compname [i] = name [i]; compname [pos] = '\0'; port = std::atoi (name + pos + 1); if (port <= 0) return mwError; computers. push_back (std::string (compname)); ports. push_back (port); delete [] compname; } else { // if the port number is not reasonable, use the default one if (port <= 0) port = portnum; if (port <= 0) return mwError; computers. push_back (std::string (name)); ports. push_back (port); } return mwOk; } /* mwTask::Add */
void chomp::multiwork::mwTask::ControlNumber | ( | unsigned int | number | ) | [inline] |
unsigned int chomp::multiwork::mwTask::ControlNumber | ( | ) | const [inline] |
Returns the currently set identification control number.
Definition at line 333 of file mwtask.h.
References ctrlnum.
Referenced by chomp::multiwork::mwCoordinator::RecvMessageC(), chomp::multiwork::mwWorker::RecvMessageW(), chomp::multiwork::mwCoordinator::SendMessageC(), and chomp::multiwork::mwWorker::SendMessageW().
{ return ctrlnum; } /* mwTask::ControlNumber */
int chomp::multiwork::mwTask::Load | ( | const char * | filename | ) | [inline] |
Loads computer addresses from the given file.
Returns the number of acquired addresses or mwError.
Definition at line 396 of file mwtask.h.
References Add(), chomp::multiwork::mwOk, and portnum.
{ std::ifstream f (filename); if (!f) return mwError; char buf [512]; int counter = 0; while (1) { *buf = '\0'; f. getline (buf, 512, '\n'); if ((*buf == ';') || (*buf == '#') || (*buf == '/')) continue; if (*buf) { int result = this -> Add (buf, portnum); if (result == mwOk) ++ counter; } if (!f) return counter; } } /* mwTask::Load */
void chomp::multiwork::mwTask::LogClose | ( | ) | [inline] |
Closes the log file and adds a line with the time information unless this log file was borrowed from another task.
Definition at line 232 of file mwtask.h.
References logBorrowed, and logFile.
Referenced by LogFile(), and ~mwTask().
{ if (!logFile) return; if (logBorrowed) { logFile = 0; logBorrowed = false; return; } std::time_t stop_time; std::time (&stop_time); *logFile << "\nMultiWork log file closed on " << std::asctime (std::localtime (&stop_time)) << "\n" "-----------------------------------------------------\n" << std::endl; delete logFile; logFile = 0; return; } /* mwTask::LogClose */
void chomp::multiwork::mwTask::LogFile | ( | const mwTask & | other | ) | [inline] |
Uses another task's log file to log this task's information.
Definition at line 287 of file mwtask.h.
References logBorrowed, LogClose(), and logFile.
{ // close the current log if in use LogClose (); // if there is no other log file, do nothing if (!other. logFile) return; // borrow the log file pointer logBorrowed = true; logFile = other. logFile; return; } /* mwTask::LogFile */
int chomp::multiwork::mwTask::LogFile | ( | const char * | filename | ) | [inline] |
Begins logging detailed communication debug information to the given file.
Returns mwOk on success, mwError if cannot open/create the file.
Definition at line 253 of file mwtask.h.
References LogClose(), and logFile.
{ // close the current log if in use if (logFile) LogClose (); // if no file name supplied, return now if (!filename || !*filename) return mwOk; // create a file stream variable logFile = new std::ofstream; if (!logFile) return mwError; // open the log file for appending logFile -> open (filename, std::ios::app); // write the current time to the log std::time_t start_time; std::time (&start_time); *logFile << "MultiWork log file opened on " << std::asctime (std::localtime (&start_time)) << std::endl; // if unable to open the file or an error occurred, return mwError if (!*logFile) { delete logFile; return mwError; } return mwOk; } /* mwTask::LogFile */
int chomp::multiwork::mwTask::Port | ( | ) | const [inline] |
Returns the current port number.
Definition at line 322 of file mwtask.h.
References portnum.
Referenced by chomp::multiwork::mwCoordinator::BeginListening(), chomp::multiwork::mwWorker::Work(), and chomp::multiwork::mwWorker::WorkOne().
{ return portnum; } /* mwTask::Port */
void chomp::multiwork::mwTask::Port | ( | int | number | ) | [inline] |
int chomp::multiwork::mwTask::QuitWorkers | ( | ) | [inline] |
Quits all the workers whose addresses were added with the 'Add' and 'Load' functions.
Returns mwOk or mwError.
Definition at line 493 of file mwtask.h.
References computers, ctrlnum, logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwConnect(), chomp::multiwork::mwDisconnect(), chomp::multiwork::mwOk, ports, and SendMessage().
{ // write to the log file what you are doing if (logFile) *logFile << "Turning workers off..." << std::endl; // try connecting to each worker and ask them to exit int counter = 0; for (unsigned int n = 0; n < computers. size (); ++ n) { // retrieve the computer name and port from the list const char *name = computers [n]. c_str (); int port = ports [n]; // if no valid name or port number read, skip this item if (!*name || !port) continue; // try connecting to the computer int fd = mwConnect (name, port); // if unsuccessful, make a note and take another one if (fd < 0) { if (logFile) { *logFile << "Worker " << name << ":" << port << " could not be contacted." << std::endl; } continue; } // prepare the control code to send to the worker unsigned int code = mwByeMsg | mwDontKeepMsg; // send the 'Bye!' message to the worker and disconnect it mwData empty; int result = this -> SendMessage (fd, ctrlnum, code, empty); mwDisconnect (fd); // add an appropriate message to the log file if (result == mwOk) { ++ counter; if (logFile) { *logFile << "Worker " << name << ":" << port << " exited successfully." << std::endl; } } else if (logFile) { *logFile << "Error while sending the disconnect " "message to " << name << ":" << port << "." << std::endl; } } // write to the log file how many workers were turned off if (logFile) *logFile << counter << " worker(s) have been shut down." << std::endl; return mwOk; } /* mwTask::QuitWorkers */
int chomp::multiwork::mwTask::RecvMessage | ( | int | fd, | |
unsigned int & | ctrl, | |||
unsigned int & | code, | |||
mwData & | x | |||
) | [inline, static, protected] |
Receives a message with data from the given socket.
Receives a message from the given socket.
Verifies the received control code if it is correct. Returns mwOk on success or mwError in the case of failure.
Returns mwOk, mwError or mwLost.
Definition at line 438 of file mwtask.h.
References chomp::multiwork::mwRecvBytes().
Referenced by chomp::multiwork::mwCoordinator::RecvMessageC(), and chomp::multiwork::mwWorker::RecvMessageW().
{ // read the code and length of the message char buf00 [12]; int result = mwRecvBytes (fd, buf00, 12); if (result < 0) return result; unsigned char *buf0 = reinterpret_cast<unsigned char *> (buf00); // extract the length of the message int len = (int) (buf0 [8]) << 24; len |= (int) (buf0 [9]) << 16; len |= (int) (buf0 [10]) << 8; len |= (int) (buf0 [11]); if (len < 0) return mwError; // extract the control code of the message ctrl = (int) (buf0 [0]) << 24; ctrl |= (int) (buf0 [1]) << 16; ctrl |= (int) (buf0 [2]) << 8; ctrl |= (int) (buf0 [3]); // extract the code of the message code = (int) (buf0 [4]) << 24; code |= (int) (buf0 [5]) << 16; code |= (int) (buf0 [6]) << 8; code |= (int) (buf0 [7]); // if the message length is zero, no more reading is necessary if (!len) { x. Reset (); return mwOk; } // prepare a memory buffer for the message char *buf = new char [len]; if (!buf) return mwError; // read the message result = mwRecvBytes (fd, buf, len); if (result < 0) { delete [] buf; return result; } // transform the message to mw data x. Take (buf, len); return mwOk; } /* RecvMessage */
int chomp::multiwork::mwTask::SendMessage | ( | int | fd, | |
unsigned int | ctrl, | |||
unsigned int | code, | |||
const mwData & | x | |||
) | [inline, static, protected] |
Sends a message with data to the given socket.
Sends a message to the given socket.
The message includes a control number. Returns mwOk on success and mwError in the case of failure.
Returns mwOk, mwError or mwLost.
Definition at line 428 of file mwtask.h.
References chomp::multiwork::mwSendBytes().
Referenced by QuitWorkers(), chomp::multiwork::mwCoordinator::SendMessageC(), and chomp::multiwork::mwWorker::SendMessageW().
{ mwData sending; sending << ctrl << code << x. Length () << x; return mwSendBytes (fd, sending. Buffer (), sending. Length ()); } /* SendMessage */
void chomp::multiwork::mwTask::TimeOut | ( | int | seconds | ) | [inline] |
int chomp::multiwork::mwTask::TimeOut | ( | ) | const [inline] |
Returns the currently set network connection time-out interval.
Definition at line 344 of file mwtask.h.
References timeout.
Referenced by chomp::multiwork::mwCoordinator::RunLoop(), chomp::multiwork::mwWorker::Work(), and chomp::multiwork::mwWorker::WorkOne().
{ return timeout; } /* mwTask::TimeOut */
std::vector<std::string> chomp::multiwork::mwTask::computers [protected] |
A list of workers or coordinators to connect to at start-up.
Definition at line 202 of file mwtask.h.
Referenced by Add(), chomp::multiwork::mwCoordinator::ConnectWorkers(), QuitWorkers(), and chomp::multiwork::mwWorker::Work().
unsigned int chomp::multiwork::mwTask::ctrlnum [private] |
The control number that is used to recognize a compatible worker or a compatible coordinator.
Note: The worker sends its negation.
Definition at line 169 of file mwtask.h.
Referenced by ControlNumber(), and QuitWorkers().
bool chomp::multiwork::mwTask::logBorrowed [private] |
Is this log file pointer borrowed from another task?
Definition at line 182 of file mwtask.h.
Referenced by LogClose(), and LogFile().
std::ofstream* chomp::multiwork::mwTask::logFile [protected] |
The debug log file stream.
Definition at line 178 of file mwtask.h.
Referenced by chomp::multiwork::mwCoordinator::BeginListening(), chomp::multiwork::mwCoordinator::ConnectWorkers(), chomp::multiwork::mwCoordinator::Coordinate(), chomp::multiwork::mwCoordinator::DisconnectAll(), LogClose(), LogFile(), QuitWorkers(), chomp::multiwork::mwCoordinator::RecvMessageC(), chomp::multiwork::mwWorker::RecvMessageW(), chomp::multiwork::mwCoordinator::RunLoop(), chomp::multiwork::mwCoordinator::RunLoopLocally(), chomp::multiwork::mwWorker::Work(), and chomp::multiwork::mwWorker::WorkOne().
int chomp::multiwork::mwTask::portnum [private] |
std::vector<int> chomp::multiwork::mwTask::ports [protected] |
A list of port numbers of workers to connect to at start-up.
Definition at line 205 of file mwtask.h.
Referenced by Add(), chomp::multiwork::mwCoordinator::ConnectWorkers(), QuitWorkers(), and chomp::multiwork::mwWorker::Work().
int chomp::multiwork::mwTask::timeout [private] |
The network communication time-out in seconds.
Definition at line 172 of file mwtask.h.
Referenced by TimeOut(), and chomp::multiwork::mwWorker::Work().