Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes

chomp::multiwork::mwCoordinator Class Reference

This class defines a generic coordinator task object for the multi-work distributed computations framework. More...

#include <mwcoord.h>

Inheritance diagram for chomp::multiwork::mwCoordinator:
chomp::multiwork::mwTask chomp::multiwork::mwSubCoordinator< dim, coord >

List of all members.

Public Member Functions

 mwCoordinator ()
 The default constructor.
virtual ~mwCoordinator ()
 The virtual destructor.
void KeepWorkers (bool keep=true)
 Makes workers keep running after the coordinator's completion.
int SaveWorkers (const char *filename)
 Saves addresses of workers to the given file in the form of address:port.
void Init (mwData &data)
 Defines a portion of initialization data which will be sent to every newly connected worker.
int Coordinate (mwWorker *w=NULL)
 Run the coordinator until all the work has been completed.

Private Member Functions

virtual int Prepare (mwData &data)
 Prepares a piece of data to be sent to a worker.
virtual int Accept (mwData &data)
 Accepts a result received from a worker.
virtual int Reject (mwData &data)
 Acknowledges data that was rejected by a worker.
int RunLoop (bool no_more_data)
 Runs the main communication loop: Sends the data to workers and receives the results of their computations.
int RunLoopLocally ()
 Runs the main communication loop using the local worker.
void ConnectWorkers ()
 Connects to all workers in the list.
void BeginListening ()
 Begins listening at the given port.
void DisconnectAll ()
 Disconnects all the workers (normally called in the destructor).
int SendMessageC (int fd, unsigned int code, const mwData &x) const
 Sends a message with data to the given socket as a coordinator.
int RecvMessageC (int fd, unsigned int &code, mwData &x) const
 Receives a message with data from the socket of a worker.

Static Private Member Functions

static void mwTableDel (int *tab, int len, int pos)
 A helper function for deleting a table entry and shifting the remainder of the table backwards.
static void mwTableDel (mwData *tab, int len, int pos)
 A helper function for deleting a table entry and shifting the remainder of the table backwards using the method "Take" of the mwData class.

Private Attributes

bool singleWork
 Should data be processed locally only?
mwWorkerlocalWorker
 The address of a local worker or 0 if none is available.
bool keepWorkers
 Should the workers be kept running after coordinator is done?
mwData initData
 The initialization data that has to be sent to workers.
int nWaiting
 The number of workers waiting for their data.
mwWorkerData xWaiting [1024]
 The workers waiting for their tasks.
int nWorking
 The number of workers processing their data.
mwWorkerData xWorking [1024]
 The workers processing their data.
int nToDo
 The number of data pieces to be sent to working tasks.
mwData xToDo [1024]
 The data pieces waiting to be sent to working tasks.
int nRejected
 The number of recently rejected pieces of data.
mwData xRejected [1024]
 The recently rejected pieces of data.
int nDone
 The number of recently finished pieces of data.
mwData xDone [1024]
 The recently finished pieces of data.
int listensocket
 The socket number at which new workers are listened to.

Detailed Description

This class defines a generic coordinator task object for the multi-work distributed computations framework.

Each coordinator class defined by the user must inherit from this class.

Definition at line 127 of file mwcoord.h.


Constructor & Destructor Documentation

chomp::multiwork::mwCoordinator::mwCoordinator (  )  [inline]

The default constructor.

Definition at line 289 of file mwcoord.h.

                                    :
#if mwNETWORK
        singleWork (false),
#else
        singleWork (true),
#endif
        localWorker (0),
        keepWorkers (false),
        nWaiting (0),
        nWorking (0),
        nToDo (0),
        nRejected (0),
        nDone (0),
        listensocket (-1)
{
        return;
} /* mwCoordinator::mwCoordinator */

chomp::multiwork::mwCoordinator::~mwCoordinator (  )  [inline, virtual]

The virtual destructor.

Definition at line 343 of file mwcoord.h.

References DisconnectAll().

{
        // disconnect all the workers
        DisconnectAll ();
        
        return;
} /* mwCoordinator::~mwCoordinator */


Member Function Documentation

int chomp::multiwork::mwCoordinator::Accept ( mwData data  )  [inline, private, virtual]

Accepts a result received from a worker.

Should return mwOk on success, mwError to stop the computations. Please, overload this function in the class you derive from mwCoordinator.

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 453 of file mwcoord.h.

Referenced by Coordinate().

