RZ

001: // DemandMediator.cc # mediates between data driven and demand driven processes
002:
003: #include "DemandMediator.hh"
004:
005: // ######## ######## ######## ######## ######## ######## ######## ########
006:
007: #include <unistd.h>
008: #include <sys/stat.h>
009: #include <sys/time.h>
010: #include <sys/types.h>
011: #include <fcntl.h>
012:
013: #include <iostream>
014: #include <iomanip>
015: #include <sstream>
016: #include <vector>
017:
018: #include "RZError.hh"
019: #include "RZNetOrder.hh"
020: #include "RZIO.hh"
021: #include "RZMagic.hh"
022: #include "MMap.hh"
023:
024: #include "GetoptVerbose.hh"
025: #include "DataPackets.hh"
026:
027: #include "ControlRemote.hh"
028: #include "LocalScan.hh"
029:
030: #include "DemandWrapper.hh"
031: #include "DataLocalScanner.hh"
032: #include "DataRemoteController.hh"
033: #include "DataRemoteRetriever.hh"
034: #include "DemandMediator.hh"
035: #include "DemandReconstituter.hh"
036:
037: using namespace RZ;
038:
039: // ######## mediate between data driven and demand driven processes ########
040:
041: void DemandMediator::demand_mediator(
042:     std::string const included,
043:     int const keepopenfd, int const rangefd, int const remotefd, 
044:     int const reconstitutedfd,
045:     std::vector< int >::iterator givenfdi,
046:     std::vector< int >::iterator const givenfdiend,
047:     std::vector< std::string >::iterator giveni,
048:     std::vector< std::string >::iterator const giveniend)
049: {
050:     // keeping keepopenfd open to avoid a SIGPIPE
051:     DataRemoteController::RANGE.file_no = rangefd;
052:     DataRemoteRetriever::REMOTE.file_no = remotefd;
053:
054:     // ######## open transaction ########
055:
056:     DataRemoteRetriever::require();
057:
058:     // ######## ######## ######## ######## ######## ######## ######## ########
059:     // ######## ######## ######## ######## ######## ######## ######## ########
060:
061:     // ######## fork and exec the 'Reconstituter subprocess ########
062:
063:     {
064:         int pfd[2];
065:         if (pipe(pfd)) throw RZ::IOError("Couldn't create pipe");
066:         int pid = fork();
067:         if (pid == -1) throw RZ::AssertionError(
068:             "Can't start file reconstituter child process");
069:         else if (pid) {
070:             RZ::closer(pfd[0]);
071:             DemandReconstituter::WORKER.file_no = pfd[1];
072:             DemandReconstituter::WORKER.process_id = pid;
073:         }
074:         else {
075:             RZ::closer(pfd[1]);
076:             if (dup2(pfd[0],0) != 0) throw RZ::IOError("Can't dup to stdin");
077:             RZ::closer(pfd[0]);
078:             RZ::closer(DataRemoteRetriever::REMOTE.file_no);
079:             RZ::closer(DataRemoteController::RANGE.file_no);
080:             if (keepopenfd) RZ::closer(keepopenfd);
081:             // use reconstitutedfd
082:
083:             std::stringstream r; r << reconstitutedfd;
084:             std::string r0(included + "/Reconstitute.gateway");
085:             std::string r1(included + "/DemandReconstituter");
086:             execl(r0.c_str(), r1.c_str(), r.str().c_str(), NULL);
087:             throw RZ::IOError("Reconstituter failure");
088:
089:             // implied exit closing reconstitutedfd
090:             // implied exit closing STDIN {from DemandReconstituter::WORKER}
091:         }
092:     }
093:
094:     RZ::closer(reconstitutedfd);
095:
096:     // ######## ######## ######## ######## ######## ######## ######## ########
097:     // ######## ######## ######## ######## ######## ######## ######## ########
098:
099:     // ######## start the race ########
100:
101:     if (RZ::debug) std::cerr << "  scan <- givens" << std::endl;
102:
103:     // ######## tally competitors at checkpoints ########
104:
105:     char volatile * const ltab_mmap = RZ::anonymous_memory_map(
106:         RZ::rz_wrapper.n_blks, true, "local blocks");
107:     char volatile * const rtab_mmap = RZ::anonymous_memory_map(
108:         RZ::rz_wrapper.n_blks, true, "remote blocks");
109:
110:     // ######## fork 'Scanner subprocess ########
111:
112:     {
113:         int pfd[2];
114:         if (pipe(pfd)) throw RZ::AssertionError("Couldn't create pipe");
115:         int pid = fork();
116:         if (pid == -1) throw RZ::AssertionError(
117:             "Can't start local scanner child process");
118:         else if (pid) {
119:             closer(pfd[1]);
120:             DataLocalScanner::LOCAL.file_no = pfd[0];
121:             DataLocalScanner::LOCAL.process_id = pid;
122:         }
123:         else {
124:             RZ::closer(pfd[0]);
125:             RZ::closer(DemandReconstituter::WORKER.file_no);
126:             RZ::closer(DataRemoteRetriever::REMOTE.file_no);
127:             RZ::closer(DataRemoteController::RANGE.file_no);//close writin end
128:             if (keepopenfd) RZ::closer(keepopenfd);//of pipe before readin end
129:             if (DataRemoteController::RANGE.file_no != 1) RZ::closer(1);
130:             // may use inherited STDIN
131:
132:             RZ::Scanner scanner(pfd[1], RZ::rz_wrapper, ltab_mmap, rtab_mmap);
133:
134:             // scan each --givenfd
135:             for (; givenfdi < givenfdiend; ++givenfdi) {
136:                 int givenfile = *givenfdi;
137:                 try {
138:                     scanner.scan_mmap(givenfile);
139:                 }
140:                 catch (MMapError x) {
141:                     scanner.scan_circular(givenfile);
142:                 }
143:                 RZ::closer(givenfile);
144:             }
145:
146:             // scan each --given filename
147:             for (; giveni < giveniend; ++giveni) {
148:                 int givenfile = open(&(*giveni)[0], O_RDONLY | O_NOATIME);
149:                 try {
150:                     scanner.scan_mmap(givenfile);
151:                 }
152:                 catch (MMapError x) {
153:                     scanner.scan_circular(givenfile);
154:                 }
155:                 RZ::closer(givenfile);
156:             }
157:
158:             _exit(0);
159:             // exit closing rtab_mmap
160:             // exit closing ltab_mmap
161:             // exit closing STDOUT {to 'Scanner::LOCAL}
162:             // exit closing STDIN
163:         }
164:     }
165:
166:     DataLocalScanner::require();
167:     DataRemoteController::require(ltab_mmap, rtab_mmap);
168:
169:     // ######## tempfiles from 'Reconstituter subprocesses ########
170:
171:     if (RZ::debug) std::cerr << "  synthesis -> tempfiles" << std::endl;
172:
173:     DemandReconstituter::require();
174:
175:     // ######## ######## ######## ######## ######## ######## ######## ########
176:     // ######## ######## ######## ######## ######## ######## ######## ########
177:
178:     if (RZ::debug || RZ::verbose)
179:         std::cerr << "  processing < local <> remote >" << std::endl;
180:
181:     int REMO =  DataRemoteRetriever::REMOTE.file_no;
182:     int LOCA =     DataLocalScanner::LOCAL.file_no;
183:     int RANG = DataRemoteController::RANGE.file_no;
184:     int WORK =  DemandReconstituter::WORKER.file_no;
185:     int MAXX = std::max(std::max(REMO, LOCA), std::max(RANG, WORK)) + 1;
186:
187:     // ######## spinup: give the local scanner a head start ########
188:
189:     {
190:         struct timeval timer = { 0, 10000 };
191:         
192:         fd_set r, w, x; // read/write/out-of-band pending
193:         FD_ZERO(&r);  // read pending
194:         FD_ZERO(&w); // write pending
195:         FD_ZERO(&x);
196:         
197:         while (timer.tv_usec > 0) {
198:             bool remof =  DataRemoteRetriever::is_pending(1);
199:             bool locaf =     DataLocalScanner::is_pending();
200:             bool workf =  DemandReconstituter::is_pending();
201:
202:             if (!(remof || locaf || workf)) break;
203:
204:             if (remof) FD_SET(REMO, &r); else FD_CLR(REMO, &r);
205:             if (locaf) FD_SET(LOCA, &r); else FD_CLR(LOCA, &r);
206:             if (workf) FD_SET(WORK, &w); else FD_CLR(WORK, &w);
207:
208:             int s = select(MAXX, &r, &w, &x, &timer);
209:             if (s == -1 && errno != EINTR)
210:                 throw AssertionError("Select loop failure");
211:
212:             if (FD_ISSET(REMO, &r)) DataRemoteRetriever::select_processing(1);
213:             if (FD_ISSET(LOCA, &r))    DataLocalScanner::select_processing();
214:             if (FD_ISSET(WORK, &w)) DemandReconstituter::select_processing();
215:         }
216:     }
217:
218:     if (DataRemoteController::first_packet_need)
219:         DataRemoteController::need_more();
220:
221:     // ######## main polling loop ########
222:
223:     {
224:         fd_set r, w, x; // read/write/out-of-band pending
225:         FD_ZERO(&r);  // read pending
226:         FD_ZERO(&w); // write pending
227:         FD_ZERO(&x);
228:         
229:         for (;;) {
230:             bool remof =  DataRemoteRetriever::is_pending(0);
231:             bool locaf =     DataLocalScanner::is_pending();
232:             bool rangf = DataRemoteController::is_pending();
233:             bool workf =  DemandReconstituter::is_pending();
234:
235:             if (!(remof || locaf || rangf || workf)) break;
236:
237:             if (remof) FD_SET(REMO, &r); else FD_CLR(REMO, &r);
238:             if (locaf) FD_SET(LOCA, &r); else FD_CLR(LOCA, &r);
239:             if (rangf) FD_SET(RANG, &w); else FD_CLR(RANG, &w);
240:             if (workf) FD_SET(WORK, &w); else FD_CLR(WORK, &w);
241:
242:             int s = select(MAXX, &r, &w, &x, 0);
243:             if (s == -1 && errno != EINTR)
244:                 throw AssertionError("Select loop failure");
245:
246:             if (FD_ISSET(REMO, &r))  DataRemoteRetriever::select_processing(0);
247:             if (FD_ISSET(LOCA, &r))     DataLocalScanner::select_processing();
248:             if (FD_ISSET(RANG, &w)) DataRemoteController::select_processing();
249:             if (FD_ISSET(WORK, &w))  DemandReconstituter::select_processing();
250:         }
251:     }
252:
253:     if (RZ::debug || RZ::verbose) std::cerr << std::endl;
254:
255:     // ######## reap all the children ########
256:
257:     DemandReconstituter::assure_finished();
258:     if (keepopenfd) RZ::closer(keepopenfd);
259:
260:     // ######## summary statistics ########
261:
262:     using DataLocalScanner::local_total;
263:     using DataRemoteRetriever::remote_total;
264:     using RZ::rz_wrapper;
265:     
266:     if (RZ::debug || RZ::verbose) {
267:         float scend = 0.0;
268:         if (rz_wrapper.sc_end) scend = 100.0 * local_total / rz_wrapper.sc_end;
269:
270:         float rzend = 100.0;
271:         if (rz_wrapper.rz_end) rzend = 100.0
272:             * (remote_total - rz_wrapper.wrapper_size) / rz_wrapper.rz_end;
273:
274:         std::cerr
275:             <<"  processed "
276:             <<"< locally: " 
277:                 <<std::setw(2) <<std::setprecision(2) <<std::setfill('0')
278:                 <<scend <<"% <"
279:             <<"> remotely: " 
280:                 <<std::setw(2) <<std::setprecision(2) <<std::setfill('0') 
281:                 <<rzend <<"% >"
282:             <<std::endl;
283:     }
284: }
285:
286: // ######## ######## ######## ######## ######## ######## ######## ########
287:
288: // DemandMediator.cc # mediates between data driven and demand driven processes