This class defines a generic coordinator task object for the multi-work distributed computations framework. More...
#include <mwcoord.h>
Public Member Functions | |
| mwCoordinator () | |
| The default constructor. | |
| virtual | ~mwCoordinator () |
| The virtual destructor. | |
| void | KeepWorkers (bool keep=true) |
| Makes workers keep running after the coordinator's completion. | |
| int | SaveWorkers (const char *filename) |
| Saves addresses of workers to the given file in the form of address:port. | |
| void | Init (mwData &data) |
| Defines a portion of initialization data which will be sent to every newly connected worker. | |
| int | Coordinate (mwWorker *w=NULL) |
| Run the coordinator until all the work has been completed. | |
Private Member Functions | |
| virtual int | Prepare (mwData &data) |
| Prepares a piece of data to be sent to a worker. | |
| virtual int | Accept (mwData &data) |
| Accepts a result received from a worker. | |
| virtual int | Reject (mwData &data) |
| Acknowledges data that was rejected by a worker. | |
| int | RunLoop (bool no_more_data) |
| Runs the main communication loop: Sends the data to workers and receives the results of their computations. | |
| int | RunLoopLocally () |
| Runs the main communication loop using the local worker. | |
| void | ConnectWorkers () |
| Connects to all workers in the list. | |
| void | BeginListening () |
| Begins listening at the given port. | |
| void | DisconnectAll () |
| Disconnects all the workers (normally called in the destructor). | |
| int | SendMessageC (int fd, unsigned int code, const mwData &x) const |
| Sends a message with data to the given socket as a coordinator. | |
| int | RecvMessageC (int fd, unsigned int &code, mwData &x) const |
| Receives a message with data from the socket of a worker. | |
Static Private Member Functions | |
| static void | mwTableDel (int *tab, int len, int pos) |
| A helper function for deleting a table entry and shifting the remainder of the table backwards. | |
| static void | mwTableDel (mwData *tab, int len, int pos) |
| A helper function for deleting a table entry and shifting the remainder of the table backwards using the method "Take" of the mwData class. | |
Private Attributes | |
| bool | singleWork |
| Should data be processed locally only? | |
| mwWorker * | localWorker |
| The address of a local worker or 0 if none is available. | |
| bool | keepWorkers |
| Should the workers be kept running after coordinator is done? | |
| mwData | initData |
| The initialization data that has to be sent to workers. | |
| int | nWaiting |
| The number of workers waiting for their data. | |
| mwWorkerData | xWaiting [1024] |
| The workers waiting for their tasks. | |
| int | nWorking |
| The number of workers processing their data. | |
| mwWorkerData | xWorking [1024] |
| The workers processing their data. | |
| int | nToDo |
| The number of data pieces to be sent to working tasks. | |
| mwData | xToDo [1024] |
| The data pieces waiting to be sent to working tasks. | |
| int | nRejected |
| The number of recently rejected pieces of data. | |
| mwData | xRejected [1024] |
| The recently rejected pieces of data. | |
| int | nDone |
| The number of recently finished pieces of data. | |
| mwData | xDone [1024] |
| The recently finished pieces of data. | |
| int | listensocket |
| The socket number at which new workers are listened to. | |
This class defines a generic coordinator task object for the multi-work distributed computations framework.
Each coordinator class defined by the user must inherit from this class.
Definition at line 127 of file mwcoord.h.
| chomp::multiwork::mwCoordinator::mwCoordinator | ( | ) | [inline] |
The default constructor.
Definition at line 289 of file mwcoord.h.
: #if mwNETWORK singleWork (false), #else singleWork (true), #endif localWorker (0), keepWorkers (false), nWaiting (0), nWorking (0), nToDo (0), nRejected (0), nDone (0), listensocket (-1) { return; } /* mwCoordinator::mwCoordinator */
| chomp::multiwork::mwCoordinator::~mwCoordinator | ( | ) | [inline, virtual] |
The virtual destructor.
Definition at line 343 of file mwcoord.h.
References DisconnectAll().
{
// disconnect all the workers
DisconnectAll ();
return;
} /* mwCoordinator::~mwCoordinator */
| int chomp::multiwork::mwCoordinator::Accept | ( | mwData & | data | ) | [inline, private, virtual] |
Accepts a result received from a worker.
Should return mwOk on success, mwError to stop the computations. Please, overload this function in the class you derive from mwCoordinator.
Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.
Definition at line 453 of file mwcoord.h.
Referenced by Coordinate().
{
return mwOk;
} /* mwCoordinator::Accept */
| void chomp::multiwork::mwCoordinator::BeginListening | ( | ) | [inline, private] |
Begins listening at the given port.
Definition at line 604 of file mwcoord.h.
References listensocket, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwListen(), and chomp::multiwork::mwTask::Port().
Referenced by Coordinate().
{
// if there is no valid port number then cancel this operation
if (this -> Port () <= 0)
return;
// open a listening socket at the given port
listensocket = mwListen (this -> Port (), 15);
// add a message to the log file whether it was successful or not
if (logFile)
{
if (listensocket < 0)
*logFile << "Listening attempt at port " <<
this -> Port () << " failed." << std::endl;
else
*logFile << "Waiting for workers at port " <<
this -> Port () << "." << std::endl;
}
return;
} /* mwCoordinator::BeginListening */
| void chomp::multiwork::mwCoordinator::ConnectWorkers | ( | ) | [inline, private] |
Connects to all workers in the list.
Definition at line 555 of file mwcoord.h.
References chomp::multiwork::mwTask::computers, initData, keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwConnect(), chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, mwMAXWORK, chomp::multiwork::mwOk, nWaiting, chomp::multiwork::mwTask::ports, SendMessageC(), and xWaiting.
Referenced by Coordinate().
{
int nComputers = computers. size ();
for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
{
// determine the computer name and port number
const std::string &name = computers [i];
int port = ports [i];
// open the connection
int fd = mwConnect (name. c_str (), port);
// if the connection attempt was successful, add this worker
if (fd >= 0)
{
if (logFile)
*logFile << "Connected to " << name << ":" <<
port << "." << std::endl;
mwWorkerData &w = xWaiting [nWaiting];
w. fd = fd;
w. name = name;
w. port = port;
++ nWaiting;
// prepare the initialization code
int code = mwInitMsg |
(keepWorkers ? mwKeepMsg : mwDontKeepMsg);
// send the initialization data to the worker
int initResult = SendMessageC (w. fd, code,
initData);
if (initResult != mwOk)
{
if (logFile)
*logFile << "Error while sending "
"the initialization data." <<
std::endl;
-- nWaiting;
}
}
// if unable to connect, make only a note in the log
else if (logFile)
*logFile << "Connection attempt to " << name <<
":" << port << " failed." << std::endl;
}
return;
} /* mwCoordinator::ConnectWorkers */
| int chomp::multiwork::mwCoordinator::Coordinate | ( | mwWorker * | w = NULL |
) | [inline] |
Run the coordinator until all the work has been completed.
If a worker object is supplied, the work will be done locally unless any remote worker is connected to the coordinator. Return mwOk or mwError.
Definition at line 1062 of file mwcoord.h.
References Accept(), BeginListening(), ConnectWorkers(), initData, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, chomp::multiwork::mwNoData, chomp::multiwork::mwOk, nDone, nRejected, nToDo, nWaiting, nWorking, Prepare(), Reject(), RunLoop(), RunLoopLocally(), singleWork, xDone, xRejected, and xToDo.
{
// remember the local worker's address
localWorker = w;
// initialize the local worker if any
if (localWorker)
localWorker -> Initialize (initData);
if (logFile)
{
*logFile << "Running as a COORDINATOR." << std::endl;
// indicate what kind of network connection is in use
#if !mwNETWORK
*logFile << "There is no network in use." << std::endl;
#elif mwWXWIN
*logFile << "Using the sockets interface "
"provided by wxWindows." << std::endl;
#else
*logFile << "Using the standard sockets "
"for network connections." << std::endl;
#endif
// say if running in the single-work mode only
if (singleWork && localWorker)
*logFile << "Running in the single-work mode." <<
std::endl;
}
// connect to workers on the list and begin listening if necessary
if (!singleWork)
{
this -> ConnectWorkers ();
this -> BeginListening ();
}
// if not listening and there are no workers then try switching
// to the single-work mode, unless there is no local worker
if (!singleWork && (listensocket < 0) && !nWaiting)
{
// switch to the single-work mode if allowed to
if (localWorker)
{
singleWork = true;
if (logFile)
*logFile << "No remote workers. Switching "
"to the single-work mode." <<
std::endl;
}
// if data cannot be processed locally, report a failure
else
{
if (logFile)
*logFile << "Failure: No workers." <<
std::endl;
return mwError;
}
}
// is there no more data to be sent?
bool no_more_data = false;
while (1)
{
// run the communications loop
int loopresult = singleWork ? this -> RunLoopLocally () :
this -> RunLoop (no_more_data);
// stop if the coordinator failed badly
if (loopresult == mwError)
return mwError;
// if some data was rejected, process all this data
while (nRejected > 0)
{
// run the user's procedure to acquire rejected data
-- nRejected;
xRejected [nRejected]. Rewind ();
int result = this -> Reject (xRejected [nRejected]);
// interrupt if the user says that an error occurred
if (result != mwOk)
return mwError;
// reset the data acquired by the user
xRejected [nRejected]. Reset ();
no_more_data = false;
}
// if some new data arrived, process all this data
while (nDone > 0)
{
// call the user's procedure to accept the data piece
-- nDone;
xDone [nDone]. Rewind ();
int result = this -> Accept (xDone [nDone]);
// interrupt if the user says that an error occurred
if (result != mwOk)
return mwError;
// reset the data acquired by the user
xDone [nDone]. Reset ();
no_more_data = false;
}
// determine whether a new data item must be prepared:
// if there are workers waiting and there is no data then YES
bool hungry = (nWaiting > 0) && !nToDo;
// if there is no worker and no data then YES
if ((localWorker || (listensocket >= 0)) &&
!nWorking && !nWaiting && !nToDo)
{
hungry = true;
}
// if no more data is needed then definitely NO
if (no_more_data)
hungry = false;
// prepare a new data item if necessary
if (hungry)
{
// run the user's procedure for preparing data
int result = this -> Prepare (xToDo [nToDo]);
// break if the user says that an error occurred
if (result == mwError)
return mwError;
// make a note if there is no more data
else if (result == mwNoData)
no_more_data = true;
// add the data piece to the work queue otherwise
else
++ nToDo;
}
// stop if the tasks are completed and there is no more data
if (no_more_data && !nWorking && !nToDo)
return mwOk;
}
} /* mwCoordinator::Coordinate */
| void chomp::multiwork::mwCoordinator::DisconnectAll | ( | ) | [inline, private] |
Disconnects all the workers (normally called in the destructor).
Definition at line 307 of file mwcoord.h.
References keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwKeepMsg, nWaiting, nWorking, SendMessageC(), xWaiting, and xWorking.
Referenced by ~mwCoordinator().
{
// release all the workers if they are still connected
for (int i = 0; i < nWaiting + nWorking; ++ i)
{
// determine the particular worker
mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
xWorking [i - nWaiting];
// if this worker is already disconnected, take the next one
if (w. fd < 0)
continue;
// prepare the code to send to the worker
unsigned int code = mwByeMsg;
code |= keepWorkers ? mwKeepMsg : mwDontKeepMsg;
// send the 'Bye!' message to the worker
mwData empty;
SendMessageC (w. fd, code, empty);
// disconnect the worker
mwDisconnect (w. fd);
w. fd = -1;
// make a note of what happened in the log file
if (logFile)
*logFile << "Worker " << i << " (" << w. name <<
":" << w. port << ") disconnected and " <<
(keepWorkers ? "waiting." : "exited.") <<
std::endl;
}
return;
} /* mwCoordinator::Disconnect */
| void chomp::multiwork::mwCoordinator::Init | ( | mwData & | data | ) | [inline] |
| void chomp::multiwork::mwCoordinator::KeepWorkers | ( | bool | keep = true |
) | [inline] |
Makes workers keep running after the coordinator's completion.
Definition at line 351 of file mwcoord.h.
References keepWorkers.
{
keepWorkers = keep;
return;
} /* mwCoordinator::KeepWorkers */
| void chomp::multiwork::mwCoordinator::mwTableDel | ( | int * | tab, | |
| int | len, | |||
| int | pos | |||
| ) | [inline, static, private] |
| void chomp::multiwork::mwCoordinator::mwTableDel | ( | mwData * | tab, | |
| int | len, | |||
| int | pos | |||
| ) | [inline, static, private] |
| int chomp::multiwork::mwCoordinator::Prepare | ( | mwData & | data | ) | [inline, private, virtual] |
Prepares a piece of data to be sent to a worker.
Should return mwOk on success, mwNothing if there is no data to send based on the computations completed so far, mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator.
Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.
Definition at line 448 of file mwcoord.h.
Referenced by Coordinate().
{
return mwNoData;
} /* mwCoordinator::Prepare */
| int chomp::multiwork::mwCoordinator::RecvMessageC | ( | int | fd, | |
| unsigned int & | code, | |||
| mwData & | x | |||
| ) | const [inline, private] |
Receives a message with data from the socket of a worker.
Returns mwOk on success or mwError in the case of failure.
Definition at line 484 of file mwcoord.h.
References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().
Referenced by RunLoop().
{
// receive the message
unsigned int ctrl = 0;
int result = this -> RecvMessage (fd, ctrl, code, x);
// if there was an error then return its code
if (result != mwOk)
return result;
// if the control number is correct then finish successfully
if (ctrl == ~(this -> ControlNumber ()))
return mwOk;
// if the control number is wrong then return an error code
if (logFile)
*logFile << "Wrong control code received "
"from the worker: " << ~ctrl << "." << std::endl;
return mwError;
} /* mwCoordinator::SendMessageC */
| int chomp::multiwork::mwCoordinator::Reject | ( | mwData & | data | ) | [inline, private, virtual] |
Acknowledges data that was rejected by a worker.
Should return mwOk if this is fine, or mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator unless the default behavior is fine (returning mwOk and ignoring the data).
Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.
Definition at line 458 of file mwcoord.h.
Referenced by Coordinate().
{
return mwOk;
} /* mwCoordinator::Reject */
| int chomp::multiwork::mwCoordinator::RunLoop | ( | bool | no_more_data | ) | [inline, private] |
Runs the main communication loop: Sends the data to workers and receives the results of their computations.
If there is no_more_data then waits for the running tasks until they finalize and some data becomes available. Otherwise, returns also when a new worker is ready to acquire a portion of data for processing. Returns mwOk or mwError.
Definition at line 627 of file mwcoord.h.
References initData, keepWorkers, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwCanRead, chomp::multiwork::mwCanWrite, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, mwMAXWORK, chomp::multiwork::mwOk, chomp::multiwork::mwPortMsg, chomp::multiwork::mwReject, chomp::multiwork::mwRejectedMsg, chomp::multiwork::mwSelect(), chomp::multiwork::mwTimeOut, nDone, nRejected, nToDo, nWaiting, nWorking, RecvMessageC(), SendMessageC(), singleWork, chomp::multiwork::swap(), chomp::multiwork::mwTask::TimeOut(), xDone, xRejected, xToDo, xWaiting, and xWorking.
Referenced by Coordinate().
{
if (false && logFile)
*logFile << "\nDebug0: " << nWaiting << " waiting, " <<
nWorking << " working, " << nToDo <<
" data pieces." << std::endl;
// if not listening and there are no workers then try switching
// to the single-work mode, unless there is no local worker
if ((listensocket < 0) && !nWorking && !nWaiting)
{
// switch to the single-work mode if allowed to
if (localWorker)
{
singleWork = true;
if (logFile)
*logFile << "All workers disconnected. "
"Switching to the single-work "
"mode." << std::endl;
return mwOk;
}
// if data cannot be processed locally, report a failure
else
{
if (logFile)
*logFile << "Failure: All workers "
"disconnected. The work cannot be "
"continued." << std::endl;
return mwError;
}
}
// TO DO: If there is no more data (options & mwNoMoreData)
// and some workers are idle, and some others are working
// for a long time, send their data also to a few idle
// workers... This requires some A.I., of course. ;)
// the i/o flags ans sockets of the workers + one for 'listensocket'
int ioflags [mwMAXWORK];
int sockets [mwMAXWORK];
int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;
// prepare flags for the working and waiting workers
for (int i = 0; i < nWorking; ++ i)
{
ioflags [i] = mwCanRead;
sockets [i] = xWorking [i]. fd;
}
if (nWorkers > nWorking)
{
for (int i = 0; i < nWaiting; ++ i)
{
ioflags [nWorking + i] = mwCanWrite;
sockets [nWorking + i] = xWaiting [i]. fd;
}
}
// prepare the listen socket flag
bool listening = false;
int listenflag = nWorkers;
if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
{
ioflags [listenflag] = mwCanRead;
listening = true;
}
else
ioflags [listenflag] = mwNone;
// determine whether it is necessary to wait or not
int timelimit = this -> TimeOut ();
if (localWorker && !nWorking && !nWaiting)
timelimit = 0;
if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
timelimit = 0;
if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
timelimit = 0;
// if (no_more_data && !nToDo && !nWorking)
// timelimit = 0;
// report the select's parameters to the log file
if (logFile)
{
*logFile << ">>> Select, t = " << timelimit << ", flags =";
for (int i = 0; i <= nWorkers; ++ i)
*logFile << " " << ioflags [i];
*logFile << "." << std::endl;
}
// wait until data can be received from or sent to any socket
int result = mwSelect (sockets, nWorkers,
listening ? listensocket : -1, ioflags, timelimit);
// report the returned parametrs to the log file
if (logFile)
{
*logFile << ">>> Returned flags =";
for (int i = 0; i <= nWorkers; ++ i)
*logFile << " " << ioflags [i];
*logFile << "." << std::endl;
}
// in case of select's failure, exit the loop with a failure message
if (result == mwError)
{
if (logFile)
*logFile << "Error: The 'select' function failed." <<
std::endl;
return mwError;
}
// report a time-out if necessary
if ((timelimit > 0) && (result == mwTimeOut))
{
if (logFile)
*logFile << "Time-out occurred at 'select'." <<
std::endl;
}
// receive data from all the workers who are ready
for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
(nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
{
// if this worker is not ready, yet, then skip it
if (!(ioflags [i] & mwCanRead))
continue;
// receive the entire data chunk from the worker
unsigned int code = 0;
int result = RecvMessageC (sockets [i], code, xDone [nDone]);
// remember which worker this is
mwWorkerData &w = xWorking [i];
// reject the worker in case of error
if (result < 0)
{
// log the details of what happened
if (logFile)
{
*logFile << "Worker " << i;
if (!w. name. empty ())
*logFile << " (" << w. name << ")";
*logFile << " disconnected: ";
if (result == mwLost)
*logFile << "Connection lost.";
else
*logFile << "An error occurred.";
*logFile << std::endl;
}
// disconnect the worker
mwDisconnect (sockets [i]);
// move the worker's data back to the "to-do" list
xToDo [nToDo]. Take (w. data);
++ nToDo;
// make a note of this in the worker's data
w. fd = -1;
w. status = -1;
}
// if transmitting the port number only, take it
else if (code & mwPortMsg)
{
// retrieve the port number
xDone [nDone] >> w. port;
// report this fact to the log file
if (logFile)
*logFile << "Port number " << w. port <<
" received from worker " << i <<
"." << std::endl;
}
// if the data was rejected, move it to 'xRejected'
else if (code & mwRejectedMsg)
{
// report what happened
if (logFile)
*logFile << "Data was rejected by worker " <<
i << "." << std::endl;
// move the data to the rejected table
xRejected [nRejected]. Take (w. data);
++ nRejected;
// indicate the status change of this worker
w. status = 1;
}
// if the data was Ok, the result is in 'xDone'
else
{
// report this to the log file
if (logFile)
*logFile << "Processed data received from "
"worker " << i << "." << std::endl;
// accept this piece of data
++ nDone;
// indicate the status change of this worker
w. status = 1;
}
}
// send data to those workers who are ready
for (int i = 0; (i < nWaiting) && nToDo; ++ i)
{
// remember the worker and its offset
mwWorkerData &w = xWaiting [i];
int offset = nWorking + i;
// if this worker is not ready to get data then skip it
if (!(ioflags [offset] & mwCanWrite))
continue;
// prepare a message code to send
unsigned int code = mwStdMsg;
// send a data chunk for processing
int result = SendMessageC (sockets [offset], code,
xToDo [nToDo - 1]);
// if the data was sent successfully
if (result == mwOk)
{
// take the data chunk to the worker
-- nToDo;
w. data. Take (xToDo [nToDo]);
// indicate the status of the worker
w. status = 1;
// report this to the log file
if (logFile)
*logFile << "Data " << nToDo << " sent to "
"worker " << i << "." << std::endl;
}
// if an error occurred, reject the worker
else
{
// report this situation to the log file
if (logFile)
*logFile << "Worker " << i << " disconnected"
": " << ((result == mwLost) ?
"Connection lost." :
"An error occurred.") << std::endl;
// disconnect the worker
mwDisconnect (sockets [offset]);
// modify the status of the worker accordingly
w. fd = -1;
w. status = -1;
}
}
if (false && logFile)
{
*logFile << "Debug1: " << nWaiting << " waiting:";
for (int i = 0; i < nWaiting; ++ i)
*logFile << " " << xWaiting [i]. status;
*logFile << "; " << nWorking << " working:";
for (int i = 0; i < nWorking; ++ i)
*logFile << " " << xWorking [i]. status;
*logFile << std::endl;
}
// purge workers who have been disconnected and move workers
// whose data has been acquired to the 'waiting' queue
for (int i = 0; i < nWorking; ++ i)
{
// skip this worker if it is fine
if (xWorking [i]. status == 0)
continue;
// swap this worker with the last one if necessary
if (i < nWorking - 1)
swap (xWorking [i], xWorking [nWorking - 1]);
// move the worker to the waiting queue if necessary
if (xWorking [nWorking - 1]. status > 0)
{
xWorking [nWorking - 1]. status = 0;
swap (xWaiting [nWaiting], xWorking [nWorking - 1]);
++ nWaiting;
}
// remove this worker from the working queue
-- nWorking;
// consider the same entry in the table again
-- i;
}
// purge workers who have been disconnected and move workers
// who received data to the 'working' queue
for (int i = 0; i < nWaiting; ++ i)
{
// skip this worker if it is fine
if (xWaiting [i]. status == 0)
continue;
// swap this worker with the last one if necessary
if (i < nWaiting - 1)
swap (xWaiting [i], xWaiting [nWaiting - 1]);
// move the worker to the working queue if necessary
if (xWaiting [nWaiting - 1]. status > 0)
{
xWaiting [nWaiting - 1]. status = 0;
swap (xWorking [nWorking], xWaiting [nWaiting - 1]);
++ nWorking;
}
// remove this worker from the waiting queue
-- nWaiting;
// consider the same entry in the table again
-- i;
}
if (false && logFile)
*logFile << "Debug2: " << nWaiting << " waiting, " <<
nWorking << " working, " << nToDo <<
" data pieces." << std::endl;
// accept connections from new workers if any
if (listening && (ioflags [listenflag] & mwCanRead) &&
(nWaiting + nWorking < mwMAXWORK))
{
// accept the connection
mwWorkerData &w = xWaiting [nWaiting];
w. name = std::string ("");
w. port = 0;
w. status = 0;
w. fd = mwAccept (listensocket, w. name);
// if the new worker has been accepted successfully
if (w. fd >= 0)
{
// report the worker's acceptance
if (logFile)
*logFile << "A worker from '" << w. name <<
"' accepted." << std::endl;
// add the worker to the waiting queue
++ nWaiting;
// prepare the initialization code
int code = mwInitMsg |
(keepWorkers ? mwKeepMsg : mwDontKeepMsg);
// send the initialization data to the worker
int initResult = SendMessageC (w. fd, code,
initData);
if (initResult != mwOk)
{
if (logFile)
*logFile << "Error while sending "
"the initialization data." <<
std::endl;
-- nWaiting;
}
}
// report the error to the log file if not successful
else if (logFile)
*logFile << "Unsuccessful connection of a worker "
"from '" << w. name << "'." << std::endl;
}
if (false && logFile)
*logFile << "Debug3: " << nWaiting << " waiting, " <<
nWorking << " working, " << nToDo <<
" data pieces." << std::endl;
// ask for some data to process if none is waiting
// or too much data acquired from workers is accumulated
if (localWorker && !nWorking && !nWaiting &&
!no_more_data && (result == mwTimeOut) && !nToDo)
{
if (logFile)
*logFile << "Asking for some data to be "
"processed locally." << std::endl;
return mwOk;
}
// run the work locally as a last resort
if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
{
// make a note in the log file of what is going on
if (logFile)
*logFile << "Processing data locally." << std::endl;
// process one piece of data
-- nToDo;
xToDo [nToDo]. Rewind ();
int result = localWorker -> Process (xToDo [nToDo]);
xToDo [nToDo]. Rewind ();
if (result == mwReject)
{
xRejected [nRejected]. Take (xToDo [nToDo]);
++ nRejected;
}
else if (result == mwError)
{
if (logFile)
*logFile << "Data processing failed." <<
std::endl;
return mwError;
}
else
{
xDone [nDone]. Take (xToDo [nToDo]);
++ nDone;
}
}
if (false && logFile)
*logFile << "Debug4: " << nWaiting << " waiting, " <<
nWorking << " working, " << nToDo <<
" data pieces." << std::endl;
return mwOk;
} /* mwCoordinator::RunLoop */
| int chomp::multiwork::mwCoordinator::RunLoopLocally | ( | ) | [inline, private] |
Runs the main communication loop using the local worker.
Returns mwOk or mwError.
Definition at line 508 of file mwcoord.h.
References localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, mwMAXWORK, chomp::multiwork::mwReject, nDone, nRejected, nToDo, nWaiting, xDone, xRejected, and xToDo.
Referenced by Coordinate().
{
if (!localWorker)
return mwError;
// indicate that some worker is continuously waiting for data
if (!nWaiting)
++ nWaiting;
// process the data
while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
{
// process the data and replace it with the result
-- nToDo;
xToDo [nToDo]. Rewind ();
int result = localWorker -> Process (xToDo [nToDo]);
xToDo [nToDo]. Rewind ();
// if the data caused an error then abort the loop
if (result == mwError)
{
if (logFile)
*logFile << "Data processing failed." <<
std::endl;
return mwError;
}
// if the data was rejected
// then move it to the right place
else if (result == mwReject)
{
xRejected [nRejected]. Take (xToDo [nToDo]);
++ nRejected;
}
// if the data was accepted
// then move it to the right place
else
{
xDone [nDone]. Take (xToDo [nToDo]);
++ nDone;
}
}
return mwOk;
} /* mwCoordinator::RunLoopLocally */
| int chomp::multiwork::mwCoordinator::SaveWorkers | ( | const char * | filename | ) | [inline] |
Saves addresses of workers to the given file in the form of address:port.
Repeated combinations are saved once only.
Definition at line 357 of file mwcoord.h.
References keepWorkers, nWaiting, nWorking, xWaiting, and xWorking.
{
// create a file for the list of workers
// prepare to create a file to list the workers
std::ofstream f;
bool first = false;
// go through all the connected workers
int counter = 0;
for (int i = 0; i < nWaiting + nWorking; ++ i)
{
// determine the particular worker
mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
xWorking [i - nWaiting];
// determine the host name
const char *host = w. name. c_str ();
// if no host name or port number is known, skip it
if (!host || !*host || (w. port <= 0))
continue;
// check if the same worker was already listed
int j;
for (j = 0; j < i; ++ j)
{
// determine the other worker
mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
xWorking [j - nWaiting];
// determine the name of the other host
const char *host_j = v. name. c_str ();
// if no host name or port number is known, skip it
if (!host_j || !*host_j || (v. port <= 0))
continue;
// if the ports are different, skip the other worker
if (w. port != v. port)
continue;
// if the names are different, skip the other worker
if (std::strcmp (host, host_j))
continue;
// make a note that this is the same worker
j = i + 1;
break;
}
if (j > i)
continue;
// create the file if this is the first time to write data
if (first)
{
f. open (filename, std::ios::out | std::ios::trunc);
if (!f)
return mwError;
f << "; A list of currently running workers:\n";
first = false;
}
// write the address
f << host << ":" << w. port << "\n";
++ counter;
}
// exit if no workers have been listed
if (!counter)
return mwOk;
// add a summary if any workers have been listed
f << "; A total of " << counter << " workers";
if (counter != nWaiting + nWorking)
f << " out of " << (nWaiting + nWorking);
f << " saved.\n";
if (!keepWorkers)
f << "; The workers will exit upon disconnection.\n";
else
f << "; The workers will remain running "
"after disconnection.\n";
f. close ();
if (!f)
return mwError;
else
return mwOk;
} /* mwCoordinator::SaveWorkers */
| int chomp::multiwork::mwCoordinator::SendMessageC | ( | int | fd, | |
| unsigned int | code, | |||
| const mwData & | x | |||
| ) | const [inline, private] |
Sends a message with data to the given socket as a coordinator.
Returns mwOk on success and mwError in the case of failure.
Definition at line 474 of file mwcoord.h.
References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().
Referenced by ConnectWorkers(), DisconnectAll(), and RunLoop().
{
// prepare the control number to send
unsigned int ctrl = this -> ControlNumber ();
// send the message with the control number and the message code
return this -> SendMessage (fd, ctrl, code, x);
} /* mwCoordinator::SendMessageC */
The initialization data that has to be sent to workers.
Definition at line 225 of file mwcoord.h.
Referenced by ConnectWorkers(), Coordinate(), Init(), and RunLoop().
bool chomp::multiwork::mwCoordinator::keepWorkers [private] |
Should the workers be kept running after coordinator is done?
Definition at line 220 of file mwcoord.h.
Referenced by ConnectWorkers(), DisconnectAll(), KeepWorkers(), RunLoop(), and SaveWorkers().
int chomp::multiwork::mwCoordinator::listensocket [private] |
The socket number at which new workers are listened to.
Definition at line 262 of file mwcoord.h.
Referenced by BeginListening(), Coordinate(), and RunLoop().
The address of a local worker or 0 if none is available.
Definition at line 217 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
int chomp::multiwork::mwCoordinator::nDone [private] |
The number of recently finished pieces of data.
Definition at line 254 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
int chomp::multiwork::mwCoordinator::nRejected [private] |
The number of recently rejected pieces of data.
Definition at line 248 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
int chomp::multiwork::mwCoordinator::nToDo [private] |
The number of data pieces to be sent to working tasks.
Definition at line 242 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
int chomp::multiwork::mwCoordinator::nWaiting [private] |
The number of workers waiting for their data.
Definition at line 230 of file mwcoord.h.
Referenced by ConnectWorkers(), Coordinate(), DisconnectAll(), RunLoop(), RunLoopLocally(), and SaveWorkers().
int chomp::multiwork::mwCoordinator::nWorking [private] |
The number of workers processing their data.
Definition at line 236 of file mwcoord.h.
Referenced by Coordinate(), DisconnectAll(), RunLoop(), and SaveWorkers().
bool chomp::multiwork::mwCoordinator::singleWork [private] |
Should data be processed locally only?
Definition at line 214 of file mwcoord.h.
Referenced by Coordinate(), and RunLoop().
mwData chomp::multiwork::mwCoordinator::xDone[1024] [private] |
The recently finished pieces of data.
Definition at line 257 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
mwData chomp::multiwork::mwCoordinator::xRejected[1024] [private] |
The recently rejected pieces of data.
Definition at line 251 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
mwData chomp::multiwork::mwCoordinator::xToDo[1024] [private] |
The data pieces waiting to be sent to working tasks.
Definition at line 245 of file mwcoord.h.
Referenced by Coordinate(), RunLoop(), and RunLoopLocally().
mwWorkerData chomp::multiwork::mwCoordinator::xWaiting[1024] [private] |
The workers waiting for their tasks.
Definition at line 233 of file mwcoord.h.
Referenced by ConnectWorkers(), DisconnectAll(), RunLoop(), and SaveWorkers().
mwWorkerData chomp::multiwork::mwCoordinator::xWorking[1024] [private] |
The workers processing their data.
Definition at line 239 of file mwcoord.h.
Referenced by DisconnectAll(), RunLoop(), and SaveWorkers().
1.7.1