{
        return mwOk;
} /* mwCoordinator::Accept */

void chomp::multiwork::mwCoordinator::BeginListening (  )  [inline, private]

Begins listening at the given port.

Definition at line 604 of file mwcoord.h.

References listensocket, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwListen(), and chomp::multiwork::mwTask::Port().

Referenced by Coordinate().

{
        // if there is no valid port number then cancel this operation
        if (this -> Port () <= 0)
                return;

        // open a listening socket at the given port
        listensocket = mwListen (this -> Port (), 15);

        // add a message to the log file whether it was successful or not
        if (logFile)
        {
                if (listensocket < 0)
                        *logFile << "Listening attempt at port " <<
                                this -> Port () << " failed." << std::endl;
                else
                        *logFile << "Waiting for workers at port " <<
                                this -> Port () << "." << std::endl;
        }

        return;
} /* mwCoordinator::BeginListening */

void chomp::multiwork::mwCoordinator::ConnectWorkers (  )  [inline, private]

Connects to all workers in the list.

Definition at line 555 of file mwcoord.h.

References chomp::multiwork::mwTask::computers, initData, keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwConnect(), chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, mwMAXWORK, chomp::multiwork::mwOk, nWaiting, chomp::multiwork::mwTask::ports, SendMessageC(), and xWaiting.

Referenced by Coordinate().

{
        int nComputers = computers. size ();
        for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
        {
                // determine the computer name and port number
                const std::string &name = computers [i];
                int port = ports [i];

                // open the connection
                int fd = mwConnect (name. c_str (), port);

                // if the connection attempt was successful, add this worker
                if (fd >= 0)
                {
                        if (logFile)
                                *logFile << "Connected to " << name << ":" <<
                                        port << "." << std::endl;
                        mwWorkerData &w = xWaiting [nWaiting];
                        w. fd = fd;
                        w. name = name;
                        w. port = port;
                        ++ nWaiting;

                        // prepare the initialization code
                        int code = mwInitMsg |
                                (keepWorkers ? mwKeepMsg : mwDontKeepMsg);

                        // send the initialization data to the worker
                        int initResult = SendMessageC (w. fd, code,
                                initData);
                        if (initResult != mwOk)
                        {
                                if (logFile)
                                        *logFile << "Error while sending "
                                                "the initialization data." <<
                                                std::endl;
                                -- nWaiting;
                        }
                }

                // if unable to connect, make only a note in the log
                else if (logFile)
                        *logFile << "Connection attempt to " << name <<
                                ":" << port << " failed." << std::endl;
        }
        return;
} /* mwCoordinator::ConnectWorkers */

int chomp::multiwork::mwCoordinator::Coordinate ( mwWorker w = NULL  )  [inline]

Run the coordinator until all the work has been completed.

If a worker object is supplied, the work will be done locally unless any remote worker is connected to the coordinator. Return mwOk or mwError.

Definition at line 1062 of file mwcoord.h.

References Accept(), BeginListening(), ConnectWorkers(), initData, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, chomp::multiwork::mwNoData, chomp::multiwork::mwOk, nDone, nRejected, nToDo, nWaiting, nWorking, Prepare(), Reject(), RunLoop(), RunLoopLocally(), singleWork, xDone, xRejected, and xToDo.

