001:
002:
003: #include "DemandMediator.hh"
004:
005:
006:
007: #include <unistd.h>
008: #include <sys/stat.h>
009: #include <sys/time.h>
010: #include <sys/types.h>
011: #include <fcntl.h>
012:
013: #include <iostream>
014: #include <iomanip>
015: #include <sstream>
016: #include <vector>
017:
018: #include "RZError.hh"
019: #include "RZNetOrder.hh"
020: #include "RZIO.hh"
021: #include "RZMagic.hh"
022: #include "MMap.hh"
023:
024: #include "GetoptVerbose.hh"
025: #include "DataPackets.hh"
026:
027: #include "ControlRemote.hh"
028: #include "LocalScan.hh"
029:
030: #include "DemandWrapper.hh"
031: #include "DataLocalScanner.hh"
032: #include "DataRemoteController.hh"
033: #include "DataRemoteRetriever.hh"
034: #include "DemandMediator.hh"
035: #include "DemandReconstituter.hh"
036:
037: using namespace RZ;
038:
039:
040:
041: void DemandMediator::demand_mediator(
042: std::string const included,
043: int const keepopenfd, int const rangefd, int const remotefd,
044: int const reconstitutedfd,
045: std::vector< int >::iterator givenfdi,
046: std::vector< int >::iterator const givenfdiend,
047: std::vector< std::string >::iterator giveni,
048: std::vector< std::string >::iterator const giveniend)
049: {
050:
051: DataRemoteController::RANGE.file_no = rangefd;
052: DataRemoteRetriever::REMOTE.file_no = remotefd;
053:
054:
055:
056: DataRemoteRetriever::require();
057:
058:
059:
060:
061:
062:
063: {
064: int pfd[2];
065: if (pipe(pfd)) throw RZ::IOError("Couldn't create pipe");
066: int pid = fork();
067: if (pid == -1) throw RZ::AssertionError(
068: "Can't start file reconstituter child process");
069: else if (pid) {
070: RZ::closer(pfd[0]);
071: DemandReconstituter::WORKER.file_no = pfd[1];
072: DemandReconstituter::WORKER.process_id = pid;
073: }
074: else {
075: RZ::closer(pfd[1]);
076: if (dup2(pfd[0],0) != 0) throw RZ::IOError("Can't dup to stdin");
077: RZ::closer(pfd[0]);
078: RZ::closer(DataRemoteRetriever::REMOTE.file_no);
079: RZ::closer(DataRemoteController::RANGE.file_no);
080: if (keepopenfd) RZ::closer(keepopenfd);
081:
082:
083: std::stringstream r; r << reconstitutedfd;
084: std::string r0(included + "/Reconstitute.gateway");
085: std::string r1(included + "/DemandReconstituter");
086: execl(r0.c_str(), r1.c_str(), r.str().c_str(), NULL);
087: throw RZ::IOError("Reconstituter failure");
088:
089:
090:
091: }
092: }
093:
094: RZ::closer(reconstitutedfd);
095:
096:
097:
098:
099:
100:
101: if (RZ::debug) std::cerr << " scan <- givens" << std::endl;
102:
103:
104:
105: char volatile * const ltab_mmap = RZ::anonymous_memory_map(
106: RZ::rz_wrapper.n_blks, true, "local blocks");
107: char volatile * const rtab_mmap = RZ::anonymous_memory_map(
108: RZ::rz_wrapper.n_blks, true, "remote blocks");
109:
110:
111:
112: {
113: int pfd[2];
114: if (pipe(pfd)) throw RZ::AssertionError("Couldn't create pipe");
115: int pid = fork();
116: if (pid == -1) throw RZ::AssertionError(
117: "Can't start local scanner child process");
118: else if (pid) {
119: closer(pfd[1]);
120: DataLocalScanner::LOCAL.file_no = pfd[0];
121: DataLocalScanner::LOCAL.process_id = pid;
122: }
123: else {
124: RZ::closer(pfd[0]);
125: RZ::closer(DemandReconstituter::WORKER.file_no);
126: RZ::closer(DataRemoteRetriever::REMOTE.file_no);
127: RZ::closer(DataRemoteController::RANGE.file_no);
128: if (keepopenfd) RZ::closer(keepopenfd);
129: if (DataRemoteController::RANGE.file_no != 1) RZ::closer(1);
130:
131:
132: RZ::Scanner scanner(pfd[1], RZ::rz_wrapper, ltab_mmap, rtab_mmap);
133:
134:
135: for (; givenfdi < givenfdiend; ++givenfdi) {
136: int givenfile = *givenfdi;
137: try {
138: scanner.scan_mmap(givenfile);
139: }
140: catch (MMapError x) {
141: scanner.scan_circular(givenfile);
142: }
143: RZ::closer(givenfile);
144: }
145:
146:
147: for (; giveni < giveniend; ++giveni) {
148: int givenfile = open(&(*giveni)[0], O_RDONLY | O_NOATIME);
149: try {
150: scanner.scan_mmap(givenfile);
151: }
152: catch (MMapError x) {
153: scanner.scan_circular(givenfile);
154: }
155: RZ::closer(givenfile);
156: }
157:
158: _exit(0);
159:
160:
161:
162:
163: }
164: }
165:
166: DataLocalScanner::require();
167: DataRemoteController::require(ltab_mmap, rtab_mmap);
168:
169:
170:
171: if (RZ::debug) std::cerr << " synthesis -> tempfiles" << std::endl;
172:
173: DemandReconstituter::require();
174:
175:
176:
177:
178: if (RZ::debug || RZ::verbose)
179: std::cerr << " processing < local <> remote >" << std::endl;
180:
181: int REMO = DataRemoteRetriever::REMOTE.file_no;
182: int LOCA = DataLocalScanner::LOCAL.file_no;
183: int RANG = DataRemoteController::RANGE.file_no;
184: int WORK = DemandReconstituter::WORKER.file_no;
185: int MAXX = std::max(std::max(REMO, LOCA), std::max(RANG, WORK)) + 1;
186:
187:
188:
189: {
190: struct timeval timer = { 0, 10000 };
191:
192: fd_set r, w, x;
193: FD_ZERO(&r);
194: FD_ZERO(&w);
195: FD_ZERO(&x);
196:
197: while (timer.tv_usec > 0) {
198: bool remof = DataRemoteRetriever::is_pending(1);
199: bool locaf = DataLocalScanner::is_pending();
200: bool workf = DemandReconstituter::is_pending();
201:
202: if (!(remof || locaf || workf)) break;
203:
204: if (remof) FD_SET(REMO, &r); else FD_CLR(REMO, &r);
205: if (locaf) FD_SET(LOCA, &r); else FD_CLR(LOCA, &r);
206: if (workf) FD_SET(WORK, &w); else FD_CLR(WORK, &w);
207:
208: int s = select(MAXX, &r, &w, &x, &timer);
209: if (s == -1 && errno != EINTR)
210: throw AssertionError("Select loop failure");
211:
212: if (FD_ISSET(REMO, &r)) DataRemoteRetriever::select_processing(1);
213: if (FD_ISSET(LOCA, &r)) DataLocalScanner::select_processing();
214: if (FD_ISSET(WORK, &w)) DemandReconstituter::select_processing();
215: }
216: }
217:
218: if (DataRemoteController::first_packet_need)
219: DataRemoteController::need_more();
220:
221:
222:
223: {
224: fd_set r, w, x;
225: FD_ZERO(&r);
226: FD_ZERO(&w);
227: FD_ZERO(&x);
228:
229: for (;;) {
230: bool remof = DataRemoteRetriever::is_pending(0);
231: bool locaf = DataLocalScanner::is_pending();
232: bool rangf = DataRemoteController::is_pending();
233: bool workf = DemandReconstituter::is_pending();
234:
235: if (!(remof || locaf || rangf || workf)) break;
236:
237: if (remof) FD_SET(REMO, &r); else FD_CLR(REMO, &r);
238: if (locaf) FD_SET(LOCA, &r); else FD_CLR(LOCA, &r);
239: if (rangf) FD_SET(RANG, &w); else FD_CLR(RANG, &w);
240: if (workf) FD_SET(WORK, &w); else FD_CLR(WORK, &w);
241:
242: int s = select(MAXX, &r, &w, &x, 0);
243: if (s == -1 && errno != EINTR)
244: throw AssertionError("Select loop failure");
245:
246: if (FD_ISSET(REMO, &r)) DataRemoteRetriever::select_processing(0);
247: if (FD_ISSET(LOCA, &r)) DataLocalScanner::select_processing();
248: if (FD_ISSET(RANG, &w)) DataRemoteController::select_processing();
249: if (FD_ISSET(WORK, &w)) DemandReconstituter::select_processing();
250: }
251: }
252:
253: if (RZ::debug || RZ::verbose) std::cerr << std::endl;
254:
255:
256:
257: DemandReconstituter::assure_finished();
258: if (keepopenfd) RZ::closer(keepopenfd);
259:
260:
261:
262: using DataLocalScanner::local_total;
263: using DataRemoteRetriever::remote_total;
264: using RZ::rz_wrapper;
265:
266: if (RZ::debug || RZ::verbose) {
267: float scend = 0.0;
268: if (rz_wrapper.sc_end) scend = 100.0 * local_total / rz_wrapper.sc_end;
269:
270: float rzend = 100.0;
271: if (rz_wrapper.rz_end) rzend = 100.0
272: * (remote_total - rz_wrapper.wrapper_size) / rz_wrapper.rz_end;
273:
274: std::cerr
275: <<" processed "
276: <<"< locally: "
277: <<std::setw(2) <<std::setprecision(2) <<std::setfill('0')
278: <<scend <<"% <"
279: <<"> remotely: "
280: <<std::setw(2) <<std::setprecision(2) <<std::setfill('0')
281: <<rzend <<"% >"
282: <<std::endl;
283: }
284: }
285:
286:
287:
288: