mwtask.h

Go to the documentation of this file.
00001 
00002 
00003 
00013 
00014 // Copyright (C) 1997-2010 by Pawel Pilarczyk.
00015 //
00016 // This file is part of the Homology Library.  This library is free software;
00017 // you can redistribute it and/or modify it under the terms of the GNU
00018 // General Public License as published by the Free Software Foundation;
00019 // either version 2 of the License, or (at your option) any later version.
00020 //
00021 // This library is distributed in the hope that it will be useful,
00022 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00023 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00024 // GNU General Public License for more details.
00025 //
00026 // You should have received a copy of the GNU General Public License along
00027 // with this software; see the file "license.txt".  If not, write to the
00028 // Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
00029 // MA 02111-1307, USA.
00030 
00031 // Started on August 11, 2004. Last revision: November 29, 2007.
00032 
00033 
00034 #ifndef _CHOMP_MULTIWORK_MWTASK_H_
00035 #define _CHOMP_MULTIWORK_MWTASK_H_
00036 
00037 #include <fstream>
00038 #include <string>
00039 #include <vector>
00040 #include <cstdlib> // for "atoi"
00041 #include <ctime> // for log start/stop time
00042 
00043 #include "chomp/multiwork/mwconfig.h"
00044 #include "chomp/multiwork/mwlowlev.h"
00045 #include "chomp/multiwork/mwdata.h"
00046 
00047 
00048 namespace chomp {
00049 namespace multiwork {
00050 
00051 // --------------------------------------------------
00052 // ----------------- message codes ------------------
00053 // --------------------------------------------------
00054 
00057 enum mwCodes
00058 {
00060         mwNoMsg = 0x0000,
00061 
00064         mwInitMsg = 0x0001,
00065 
00067         mwStdMsg = 0x0002,
00068 
00070         mwKeepMsg = 0x0004,
00071 
00073         mwDontKeepMsg = 0x0008,
00074 
00076         mwByeMsg = 0x0010,
00077 
00079         mwRejectedMsg = 0x0001,
00080 
00082         mwPortMsg = 0x0002
00083 
00084 }; /* enum mwCodes */
00085 
00086 
00087 // --------------------------------------------------
00088 // --------------------- mwTask ---------------------
00089 // --------------------------------------------------
00090 
00099 class mwTask
00100 {
00101 public:
00103         mwTask ();
00104                 
00106         virtual ~mwTask ();
00107 
00108         // --- configuration ---
00109 
00111         void Port (int number);
00112 
00114         int Port () const;
00115 
00117         void ControlNumber (unsigned int number);
00118 
00120         unsigned int ControlNumber () const;
00121 
00123         void TimeOut (int seconds);
00124 
00126         int TimeOut () const;
00127 
00128         // --- log file ---
00129 
00133         int LogFile (const char *filename);
00134 
00136         void LogFile (const mwTask &other);
00137 
00140         void LogClose ();
00141 
00142         // --- computer addresses ---
00143 
00148         int Add (const char *name, int port = -1);
00149         
00152         int Load (const char *filename);
00153         
00154         // --- quit waiting workers ---
00155 
00159         int QuitWorkers ();
00160 
00161 private:
00162         // --- configuration data ---
00163         
00165         int portnum;
00166 
00169         unsigned int ctrlnum;
00170 
00172         int timeout;
00173 
00174         // --- log file ---
00175 
00176 protected:
00178         std::ofstream *logFile;
00179 
00180 private:
00182         bool logBorrowed;
00183 
00184 protected:
00185         // --- network communication ---
00186 
00190         static int SendMessage (int fd, unsigned int ctrl,
00191                 unsigned int code, const mwData &x);
00192 
00196         static int RecvMessage (int fd, unsigned int &ctrl,
00197                 unsigned int &code, mwData &x);
00198 
00199         // --- computer addresses ---
00200 
00202         std::vector<std::string> computers;
00203 
00205         std::vector<int> ports;
00206 
00207 private:
00208         // --- other stuff ---
00209 
00211         mwTask (const mwTask &) {}
00212                 
00214         mwTask &operator = (const mwTask &) {return *this;}
00215                 
00216 }; /* class mwTask */
00217 
00218 // --------------------------------------------------
00219 
00220 inline mwTask::mwTask ():
00221         portnum (mwPORT),
00222         ctrlnum (mwCTRLNUM),
00223         timeout (mwTIMEOUT),
00224         logFile (0),
00225         logBorrowed (false)
00226 {
00227         return;
00228 } /* mwTask::mwTask */
00229 
00230 // --------------------------------------------------
00231 
00232 inline void mwTask::LogClose ()
00233 {
00234         if (!logFile)
00235                 return;
00236         if (logBorrowed)
00237         {
00238                 logFile = 0;
00239                 logBorrowed = false;
00240                 return;
00241         }
00242         std::time_t stop_time;
00243         std::time (&stop_time);
00244         *logFile << "\nMultiWork log file closed on " <<
00245                 std::asctime (std::localtime (&stop_time)) << "\n"
00246                 "-----------------------------------------------------\n" <<
00247                 std::endl;
00248         delete logFile;
00249         logFile = 0;
00250         return;
00251 } /* mwTask::LogClose */
00252 
00253 inline int mwTask::LogFile (const char *filename)
00254 {
00255         // close the current log if in use
00256         if (logFile)
00257                 LogClose ();
00258 
00259         // if no file name supplied, return now
00260         if (!filename || !*filename)
00261                 return mwOk;
00262 
00263         // create a file stream variable
00264         logFile = new std::ofstream;
00265         if (!logFile)
00266                 return mwError;
00267 
00268         // open the log file for appending
00269         logFile -> open (filename, std::ios::app);
00270 
00271         // write the current time to the log
00272         std::time_t start_time;
00273         std::time (&start_time);
00274         *logFile << "MultiWork log file opened on " <<
00275                 std::asctime (std::localtime (&start_time)) << std::endl;
00276 
00277         // if unable to open the file or an error occurred, return mwError
00278         if (!*logFile)
00279         {
00280                 delete logFile;
00281                 return mwError;
00282         }
00283 
00284         return mwOk;
00285 } /* mwTask::LogFile */
00286 
00287 inline void mwTask::LogFile (const mwTask &other)
00288 {
00289         // close the current log if in use
00290         LogClose ();
00291 
00292         // if there is no other log file, do nothing
00293         if (!other. logFile)
00294                 return;
00295 
00296         // borrow the log file pointer
00297         logBorrowed = true;
00298         logFile = other. logFile;
00299 
00300         return;
00301 } /* mwTask::LogFile */
00302 
00303 // --------------------------------------------------
00304 
00305 inline mwTask::~mwTask ()
00306 {
00307         // close the log file if it was in use
00308         LogClose ();
00309 
00310         return;
00311 } /* mwTask::~mwTask */
00312 
00313 // --------------------------------------------------
00314 
00315 inline void mwTask::Port (int number)
00316 {
00317         if (number >= 0)
00318                 portnum = static_cast<short int> (number);
00319         return;
00320 } /* mwTask::Port */
00321 
00322 inline int mwTask::Port () const
00323 {
00324         return portnum;
00325 } /* mwTask::Port */
00326 
00327 inline void mwTask::ControlNumber (unsigned int number)
00328 {
00329         ctrlnum = number;
00330         return;
00331 } /* mwTask::ControlNumber */
00332 
00333 inline unsigned int mwTask::ControlNumber () const
00334 {
00335         return ctrlnum;
00336 } /* mwTask::ControlNumber */
00337 
00338 inline void mwTask::TimeOut (int seconds)
00339 {
00340         timeout = seconds;
00341         return;
00342 } /* mwTask::TimeOut */
00343 
00344 inline int mwTask::TimeOut () const
00345 {
00346         return timeout;
00347 } /* mwTask::TimeOut */
00348 
00349 // --------------------------------------------------
00350 
00351 inline int mwTask::Add (const char *name, int port)
00352 {
00353         // if the name is empty, ignore it
00354         if (!name || !*name)
00355                 return mwError;
00356 
00357         // determine whether the name contains a colon and port number
00358         int pos = 1;
00359         while (name [pos])
00360                 ++ pos;
00361         -- pos;
00362         while (pos && (name [pos] != ':') &&
00363                 (name [pos] >= '0') && (name [pos] <= '9'))
00364         {
00365                 -- pos;
00366         }
00367 
00368         // if the name contains colon and some digits after the colon...
00369         if (pos && (name [pos] == ':') && name [pos + 1])
00370         {
00371                 // append the computer name and the chosen port number
00372                 char *compname = new char [pos + 1];
00373                 for (int i = 0; i < pos; ++ i)
00374                         compname [i] = name [i];
00375                 compname [pos] = '\0';
00376                 port = std::atoi (name + pos + 1);
00377                 if (port <= 0)
00378                         return mwError;
00379                 computers. push_back (std::string (compname));
00380                 ports. push_back (port);
00381                 delete [] compname;
00382         }
00383         else
00384         {
00385                 // if the port number is not reasonable, use the default one
00386                 if (port <= 0)
00387                         port = portnum;
00388                 if (port <= 0)
00389                         return mwError;
00390                 computers. push_back (std::string (name));
00391                 ports. push_back (port);
00392         }
00393         return mwOk;
00394 } /* mwTask::Add */
00395 
00396 inline int mwTask::Load (const char *filename)
00397 {
00398         std::ifstream f (filename);
00399         if (!f)
00400                 return mwError;
00401 
00402         char buf [512];
00403         int counter = 0;
00404         while (1)
00405         {
00406                 *buf = '\0';
00407                 f. getline (buf, 512, '\n');
00408                 if ((*buf == ';') || (*buf == '#') || (*buf == '/'))
00409                         continue;
00410                 if (*buf)
00411                 {
00412                         int result = this -> Add (buf, portnum);
00413                         if (result == mwOk)
00414                                 ++ counter;
00415                 }
00416                 if (!f)
00417                         return counter;
00418         }
00419 } /* mwTask::Load */
00420 
00421 
00422 // --------------------------------------------------
00423 // -------------------- messages --------------------
00424 // --------------------------------------------------
00425 
00428 inline int mwTask::SendMessage (int fd, unsigned int ctrl,
00429         unsigned int code, const mwData &x)
00430 {
00431         mwData sending;
00432         sending << ctrl << code << x. Length () << x;
00433         return mwSendBytes (fd, sending. Buffer (), sending. Length ());
00434 } /* SendMessage */
00435 
00438 inline int mwTask::RecvMessage (int fd, unsigned int &ctrl,
00439         unsigned int &code, mwData &x)
00440 {
00441         // read the code and length of the message
00442         char buf00 [12];
00443         int result = mwRecvBytes (fd, buf00, 12);
00444         if (result < 0)
00445                 return result;
00446         unsigned char *buf0 = reinterpret_cast<unsigned char *> (buf00);
00447 
00448         // extract the length of the message
00449         int len = (int) (buf0 [8]) << 24;
00450         len |= (int) (buf0 [9]) << 16;
00451         len |= (int) (buf0 [10]) << 8;
00452         len |= (int) (buf0 [11]);
00453         if (len < 0)
00454                 return mwError;
00455 
00456         // extract the control code of the message
00457         ctrl = (int) (buf0 [0]) << 24;
00458         ctrl |= (int) (buf0 [1]) << 16;
00459         ctrl |= (int) (buf0 [2]) << 8;
00460         ctrl |= (int) (buf0 [3]);
00461 
00462         // extract the code of the message
00463         code = (int) (buf0 [4]) << 24;
00464         code |= (int) (buf0 [5]) << 16;
00465         code |= (int) (buf0 [6]) << 8;
00466         code |= (int) (buf0 [7]);
00467 
00468         // if the message length is zero, no more reading is necessary
00469         if (!len)
00470         {
00471                 x. Reset ();
00472                 return mwOk;
00473         }
00474 
00475         // prepare a memory buffer for the message
00476         char *buf = new char [len];
00477         if (!buf)
00478                 return mwError;
00479 
00480         // read the message
00481         result = mwRecvBytes (fd, buf, len);
00482         if (result < 0)
00483         {
00484                 delete [] buf;
00485                 return result;
00486         }
00487 
00488         // transform the message to mw data
00489         x. Take (buf, len);
00490         return mwOk;
00491 } /* RecvMessage */
00492 
00493 inline int mwTask::QuitWorkers ()
00494 {
00495         // write to the log file what you are doing
00496         if (logFile)
00497                 *logFile << "Turning workers off..." << std::endl;
00498         
00499         // try connecting to each worker and ask them to exit
00500         int counter = 0;
00501         for (unsigned int n = 0; n < computers. size (); ++ n)
00502         {
00503                 // retrieve the computer name and port from the list
00504                 const char *name = computers [n]. c_str ();
00505                 int port = ports [n];
00506 
00507                 // if no valid name or port number read, skip this item
00508                 if (!*name || !port)
00509                         continue;
00510 
00511                 // try connecting to the computer
00512                 int fd = mwConnect (name, port);
00513 
00514                 // if unsuccessful, make a note and take another one
00515                 if (fd < 0)
00516                 {
00517                         if (logFile)
00518                         {
00519                                 *logFile << "Worker " << name << ":" <<
00520                                         port << " could not be contacted." <<
00521                                         std::endl;
00522                         }
00523                         continue;
00524                 }
00525                 
00526                 // prepare the control code to send to the worker
00527                 unsigned int code = mwByeMsg | mwDontKeepMsg;
00528 
00529                 // send the 'Bye!' message to the worker and disconnect it
00530                 mwData empty;
00531                 int result = this -> SendMessage (fd, ctrlnum, code, empty);
00532                 mwDisconnect (fd);
00533 
00534                 // add an appropriate message to the log file
00535                 if (result == mwOk)
00536                 {
00537                         ++ counter;
00538                         if (logFile)
00539                         {
00540                                 *logFile << "Worker " << name << ":" <<
00541                                         port << " exited successfully." <<
00542                                         std::endl;
00543                         }
00544                 }
00545                 else if (logFile)
00546                 {
00547                         *logFile << "Error while sending the disconnect "
00548                                 "message to " << name << ":" << port <<
00549                                 "." << std::endl;
00550                 }
00551         }
00552 
00553         // write to the log file how many workers were turned off
00554         if (logFile)
00555                 *logFile << counter << " worker(s) have been shut down." <<
00556                         std::endl;
00557 
00558         return mwOk;
00559 } /* mwTask::QuitWorkers */
00560 
00561 
00562 } // namespace multiwork
00563 } // namespace chomp
00564 
00565 #endif // _CHOMP_MULTIWORK_MWTASK_H_
00566 
00568