{
        // remember the local worker's address
        localWorker = w;

        // initialize the local worker if any
        if (localWorker)
                localWorker -> Initialize (initData);

        if (logFile)
        {
                *logFile << "Running as a COORDINATOR." << std::endl;

                // indicate what kind of network connection is in use
                #if !mwNETWORK
                *logFile << "There is no network in use." << std::endl;
                #elif mwWXWIN
                *logFile << "Using the sockets interface "
                        "provided by wxWindows." << std::endl;
                #else
                *logFile << "Using the standard sockets "
                        "for network connections." << std::endl;
                #endif

                // say if running in the single-work mode only
                if (singleWork && localWorker)
                        *logFile << "Running in the single-work mode." <<
                                std::endl;
        }

        // connect to workers on the list and begin listening if necessary
        if (!singleWork)
        {
                this -> ConnectWorkers ();
                this -> BeginListening ();
        }

        // if not listening and there are no workers then try switching
        // to the single-work mode, unless there is no local worker
        if (!singleWork && (listensocket < 0) && !nWaiting)
        {
                // switch to the single-work mode if allowed to
                if (localWorker)
                {
                        singleWork = true;
                        if (logFile)
                                *logFile << "No remote workers. Switching "
                                        "to the single-work mode." <<
                                        std::endl;
                }

                // if data cannot be processed locally, report a failure
                else
                {
                        if (logFile)
                                *logFile << "Failure: No workers." <<
                                        std::endl;
                        return mwError;
                }
        }

        // is there no more data to be sent?
        bool no_more_data = false;

        while (1)
        {
                // run the communications loop
                int loopresult = singleWork ? this -> RunLoopLocally () :
                        this -> RunLoop (no_more_data);

                // stop if the coordinator failed badly
                if (loopresult == mwError)
                        return mwError;

                // if some data was rejected, process all this data
                while (nRejected > 0)
                {
                        // run the user's procedure to acquire rejected data
                        -- nRejected;
                        xRejected [nRejected]. Rewind ();
                        int result = this -> Reject (xRejected [nRejected]);

                        // interrupt if the user says that an error occurred
                        if (result != mwOk)
                                return mwError;

                        // reset the data acquired by the user
                        xRejected [nRejected]. Reset ();
                        no_more_data = false;
                }

                // if some new data arrived, process all this data
                while (nDone > 0)
                {
                        // call the user's procedure to accept the data piece
                        -- nDone;
                        xDone [nDone]. Rewind ();
                        int result = this -> Accept (xDone [nDone]);

                        // interrupt if the user says that an error occurred
                        if (result != mwOk)
                                return mwError;

                        // reset the data acquired by the user
                        xDone [nDone]. Reset ();
                        no_more_data = false;
                }

                // determine whether a new data item must be prepared:
                // if there are workers waiting and there is no data then YES
                bool hungry = (nWaiting > 0) && !nToDo;
                // if there is no worker and no data then YES
                if ((localWorker || (listensocket >= 0)) &&
                        !nWorking && !nWaiting && !nToDo)
                {
                        hungry = true;
                }
                // if no more data is needed then definitely NO
                if (no_more_data)
                        hungry = false;

                // prepare a new data item if necessary
                if (hungry)
                {
                        // run the user's procedure for preparing data
                        int result = this -> Prepare (xToDo [nToDo]);

                        // break if the user says that an error occurred
                        if (result == mwError)
                                return mwError;

                        // make a note if there is no more data
                        else if (result == mwNoData)
                                no_more_data = true;

                        // add the data piece to the work queue otherwise
                        else
                                ++ nToDo;
                }

                // stop if the tasks are completed and there is no more data
                if (no_more_data && !nWorking && !nToDo)
                        return mwOk;
        }
} /* mwCoordinator::Coordinate */

void chomp::multiwork::mwCoordinator::DisconnectAll (  )  [inline, private]

Disconnects all the workers (normally called in the destructor).

Definition at line 307 of file mwcoord.h.

References keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwKeepMsg, nWaiting, nWorking, SendMessageC(), xWaiting, and xWorking.

Referenced by ~mwCoordinator().

{
        // release all the workers if they are still connected
        for (int i = 0; i < nWaiting + nWorking; ++ i)
        {
                // determine the particular worker
                mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
                        xWorking [i - nWaiting];

                // if this worker is already disconnected, take the next one
                if (w. fd < 0)
                        continue;

                // prepare the code to send to the worker
                unsigned int code = mwByeMsg;
                code |= keepWorkers ? mwKeepMsg : mwDontKeepMsg;

                // send the 'Bye!' message to the worker
                mwData empty;
                SendMessageC (w. fd, code, empty);

                // disconnect the worker
                mwDisconnect (w. fd);
                w. fd = -1;

                // make a note of what happened in the log file
                if (logFile)
                        *logFile << "Worker " << i << " (" << w. name <<
                                ":" << w. port << ") disconnected and " <<
                                (keepWorkers ? "waiting." : "exited.") <<
                                std::endl;
        }

        return;
} /* mwCoordinator::Disconnect */

void chomp::multiwork::mwCoordinator::Init ( mwData data  )  [inline]

Defines a portion of initialization data which will be sent to every newly connected worker.

The argument variable is reset and the data is moved to the internal buffer.

Definition at line 465 of file mwcoord.h.

References initData.

{
        initData. Take (data);
        return;
} /* mwCoordinator::Init */

void chomp::multiwork::mwCoordinator::KeepWorkers ( bool  keep = true  )  [inline]

Makes workers keep running after the coordinator's completion.

Definition at line 351 of file mwcoord.h.

