mwcoord.h

Go to the documentation of this file.
00001 
00002 
00003 
00013 
00014 // Copyright (C) 1997-2010 by Pawel Pilarczyk.
00015 //
00016 // This file is part of the Homology Library.  This library is free software;
00017 // you can redistribute it and/or modify it under the terms of the GNU
00018 // General Public License as published by the Free Software Foundation;
00019 // either version 2 of the License, or (at your option) any later version.
00020 //
00021 // This library is distributed in the hope that it will be useful,
00022 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00023 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00024 // GNU General Public License for more details.
00025 //
00026 // You should have received a copy of the GNU General Public License along
00027 // with this software; see the file "license.txt".  If not, write to the
00028 // Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00029 // MA 02111-1307, USA.
00030 
00031 // Started on August 11, 2004. Last revision: December 11, 2007.
00032 
00033 
00034 #ifndef _CHOMP_MULTIWORK_MWCOORD_H_
00035 #define _CHOMP_MULTIWORK_MWCOORD_H_
00036 
00037 #include <algorithm>
00038 #include <string>
00039 #include <cstring>
00040 
00041 #include "chomp/multiwork/mwconfig.h"
00042 #include "chomp/multiwork/mwlowlev.h"
00043 #include "chomp/multiwork/mwdata.h"
00044 #include "chomp/multiwork/mwtask.h"
00045 
00046 
00047 namespace chomp {
00048 namespace multiwork {
00049 
00050 class mwWorker;
00051 
00052 
00053 // --------------------------------------------------
00054 // ------------------ mwWorkerData ------------------
00055 // --------------------------------------------------
00056 
00059 class mwWorkerData
00060 {
00061 public:
00063         mwWorkerData ();
00064 
00066         ~mwWorkerData ();
00067 
00069         mwData data;
00070 
00074         int fd;
00075 
00077         std::string name;
00078 
00080         int port;
00081 
00084         int status;
00085 
00087         friend void swap (mwWorkerData &data1, mwWorkerData &data2);
00088 
00089 private:
00091         mwWorkerData (const mwWorkerData &) {return;};
00092 
00094         mwWorkerData &operator = (const mwWorkerData &) {return *this;};
00095 
00096 }; /* class mwWorkerData */
00097 
00098 inline mwWorkerData::mwWorkerData ():
00099         fd (-1), name (""), port (0), status (0)
00100 {
00101         return;
00102 } /* mwWorkerData::mwWorkerData */
00103 
00104 inline mwWorkerData::~mwWorkerData ()
00105 {
00106         return;
00107 } /* mwWorkerData::~mwWorkerData */
00108 
00109 inline void swap (mwWorkerData &data1, mwWorkerData &data2)
00110 {
00111         swap (data1. data, data2. data);
00112         std::swap (data1. fd, data2. fd);
00113         std::swap (data1. name, data2. name);
00114         std::swap (data1. port, data2. port);
00115         std::swap (data1. status, data2. status);
00116         return;
00117 } /* swap */
00118 
00119 
00120 // --------------------------------------------------
00121 // ----------------- mwCoordinator ------------------
00122 // --------------------------------------------------
00123 
00127 class mwCoordinator: public virtual mwTask
00128 {
00129 public:
00130         // --- constructor/destructor ---
00131         
00133         mwCoordinator ();
00134                 
00136         virtual ~mwCoordinator ();
00137 
00138         // --- configuration ---
00139 
00141         void KeepWorkers (bool keep = true);
00142 
00145         int SaveWorkers (const char *filename);
00146 
00147         // --- initialization data ---
00148 
00152         void Init (mwData &data);
00153 
00154         // --- run the coordinator ---
00155 
00160         int Coordinate (mwWorker *w = NULL);
00161 
00162 private:
00163         // --- coordinator's job ---
00164 
00171         virtual int Prepare (mwData &data);
00172         
00177         virtual int Accept (mwData &data);
00178         
00185         virtual int Reject (mwData &data);
00186 
00187         // --- the coordinator's functions ---
00188 
00196         int RunLoop (bool no_more_data);
00197 
00200         int RunLoopLocally ();
00201 
00203         void ConnectWorkers ();
00204 
00206         void BeginListening ();
00207 
00209         void DisconnectAll ();
00210 
00211         // --- configuration ---
00212 
00214         bool singleWork;
00215         
00217         mwWorker *localWorker;
00218 
00220         bool keepWorkers;
00221 
00222         // --- initialization data ---
00223 
00225         mwData initData;
00226 
00227         // --- data processing ---
00228 
00230         int nWaiting;
00231 
00233         mwWorkerData xWaiting [mwMAXWORK];
00234 
00236         int nWorking;
00237 
00239         mwWorkerData xWorking [mwMAXWORK];
00240 
00242         int nToDo;
00243 
00245         mwData xToDo [mwMAXWORK];
00246 
00248         int nRejected;
00249 
00251         mwData xRejected [mwMAXWORK];
00252 
00254         int nDone;
00255 
00257         mwData xDone [mwMAXWORK];
00258 
00259         // --- data used for the communication with workers ---
00260 
00262         int listensocket;
00263 
00264         // --- operations on arrays (tables) ---
00265 
00268         static void mwTableDel (int *tab, int len, int pos);
00269 
00273         static void mwTableDel (mwData *tab, int len, int pos);
00274 
00275         // --- network communiation ---
00276 
00279         int SendMessageC (int fd, unsigned int code, const mwData &x) const;
00280 
00283         int RecvMessageC (int fd, unsigned int &code, mwData &x) const;
00284 
00285 }; /* class mwCoordinator */
00286 
00287 // --------------------------------------------------
00288 
00289 inline mwCoordinator::mwCoordinator ():
00290 #if mwNETWORK
00291         singleWork (false),
00292 #else
00293         singleWork (true),
00294 #endif
00295         localWorker (0),
00296         keepWorkers (false),
00297         nWaiting (0),
00298         nWorking (0),
00299         nToDo (0),
00300         nRejected (0),
00301         nDone (0),
00302         listensocket (-1)
00303 {
00304         return;
00305 } /* mwCoordinator::mwCoordinator */
00306 
00307 inline void mwCoordinator::DisconnectAll ()
00308 {
00309         // release all the workers if they are still connected
00310         for (int i = 0; i < nWaiting + nWorking; ++ i)
00311         {
00312                 // determine the particular worker
00313                 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
00314                         xWorking [i - nWaiting];
00315 
00316                 // if this worker is already disconnected, take the next one
00317                 if (w. fd < 0)
00318                         continue;
00319 
00320                 // prepare the code to send to the worker
00321                 unsigned int code = mwByeMsg;
00322                 code |= keepWorkers ? mwKeepMsg : mwDontKeepMsg;
00323 
00324                 // send the 'Bye!' message to the worker
00325                 mwData empty;
00326                 SendMessageC (w. fd, code, empty);
00327 
00328                 // disconnect the worker
00329                 mwDisconnect (w. fd);
00330                 w. fd = -1;
00331 
00332                 // make a note of what happened in the log file
00333                 if (logFile)
00334                         *logFile << "Worker " << i << " (" << w. name <<
00335                                 ":" << w. port << ") disconnected and " <<
00336                                 (keepWorkers ? "waiting." : "exited.") <<
00337                                 std::endl;
00338         }
00339 
00340         return;
00341 } /* mwCoordinator::Disconnect */
00342 
00343 inline mwCoordinator::~mwCoordinator ()
00344 {
00345         // disconnect all the workers
00346         DisconnectAll ();
00347         
00348         return;
00349 } /* mwCoordinator::~mwCoordinator */
00350 
00351 inline void mwCoordinator::KeepWorkers (bool keep)
00352 {
00353         keepWorkers = keep;
00354         return;
00355 } /* mwCoordinator::KeepWorkers */
00356 
00357 inline int mwCoordinator::SaveWorkers (const char *filename)
00358 {
00359         // create a file for the list of workers
00360         // prepare to create a file to list the workers
00361         std::ofstream f;
00362         bool first = false;
00363 
00364         // go through all the connected workers
00365         int counter = 0;
00366         for (int i = 0; i < nWaiting + nWorking; ++ i)
00367         {
00368                 // determine the particular worker
00369                 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
00370                         xWorking [i - nWaiting];
00371 
00372                 // determine the host name
00373                 const char *host = w. name. c_str ();
00374 
00375                 // if no host name or port number is known, skip it
00376                 if (!host || !*host || (w. port <= 0))
00377                         continue;
00378 
00379                 // check if the same worker was already listed
00380                 int j;
00381                 for (j = 0; j < i; ++ j)
00382                 {
00383                         // determine the other worker
00384                         mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
00385                                 xWorking [j - nWaiting];
00386 
00387                         // determine the name of the other host
00388                         const char *host_j = v. name. c_str ();
00389 
00390                         // if no host name or port number is known, skip it
00391                         if (!host_j || !*host_j || (v. port <= 0))
00392                                 continue;
00393 
00394                         // if the ports are different, skip the other worker
00395                         if (w. port != v. port)
00396                                 continue;
00397 
00398                         // if the names are different, skip the other worker
00399                         if (std::strcmp (host, host_j))
00400                                 continue;
00401 
00402                         // make a note that this is the same worker
00403                         j = i + 1;
00404                         break;
00405                 }
00406                 if (j > i)
00407                         continue;
00408 
00409                 // create the file if this is the first time to write data
00410                 if (first)
00411                 {
00412                         f. open (filename, std::ios::out | std::ios::trunc);
00413                         if (!f)
00414                                 return mwError;
00415                         f << "; A list of currently running workers:\n";
00416                         first = false;
00417                 }
00418 
00419                 // write the address
00420                 f << host << ":" << w. port << "\n";
00421                 ++ counter;
00422         }
00423 
00424         // exit if no workers have been listed
00425         if (!counter)
00426                 return mwOk;
00427 
00428         // add a summary if any workers have been listed
00429         f << "; A total of " << counter << " workers";
00430         if (counter != nWaiting + nWorking)
00431                 f << " out of " << (nWaiting + nWorking);
00432         f << " saved.\n";
00433 
00434         if (!keepWorkers)
00435                 f << "; The workers will exit upon disconnection.\n";
00436         else
00437                 f << "; The workers will remain running "
00438                         "after disconnection.\n";
00439         f. close ();
00440         if (!f)
00441                 return mwError;
00442         else
00443                 return mwOk;
00444 } /* mwCoordinator::SaveWorkers */
00445 
00446 // --------------------------------------------------
00447 
00448 inline int mwCoordinator::Prepare (mwData &)
00449 {
00450         return mwNoData;
00451 } /* mwCoordinator::Prepare */
00452 
00453 inline int mwCoordinator::Accept (mwData &)
00454 {
00455         return mwOk;
00456 } /* mwCoordinator::Accept */
00457 
00458 inline int mwCoordinator::Reject (mwData &)
00459 {
00460         return mwOk;
00461 } /* mwCoordinator::Reject */
00462 
00463 // --------------------------------------------------
00464 
00465 inline void mwCoordinator::Init (mwData &data)
00466 {
00467         initData. Take (data);
00468         return;
00469 } /* mwCoordinator::Init */
00470 
00471 
00472 // --------------------------------------------------
00473 
00474 inline int mwCoordinator::SendMessageC (int fd, unsigned int code,
00475         const mwData &x) const
00476 {
00477         // prepare the control number to send
00478         unsigned int ctrl = this -> ControlNumber ();
00479 
00480         // send the message with the control number and the message code
00481         return this -> SendMessage (fd, ctrl, code, x);
00482 } /* mwCoordinator::SendMessageC */
00483 
00484 inline int mwCoordinator::RecvMessageC (int fd, unsigned int &code,
00485         mwData &x) const
00486 {
00487         // receive the message
00488         unsigned int ctrl = 0;
00489         int result = this -> RecvMessage (fd, ctrl, code, x);
00490 
00491         // if there was an error then return its code
00492         if (result != mwOk)
00493                 return result;
00494 
00495         // if the control number is correct then finish successfully
00496         if (ctrl == ~(this -> ControlNumber ()))
00497                 return mwOk;
00498 
00499         // if the control number is wrong then return an error code
00500         if (logFile)
00501                 *logFile << "Wrong control code received "
00502                         "from the worker: " << ~ctrl << "." << std::endl;
00503         return mwError;
00504 } /* mwCoordinator::SendMessageC */
00505 
00506 // --------------------------------------------------
00507 
00508 inline int mwCoordinator::RunLoopLocally ()
00509 {
00510         if (!localWorker)
00511                 return mwError;
00512 
00513         // indicate that some worker is continuously waiting for data
00514         if (!nWaiting)
00515                 ++ nWaiting;
00516 
00517         // process the data
00518         while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
00519         {
00520                 // process the data and replace it with the result
00521                 -- nToDo;
00522                 xToDo [nToDo]. Rewind ();
00523                 int result = localWorker -> Process (xToDo [nToDo]);
00524                 xToDo [nToDo]. Rewind ();
00525 
00526                 // if the data caused an error then abort the loop
00527                 if (result == mwError)
00528                 {
00529                         if (logFile)
00530                                 *logFile << "Data processing failed." <<
00531                                         std::endl;
00532                         return mwError;
00533                 }
00534 
00535                 // if the data was rejected
00536                 // then move it to the right place
00537                 else if (result == mwReject)
00538                 {
00539                         xRejected [nRejected]. Take (xToDo [nToDo]);
00540                         ++ nRejected;
00541                 }
00542 
00543                 // if the data was accepted
00544                 // then move it to the right place
00545                 else
00546                 {
00547                         xDone [nDone]. Take (xToDo [nToDo]);
00548                         ++ nDone;
00549                 }
00550         }
00551 
00552         return mwOk;
00553 } /* mwCoordinator::RunLoopLocally */
00554 
00555 inline void mwCoordinator::ConnectWorkers ()
00556 {
00557         int nComputers = computers. size ();
00558         for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
00559         {
00560                 // determine the computer name and port number
00561                 const std::string &name = computers [i];
00562                 int port = ports [i];
00563 
00564                 // open the connection
00565                 int fd = mwConnect (name. c_str (), port);
00566 
00567                 // if the connection attempt was successful, add this worker
00568                 if (fd >= 0)
00569                 {
00570                         if (logFile)
00571                                 *logFile << "Connected to " << name << ":" <<
00572                                         port << "." << std::endl;
00573                         mwWorkerData &w = xWaiting [nWaiting];
00574                         w. fd = fd;
00575                         w. name = name;
00576                         w. port = port;
00577                         ++ nWaiting;
00578 
00579                         // prepare the initialization code
00580                         int code = mwInitMsg |
00581                                 (keepWorkers ? mwKeepMsg : mwDontKeepMsg);
00582 
00583                         // send the initialization data to the worker
00584                         int initResult = SendMessageC (w. fd, code,
00585                                 initData);
00586                         if (initResult != mwOk)
00587                         {
00588                                 if (logFile)
00589                                         *logFile << "Error while sending "
00590                                                 "the initialization data." <<
00591                                                 std::endl;
00592                                 -- nWaiting;
00593                         }
00594                 }
00595 
00596                 // if unable to connect, make only a note in the log
00597                 else if (logFile)
00598                         *logFile << "Connection attempt to " << name <<
00599                                 ":" << port << " failed." << std::endl;
00600         }
00601         return;
00602 } /* mwCoordinator::ConnectWorkers */
00603 
00604 inline void mwCoordinator::BeginListening ()
00605 {
00606         // if there is no valid port number then cancel this operation
00607         if (this -> Port () <= 0)
00608                 return;
00609 
00610         // open a listening socket at the given port
00611         listensocket = mwListen (this -> Port (), 15);
00612 
00613         // add a message to the log file whether it was successful or not
00614         if (logFile)
00615         {
00616                 if (listensocket < 0)
00617                         *logFile << "Listening attempt at port " <<
00618                                 this -> Port () << " failed." << std::endl;
00619                 else
00620                         *logFile << "Waiting for workers at port " <<
00621                                 this -> Port () << "." << std::endl;
00622         }
00623 
00624         return;
00625 } /* mwCoordinator::BeginListening */
00626 
00627 inline int mwCoordinator::RunLoop (bool no_more_data)
00628 {
00629         if (false && logFile)
00630                 *logFile << "\nDebug0: " << nWaiting << " waiting, " <<
00631                         nWorking << " working, " << nToDo <<
00632                         " data pieces." << std::endl;
00633         
00634         // if not listening and there are no workers then try switching
00635         // to the single-work mode, unless there is no local worker
00636         if ((listensocket < 0) && !nWorking && !nWaiting)
00637         {
00638                 // switch to the single-work mode if allowed to
00639                 if (localWorker)
00640                 {
00641                         singleWork = true;
00642                         if (logFile)
00643                                 *logFile << "All workers disconnected. "
00644                                         "Switching to the single-work "
00645                                         "mode." << std::endl;
00646                         return mwOk;
00647                 }
00648 
00649                 // if data cannot be processed locally, report a failure
00650                 else
00651                 {
00652                         if (logFile)
00653                                 *logFile << "Failure: All workers "
00654                                         "disconnected. The work cannot be "
00655                                         "continued." << std::endl;
00656                         return mwError;
00657                 }
00658         }
00659 
00660         // TO DO: If there is no more data (options & mwNoMoreData)
00661         // and some workers are idle, and some others are working
00662         // for a long time, send their data also to a few idle
00663         // workers... This requires some A.I., of course. ;)
00664 
00665         // the i/o flags ans sockets of the workers + one for 'listensocket'
00666         int ioflags [mwMAXWORK];
00667         int sockets [mwMAXWORK];
00668         int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;
00669 
00670         // prepare flags for the working and waiting workers
00671         for (int i = 0; i < nWorking; ++ i)
00672         {
00673                 ioflags [i] = mwCanRead;
00674                 sockets [i] = xWorking [i]. fd;
00675         }
00676         if (nWorkers > nWorking)
00677         {
00678                 for (int i = 0; i < nWaiting; ++ i)
00679                 {
00680                         ioflags [nWorking + i] = mwCanWrite;
00681                         sockets [nWorking + i] = xWaiting [i]. fd;
00682                 }
00683         }
00684 
00685         // prepare the listen socket flag
00686         bool listening = false;
00687         int listenflag = nWorkers;
00688         if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
00689         {
00690                 ioflags [listenflag] = mwCanRead;
00691                 listening = true;
00692         }
00693         else
00694                 ioflags [listenflag] = mwNone;
00695 
00696         // determine whether it is necessary to wait or not
00697         int timelimit = this -> TimeOut ();
00698         if (localWorker && !nWorking && !nWaiting)
00699                 timelimit = 0;
00700         if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
00701                 timelimit = 0;
00702         if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
00703                 timelimit = 0;
00704 //      if (no_more_data && !nToDo && !nWorking)
00705 //              timelimit = 0;
00706 
00707         // report the select's parameters to the log file
00708         if (logFile)
00709         {
00710                 *logFile << ">>> Select, t = " << timelimit << ", flags =";
00711                 for (int i = 0; i <= nWorkers; ++ i)
00712                         *logFile << " " << ioflags [i];
00713                 *logFile << "." << std::endl;
00714         }
00715 
00716         // wait until data can be received from or sent to any socket
00717         int result = mwSelect (sockets, nWorkers,
00718                 listening ? listensocket : -1, ioflags, timelimit);
00719 
00720         // report the returned parametrs to the log file
00721         if (logFile)
00722         {
00723                 *logFile << ">>> Returned flags =";
00724                 for (int i = 0; i <= nWorkers; ++ i)
00725                         *logFile << " " << ioflags [i];
00726                 *logFile << "." << std::endl;
00727         }
00728 
00729         // in case of select's failure, exit the loop with a failure message
00730         if (result == mwError)
00731         {
00732                 if (logFile)
00733                         *logFile << "Error: The 'select' function failed." <<
00734                                 std::endl;
00735                 return mwError;
00736         }
00737                                 
00738         // report a time-out if necessary
00739         if ((timelimit > 0) && (result == mwTimeOut))
00740         {
00741                 if (logFile)
00742                         *logFile << "Time-out occurred at 'select'." <<
00743                                 std::endl;
00744         }
00745 
00746         // receive data from all the workers who are ready
00747         for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
00748                 (nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
00749         {
00750                 // if this worker is not ready, yet, then skip it
00751                 if (!(ioflags [i] & mwCanRead))
00752                         continue;
00753 
00754                 // receive the entire data chunk from the worker
00755                 unsigned int code = 0;
00756                 int result = RecvMessageC (sockets [i], code, xDone [nDone]);
00757 
00758                 // remember which worker this is
00759                 mwWorkerData &w = xWorking [i];
00760 
00761                 // reject the worker in case of error
00762                 if (result < 0)
00763                 {
00764                         // log the details of what happened
00765                         if (logFile)
00766                         {
00767                                 *logFile << "Worker " << i;
00768                                 if (!w. name. empty ())
00769                                         *logFile << " (" << w. name << ")";
00770                                 *logFile << " disconnected: ";
00771                                 if (result == mwLost)
00772                                         *logFile << "Connection lost.";
00773                                 else
00774                                         *logFile << "An error occurred.";
00775                                 *logFile << std::endl;
00776                         }
00777 
00778                         // disconnect the worker
00779                         mwDisconnect (sockets [i]);
00780 
00781                         // move the worker's data back to the "to-do" list
00782                         xToDo [nToDo]. Take (w. data);
00783                         ++ nToDo;
00784 
00785                         // make a note of this in the worker's data
00786                         w. fd = -1;
00787                         w. status = -1;
00788                 }
00789 
00790                 // if transmitting the port number only, take it
00791                 else if (code & mwPortMsg)
00792                 {
00793                         // retrieve the port number
00794                         xDone [nDone] >> w. port;
00795 
00796                         // report this fact to the log file
00797                         if (logFile)
00798                                 *logFile << "Port number " << w. port <<
00799                                         " received from worker " << i <<
00800                                         "." << std::endl;
00801                 }
00802         
00803                 // if the data was rejected, move it to 'xRejected'
00804                 else if (code & mwRejectedMsg)
00805                 {
00806                         // report what happened
00807                         if (logFile)
00808                                 *logFile << "Data was rejected by worker " <<
00809                                         i << "." << std::endl;
00810 
00811                         // move the data to the rejected table
00812                         xRejected [nRejected]. Take (w. data);
00813                         ++ nRejected;
00814 
00815                         // indicate the status change of this worker
00816                         w. status = 1;
00817                 }
00818 
00819                 // if the data was Ok, the result is in 'xDone'
00820                 else
00821                 {
00822                         // report this to the log file
00823                         if (logFile)
00824                                 *logFile << "Processed data received from "
00825                                         "worker " << i << "." << std::endl;
00826 
00827                         // accept this piece of data
00828                         ++ nDone;
00829 
00830                         // indicate the status change of this worker
00831                         w. status = 1;
00832                 }
00833         }
00834 
00835         // send data to those workers who are ready
00836         for (int i = 0; (i < nWaiting) && nToDo; ++ i)
00837         {
00838                 // remember the worker and its offset
00839                 mwWorkerData &w = xWaiting [i];
00840                 int offset = nWorking + i;
00841 
00842                 // if this worker is not ready to get data then skip it
00843                 if (!(ioflags [offset] & mwCanWrite))
00844                         continue;
00845 
00846                 // prepare a message code to send
00847                 unsigned int code = mwStdMsg;
00848 
00849                 // send a data chunk for processing
00850                 int result = SendMessageC (sockets [offset], code,
00851                         xToDo [nToDo - 1]);
00852 
00853                 // if the data was sent successfully
00854                 if (result == mwOk)
00855                 {
00856                         // take the data chunk to the worker
00857                         -- nToDo;
00858                         w. data. Take (xToDo [nToDo]);
00859 
00860                         // indicate the status of the worker
00861                         w. status = 1;
00862 
00863                         // report this to the log file
00864                         if (logFile)
00865                                 *logFile << "Data " << nToDo << " sent to "
00866                                         "worker " << i << "." << std::endl;
00867                 }
00868         
00869                 // if an error occurred, reject the worker
00870                 else
00871                 {
00872                         // report this situation to the log file
00873                         if (logFile)
00874                                 *logFile << "Worker " << i << " disconnected"
00875                                         ": " << ((result == mwLost) ?
00876                                         "Connection lost." :
00877                                         "An error occurred.") << std::endl;
00878 
00879                         // disconnect the worker
00880                         mwDisconnect (sockets [offset]);
00881 
00882                         // modify the status of the worker accordingly
00883                         w. fd = -1;
00884                         w. status = -1;
00885                 }
00886         }
00887 
00888         if (false && logFile)
00889         {
00890                 *logFile << "Debug1: " << nWaiting << " waiting:";
00891                 for (int i = 0; i < nWaiting; ++ i)
00892                         *logFile << " " << xWaiting [i]. status;
00893                 *logFile << "; " << nWorking << " working:";
00894                 for (int i = 0; i < nWorking; ++ i)
00895                         *logFile << " " << xWorking [i]. status;
00896                 *logFile << std::endl;
00897         }
00898         
00899         // purge workers who have been disconnected and move workers
00900         // whose data has been acquired to the 'waiting' queue
00901         for (int i = 0; i < nWorking; ++ i)
00902         {
00903                 // skip this worker if it is fine
00904                 if (xWorking [i]. status == 0)
00905                         continue;
00906 
00907                 // swap this worker with the last one if necessary
00908                 if (i < nWorking - 1)
00909                         swap (xWorking [i], xWorking [nWorking - 1]);
00910         
00911                 // move the worker to the waiting queue if necessary
00912                 if (xWorking [nWorking - 1]. status > 0)
00913                 {
00914                         xWorking [nWorking - 1]. status = 0;
00915                         swap (xWaiting [nWaiting], xWorking [nWorking - 1]);
00916                         ++ nWaiting;
00917                 }
00918 
00919                 // remove this worker from the working queue
00920                 -- nWorking;
00921 
00922                 // consider the same entry in the table again
00923                 -- i;
00924         }
00925 
00926         // purge workers who have been disconnected and move workers
00927         // who received data to the 'working' queue
00928         for (int i = 0; i < nWaiting; ++ i)
00929         {
00930                 // skip this worker if it is fine
00931                 if (xWaiting [i]. status == 0)
00932                         continue;
00933 
00934                 // swap this worker with the last one if necessary
00935                 if (i < nWaiting - 1)
00936                         swap (xWaiting [i], xWaiting [nWaiting - 1]);
00937 
00938                 // move the worker to the working queue if necessary
00939                 if (xWaiting [nWaiting - 1]. status > 0)
00940                 {
00941                         xWaiting [nWaiting - 1]. status = 0;
00942                         swap (xWorking [nWorking], xWaiting [nWaiting - 1]);
00943                         ++ nWorking;
00944                 }
00945 
00946                 // remove this worker from the waiting queue
00947                 -- nWaiting;
00948 
00949                 // consider the same entry in the table again
00950                 -- i;
00951         }
00952 
00953         if (false && logFile)
00954                 *logFile << "Debug2: " << nWaiting << " waiting, " <<
00955                         nWorking << " working, " << nToDo <<
00956                         " data pieces." << std::endl;
00957         
00958         // accept connections from new workers if any
00959         if (listening && (ioflags [listenflag] & mwCanRead) &&
00960                 (nWaiting + nWorking < mwMAXWORK))
00961         {
00962                 // accept the connection
00963                 mwWorkerData &w = xWaiting [nWaiting];
00964                 w. name = std::string ("");
00965                 w. port = 0;
00966                 w. status = 0;
00967                 w. fd = mwAccept (listensocket, w. name);
00968 
00969                 // if the new worker has been accepted successfully
00970                 if (w. fd >= 0)
00971                 {
00972                         // report the worker's acceptance
00973                         if (logFile)
00974                                 *logFile << "A worker from '" << w. name <<
00975                                         "' accepted." << std::endl;
00976 
00977                         // add the worker to the waiting queue
00978                         ++ nWaiting;
00979 
00980                         // prepare the initialization code
00981                         int code = mwInitMsg |
00982                                 (keepWorkers ? mwKeepMsg : mwDontKeepMsg);
00983 
00984                         // send the initialization data to the worker
00985                         int initResult = SendMessageC (w. fd, code,
00986                                 initData);
00987                         if (initResult != mwOk)
00988                         {
00989                                 if (logFile)
00990                                         *logFile << "Error while sending "
00991                                                 "the initialization data." <<
00992                                                 std::endl;
00993                                 -- nWaiting;
00994                         }
00995                 }
00996 
00997                 // report the error to the log file if not successful
00998                 else if (logFile)
00999                         *logFile << "Unsuccessful connection of a worker "
01000                                 "from '" << w. name << "'." << std::endl;
01001         }
01002 
01003         if (false && logFile)
01004                 *logFile << "Debug3: " << nWaiting << " waiting, " <<
01005                         nWorking << " working, " << nToDo <<
01006                         " data pieces." << std::endl;
01007         
01008         // ask for some data to process if none is waiting
01009         // or too much data acquired from workers is accumulated
01010         if (localWorker && !nWorking && !nWaiting &&
01011                 !no_more_data && (result == mwTimeOut) && !nToDo)
01012         {
01013                 if (logFile)
01014                         *logFile << "Asking for some data to be "
01015                                 "processed locally." << std::endl;
01016                 return mwOk;
01017         }
01018 
01019         // run the work locally as a last resort
01020         if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
01021                 nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
01022         {
01023                 // make a note in the log file of what is going on
01024                 if (logFile)
01025                         *logFile << "Processing data locally." << std::endl;
01026 
01027                 // process one piece of data
01028                 -- nToDo;
01029                 xToDo [nToDo]. Rewind ();
01030                 int result = localWorker -> Process (xToDo [nToDo]);
01031                 xToDo [nToDo]. Rewind ();
01032 
01033                 if (result == mwReject)
01034                 {
01035                         xRejected [nRejected]. Take (xToDo [nToDo]);
01036                         ++ nRejected;
01037                 }
01038                 else if (result == mwError)
01039                 {
01040                         if (logFile)
01041                                 *logFile << "Data processing failed." <<
01042                                         std::endl;
01043                         return mwError;
01044                 }
01045                 else
01046                 {
01047                         xDone [nDone]. Take (xToDo [nToDo]);
01048                         ++ nDone;
01049                 }
01050         }
01051 
01052         if (false && logFile)
01053                 *logFile << "Debug4: " << nWaiting << " waiting, " <<
01054                         nWorking << " working, " << nToDo <<
01055                         " data pieces." << std::endl;
01056         
01057         return mwOk;
01058 } /* mwCoordinator::RunLoop */
01059 
01060 // --------------------------------------------------
01061 
01062 inline int mwCoordinator::Coordinate (mwWorker *w)
01063 {
01064         // remember the local worker's address
01065         localWorker = w;
01066 
01067         // initialize the local worker if any
01068         if (localWorker)
01069                 localWorker -> Initialize (initData);
01070 
01071         if (logFile)
01072         {
01073                 *logFile << "Running as a COORDINATOR." << std::endl;
01074 
01075                 // indicate what kind of network connection is in use
01076                 #if !mwNETWORK
01077                 *logFile << "There is no network in use." << std::endl;
01078                 #elif mwWXWIN
01079                 *logFile << "Using the sockets interface "
01080                         "provided by wxWindows." << std::endl;
01081                 #else
01082                 *logFile << "Using the standard sockets "
01083                         "for network connections." << std::endl;
01084                 #endif
01085 
01086                 // say if running in the single-work mode only
01087                 if (singleWork && localWorker)
01088                         *logFile << "Running in the single-work mode." <<
01089                                 std::endl;
01090         }
01091 
01092         // connect to workers on the list and begin listening if necessary
01093         if (!singleWork)
01094         {
01095                 this -> ConnectWorkers ();
01096                 this -> BeginListening ();
01097         }
01098 
01099         // if not listening and there are no workers then try switching
01100         // to the single-work mode, unless there is no local worker
01101         if (!singleWork && (listensocket < 0) && !nWaiting)
01102         {
01103                 // switch to the single-work mode if allowed to
01104                 if (localWorker)
01105                 {
01106                         singleWork = true;
01107                         if (logFile)
01108                                 *logFile << "No remote workers. Switching "
01109                                         "to the single-work mode." <<
01110                                         std::endl;
01111                 }
01112 
01113                 // if data cannot be processed locally, report a failure
01114                 else
01115                 {
01116                         if (logFile)
01117                                 *logFile << "Failure: No workers." <<
01118                                         std::endl;
01119                         return mwError;
01120                 }
01121         }
01122 
01123         // is there no more data to be sent?
01124         bool no_more_data = false;
01125 
01126         while (1)
01127         {
01128                 // run the communications loop
01129                 int loopresult = singleWork ? this -> RunLoopLocally () :
01130                         this -> RunLoop (no_more_data);
01131 
01132                 // stop if the coordinator failed badly
01133                 if (loopresult == mwError)
01134                         return mwError;
01135 
01136                 // if some data was rejected, process all this data
01137                 while (nRejected > 0)
01138                 {
01139                         // run the user's procedure to acquire rejected data
01140                         -- nRejected;
01141                         xRejected [nRejected]. Rewind ();
01142                         int result = this -> Reject (xRejected [nRejected]);
01143 
01144                         // interrupt if the user says that an error occurred
01145                         if (result != mwOk)
01146                                 return mwError;
01147 
01148                         // reset the data acquired by the user
01149                         xRejected [nRejected]. Reset ();
01150                         no_more_data = false;
01151                 }
01152 
01153                 // if some new data arrived, process all this data
01154                 while (nDone > 0)
01155                 {
01156                         // call the user's procedure to accept the data piece
01157                         -- nDone;
01158                         xDone [nDone]. Rewind ();
01159                         int result = this -> Accept (xDone [nDone]);
01160 
01161                         // interrupt if the user says that an error occurred
01162                         if (result != mwOk)
01163                                 return mwError;
01164 
01165                         // reset the data acquired by the user
01166                         xDone [nDone]. Reset ();
01167                         no_more_data = false;
01168                 }
01169 
01170                 // determine whether a new data item must be prepared:
01171                 // if there are workers waiting and there is no data then YES
01172                 bool hungry = (nWaiting > 0) && !nToDo;
01173                 // if there is no worker and no data then YES
01174                 if ((localWorker || (listensocket >= 0)) &&
01175                         !nWorking && !nWaiting && !nToDo)
01176                 {
01177                         hungry = true;
01178                 }
01179                 // if no more data is needed then definitely NO
01180                 if (no_more_data)
01181                         hungry = false;
01182 
01183                 // prepare a new data item if necessary
01184                 if (hungry)
01185                 {
01186                         // run the user's procedure for preparing data
01187                         int result = this -> Prepare (xToDo [nToDo]);
01188 
01189                         // break if the user says that an error occurred
01190                         if (result == mwError)
01191                                 return mwError;
01192 
01193                         // make a note if there is no more data
01194                         else if (result == mwNoData)
01195                                 no_more_data = true;
01196 
01197                         // add the data piece to the work queue otherwise
01198                         else
01199                                 ++ nToDo;
01200                 }
01201 
01202                 // stop if the tasks are completed and there is no more data
01203                 if (no_more_data && !nWorking && !nToDo)
01204                         return mwOk;
01205         }
01206 } /* mwCoordinator::Coordinate */
01207 
01208 // --------------------------------------------------
01209 
01210 inline void mwCoordinator::mwTableDel (int *tab, int len, int pos)
01211 // Delete the entry at position 'pos' from the table of 'len' positions.
01212 // Shift all the positions after 'pos' to the back.
01213 {
01214         for (int i = pos + 1; i < len; ++ i)
01215                 tab [i - 1] = tab [i];
01216         return;
01217 } /* mwTableDel */
01218 
01219 inline void mwCoordinator::mwTableDel (mwData *tab, int len, int pos)
01220 // Delete the entry at position 'pos' from the table of 'len' positions.
01221 // Shift all the positions after 'pos' to the back.
01222 {
01223         for (int i = pos + 1; i < len; ++ i)
01224                 tab [i - 1]. Take (tab [i]);
01225         return;
01226 } /* mwTableDel */
01227 
01228 
01229 } // namespace multiwork
01230 } // namespace chomp
01231 
01232 #endif // _CHOMP_MULTIWORK_MWCOORD_H_
01233 
01235