Go to the documentation of this file.00001
00002
00003
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _CHOMP_MULTIWORK_MWWORKER_H_
00035 #define _CHOMP_MULTIWORK_MWWORKER_H_
00036
00037 #include "chomp/multiwork/mwconfig.h"
00038 #include "chomp/multiwork/mwlowlev.h"
00039 #include "chomp/multiwork/mwdata.h"
00040 #include "chomp/multiwork/mwtask.h"
00041
00042 namespace chomp {
00043 namespace multiwork {
00044
00045 class mwCoordinator;
00046
00047
00048
00049
00050
00051
00055 class mwWorker: public virtual mwTask
00056 {
00057 public:
00059 mwWorker ();
00060
00062 virtual ~mwWorker ();
00063
00066 int Work ();
00067
00070 void KeepWorker (bool keep = true);
00071
00072 private:
00073
00074
00081 virtual int Process (mwData &data);
00082
00090 virtual int Initialize (mwData &data);
00091
00096 int WorkOne (int fd);
00097
00098
00099
00101 bool keepWorker;
00102
00103
00104 friend class mwCoordinator;
00105
00106
00107
00110 int SendMessageW (int fd, unsigned int code, const mwData &x) const;
00111
00114 int RecvMessageW (int fd, unsigned int &code, mwData &x) const;
00115
00116 };
00117
00118
00119
00120 inline mwWorker::mwWorker (): keepWorker (false)
00121 {
00122 return;
00123 }
00124
00125 inline mwWorker::~mwWorker ()
00126 {
00127 return;
00128 }
00129
00130
00131
00132 inline void mwWorker::KeepWorker (bool keep)
00133 {
00134 keepWorker = keep;
00135 return;
00136 }
00137
00138
00139
00140 inline int mwWorker::Process (mwData &)
00141 {
00142 return mwOk;
00143 }
00144
00145 inline int mwWorker::Initialize (mwData &)
00146 {
00147 return mwOk;
00148 }
00149
00150
00151
00152 inline int mwWorker::SendMessageW (int fd, unsigned int code,
00153 const mwData &x) const
00154 {
00155
00156 unsigned int ctrl = ~(this -> ControlNumber ());
00157
00158
00159 return this -> SendMessage (fd, ctrl, code, x);
00160 }
00161
00162 inline int mwWorker::RecvMessageW (int fd, unsigned int &code,
00163 mwData &x) const
00164 {
00165
00166 unsigned int ctrl = 0;
00167 int result = this -> RecvMessage (fd, ctrl, code, x);
00168
00169
00170 if (result != mwOk)
00171 return result;
00172
00173
00174 if (ctrl == this -> ControlNumber ())
00175 return mwOk;
00176
00177
00178 if (logFile)
00179 *logFile << "Wrong control code received "
00180 "from the coordinator: " << ctrl << "." << std::endl;
00181 return mwError;
00182 }
00183
00184
00185
00186 inline int mwWorker::WorkOne (int fd)
00187 {
00188
00189 bool sendPort = (this -> Port () > 0);
00190 int timelimit = this -> TimeOut ();
00191
00192 while (1)
00193 {
00194
00195 if (timelimit > 0)
00196 {
00197
00198
00199 int portArray [1];
00200 portArray [0] = fd;
00201 int ioFlags [2];
00202 ioFlags [0] = mwCanRead;
00203 ioFlags [1] = mwNone;
00204 int result = mwSelect (portArray, 1, fd, ioFlags,
00205 timelimit);
00206
00207
00208 if (result == mwError)
00209 {
00210 if (logFile)
00211 *logFile << "Error while "
00212 "waiting for data from the "
00213 "coordinator." << std::endl;
00214 return mwError;
00215 }
00216
00217
00218 if (result == mwLost)
00219 {
00220 if (logFile)
00221 *logFile << "Disconnected while "
00222 "waiting for data from the "
00223 "coordinator." << std::endl;
00224 return mwOk;
00225 }
00226
00227
00228 if (result == mwTimeOut)
00229 {
00230 if (logFile)
00231 *logFile << "Time out reached while "
00232 "waiting for data from the "
00233 "coordinator." << std::endl;
00234 return mwOk;
00235 }
00236 }
00237
00238
00239 mwData msg;
00240 unsigned int code = 0;
00241 int result = RecvMessageW (fd, code, msg);
00242
00243
00244 if (result == mwError)
00245 {
00246 if (logFile)
00247 *logFile << "Error while receiving data "
00248 "from the coordinator." << std::endl;
00249 return mwError;
00250 }
00251
00252
00253 if (result == mwLost)
00254 {
00255 if (logFile)
00256 *logFile << "The connection closed "
00257 "by the coordinator." << std::endl;
00258 return mwOk;
00259 }
00260
00261
00262 if (code & mwInitMsg)
00263 {
00264 if (logFile)
00265 *logFile << "Initializing the worker." <<
00266 std::endl;
00267
00268
00269 if (code & mwKeepMsg)
00270 keepWorker = true;
00271 if (code & mwDontKeepMsg)
00272 keepWorker = false;
00273
00274
00275 int initResult = Initialize (msg);
00276
00277
00278 if (initResult != mwOk)
00279 {
00280 if (logFile)
00281 *logFile << "The initialization "
00282 "failed." << std::endl;
00283 return mwError;
00284 }
00285 }
00286
00287
00288 if (code & mwByeMsg)
00289 {
00290
00291 if (code & mwKeepMsg)
00292 keepWorker = true;
00293 if (code & mwDontKeepMsg)
00294 keepWorker = false;
00295
00296 if (logFile)
00297 *logFile << "Disconnecting upon "
00298 "coordinator's request." <<
00299 std::endl;
00300 return mwOk;
00301 }
00302
00303
00304 if (!(code & mwStdMsg))
00305 continue;
00306
00307
00308 unsigned int retcode = 0;
00309
00310
00311 result = Process (msg);
00312
00313
00314 if (result == mwError)
00315 {
00316 if (logFile)
00317 *logFile << "Data processing failed." <<
00318 std::endl;
00319 return mwError;
00320 }
00321
00322
00323 else if (result == mwReject)
00324 {
00325 if (logFile)
00326 *logFile << "* Data rejected." << std::endl;
00327 msg. Reset ();
00328 retcode |= mwRejectedMsg;
00329 }
00330
00331
00332 else
00333 {
00334 if (logFile)
00335 *logFile << "* Data processed." << std::endl;
00336 }
00337
00338
00339 if (sendPort)
00340 {
00341
00342 mwData d;
00343 d << this -> Port ();
00344 unsigned int code = mwPortMsg;
00345 int res = SendMessageW (fd, code, d);
00346
00347
00348 if (res < 0)
00349 {
00350 if (logFile)
00351 *logFile << "Error while sending "
00352 "the port number." <<
00353 std::endl;
00354 return mwError;
00355 }
00356
00357
00358 sendPort = false;
00359 if (logFile)
00360 *logFile << "* Port number sent." <<
00361 std::endl;
00362 }
00363
00364
00365 result = SendMessageW (fd, retcode, msg);
00366
00367
00368 if (result == mwLost)
00369 {
00370 if (logFile)
00371 *logFile << "Connection lost while sending "
00372 "data to the coordinator." <<
00373 std::endl;
00374 return mwError;
00375 }
00376
00377
00378 if (result != mwOk)
00379 {
00380 if (logFile)
00381 *logFile << "Error while sending data." <<
00382 std::endl;
00383 return mwError;
00384 }
00385 }
00386 }
00387
00388 inline int mwWorker::Work ()
00389 {
00390 if (logFile)
00391 {
00392 *logFile << "Running as a WORKER." << std::endl;
00393
00394
00395 #if !mwNETWORK
00396 *logFile << "There is no network in use, "
00397 "so exiting right now." << std::endl;
00398 #elif mwWXWIN
00399 *logFile << "Using the sockets interface "
00400 "provided by wxWindows." << std::endl;
00401 #else
00402 *logFile << "Using the standard sockets "
00403 "for network connections." << std::endl;
00404 #endif
00405 }
00406
00407
00408 int fd = -1;
00409 for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
00410 {
00411
00412 int port = this -> ports [cur];
00413 const char *name = this -> computers [cur]. c_str ();
00414
00415
00416 if (!*name || !port)
00417 continue;
00418
00419
00420 fd = mwConnect (name, port);
00421
00422
00423 if (logFile)
00424 {
00425 *logFile << "Connection to " << name << ":" <<
00426 port << ((fd < 0) ? " refused." :
00427 " established.") << std::endl;
00428 }
00429 }
00430
00431 while (1)
00432 {
00433
00434 if ((fd < 0) && (this -> Port () > 0))
00435 {
00436
00437 if (logFile)
00438 *logFile << "Waiting for a coordinator at "
00439 "port " << this -> Port () << "." <<
00440 std::endl;
00441 int fdlisten = mwListen (this -> Port (), 1);
00442
00443
00444 if (fdlisten < 0)
00445 {
00446 if (logFile)
00447 *logFile << "Error: This port is "
00448 "probably in use." <<
00449 std::endl;
00450 return mwError;
00451 }
00452
00453
00454 std::string computer;
00455 int timeout = this -> TimeOut ();
00456 fd = mwAccept (fdlisten, computer, timeout);
00457 mwDisconnect (fdlisten);
00458
00459
00460 if (fd == mwTimeOut)
00461 {
00462 if (logFile)
00463 *logFile << "Time out. Exiting." <<
00464 std::endl;
00465 return mwOk;
00466 }
00467
00468
00469 else if (fd < 0)
00470 {
00471 if (logFile)
00472 *logFile << "Error while connecting "
00473 "to a coordinator." <<
00474 std::endl;
00475 return mwError;
00476 }
00477
00478
00479 if (logFile)
00480 *logFile << "Connected to a coordinator "
00481 "at '" << computer << "'." <<
00482 std::endl;
00483 }
00484
00485
00486 int result = WorkOne (fd);
00487 mwDisconnect (fd);
00488 fd = -1;
00489
00490
00491
00492 if ((this -> Port () <= 0) || !keepWorker ||
00493 (result != mwOk))
00494 {
00495 return result;
00496 }
00497
00498
00499 if (logFile)
00500 *logFile << "============================" <<
00501 std::endl;
00502 }
00503 }
00504
00505
00506 }
00507 }
00508
00509 #endif // _CHOMP_MULTIWORK_MWWORKER_H_
00510
00512