RZ

001: // DemandReconstituter.cc # the actual synthesize algorithm
002:
003: #include <unistd.h>
004: #include <sys/uio.h>
005:
006: #include <string>
007: #include <iostream>
008:
009: #include "RZError.hh"
010: #include "RZNetOrder.hh"
011:
012: #include "GetoptVerbose.hh"
013:
014: #include "DemandWrapper.hh"
015: #include "DataLocalScanner.hh"
016: #include "DataRemoteController.hh"
017: #include "DataRemoteRetriever.hh"
018: #include "DemandMediator.hh"
019: #include "DemandReconstituter.hh"
020:
021: namespace DemandReconstituter {
022:     struct RZ::Worker WORKER;
023:
024:     std::vector< char > rz_wrapper_buf;
025:
026:     std::vector< std::vector< char > >::iterator rz_buf, sc_buf;
027: }
028:
029: // ######## ######## ######## ######## ######## ######## ######## ########
030:
031: namespace DemandReconstituter {
032:     static int bkend; // INVARIANT 0 <= $bki <= $bkf <= $bkend
033: }
034:
035: // ######## ######## ######## ######## ######## ######## ######## ########
036:
037: namespace DemandReconstituter {
038:     static int bkf = 0; // INVARIANT 0 <= bki <= bkf <= bkend
039:
040:     bool is_need(void)
041:     {
042:         return bkf < bkend;
043:     }
044:
045:     void advance_pointer(void)
046:     {
047:         for (; bkf < bkend; ++bkf) {
048:             if (!rz_buf[ bkf ].empty()) continue;
049:             if (!sc_buf[ bkf ].empty()) continue;
050:             break;
051:         }
052:         if (bkf == bkend) {
053:             DataLocalScanner::local_close();
054:             DataRemoteRetriever::remote_close();
055:         }
056:     }
057: }
058:
059: // ######## ######## ######## ######## ######## ######## ######## ########
060:
061: namespace DemandReconstituter {
062:
063:     // ######## ######## ######## ######## ######## ######## ########
064:     // STREAM FORMAT
065:     //   1 word header "<<<<" as local -- ">>>>" as remote
066:     //   either a local or remote block
067:
068:     static char whencelocal[] = "<<<<", whenceremote[] = ">>>>";
069:
070:     static struct iovec vec[2];
071:
072:     static struct iovec *vbase;
073:     static unsigned int vlen = 0;
074:
075:     static void setbuffervector(bool islocal, std::vector< char > &body)
076:     {
077:         if(body.empty()) throw RZ::AssertionError("Body empty");
078:
079:         vec[0].iov_base = islocal ? whencelocal : whenceremote;
080:         vec[0].iov_len = 4;
081:         vec[1].iov_base = &body.front();
082:         vec[1].iov_len = body.size();
083:
084:         vbase = vec; // buffer remaining
085:         vlen = 2;    // buffer remaining
086:     }
087: }
088:
089: // ######## ######## ######## ######## ######## ######## ######## ########
090:
091: namespace DemandReconstituter {
092:     static int bki = 0; // INVARIANT 0 <= $bki <= $bkf <= $bkend
093:
094:     void worker_close(void)
095:     {
096:         DataLocalScanner::local_close();
097:         DataRemoteRetriever::remote_close();
098:         WORKER.close();
099:         //if (WORKER.close_worker()) throw RZ::AssertionError();
100:           // $! ? warn "Warning $!" : die "\nWorker died at block $bki\n";
101:     }
102:
103:     void assure_finished(void)
104:     {
105:         worker_close();
106:         if (vlen || bki < bkend) throw RZ::AssertionError("Premature death");
107:     }
108:
109:     bool is_pending(void)
110:     {
111:         if (vlen) return true;
112:         if (bki == bkend) return false;
113:         if (!rz_buf[ bki ].empty()) {
114:             if (RZ::debug || RZ::verbose) std::cerr << '>';
115:             if (rz_buf[ bki ].size() != RZ::rz_tab.rz_size(bki))
116:                 throw RZ::AssertionError("Remote block not the right size.");
117:             setbuffervector(false, rz_buf[ bki++ ]);
118:             return true;
119:         }
120:         if (!sc_buf[ bki ].empty()) {
121:             if (RZ::debug || RZ::verbose) std::cerr << '<';
122:             if (sc_buf[ bki ].size() != RZ::rz_wrapper.block_size)
123:                 throw RZ::AssertionError("Local block not the right size.");
124:             setbuffervector(true, sc_buf[ bki++ ]);
125:             return true;
126:         }
127:         return false;
128:     }
129: }
130:
131: // ######## ######## ######## ######## ######## ######## ######## ########
132:
133: namespace DemandReconstituter {
134:     void select_processing(void)
135:     {
136:         int r = writev(WORKER.file_no, vbase, vlen);
137:         if (r == -1 && errno == EINTR) return;
138:         RZ::IOError::what_if("Can't write to worker!");
139:         if (!r) throw RZ::AssertionError("Write failure");
140:         for (; vlen; ++vbase, --vlen)
141:             if (vbase->iov_len <= (unsigned int)r)
142:                 r -= vbase->iov_len;
143:             else {
144:                 vbase->iov_base = (char *)(vbase->iov_base) + r;
145:                 vbase->iov_len -= r;
146:                 break;
147:             }
148:     }
149: }
150:
151: // ######## ######## ######## ######## ######## ######## ######## ########
152:
153: namespace DemandReconstituter {
154:     static std::vector< std::vector< char > > rzcontainer, sccontainer;
155:
156:     void require(void)
157:     {
158:         bkf = bki = RZ::rz_tab.rz_at_begin;
159:         bkend = RZ::rz_tab.rz_at_end;
160:
161:         rzcontainer.resize(bkend - bki);
162:         sccontainer.resize(bkend - bki);
163:
164:         rz_buf = rzcontainer.begin();
165:         sc_buf = sccontainer.begin();
166:         if (bki) {
167:             ++rz_buf;
168:             ++sc_buf;
169:         }
170:
171:         setbuffervector(false, rz_wrapper_buf);
172:     }
173: }
174:
175: // DemandReconstituter.cc # the actual synthesize algorithm