References keepWorkers.

{
        keepWorkers = keep;
        return;
} /* mwCoordinator::KeepWorkers */

void chomp::multiwork::mwCoordinator::mwTableDel ( int *  tab,
int  len,
int  pos 
) [inline, static, private]

A helper function for deleting a table entry and shifting the remainder of the table backwards.

Definition at line 1210 of file mwcoord.h.

{
        for (int i = pos + 1; i < len; ++ i)
                tab [i - 1] = tab [i];
        return;
} /* mwTableDel */

void chomp::multiwork::mwCoordinator::mwTableDel ( mwData tab,
int  len,
int  pos 
) [inline, static, private]

A helper function for deleting a table entry and shifting the remainder of the table backwards using the method "Take" of the mwData class.

Definition at line 1219 of file mwcoord.h.

{
        for (int i = pos + 1; i < len; ++ i)
                tab [i - 1]. Take (tab [i]);
        return;
} /* mwTableDel */

int chomp::multiwork::mwCoordinator::Prepare ( mwData data  )  [inline, private, virtual]

Prepares a piece of data to be sent to a worker.

Should return mwOk on success, mwNothing if there is no data to send based on the computations completed so far, mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator.

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 448 of file mwcoord.h.

Referenced by Coordinate().

{
        return mwNoData;
} /* mwCoordinator::Prepare */

int chomp::multiwork::mwCoordinator::RecvMessageC ( int  fd,
unsigned int &  code,
mwData x 
) const [inline, private]

Receives a message with data from the socket of a worker.

Returns mwOk on success or mwError in the case of failure.

Definition at line 484 of file mwcoord.h.

References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().

Referenced by RunLoop().

{
        // receive the message
        unsigned int ctrl = 0;
        int result = this -> RecvMessage (fd, ctrl, code, x);

        // if there was an error then return its code
        if (result != mwOk)
                return result;

        // if the control number is correct then finish successfully
        if (ctrl == ~(this -> ControlNumber ()))
                return mwOk;

        // if the control number is wrong then return an error code
        if (logFile)
                *logFile << "Wrong control code received "
                        "from the worker: " << ~ctrl << "." << std::endl;
        return mwError;
} /* mwCoordinator::SendMessageC */

int chomp::multiwork::mwCoordinator::Reject ( mwData data  )  [inline, private, virtual]

Acknowledges data that was rejected by a worker.

Should return mwOk if this is fine, or mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator unless the default behavior is fine (returning mwOk and ignoring the data).

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 458 of file mwcoord.h.

Referenced by Coordinate().

{
        return mwOk;
} /* mwCoordinator::Reject */

int chomp::multiwork::mwCoordinator::RunLoop ( bool  no_more_data  )  [inline, private]

Runs the main communication loop: Sends the data to workers and receives the results of their computations.

If there is no_more_data then waits for the running tasks until they finalize and some data becomes available. Otherwise, returns also when a new worker is ready to acquire a portion of data for processing. Returns mwOk or mwError.

Definition at line 627 of file mwcoord.h.

References initData, keepWorkers, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwCanRead, chomp::multiwork::mwCanWrite, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, mwMAXWORK, chomp::multiwork::mwOk, chomp::multiwork::mwPortMsg, chomp::multiwork::mwReject, chomp::multiwork::mwRejectedMsg, chomp::multiwork::mwSelect(), chomp::multiwork::mwTimeOut, nDone, nRejected, nToDo, nWaiting, nWorking, RecvMessageC(), SendMessageC(), singleWork, chomp::multiwork::swap(), chomp::multiwork::mwTask::TimeOut(), xDone, xRejected, xToDo, xWaiting, and xWorking.

Referenced by Coordinate().

