mwworker.h

Go to the documentation of this file.
00001 
00002 
00003 
00013 
00014 // Copyright (C) 1997-2010 by Pawel Pilarczyk.
00015 //
00016 // This file is part of the Homology Library.  This library is free software;
00017 // you can redistribute it and/or modify it under the terms of the GNU
00018 // General Public License as published by the Free Software Foundation;
00019 // either version 2 of the License, or (at your option) any later version.
00020 //
00021 // This library is distributed in the hope that it will be useful,
00022 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00023 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00024 // GNU General Public License for more details.
00025 //
00026 // You should have received a copy of the GNU General Public License along
00027 // with this software; see the file "license.txt".  If not, write to the
00028 // Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00029 // MA 02111-1307, USA.
00030 
00031 // Started on August 11, 2004. Last revision: April 11, 2008.
00032 
00033 
00034 #ifndef _CHOMP_MULTIWORK_MWWORKER_H_
00035 #define _CHOMP_MULTIWORK_MWWORKER_H_
00036 
00037 #include "chomp/multiwork/mwconfig.h"
00038 #include "chomp/multiwork/mwlowlev.h"
00039 #include "chomp/multiwork/mwdata.h"
00040 #include "chomp/multiwork/mwtask.h"
00041 
00042 namespace chomp {
00043 namespace multiwork {
00044 
00045 class mwCoordinator;
00046 
00047 
00048 // --------------------------------------------------
00049 // -------------------- mwWorker --------------------
00050 // --------------------------------------------------
00051 
00055 class mwWorker: public virtual mwTask
00056 {
00057 public:
00059         mwWorker ();
00060                 
00062         virtual ~mwWorker ();
00063 
00066         int Work ();
00067                 
00070         void KeepWorker (bool keep = true);
00071 
00072 private:
00073         // --- worker's job ---
00074 
00081         virtual int Process (mwData &data);
00082 
00090         virtual int Initialize (mwData &data);
00091 
00096         int WorkOne (int fd);
00097 
00098         // --- configuration ---
00099 
00101         bool keepWorker;
00102 
00103         // grant access to the protected functions and data to a coordinator
00104         friend class mwCoordinator;
00105 
00106         // --- network communiation ---
00107 
00110         int SendMessageW (int fd, unsigned int code, const mwData &x) const;
00111 
00114         int RecvMessageW (int fd, unsigned int &code, mwData &x) const;
00115 
00116 }; /* class mwWorker */
00117 
00118 // --------------------------------------------------
00119 
00120 inline mwWorker::mwWorker (): keepWorker (false)
00121 {
00122         return;
00123 } /* mwWorker::mwWorker */
00124 
00125 inline mwWorker::~mwWorker ()
00126 {
00127         return;
00128 } /* mwWorker::~mwWorker */
00129 
00130 // --------------------------------------------------
00131 
00132 inline void mwWorker::KeepWorker (bool keep)
00133 {
00134         keepWorker = keep;
00135         return;
00136 } /* mwWorker::KeepWorkers */
00137 
00138 // --------------------------------------------------
00139 
00140 inline int mwWorker::Process (mwData &)
00141 {
00142         return mwOk;
00143 } /* mwWorker::Process */
00144 
00145 inline int mwWorker::Initialize (mwData &)
00146 {
00147         return mwOk;
00148 } /* mwWorker::Initialize */
00149 
00150 // --------------------------------------------------
00151 
00152 inline int mwWorker::SendMessageW (int fd, unsigned int code,
00153         const mwData &x) const
00154 {
00155         // prepare the negation of the control number to send
00156         unsigned int ctrl = ~(this -> ControlNumber ());
00157 
00158         // send the message with the control number and the message code
00159         return this -> SendMessage (fd, ctrl, code, x);
00160 } /* mwWorker::SendMessageW */
00161 
00162 inline int mwWorker::RecvMessageW (int fd, unsigned int &code,
00163         mwData &x) const
00164 {
00165         // receive the message
00166         unsigned int ctrl = 0;
00167         int result = this -> RecvMessage (fd, ctrl, code, x);
00168 
00169         // if there was an error then return its code
00170         if (result != mwOk)
00171                 return result;
00172 
00173         // if the control number is correct then finish successfully
00174         if (ctrl == this -> ControlNumber ())
00175                 return mwOk;
00176 
00177         // if the control number is wrong then return an error code
00178         if (logFile)
00179                 *logFile << "Wrong control code received "
00180                         "from the coordinator: " << ctrl << "." << std::endl;
00181         return mwError;
00182 } /* mwWorker::RecvMessageW */
00183 
00184 // --------------------------------------------------
00185 
00186 inline int mwWorker::WorkOne (int fd)
00187 {
00188         // be ready to send the port number at which the worker is listening
00189         bool sendPort = (this -> Port () > 0);
00190         int timelimit = this -> TimeOut ();
00191 
00192         while (1)
00193         {
00194                 // disconnect if the coordinator is not responding
00195                 if (timelimit > 0)
00196                 {
00197                         // call the function 'select' to wait
00198                         // until data is available for reading
00199                         int portArray [1];
00200                         portArray [0] = fd;
00201                         int ioFlags [2];
00202                         ioFlags [0] = mwCanRead;
00203                         ioFlags [1] = mwNone;
00204                         int result = mwSelect (portArray, 1, fd, ioFlags,
00205                                 timelimit);
00206 
00207                         // disconnect in case of an I/O error
00208                         if (result == mwError)
00209                         {
00210                                 if (logFile)
00211                                         *logFile << "Error while "
00212                                                 "waiting for data from the "
00213                                                 "coordinator." << std::endl;
00214                                 return mwError;
00215                         }
00216 
00217                         // finish if disconnected
00218                         if (result == mwLost)
00219                         {
00220                                 if (logFile)
00221                                         *logFile << "Disconnected while "
00222                                                 "waiting for data from the "
00223                                                 "coordinator." << std::endl;
00224                                 return mwOk;
00225                         }
00226 
00227                         // finish if not responding
00228                         if (result == mwTimeOut)
00229                         {
00230                                 if (logFile)
00231                                         *logFile << "Time out reached while "
00232                                                 "waiting for data from the "
00233                                                 "coordinator." << std::endl;
00234                                 return mwOk;
00235                         }
00236                 }
00237 
00238                 // receive a message
00239                 mwData msg;
00240                 unsigned int code = 0;
00241                 int result = RecvMessageW (fd, code, msg);
00242 
00243                 // disconnect in case of an I/O error
00244                 if (result == mwError)
00245                 {
00246                         if (logFile)
00247                                 *logFile << "Error while receiving data "
00248                                         "from the coordinator." << std::endl;
00249                         return mwError;
00250                 }
00251         
00252                 // disconnect if the connection was lost
00253                 if (result == mwLost)
00254                 {
00255                         if (logFile)
00256                                 *logFile << "The connection closed "
00257                                         "by the coordinator." << std::endl;
00258                         return mwOk;
00259                 }
00260 
00261                 // initialize the worker if requested to
00262                 if (code & mwInitMsg)
00263                 {
00264                         if (logFile)
00265                                 *logFile << "Initializing the worker." <<
00266                                         std::endl;
00267 
00268                         // update the 'keepWorker' flag if requested to
00269                         if (code & mwKeepMsg)
00270                                 keepWorker = true;
00271                         if (code & mwDontKeepMsg)
00272                                 keepWorker = false;
00273 
00274                         // initialize the data
00275                         int initResult = Initialize (msg);
00276 
00277                         // if the initialization fails then quit the work
00278                         if (initResult != mwOk)
00279                         {
00280                                 if (logFile)
00281                                         *logFile << "The initialization "
00282                                                 "failed." << std::endl;
00283                                 return mwError;
00284                         }
00285                 }
00286 
00287                 // disconnect if requested to
00288                 if (code & mwByeMsg)
00289                 {
00290                         // update the 'keepWorker' flag if requested to
00291                         if (code & mwKeepMsg)
00292                                 keepWorker = true;
00293                         if (code & mwDontKeepMsg)
00294                                 keepWorker = false;
00295 
00296                         if (logFile)
00297                                 *logFile << "Disconnecting upon "
00298                                         "coordinator's request." <<
00299                                         std::endl;
00300                         return mwOk;
00301                 }
00302 
00303                 // skip the rest if there is no standard message to process
00304                 if (!(code & mwStdMsg))
00305                         continue;
00306 
00307                 // prepare a basis for the returned code
00308                 unsigned int retcode = 0;
00309                 
00310                 // process the message data
00311                 result = Process (msg);
00312 
00313                 // if an error occurred, disconnect and quit
00314                 if (result == mwError)
00315                 {
00316                         if (logFile)
00317                                 *logFile << "Data processing failed." <<
00318                                         std::endl;
00319                         return mwError;
00320                 }
00321 
00322                 // if rejected, reset the data and set the rejection flag
00323                 else if (result == mwReject)
00324                 {
00325                         if (logFile)
00326                                 *logFile << "* Data rejected." << std::endl;
00327                         msg. Reset ();
00328                         retcode |= mwRejectedMsg;
00329                 }
00330 
00331                 // if processed successfully, make a note of it
00332                 else
00333                 {
00334                         if (logFile)
00335                                 *logFile << "* Data processed." << std::endl;
00336                 }
00337 
00338                 // send port number if relevant
00339                 if (sendPort)
00340                 {
00341                         // send the port number at which the worker listens
00342                         mwData d;
00343                         d << this -> Port ();
00344                         unsigned int code = mwPortMsg;
00345                         int res = SendMessageW (fd, code, d);
00346 
00347                         // quit in case of failure
00348                         if (res < 0)
00349                         {
00350                                 if (logFile)
00351                                         *logFile << "Error while sending "
00352                                                 "the port number." <<
00353                                                 std::endl;
00354                                 return mwError;
00355                         }
00356 
00357                         // make a note of having sent the port number
00358                         sendPort = false;
00359                         if (logFile)
00360                                 *logFile << "* Port number sent." <<
00361                                         std::endl;
00362                 }
00363 
00364                 // send the result of the processed data
00365                 result = SendMessageW (fd, retcode, msg);
00366 
00367                 // quit if the connection was lost
00368                 if (result == mwLost)
00369                 {
00370                         if (logFile)
00371                                 *logFile << "Connection lost while sending "
00372                                         "data to the coordinator." <<
00373                                         std::endl;
00374                         return mwError;
00375                 }
00376                 
00377                 // quit if an error occurred
00378                 if (result != mwOk)
00379                 {
00380                         if (logFile)
00381                                 *logFile << "Error while sending data." <<
00382                                         std::endl;
00383                         return mwError;
00384                 }
00385         }
00386 } /* mwWorker::WorkOne */
00387 
00388 inline int mwWorker::Work ()
00389 {
00390         if (logFile)
00391         {
00392                 *logFile << "Running as a WORKER." << std::endl;
00393 
00394                 // indicate what kind of network connection is in use
00395                 #if !mwNETWORK
00396                 *logFile << "There is no network in use, "
00397                         "so exiting right now." << std::endl;
00398                 #elif mwWXWIN
00399                 *logFile << "Using the sockets interface "
00400                         "provided by wxWindows." << std::endl;
00401                 #else
00402                 *logFile << "Using the standard sockets "
00403                         "for network connections." << std::endl;
00404                 #endif
00405         }
00406 
00407         // connect to the first comp. on the list that is running
00408         int fd = -1;
00409         for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
00410         {
00411                 // retrieve the computer name and port from the list
00412                 int port = this -> ports [cur];
00413                 const char *name = this -> computers [cur]. c_str ();
00414 
00415                 // if no valid name or no port read, skip this data
00416                 if (!*name || !port)
00417                         continue;
00418 
00419                 // try connecting to the computer
00420                 fd = mwConnect (name, port);
00421 
00422                 // add an appropriate message to the log file
00423                 if (logFile)
00424                 {
00425                         *logFile << "Connection to " << name << ":" <<
00426                                 port << ((fd < 0) ? " refused." :
00427                                 " established.") << std::endl;
00428                 }
00429         }
00430 
00431         while (1)
00432         {
00433                 // if not connected then try listening
00434                 if ((fd < 0) && (this -> Port () > 0))
00435                 {
00436                         // listen at the given port (use a very short queue)
00437                         if (logFile)
00438                                 *logFile << "Waiting for a coordinator at "
00439                                         "port " << this -> Port () << "." <<
00440                                         std::endl;
00441                         int fdlisten = mwListen (this -> Port (), 1);
00442 
00443                         // if the listening failed, return with an error
00444                         if (fdlisten < 0)
00445                         {
00446                                 if (logFile)
00447                                         *logFile << "Error: This port is "
00448                                                 "probably in use." <<
00449                                                 std::endl;
00450                                 return mwError;
00451                         }
00452 
00453                         // accept a connection and stop listening
00454                         std::string computer;
00455                         int timeout = this -> TimeOut ();
00456                         fd = mwAccept (fdlisten, computer, timeout);
00457                         mwDisconnect (fdlisten);
00458         
00459                         // if too much time elapsed, quit the job
00460                         if (fd == mwTimeOut)
00461                         {
00462                                 if (logFile)
00463                                         *logFile << "Time out. Exiting." <<
00464                                                 std::endl;
00465                                 return mwOk;
00466                         }
00467 
00468                         // if an error occurred, quit the job
00469                         else if (fd < 0)
00470                         {
00471                                 if (logFile)
00472                                         *logFile << "Error while connecting "
00473                                                 "to a coordinator." <<
00474                                                 std::endl;
00475                                 return mwError;
00476                         }
00477 
00478                         // report the fact of connection to the log file
00479                         if (logFile)
00480                                 *logFile << "Connected to a coordinator "
00481                                         "at '" << computer << "'." <<
00482                                         std::endl;
00483                 }
00484 
00485                 // receive messages and work
00486                 int result = WorkOne (fd);
00487                 mwDisconnect (fd);
00488                 fd = -1;
00489 
00490                 // quit if there is no point to listen to another coordinator
00491                 // or if an error occurred
00492                 if ((this -> Port () <= 0) || !keepWorker ||
00493                         (result != mwOk))
00494                 {
00495                         return result;
00496                 }
00497 
00498                 // add a line to the log file between working sessions
00499                 if (logFile)
00500                         *logFile << "============================" <<
00501                                 std::endl;
00502         }
00503 } /* mwWorker::Work */
00504 
00505 
00506 } // namespace multiwork
00507 } // namespace chomp
00508 
00509 #endif // _CHOMP_MULTIWORK_MWWORKER_H_
00510 
00512