00001
00002
00003
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #ifndef _CHOMP_MULTIWORK_MWCOORD_H_
00035 #define _CHOMP_MULTIWORK_MWCOORD_H_
00036
00037 #include <algorithm>
00038 #include <string>
00039 #include <cstring>
00040
00041 #include "chomp/multiwork/mwconfig.h"
00042 #include "chomp/multiwork/mwlowlev.h"
00043 #include "chomp/multiwork/mwdata.h"
00044 #include "chomp/multiwork/mwtask.h"
00045
00046
00047 namespace chomp {
00048 namespace multiwork {
00049
00050 class mwWorker;
00051
00052
00053
00054
00055
00056
00059 class mwWorkerData
00060 {
00061 public:
00063 mwWorkerData ();
00064
00066 ~mwWorkerData ();
00067
00069 mwData data;
00070
00074 int fd;
00075
00077 std::string name;
00078
00080 int port;
00081
00084 int status;
00085
00087 friend void swap (mwWorkerData &data1, mwWorkerData &data2);
00088
00089 private:
00091 mwWorkerData (const mwWorkerData &) {return;};
00092
00094 mwWorkerData &operator = (const mwWorkerData &) {return *this;};
00095
00096 };
00097
00098 inline mwWorkerData::mwWorkerData ():
00099 fd (-1), name (""), port (0), status (0)
00100 {
00101 return;
00102 }
00103
00104 inline mwWorkerData::~mwWorkerData ()
00105 {
00106 return;
00107 }
00108
00109 inline void swap (mwWorkerData &data1, mwWorkerData &data2)
00110 {
00111 swap (data1. data, data2. data);
00112 std::swap (data1. fd, data2. fd);
00113 std::swap (data1. name, data2. name);
00114 std::swap (data1. port, data2. port);
00115 std::swap (data1. status, data2. status);
00116 return;
00117 }
00118
00119
00120
00121
00122
00123
00127 class mwCoordinator: public virtual mwTask
00128 {
00129 public:
00130
00131
00133 mwCoordinator ();
00134
00136 virtual ~mwCoordinator ();
00137
00138
00139
00141 void KeepWorkers (bool keep = true);
00142
00145 int SaveWorkers (const char *filename);
00146
00147
00148
00152 void Init (mwData &data);
00153
00154
00155
00160 int Coordinate (mwWorker *w = NULL);
00161
00162 private:
00163
00164
00171 virtual int Prepare (mwData &data);
00172
00177 virtual int Accept (mwData &data);
00178
00185 virtual int Reject (mwData &data);
00186
00187
00188
00196 int RunLoop (bool no_more_data);
00197
00200 int RunLoopLocally ();
00201
00203 void ConnectWorkers ();
00204
00206 void BeginListening ();
00207
00209 void DisconnectAll ();
00210
00211
00212
00214 bool singleWork;
00215
00217 mwWorker *localWorker;
00218
00220 bool keepWorkers;
00221
00222
00223
00225 mwData initData;
00226
00227
00228
00230 int nWaiting;
00231
00233 mwWorkerData xWaiting [mwMAXWORK];
00234
00236 int nWorking;
00237
00239 mwWorkerData xWorking [mwMAXWORK];
00240
00242 int nToDo;
00243
00245 mwData xToDo [mwMAXWORK];
00246
00248 int nRejected;
00249
00251 mwData xRejected [mwMAXWORK];
00252
00254 int nDone;
00255
00257 mwData xDone [mwMAXWORK];
00258
00259
00260
00262 int listensocket;
00263
00264
00265
00268 static void mwTableDel (int *tab, int len, int pos);
00269
00273 static void mwTableDel (mwData *tab, int len, int pos);
00274
00275
00276
00279 int SendMessageC (int fd, unsigned int code, const mwData &x) const;
00280
00283 int RecvMessageC (int fd, unsigned int &code, mwData &x) const;
00284
00285 };
00286
00287
00288
00289 inline mwCoordinator::mwCoordinator ():
00290 #if mwNETWORK
00291 singleWork (false),
00292 #else
00293 singleWork (true),
00294 #endif
00295 localWorker (0),
00296 keepWorkers (false),
00297 nWaiting (0),
00298 nWorking (0),
00299 nToDo (0),
00300 nRejected (0),
00301 nDone (0),
00302 listensocket (-1)
00303 {
00304 return;
00305 }
00306
00307 inline void mwCoordinator::DisconnectAll ()
00308 {
00309
00310 for (int i = 0; i < nWaiting + nWorking; ++ i)
00311 {
00312
00313 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
00314 xWorking [i - nWaiting];
00315
00316
00317 if (w. fd < 0)
00318 continue;
00319
00320
00321 unsigned int code = mwByeMsg;
00322 code |= keepWorkers ? mwKeepMsg : mwDontKeepMsg;
00323
00324
00325 mwData empty;
00326 SendMessageC (w. fd, code, empty);
00327
00328
00329 mwDisconnect (w. fd);
00330 w. fd = -1;
00331
00332
00333 if (logFile)
00334 *logFile << "Worker " << i << " (" << w. name <<
00335 ":" << w. port << ") disconnected and " <<
00336 (keepWorkers ? "waiting." : "exited.") <<
00337 std::endl;
00338 }
00339
00340 return;
00341 }
00342
00343 inline mwCoordinator::~mwCoordinator ()
00344 {
00345
00346 DisconnectAll ();
00347
00348 return;
00349 }
00350
00351 inline void mwCoordinator::KeepWorkers (bool keep)
00352 {
00353 keepWorkers = keep;
00354 return;
00355 }
00356
00357 inline int mwCoordinator::SaveWorkers (const char *filename)
00358 {
00359
00360
00361 std::ofstream f;
00362 bool first = false;
00363
00364
00365 int counter = 0;
00366 for (int i = 0; i < nWaiting + nWorking; ++ i)
00367 {
00368
00369 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
00370 xWorking [i - nWaiting];
00371
00372
00373 const char *host = w. name. c_str ();
00374
00375
00376 if (!host || !*host || (w. port <= 0))
00377 continue;
00378
00379
00380 int j;
00381 for (j = 0; j < i; ++ j)
00382 {
00383
00384 mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
00385 xWorking [j - nWaiting];
00386
00387
00388 const char *host_j = v. name. c_str ();
00389
00390
00391 if (!host_j || !*host_j || (v. port <= 0))
00392 continue;
00393
00394
00395 if (w. port != v. port)
00396 continue;
00397
00398
00399 if (std::strcmp (host, host_j))
00400 continue;
00401
00402
00403 j = i + 1;
00404 break;
00405 }
00406 if (j > i)
00407 continue;
00408
00409
00410 if (first)
00411 {
00412 f. open (filename, std::ios::out | std::ios::trunc);
00413 if (!f)
00414 return mwError;
00415 f << "; A list of currently running workers:\n";
00416 first = false;
00417 }
00418
00419
00420 f << host << ":" << w. port << "\n";
00421 ++ counter;
00422 }
00423
00424
00425 if (!counter)
00426 return mwOk;
00427
00428
00429 f << "; A total of " << counter << " workers";
00430 if (counter != nWaiting + nWorking)
00431 f << " out of " << (nWaiting + nWorking);
00432 f << " saved.\n";
00433
00434 if (!keepWorkers)
00435 f << "; The workers will exit upon disconnection.\n";
00436 else
00437 f << "; The workers will remain running "
00438 "after disconnection.\n";
00439 f. close ();
00440 if (!f)
00441 return mwError;
00442 else
00443 return mwOk;
00444 }
00445
00446
00447
00448 inline int mwCoordinator::Prepare (mwData &)
00449 {
00450 return mwNoData;
00451 }
00452
00453 inline int mwCoordinator::Accept (mwData &)
00454 {
00455 return mwOk;
00456 }
00457
00458 inline int mwCoordinator::Reject (mwData &)
00459 {
00460 return mwOk;
00461 }
00462
00463
00464
00465 inline void mwCoordinator::Init (mwData &data)
00466 {
00467 initData. Take (data);
00468 return;
00469 }
00470
00471
00472
00473
00474 inline int mwCoordinator::SendMessageC (int fd, unsigned int code,
00475 const mwData &x) const
00476 {
00477
00478 unsigned int ctrl = this -> ControlNumber ();
00479
00480
00481 return this -> SendMessage (fd, ctrl, code, x);
00482 }
00483
00484 inline int mwCoordinator::RecvMessageC (int fd, unsigned int &code,
00485 mwData &x) const
00486 {
00487
00488 unsigned int ctrl = 0;
00489 int result = this -> RecvMessage (fd, ctrl, code, x);
00490
00491
00492 if (result != mwOk)
00493 return result;
00494
00495
00496 if (ctrl == ~(this -> ControlNumber ()))
00497 return mwOk;
00498
00499
00500 if (logFile)
00501 *logFile << "Wrong control code received "
00502 "from the worker: " << ~ctrl << "." << std::endl;
00503 return mwError;
00504 }
00505
00506
00507
00508 inline int mwCoordinator::RunLoopLocally ()
00509 {
00510 if (!localWorker)
00511 return mwError;
00512
00513
00514 if (!nWaiting)
00515 ++ nWaiting;
00516
00517
00518 while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
00519 {
00520
00521 -- nToDo;
00522 xToDo [nToDo]. Rewind ();
00523 int result = localWorker -> Process (xToDo [nToDo]);
00524 xToDo [nToDo]. Rewind ();
00525
00526
00527 if (result == mwError)
00528 {
00529 if (logFile)
00530 *logFile << "Data processing failed." <<
00531 std::endl;
00532 return mwError;
00533 }
00534
00535
00536
00537 else if (result == mwReject)
00538 {
00539 xRejected [nRejected]. Take (xToDo [nToDo]);
00540 ++ nRejected;
00541 }
00542
00543
00544
00545 else
00546 {
00547 xDone [nDone]. Take (xToDo [nToDo]);
00548 ++ nDone;
00549 }
00550 }
00551
00552 return mwOk;
00553 }
00554
00555 inline void mwCoordinator::ConnectWorkers ()
00556 {
00557 int nComputers = computers. size ();
00558 for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
00559 {
00560
00561 const std::string &name = computers [i];
00562 int port = ports [i];
00563
00564
00565 int fd = mwConnect (name. c_str (), port);
00566
00567
00568 if (fd >= 0)
00569 {
00570 if (logFile)
00571 *logFile << "Connected to " << name << ":" <<
00572 port << "." << std::endl;
00573 mwWorkerData &w = xWaiting [nWaiting];
00574 w. fd = fd;
00575 w. name = name;
00576 w. port = port;
00577 ++ nWaiting;
00578
00579
00580 int code = mwInitMsg |
00581 (keepWorkers ? mwKeepMsg : mwDontKeepMsg);
00582
00583
00584 int initResult = SendMessageC (w. fd, code,
00585 initData);
00586 if (initResult != mwOk)
00587 {
00588 if (logFile)
00589 *logFile << "Error while sending "
00590 "the initialization data." <<
00591 std::endl;
00592 -- nWaiting;
00593 }
00594 }
00595
00596
00597 else if (logFile)
00598 *logFile << "Connection attempt to " << name <<
00599 ":" << port << " failed." << std::endl;
00600 }
00601 return;
00602 }
00603
00604 inline void mwCoordinator::BeginListening ()
00605 {
00606
00607 if (this -> Port () <= 0)
00608 return;
00609
00610
00611 listensocket = mwListen (this -> Port (), 15);
00612
00613
00614 if (logFile)
00615 {
00616 if (listensocket < 0)
00617 *logFile << "Listening attempt at port " <<
00618 this -> Port () << " failed." << std::endl;
00619 else
00620 *logFile << "Waiting for workers at port " <<
00621 this -> Port () << "." << std::endl;
00622 }
00623
00624 return;
00625 }
00626
00627 inline int mwCoordinator::RunLoop (bool no_more_data)
00628 {
00629 if (false && logFile)
00630 *logFile << "\nDebug0: " << nWaiting << " waiting, " <<
00631 nWorking << " working, " << nToDo <<
00632 " data pieces." << std::endl;
00633
00634
00635
00636 if ((listensocket < 0) && !nWorking && !nWaiting)
00637 {
00638
00639 if (localWorker)
00640 {
00641 singleWork = true;
00642 if (logFile)
00643 *logFile << "All workers disconnected. "
00644 "Switching to the single-work "
00645 "mode." << std::endl;
00646 return mwOk;
00647 }
00648
00649
00650 else
00651 {
00652 if (logFile)
00653 *logFile << "Failure: All workers "
00654 "disconnected. The work cannot be "
00655 "continued." << std::endl;
00656 return mwError;
00657 }
00658 }
00659
00660
00661
00662
00663
00664
00665
00666 int ioflags [mwMAXWORK];
00667 int sockets [mwMAXWORK];
00668 int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;
00669
00670
00671 for (int i = 0; i < nWorking; ++ i)
00672 {
00673 ioflags [i] = mwCanRead;
00674 sockets [i] = xWorking [i]. fd;
00675 }
00676 if (nWorkers > nWorking)
00677 {
00678 for (int i = 0; i < nWaiting; ++ i)
00679 {
00680 ioflags [nWorking + i] = mwCanWrite;
00681 sockets [nWorking + i] = xWaiting [i]. fd;
00682 }
00683 }
00684
00685
00686 bool listening = false;
00687 int listenflag = nWorkers;
00688 if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
00689 {
00690 ioflags [listenflag] = mwCanRead;
00691 listening = true;
00692 }
00693 else
00694 ioflags [listenflag] = mwNone;
00695
00696
00697 int timelimit = this -> TimeOut ();
00698 if (localWorker && !nWorking && !nWaiting)
00699 timelimit = 0;
00700 if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
00701 timelimit = 0;
00702 if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
00703 timelimit = 0;
00704
00705
00706
00707
00708 if (logFile)
00709 {
00710 *logFile << ">>> Select, t = " << timelimit << ", flags =";
00711 for (int i = 0; i <= nWorkers; ++ i)
00712 *logFile << " " << ioflags [i];
00713 *logFile << "." << std::endl;
00714 }
00715
00716
00717 int result = mwSelect (sockets, nWorkers,
00718 listening ? listensocket : -1, ioflags, timelimit);
00719
00720
00721 if (logFile)
00722 {
00723 *logFile << ">>> Returned flags =";
00724 for (int i = 0; i <= nWorkers; ++ i)
00725 *logFile << " " << ioflags [i];
00726 *logFile << "." << std::endl;
00727 }
00728
00729
00730 if (result == mwError)
00731 {
00732 if (logFile)
00733 *logFile << "Error: The 'select' function failed." <<
00734 std::endl;
00735 return mwError;
00736 }
00737
00738
00739 if ((timelimit > 0) && (result == mwTimeOut))
00740 {
00741 if (logFile)
00742 *logFile << "Time-out occurred at 'select'." <<
00743 std::endl;
00744 }
00745
00746
00747 for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
00748 (nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
00749 {
00750
00751 if (!(ioflags [i] & mwCanRead))
00752 continue;
00753
00754
00755 unsigned int code = 0;
00756 int result = RecvMessageC (sockets [i], code, xDone [nDone]);
00757
00758
00759 mwWorkerData &w = xWorking [i];
00760
00761
00762 if (result < 0)
00763 {
00764
00765 if (logFile)
00766 {
00767 *logFile << "Worker " << i;
00768 if (!w. name. empty ())
00769 *logFile << " (" << w. name << ")";
00770 *logFile << " disconnected: ";
00771 if (result == mwLost)
00772 *logFile << "Connection lost.";
00773 else
00774 *logFile << "An error occurred.";
00775 *logFile << std::endl;
00776 }
00777
00778
00779 mwDisconnect (sockets [i]);
00780
00781
00782 xToDo [nToDo]. Take (w. data);
00783 ++ nToDo;
00784
00785
00786 w. fd = -1;
00787 w. status = -1;
00788 }
00789
00790
00791 else if (code & mwPortMsg)
00792 {
00793
00794 xDone [nDone] >> w. port;
00795
00796
00797 if (logFile)
00798 *logFile << "Port number " << w. port <<
00799 " received from worker " << i <<
00800 "." << std::endl;
00801 }
00802
00803
00804 else if (code & mwRejectedMsg)
00805 {
00806
00807 if (logFile)
00808 *logFile << "Data was rejected by worker " <<
00809 i << "." << std::endl;
00810
00811
00812 xRejected [nRejected]. Take (w. data);
00813 ++ nRejected;
00814
00815
00816 w. status = 1;
00817 }
00818
00819
00820 else
00821 {
00822
00823 if (logFile)
00824 *logFile << "Processed data received from "
00825 "worker " << i << "." << std::endl;
00826
00827
00828 ++ nDone;
00829
00830
00831 w. status = 1;
00832 }
00833 }
00834
00835
00836 for (int i = 0; (i < nWaiting) && nToDo; ++ i)
00837 {
00838
00839 mwWorkerData &w = xWaiting [i];
00840 int offset = nWorking + i;
00841
00842
00843 if (!(ioflags [offset] & mwCanWrite))
00844 continue;
00845
00846
00847 unsigned int code = mwStdMsg;
00848
00849
00850 int result = SendMessageC (sockets [offset], code,
00851 xToDo [nToDo - 1]);
00852
00853
00854 if (result == mwOk)
00855 {
00856
00857 -- nToDo;
00858 w. data. Take (xToDo [nToDo]);
00859
00860
00861 w. status = 1;
00862
00863
00864 if (logFile)
00865 *logFile << "Data " << nToDo << " sent to "
00866 "worker " << i << "." << std::endl;
00867 }
00868
00869
00870 else
00871 {
00872
00873 if (logFile)
00874 *logFile << "Worker " << i << " disconnected"
00875 ": " << ((result == mwLost) ?
00876 "Connection lost." :
00877 "An error occurred.") << std::endl;
00878
00879
00880 mwDisconnect (sockets [offset]);
00881
00882
00883 w. fd = -1;
00884 w. status = -1;
00885 }
00886 }
00887
00888 if (false && logFile)
00889 {
00890 *logFile << "Debug1: " << nWaiting << " waiting:";
00891 for (int i = 0; i < nWaiting; ++ i)
00892 *logFile << " " << xWaiting [i]. status;
00893 *logFile << "; " << nWorking << " working:";
00894 for (int i = 0; i < nWorking; ++ i)
00895 *logFile << " " << xWorking [i]. status;
00896 *logFile << std::endl;
00897 }
00898
00899
00900
00901 for (int i = 0; i < nWorking; ++ i)
00902 {
00903
00904 if (xWorking [i]. status == 0)
00905 continue;
00906
00907
00908 if (i < nWorking - 1)
00909 swap (xWorking [i], xWorking [nWorking - 1]);
00910
00911
00912 if (xWorking [nWorking - 1]. status > 0)
00913 {
00914 xWorking [nWorking - 1]. status = 0;
00915 swap (xWaiting [nWaiting], xWorking [nWorking - 1]);
00916 ++ nWaiting;
00917 }
00918
00919
00920 -- nWorking;
00921
00922
00923 -- i;
00924 }
00925
00926
00927
00928 for (int i = 0; i < nWaiting; ++ i)
00929 {
00930
00931 if (xWaiting [i]. status == 0)
00932 continue;
00933
00934
00935 if (i < nWaiting - 1)
00936 swap (xWaiting [i], xWaiting [nWaiting - 1]);
00937
00938
00939 if (xWaiting [nWaiting - 1]. status > 0)
00940 {
00941 xWaiting [nWaiting - 1]. status = 0;
00942 swap (xWorking [nWorking], xWaiting [nWaiting - 1]);
00943 ++ nWorking;
00944 }
00945
00946
00947 -- nWaiting;
00948
00949
00950 -- i;
00951 }
00952
00953 if (false && logFile)
00954 *logFile << "Debug2: " << nWaiting << " waiting, " <<
00955 nWorking << " working, " << nToDo <<
00956 " data pieces." << std::endl;
00957
00958
00959 if (listening && (ioflags [listenflag] & mwCanRead) &&
00960 (nWaiting + nWorking < mwMAXWORK))
00961 {
00962
00963 mwWorkerData &w = xWaiting [nWaiting];
00964 w. name = std::string ("");
00965 w. port = 0;
00966 w. status = 0;
00967 w. fd = mwAccept (listensocket, w. name);
00968
00969
00970 if (w. fd >= 0)
00971 {
00972
00973 if (logFile)
00974 *logFile << "A worker from '" << w. name <<
00975 "' accepted." << std::endl;
00976
00977
00978 ++ nWaiting;
00979
00980
00981 int code = mwInitMsg |
00982 (keepWorkers ? mwKeepMsg : mwDontKeepMsg);
00983
00984
00985 int initResult = SendMessageC (w. fd, code,
00986 initData);
00987 if (initResult != mwOk)
00988 {
00989 if (logFile)
00990 *logFile << "Error while sending "
00991 "the initialization data." <<
00992 std::endl;
00993 -- nWaiting;
00994 }
00995 }
00996
00997
00998 else if (logFile)
00999 *logFile << "Unsuccessful connection of a worker "
01000 "from '" << w. name << "'." << std::endl;
01001 }
01002
01003 if (false && logFile)
01004 *logFile << "Debug3: " << nWaiting << " waiting, " <<
01005 nWorking << " working, " << nToDo <<
01006 " data pieces." << std::endl;
01007
01008
01009
01010 if (localWorker && !nWorking && !nWaiting &&
01011 !no_more_data && (result == mwTimeOut) && !nToDo)
01012 {
01013 if (logFile)
01014 *logFile << "Asking for some data to be "
01015 "processed locally." << std::endl;
01016 return mwOk;
01017 }
01018
01019
01020 if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
01021 nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
01022 {
01023
01024 if (logFile)
01025 *logFile << "Processing data locally." << std::endl;
01026
01027
01028 -- nToDo;
01029 xToDo [nToDo]. Rewind ();
01030 int result = localWorker -> Process (xToDo [nToDo]);
01031 xToDo [nToDo]. Rewind ();
01032
01033 if (result == mwReject)
01034 {
01035 xRejected [nRejected]. Take (xToDo [nToDo]);
01036 ++ nRejected;
01037 }
01038 else if (result == mwError)
01039 {
01040 if (logFile)
01041 *logFile << "Data processing failed." <<
01042 std::endl;
01043 return mwError;
01044 }
01045 else
01046 {
01047 xDone [nDone]. Take (xToDo [nToDo]);
01048 ++ nDone;
01049 }
01050 }
01051
01052 if (false && logFile)
01053 *logFile << "Debug4: " << nWaiting << " waiting, " <<
01054 nWorking << " working, " << nToDo <<
01055 " data pieces." << std::endl;
01056
01057 return mwOk;
01058 }
01059
01060
01061
01062 inline int mwCoordinator::Coordinate (mwWorker *w)
01063 {
01064
01065 localWorker = w;
01066
01067
01068 if (localWorker)
01069 localWorker -> Initialize (initData);
01070
01071 if (logFile)
01072 {
01073 *logFile << "Running as a COORDINATOR." << std::endl;
01074
01075
01076 #if !mwNETWORK
01077 *logFile << "There is no network in use." << std::endl;
01078 #elif mwWXWIN
01079 *logFile << "Using the sockets interface "
01080 "provided by wxWindows." << std::endl;
01081 #else
01082 *logFile << "Using the standard sockets "
01083 "for network connections." << std::endl;
01084 #endif
01085
01086
01087 if (singleWork && localWorker)
01088 *logFile << "Running in the single-work mode." <<
01089 std::endl;
01090 }
01091
01092
01093 if (!singleWork)
01094 {
01095 this -> ConnectWorkers ();
01096 this -> BeginListening ();
01097 }
01098
01099
01100
01101 if (!singleWork && (listensocket < 0) && !nWaiting)
01102 {
01103
01104 if (localWorker)
01105 {
01106 singleWork = true;
01107 if (logFile)
01108 *logFile << "No remote workers. Switching "
01109 "to the single-work mode." <<
01110 std::endl;
01111 }
01112
01113
01114 else
01115 {
01116 if (logFile)
01117 *logFile << "Failure: No workers." <<
01118 std::endl;
01119 return mwError;
01120 }
01121 }
01122
01123
01124 bool no_more_data = false;
01125
01126 while (1)
01127 {
01128
01129 int loopresult = singleWork ? this -> RunLoopLocally () :
01130 this -> RunLoop (no_more_data);
01131
01132
01133 if (loopresult == mwError)
01134 return mwError;
01135
01136
01137 while (nRejected > 0)
01138 {
01139
01140 -- nRejected;
01141 xRejected [nRejected]. Rewind ();
01142 int result = this -> Reject (xRejected [nRejected]);
01143
01144
01145 if (result != mwOk)
01146 return mwError;
01147
01148
01149 xRejected [nRejected]. Reset ();
01150 no_more_data = false;
01151 }
01152
01153
01154 while (nDone > 0)
01155 {
01156
01157 -- nDone;
01158 xDone [nDone]. Rewind ();
01159 int result = this -> Accept (xDone [nDone]);
01160
01161
01162 if (result != mwOk)
01163 return mwError;
01164
01165
01166 xDone [nDone]. Reset ();
01167 no_more_data = false;
01168 }
01169
01170
01171
01172 bool hungry = (nWaiting > 0) && !nToDo;
01173
01174 if ((localWorker || (listensocket >= 0)) &&
01175 !nWorking && !nWaiting && !nToDo)
01176 {
01177 hungry = true;
01178 }
01179
01180 if (no_more_data)
01181 hungry = false;
01182
01183
01184 if (hungry)
01185 {
01186
01187 int result = this -> Prepare (xToDo [nToDo]);
01188
01189
01190 if (result == mwError)
01191 return mwError;
01192
01193
01194 else if (result == mwNoData)
01195 no_more_data = true;
01196
01197
01198 else
01199 ++ nToDo;
01200 }
01201
01202
01203 if (no_more_data && !nWorking && !nToDo)
01204 return mwOk;
01205 }
01206 }
01207
01208
01209
01210 inline void mwCoordinator::mwTableDel (int *tab, int len, int pos)
01211
01212
01213 {
01214 for (int i = pos + 1; i < len; ++ i)
01215 tab [i - 1] = tab [i];
01216 return;
01217 }
01218
01219 inline void mwCoordinator::mwTableDel (mwData *tab, int len, int pos)
01220
01221
01222 {
01223 for (int i = pos + 1; i < len; ++ i)
01224 tab [i - 1]. Take (tab [i]);
01225 return;
01226 }
01227
01228
01229 }
01230 }
01231
01232 #endif // _CHOMP_MULTIWORK_MWCOORD_H_
01233
01235