{
        if (false && logFile)
                *logFile << "\nDebug0: " << nWaiting << " waiting, " <<
                        nWorking << " working, " << nToDo <<
                        " data pieces." << std::endl;
        
        // if not listening and there are no workers then try switching
        // to the single-work mode, unless there is no local worker
        if ((listensocket < 0) && !nWorking && !nWaiting)
        {
                // switch to the single-work mode if allowed to
                if (localWorker)
                {
                        singleWork = true;
                        if (logFile)
                                *logFile << "All workers disconnected. "
                                        "Switching to the single-work "
                                        "mode." << std::endl;
                        return mwOk;
                }

                // if data cannot be processed locally, report a failure
                else
                {
                        if (logFile)
                                *logFile << "Failure: All workers "
                                        "disconnected. The work cannot be "
                                        "continued." << std::endl;
                        return mwError;
                }
        }

        // TO DO: If there is no more data (options & mwNoMoreData)
        // and some workers are idle, and some others are working
        // for a long time, send their data also to a few idle
        // workers... This requires some A.I., of course. ;)

        // the i/o flags ans sockets of the workers + one for 'listensocket'
        int ioflags [mwMAXWORK];
        int sockets [mwMAXWORK];
        int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;

        // prepare flags for the working and waiting workers
        for (int i = 0; i < nWorking; ++ i)
        {
                ioflags [i] = mwCanRead;
                sockets [i] = xWorking [i]. fd;
        }
        if (nWorkers > nWorking)
        {
                for (int i = 0; i < nWaiting; ++ i)
                {
                        ioflags [nWorking + i] = mwCanWrite;
                        sockets [nWorking + i] = xWaiting [i]. fd;
                }
        }

        // prepare the listen socket flag
        bool listening = false;
        int listenflag = nWorkers;
        if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
        {
                ioflags [listenflag] = mwCanRead;
                listening = true;
        }
        else
                ioflags [listenflag] = mwNone;

        // determine whether it is necessary to wait or not
        int timelimit = this -> TimeOut ();
        if (localWorker && !nWorking && !nWaiting)
                timelimit = 0;
        if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
                timelimit = 0;
        if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
                timelimit = 0;
//      if (no_more_data && !nToDo && !nWorking)
//              timelimit = 0;

        // report the select's parameters to the log file
        if (logFile)
        {
                *logFile << ">>> Select, t = " << timelimit << ", flags =";
                for (int i = 0; i <= nWorkers; ++ i)
                        *logFile << " " << ioflags [i];
                *logFile << "." << std::endl;
        }

        // wait until data can be received from or sent to any socket
        int result = mwSelect (sockets, nWorkers,
                listening ? listensocket : -1, ioflags, timelimit);

        // report the returned parametrs to the log file
        if (logFile)
        {
                *logFile << ">>> Returned flags =";
                for (int i = 0; i <= nWorkers; ++ i)
                        *logFile << " " << ioflags [i];
                *logFile << "." << std::endl;
        }

        // in case of select's failure, exit the loop with a failure message
        if (result == mwError)
        {
                if (logFile)
                        *logFile << "Error: The 'select' function failed." <<
                                std::endl;
                return mwError;
        }
                                
        // report a time-out if necessary
        if ((timelimit > 0) && (result == mwTimeOut))
        {
                if (logFile)
                        *logFile << "Time-out occurred at 'select'." <<
                                std::endl;
        }

        // receive data from all the workers who are ready
        for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
                (nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
        {
                // if this worker is not ready, yet, then skip it
                if (!(ioflags [i] & mwCanRead))
                        continue;

                // receive the entire data chunk from the worker
                unsigned int code = 0;
                int result = RecvMessageC (sockets [i], code, xDone [nDone]);

                // remember which worker this is
                mwWorkerData &w = xWorking [i];

                // reject the worker in case of error
                if (result < 0)
                {
                        // log the details of what happened
                        if (logFile)
                        {
                                *logFile << "Worker " << i;
                                if (!w. name. empty ())
                                        *logFile << " (" << w. name << ")";
                                *logFile << " disconnected: ";
                                if (result == mwLost)
                                        *logFile << "Connection lost.";
                                else
                                        *logFile << "An error occurred.";
                                *logFile << std::endl;
                        }

                        // disconnect the worker
                        mwDisconnect (sockets [i]);

                        // move the worker's data back to the "to-do" list
                        xToDo [nToDo]. Take (w. data);
                        ++ nToDo;

                        // make a note of this in the worker's data
                        w. fd = -1;
                        w. status = -1;
                }

                // if transmitting the port number only, take it
                else if (code & mwPortMsg)
                {
                        // retrieve the port number
                        xDone [nDone] >> w. port;

                        // report this fact to the log file
                        if (logFile)
                                *logFile << "Port number " << w. port <<
                                        " received from worker " << i <<
                                        "." << std::endl;
                }
        
                // if the data was rejected, move it to 'xRejected'
                else if (code & mwRejectedMsg)
                {
                        // report what happened
                        if (logFile)
                                *logFile << "Data was rejected by worker " <<
                                        i << "." << std::endl;

                        // move the data to the rejected table
                        xRejected [nRejected]. Take (w. data);
                        ++ nRejected;

                        // indicate the status change of this worker
                        w. status = 1;
                }

                // if the data was Ok, the result is in 'xDone'
                else
                {
                        // report this to the log file
                        if (logFile)
                                *logFile << "Processed data received from "
                                        "worker " << i << "." << std::endl;

                        // accept this piece of data
                        ++ nDone;

                        // indicate the status change of this worker
                        w. status = 1;
                }
        }

        // send data to those workers who are ready
        for (int i = 0; (i < nWaiting) && nToDo; ++ i)
        {
                // remember the worker and its offset
                mwWorkerData &w = xWaiting [i];
                int offset = nWorking + i;

                // if this worker is not ready to get data then skip it
                if (!(ioflags [offset] & mwCanWrite))
                        continue;

                // prepare a message code to send
                unsigned int code = mwStdMsg;

                // send a data chunk for processing
                int result = SendMessageC (sockets [offset], code,
                        xToDo [nToDo - 1]);

                // if the data was sent successfully
                if (result == mwOk)
                {
                        // take the data chunk to the worker
                        -- nToDo;
                        w. data. Take (xToDo [nToDo]);

                        // indicate the status of the worker
                        w. status = 1;

                        // report this to the log file
                        if (logFile)
                                *logFile << "Data " << nToDo << " sent to "
                                        "worker " << i << "." << std::endl;
                }
        
                // if an error occurred, reject the worker
                else
                {
                        // report this situation to the log file
                        if (logFile)
                                *logFile << "Worker " << i << " disconnected"
                                        ": " << ((result == mwLost) ?
                                        "Connection lost." :
                                        "An error occurred.") << std::endl;

                        // disconnect the worker
                        mwDisconnect (sockets [offset]);

                        // modify the status of the worker accordingly
                        w. fd = -1;
                        w. status = -1;
                }
        }

        if (false && logFile)
        {
                *logFile << "Debug1: " << nWaiting << " waiting:";
                for (int i = 0; i < nWaiting; ++ i)
                        *logFile << " " << xWaiting [i]. status;
                *logFile << "; " << nWorking << " working:";
                for (int i = 0; i < nWorking; ++ i)
                        *logFile << " " << xWorking [i]. status;
                *logFile << std::endl;
        }
        
        // purge workers who have been disconnected and move workers
        // whose data has been acquired to the 'waiting' queue
        for (int i = 0; i < nWorking; ++ i)
        {
                // skip this worker if it is fine
                if (xWorking [i]. status == 0)
                        continue;

                // swap this worker with the last one if necessary
                if (i < nWorking - 1)
                        swap (xWorking [i], xWorking [nWorking - 1]);
        
                // move the worker to the waiting queue if necessary
                if (xWorking [nWorking - 1]. status > 0)
                {
                        xWorking [nWorking - 1]. status = 0;
                        swap (xWaiting [nWaiting], xWorking [nWorking - 1]);
                        ++ nWaiting;
                }

                // remove this worker from the working queue
                -- nWorking;

                // consider the same entry in the table again
                -- i;
        }

        // purge workers who have been disconnected and move workers
        // who received data to the 'working' queue
        for (int i = 0; i < nWaiting; ++ i)
        {
                // skip this worker if it is fine
                if (xWaiting [i]. status == 0)
                        continue;

                // swap this worker with the last one if necessary
                if (i < nWaiting - 1)
                        swap (xWaiting [i], xWaiting [nWaiting - 1]);

                // move the worker to the working queue if necessary
                if (xWaiting [nWaiting - 1]. status > 0)
                {
                        xWaiting [nWaiting - 1]. status = 0;
                        swap (xWorking [nWorking], xWaiting [nWaiting - 1]);
                        ++ nWorking;
                }

                // remove this worker from the waiting queue
                -- nWaiting;

                // consider the same entry in the table again
                -- i;
        }

        if (false && logFile)
                *logFile << "Debug2: " << nWaiting << " waiting, " <<
                        nWorking << " working, " << nToDo <<
                        " data pieces." << std::endl;
        
        // accept connections from new workers if any
        if (listening && (ioflags [listenflag] & mwCanRead) &&
                (nWaiting + nWorking < mwMAXWORK))
        {
                // accept the connection
                mwWorkerData &w = xWaiting [nWaiting];
                w. name = std::string ("");
                w. port = 0;
                w. status = 0;
                w. fd = mwAccept (listensocket, w. name);

                // if the new worker has been accepted successfully
                if (w. fd >= 0)
                {
                        // report the worker's acceptance
                        if (logFile)
                                *logFile << "A worker from '" << w. name <<
                                        "' accepted." << std::endl;

                        // add the worker to the waiting queue
                        ++ nWaiting;

                        // prepare the initialization code
                        int code = mwInitMsg |
                                (keepWorkers ? mwKeepMsg : mwDontKeepMsg);

                        // send the initialization data to the worker
                        int initResult = SendMessageC (w. fd, code,
                                initData);
                        if (initResult != mwOk)
                        {
                                if (logFile)
                                        *logFile << "Error while sending "
                                                "the initialization data." <<
                                                std::endl;
                                -- nWaiting;
                        }
                }

                // report the error to the log file if not successful
                else if (logFile)
                        *logFile << "Unsuccessful connection of a worker "
                                "from '" << w. name << "'." << std::endl;
        }

        if (false && logFile)
                *logFile << "Debug3: " << nWaiting << " waiting, " <<
                        nWorking << " working, " << nToDo <<
                        " data pieces." << std::endl;
        
        // ask for some data to process if none is waiting
        // or too much data acquired from workers is accumulated
        if (localWorker && !nWorking && !nWaiting &&
                !no_more_data && (result == mwTimeOut) && !nToDo)
        {
                if (logFile)
                        *logFile << "Asking for some data to be "
                                "processed locally." << std::endl;
                return mwOk;
        }

        // run the work locally as a last resort
        if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
                nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
        {
                // make a note in the log file of what is going on
                if (logFile)
                        *logFile << "Processing data locally." << std::endl;

                // process one piece of data
                -- nToDo;
                xToDo [nToDo]. Rewind ();
                int result = localWorker -> Process (xToDo [nToDo]);
                xToDo [nToDo]. Rewind ();

                if (result == mwReject)
                {
                        xRejected [nRejected]. Take (xToDo [nToDo]);
                        ++ nRejected;
                }
                else if (result == mwError)
                {
                        if (logFile)
                                *logFile << "Data processing failed." <<
                                        std::endl;
                        return mwError;
                }
                else
                {
                        xDone [nDone]. Take (xToDo [nToDo]);
                        ++ nDone;
                }
        }

        if (false && logFile)
                *logFile << "Debug4: " << nWaiting << " waiting, " <<
                        nWorking << " working, " << nToDo <<
                        " data pieces." << std::endl;
        
        return mwOk;
} /* mwCoordinator::RunLoop */

int chomp::multiwork::mwCoordinator::RunLoopLocally (  )  [inline, private]

Runs the main communication loop using the local worker.

Returns mwOk or mwError.

Definition at line 508 of file mwcoord.h.

References localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, mwMAXWORK, chomp::multiwork::mwReject, nDone, nRejected, nToDo, nWaiting, xDone, xRejected, and xToDo.

Referenced by Coordinate().

{
        if (!localWorker)
                return mwError;

        // indicate that some worker is continuously waiting for data
        if (!nWaiting)
                ++ nWaiting;

        // process the data
        while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
        {
                // process the data and replace it with the result
                -- nToDo;
                xToDo [nToDo]. Rewind ();
                int result = localWorker -> Process (xToDo [nToDo]);
                xToDo [nToDo]. Rewind ();

                // if the data caused an error then abort the loop
                if (result == mwError)
                {
                        if (logFile)
                                *logFile << "Data processing failed." <<
                                        std::endl;
                        return mwError;
                }

                // if the data was rejected
                // then move it to the right place
                else if (result == mwReject)
                {
                        xRejected [nRejected]. Take (xToDo [nToDo]);
                        ++ nRejected;
                }

                // if the data was accepted
                // then move it to the right place
                else
                {
                        xDone [nDone]. Take (xToDo [nToDo]);
                        ++ nDone;
                }
        }

        return mwOk;
} /* mwCoordinator::RunLoopLocally */

int chomp::multiwork::mwCoordinator::SaveWorkers ( const char *  filename  )  [inline]

Saves addresses of workers to the given file in the form of address:port.

Repeated combinations are saved once only.

Definition at line 357 of file mwcoord.h.

References keepWorkers, nWaiting, nWorking, xWaiting, and xWorking.

{
        // create a file for the list of workers
        // prepare to create a file to list the workers
        std::ofstream f;
        bool first = false;

        // go through all the connected workers
        int counter = 0;
        for (int i = 0; i < nWaiting + nWorking; ++ i)
        {
                // determine the particular worker
                mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
                        xWorking [i - nWaiting];

                // determine the host name
                const char *host = w. name. c_str ();

                // if no host name or port number is known, skip it
                if (!host || !*host || (w. port <= 0))
                        continue;

                // check if the same worker was already listed
                int j;
                for (j = 0; j < i; ++ j)
                {
                        // determine the other worker
                        mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
                                xWorking [j - nWaiting];

                        // determine the name of the other host
                        const char *host_j = v. name. c_str ();

                        // if no host name or port number is known, skip it
                        if (!host_j || !*host_j || (v. port <= 0))
                                continue;

                        // if the ports are different, skip the other worker
                        if (w. port != v. port)
                                continue;

                        // if the names are different, skip the other worker
                        if (std::strcmp (host, host_j))
                                continue;

                        // make a note that this is the same worker
                        j = i + 1;
                        break;
                }
                if (j > i)
                        continue;

                // create the file if this is the first time to write data
                if (first)
                {
                        f. open (filename, std::ios::out | std::ios::trunc);
                        if (!f)
                                return mwError;
                        f << "; A list of currently running workers:\n";
                        first = false;
                }

                // write the address
                f << host << ":" << w. port << "\n";
                ++ counter;
        }

        // exit if no workers have been listed
        if (!counter)
                return mwOk;

        // add a summary if any workers have been listed
        f << "; A total of " << counter << " workers";
        if (counter != nWaiting + nWorking)
                f << " out of " << (nWaiting + nWorking);
        f << " saved.\n";

        if (!keepWorkers)
                f << "; The workers will exit upon disconnection.\n";
        else
                f << "; The workers will remain running "
                        "after disconnection.\n";
        f. close ();
        if (!f)
                return mwError;
        else
                return mwOk;
} /* mwCoordinator::SaveWorkers */

int chomp::multiwork::mwCoordinator::SendMessageC ( int  fd,
unsigned int  code,
const mwData x 
) const [inline, private]

Sends a message with data to the given socket as a coordinator.

Returns mwOk on success and mwError in the case of failure.

Definition at line 474 of file mwcoord.h.

References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().

Referenced by ConnectWorkers(), DisconnectAll(), and RunLoop().

{
        // prepare the control number to send
        unsigned int ctrl = this -> ControlNumber ();

        // send the message with the control number and the message code
        return this -> SendMessage (fd, ctrl, code, x);
} /* mwCoordinator::SendMessageC */


Member Data Documentation

The initialization data that has to be sent to workers.

Definition at line 225 of file mwcoord.h.

Referenced by ConnectWorkers(), Coordinate(), Init(), and RunLoop().

Should the workers be kept running after coordinator is done?

Definition at line 220 of file mwcoord.h.

Referenced by ConnectWorkers(), DisconnectAll(), KeepWorkers(), RunLoop(), and SaveWorkers().

The socket number at which new workers are listened to.

Definition at line 262 of file mwcoord.h.

Referenced by BeginListening(), Coordinate(), and RunLoop().

The address of a local worker or 0 if none is available.

Definition at line 217 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The number of recently finished pieces of data.

Definition at line 254 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The number of recently rejected pieces of data.

Definition at line 248 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The number of data pieces to be sent to working tasks.

Definition at line 242 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The number of workers waiting for their data.

Definition at line 230 of file mwcoord.h.

Referenced by ConnectWorkers(), Coordinate(), DisconnectAll(), RunLoop(), RunLoopLocally(), and SaveWorkers().

The number of workers processing their data.

Definition at line 236 of file mwcoord.h.

Referenced by Coordinate(), DisconnectAll(), RunLoop(), and SaveWorkers().

Should data be processed locally only?

Definition at line 214 of file mwcoord.h.

Referenced by Coordinate(), and RunLoop().

The recently finished pieces of data.

Definition at line 257 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The recently rejected pieces of data.

Definition at line 251 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The data pieces waiting to be sent to working tasks.

Definition at line 245 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

The workers waiting for their tasks.

Definition at line 233 of file mwcoord.h.

Referenced by ConnectWorkers(), DisconnectAll(), RunLoop(), and SaveWorkers().

The workers processing their data.

Definition at line 239 of file mwcoord.h.

Referenced by DisconnectAll(), RunLoop(), and SaveWorkers().


The documentation for this class was generated from the following file: