Go to the documentation of this file.00001
00002
00003
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _CHOMP_MULTIWORK_MWTASK_H_
00035 #define _CHOMP_MULTIWORK_MWTASK_H_
00036
00037 #include <fstream>
00038 #include <string>
00039 #include <vector>
00040 #include <cstdlib>
00041 #include <ctime>
00042
00043 #include "chomp/multiwork/mwconfig.h"
00044 #include "chomp/multiwork/mwlowlev.h"
00045 #include "chomp/multiwork/mwdata.h"
00046
00047
00048 namespace chomp {
00049 namespace multiwork {
00050
00051
00052
00053
00054
00057 enum mwCodes
00058 {
00060 mwNoMsg = 0x0000,
00061
00064 mwInitMsg = 0x0001,
00065
00067 mwStdMsg = 0x0002,
00068
00070 mwKeepMsg = 0x0004,
00071
00073 mwDontKeepMsg = 0x0008,
00074
00076 mwByeMsg = 0x0010,
00077
00079 mwRejectedMsg = 0x0001,
00080
00082 mwPortMsg = 0x0002
00083
00084 };
00085
00086
00087
00088
00089
00090
00099 class mwTask
00100 {
00101 public:
00103 mwTask ();
00104
00106 virtual ~mwTask ();
00107
00108
00109
00111 void Port (int number);
00112
00114 int Port () const;
00115
00117 void ControlNumber (unsigned int number);
00118
00120 unsigned int ControlNumber () const;
00121
00123 void TimeOut (int seconds);
00124
00126 int TimeOut () const;
00127
00128
00129
00133 int LogFile (const char *filename);
00134
00136 void LogFile (const mwTask &other);
00137
00140 void LogClose ();
00141
00142
00143
00148 int Add (const char *name, int port = -1);
00149
00152 int Load (const char *filename);
00153
00154
00155
00159 int QuitWorkers ();
00160
00161 private:
00162
00163
00165 int portnum;
00166
00169 unsigned int ctrlnum;
00170
00172 int timeout;
00173
00174
00175
00176 protected:
00178 std::ofstream *logFile;
00179
00180 private:
00182 bool logBorrowed;
00183
00184 protected:
00185
00186
00190 static int SendMessage (int fd, unsigned int ctrl,
00191 unsigned int code, const mwData &x);
00192
00196 static int RecvMessage (int fd, unsigned int &ctrl,
00197 unsigned int &code, mwData &x);
00198
00199
00200
00202 std::vector<std::string> computers;
00203
00205 std::vector<int> ports;
00206
00207 private:
00208
00209
00211 mwTask (const mwTask &) {}
00212
00214 mwTask &operator = (const mwTask &) {return *this;}
00215
00216 };
00217
00218
00219
00220 inline mwTask::mwTask ():
00221 portnum (mwPORT),
00222 ctrlnum (mwCTRLNUM),
00223 timeout (mwTIMEOUT),
00224 logFile (0),
00225 logBorrowed (false)
00226 {
00227 return;
00228 }
00229
00230
00231
00232 inline void mwTask::LogClose ()
00233 {
00234 if (!logFile)
00235 return;
00236 if (logBorrowed)
00237 {
00238 logFile = 0;
00239 logBorrowed = false;
00240 return;
00241 }
00242 std::time_t stop_time;
00243 std::time (&stop_time);
00244 *logFile << "\nMultiWork log file closed on " <<
00245 std::asctime (std::localtime (&stop_time)) << "\n"
00246 "-----------------------------------------------------\n" <<
00247 std::endl;
00248 delete logFile;
00249 logFile = 0;
00250 return;
00251 }
00252
00253 inline int mwTask::LogFile (const char *filename)
00254 {
00255
00256 if (logFile)
00257 LogClose ();
00258
00259
00260 if (!filename || !*filename)
00261 return mwOk;
00262
00263
00264 logFile = new std::ofstream;
00265 if (!logFile)
00266 return mwError;
00267
00268
00269 logFile -> open (filename, std::ios::app);
00270
00271
00272 std::time_t start_time;
00273 std::time (&start_time);
00274 *logFile << "MultiWork log file opened on " <<
00275 std::asctime (std::localtime (&start_time)) << std::endl;
00276
00277
00278 if (!*logFile)
00279 {
00280 delete logFile;
00281 return mwError;
00282 }
00283
00284 return mwOk;
00285 }
00286
00287 inline void mwTask::LogFile (const mwTask &other)
00288 {
00289
00290 LogClose ();
00291
00292
00293 if (!other. logFile)
00294 return;
00295
00296
00297 logBorrowed = true;
00298 logFile = other. logFile;
00299
00300 return;
00301 }
00302
00303
00304
00305 inline mwTask::~mwTask ()
00306 {
00307
00308 LogClose ();
00309
00310 return;
00311 }
00312
00313
00314
00315 inline void mwTask::Port (int number)
00316 {
00317 if (number >= 0)
00318 portnum = static_cast<short int> (number);
00319 return;
00320 }
00321
00322 inline int mwTask::Port () const
00323 {
00324 return portnum;
00325 }
00326
00327 inline void mwTask::ControlNumber (unsigned int number)
00328 {
00329 ctrlnum = number;
00330 return;
00331 }
00332
00333 inline unsigned int mwTask::ControlNumber () const
00334 {
00335 return ctrlnum;
00336 }
00337
00338 inline void mwTask::TimeOut (int seconds)
00339 {
00340 timeout = seconds;
00341 return;
00342 }
00343
00344 inline int mwTask::TimeOut () const
00345 {
00346 return timeout;
00347 }
00348
00349
00350
00351 inline int mwTask::Add (const char *name, int port)
00352 {
00353
00354 if (!name || !*name)
00355 return mwError;
00356
00357
00358 int pos = 1;
00359 while (name [pos])
00360 ++ pos;
00361 -- pos;
00362 while (pos && (name [pos] != ':') &&
00363 (name [pos] >= '0') && (name [pos] <= '9'))
00364 {
00365 -- pos;
00366 }
00367
00368
00369 if (pos && (name [pos] == ':') && name [pos + 1])
00370 {
00371
00372 char *compname = new char [pos + 1];
00373 for (int i = 0; i < pos; ++ i)
00374 compname [i] = name [i];
00375 compname [pos] = '\0';
00376 port = std::atoi (name + pos + 1);
00377 if (port <= 0)
00378 return mwError;
00379 computers. push_back (std::string (compname));
00380 ports. push_back (port);
00381 delete [] compname;
00382 }
00383 else
00384 {
00385
00386 if (port <= 0)
00387 port = portnum;
00388 if (port <= 0)
00389 return mwError;
00390 computers. push_back (std::string (name));
00391 ports. push_back (port);
00392 }
00393 return mwOk;
00394 }
00395
00396 inline int mwTask::Load (const char *filename)
00397 {
00398 std::ifstream f (filename);
00399 if (!f)
00400 return mwError;
00401
00402 char buf [512];
00403 int counter = 0;
00404 while (1)
00405 {
00406 *buf = '\0';
00407 f. getline (buf, 512, '\n');
00408 if ((*buf == ';') || (*buf == '#') || (*buf == '/'))
00409 continue;
00410 if (*buf)
00411 {
00412 int result = this -> Add (buf, portnum);
00413 if (result == mwOk)
00414 ++ counter;
00415 }
00416 if (!f)
00417 return counter;
00418 }
00419 }
00420
00421
00422
00423
00424
00425
00428 inline int mwTask::SendMessage (int fd, unsigned int ctrl,
00429 unsigned int code, const mwData &x)
00430 {
00431 mwData sending;
00432 sending << ctrl << code << x. Length () << x;
00433 return mwSendBytes (fd, sending. Buffer (), sending. Length ());
00434 }
00435
00438 inline int mwTask::RecvMessage (int fd, unsigned int &ctrl,
00439 unsigned int &code, mwData &x)
00440 {
00441
00442 char buf00 [12];
00443 int result = mwRecvBytes (fd, buf00, 12);
00444 if (result < 0)
00445 return result;
00446 unsigned char *buf0 = reinterpret_cast<unsigned char *> (buf00);
00447
00448
00449 int len = (int) (buf0 [8]) << 24;
00450 len |= (int) (buf0 [9]) << 16;
00451 len |= (int) (buf0 [10]) << 8;
00452 len |= (int) (buf0 [11]);
00453 if (len < 0)
00454 return mwError;
00455
00456
00457 ctrl = (int) (buf0 [0]) << 24;
00458 ctrl |= (int) (buf0 [1]) << 16;
00459 ctrl |= (int) (buf0 [2]) << 8;
00460 ctrl |= (int) (buf0 [3]);
00461
00462
00463 code = (int) (buf0 [4]) << 24;
00464 code |= (int) (buf0 [5]) << 16;
00465 code |= (int) (buf0 [6]) << 8;
00466 code |= (int) (buf0 [7]);
00467
00468
00469 if (!len)
00470 {
00471 x. Reset ();
00472 return mwOk;
00473 }
00474
00475
00476 char *buf = new char [len];
00477 if (!buf)
00478 return mwError;
00479
00480
00481 result = mwRecvBytes (fd, buf, len);
00482 if (result < 0)
00483 {
00484 delete [] buf;
00485 return result;
00486 }
00487
00488
00489 x. Take (buf, len);
00490 return mwOk;
00491 }
00492
00493 inline int mwTask::QuitWorkers ()
00494 {
00495
00496 if (logFile)
00497 *logFile << "Turning workers off..." << std::endl;
00498
00499
00500 int counter = 0;
00501 for (unsigned int n = 0; n < computers. size (); ++ n)
00502 {
00503
00504 const char *name = computers [n]. c_str ();
00505 int port = ports [n];
00506
00507
00508 if (!*name || !port)
00509 continue;
00510
00511
00512 int fd = mwConnect (name, port);
00513
00514
00515 if (fd < 0)
00516 {
00517 if (logFile)
00518 {
00519 *logFile << "Worker " << name << ":" <<
00520 port << " could not be contacted." <<
00521 std::endl;
00522 }
00523 continue;
00524 }
00525
00526
00527 unsigned int code = mwByeMsg | mwDontKeepMsg;
00528
00529
00530 mwData empty;
00531 int result = this -> SendMessage (fd, ctrlnum, code, empty);
00532 mwDisconnect (fd);
00533
00534
00535 if (result == mwOk)
00536 {
00537 ++ counter;
00538 if (logFile)
00539 {
00540 *logFile << "Worker " << name << ":" <<
00541 port << " exited successfully." <<
00542 std::endl;
00543 }
00544 }
00545 else if (logFile)
00546 {
00547 *logFile << "Error while sending the disconnect "
00548 "message to " << name << ":" << port <<
00549 "." << std::endl;
00550 }
00551 }
00552
00553
00554 if (logFile)
00555 *logFile << counter << " worker(s) have been shut down." <<
00556 std::endl;
00557
00558 return mwOk;
00559 }
00560
00561
00562 }
00563 }
00564
00565 #endif // _CHOMP_MULTIWORK_MWTASK_H_
00566
00568