RZ

001: // PullReconstitute.cc # the actual synthesize algorithm
002:
003: #include "PullReconstitute.hh"
004:
005: // ######## ######## ######## ######## ######## ######## ######## ########
006:
007: //#include <cassert>
008:
009: //#include <unistd.h>
010: #include <sys/stat.h>
011: //#include <sys/mman.h>
012:
013: #include <string>
014: #include <sstream>
015: #include <iostream>
016: #include <vector>
017: #include <algorithm>
018:
019: #include "RZError.hh"
020: #include "RZNetOrder.hh"
021: #include "RZIO.hh"
022: #include "RZMagic.hh"
023: #include "AlgoZlib.hh"
024:
025: using namespace RZ;
026:
027: // ###################### Reconstitute the source stream ######################
028:
029: void RZ::reconstitute(int reconsfd)
030: {
031:     // read initial >>>>
032:     char whence[ 4 ];
033:     //fd_read(0, whence, 4);
034:     std::cin.read(whence, 4);
035:     if (std::cin.fail())
036:         throw AssertionError(
037:             "Interprocess communication failure before block 0");
038:     if (!std::equal(whence, whence + 4, ">>>>"))
039:         throw AssertionError(
040:             "Interprocess communication failure before block 0");
041:
042:     struct Sums const sums(RZ::rz_wrapper);
043:
044:     // set up the decompression/recompression streams
045:     struct Zlib::Inflate z0("Decompression");
046:     struct Zlib::Deflate z1("Recompression");
047:     int s;
048:
049:     // the indices
050:     int rzi, rzr, sci, scr;
051:
052:     // leader
053:     if (sc_off != 0 || rz_off != 0)
054:         throw ContentError("I don't know how to skip an initial offset");
055:
056:     // data buffers
057:     char buffer8KB[ 8193 ];
058:     char buffer[ 16385 ];
059:     char deflated[ 16385 ];
060:     char inflated8KB[ 8193 ];
061:     char * const inflated = inflated8KB;
062:
063:     // synthesize the local and remote data blocks
064:     int bki;
065:     for (
066:         bki = 0, rzi = 0, sci = 0;
067:         bki < nblks && (rzr = sigtab[ bki ].rz_to - rzi);
068:         ++bki, rzi += rzr, sci += 8192)
069:     {
070:         //fd_read(0, whence, 4);
071:         std::cin.read(whence, 4);
072:         if (std::cin.fail()) {
073:             std::ostringstream err;
074:             err << "Interprocess communication failure at block " << bki;
075:             throw AssertionError(err.str());
076:         }
077:         if (std::equal(whence, whence + 4, "<<<<")) {
078:             //fd_read(0, buffer8KB, 8192);
079:             std::cin.read(buffer8KB, 8192);
080:             if (std::cin.fail()) {
081:                 std::ostringstream err;
082:                 err << "Interprocess communication failure at local block "
083:                     << bki;
084:                 throw AssertionError(err.str());
085:             }
086:
087:             // deflated <= deflate(buffer8KB)
088:             z1.z_deflate.next_in = (Zlib::Bytef*)buffer8KB;
089:             z1.z_deflate.avail_in = 8192;
090:             z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
091:             z1.z_deflate.avail_out = rzr + 1;
092:             s = z1.zlib_deflate(Z_SYNC_FLUSH);
093:             if (s != Z_OK) {
094:                 std::ostringstream err;
095:                 err << "Recompression synchronization failed at block " << bki
096:                     << ": " << s;
097:                 throw ContentError(err.str());
098:             }
099:             if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
100:                 std::ostringstream err;
101:                 err << "Mismatch in recompression length at block " << bki;
102:                 throw ContentError(err.str());
103:             }
104:
105:             rzc = crc32(rzc, (Zlib::Bytef*)deflated, rzr);
106:
107:             // inflated8KB <= inflate(deflated);
108:             z0.z_inflate.next_in = (Zlib::Bytef*)deflated;
109:             z0.z_inflate.avail_in = rzr;
110:             z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
111:             z0.z_inflate.avail_out = 8192 + 1;
112:             s = z0.zlib_inflate();
113:             if (!(s == Z_OK || s == Z_STREAM_END)) {
114:                 std::ostringstream err;
115:                 err << "Decompression inflation failed at block " << bki
116:                     << ": " << s;
117:                 throw ContentError(err.str());
118:             }
119:             if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1))
120:             {
121:                 std::ostringstream err;
122:                 err << "Roundtrip inflation failure at block " << bki;
123:                 throw ContentError(err.str());
124:             }
125:
126:             fd_write(reconsfd, inflated8KB, 8192);
127:         }
128:         else if (std::equal(whence, whence + 4, ">>>>")) {
129:             //fd_read(0, buffer, rzr);
130:             std::cin.read(buffer, rzr);
131:             if (std::cin.fail()) {
132:                 std::ostringstream err;
133:                 err << "Interprocess communication failure at remote block "
134:                     << bki;
135:                 throw AssertionError(err.str());
136:             }
137:
138:             rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
139:
140:             // inflated8KB <= inflate(buffer)
141:             z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
142:             z0.z_inflate.avail_in = rzr;
143:             z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
144:             z0.z_inflate.avail_out = 8192 + 1;
145:             s = z0.zlib_inflate();
146:             if (!(s == Z_OK || s == Z_STREAM_END)) {
147:                 std::ostringstream err;
148:                 err << "Decompression inflation failed at block " << bki
149:                     << ": " << s;
150:                 throw ContentError(err.str());
151:             }
152:             if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1)) {
153:                 std::ostringstream err;
154:                 err << "Inflated block too small at block " << bki;
155:                 throw ContentError(err.str());
156:             }
157:
158:             // deflated <= deflate(inflated8KB)
159:             z1.z_deflate.next_in = (Zlib::Bytef*)inflated8KB;
160:             z1.z_deflate.avail_in = 8192;
161:             z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
162:             z1.z_deflate.avail_out = rzr + 1;
163:             s = z1.zlib_deflate(Z_SYNC_FLUSH);
164:             if (s != Z_OK) {
165:                 std::ostringstream err;
166:                 err << "Recompression deflation failed at block " << bki
167:                     << ": " << s;
168:                 throw ContentError(err.str());
169:             }
170:             if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
171:                 std::ostringstream err;
172:                 err << "Roundtrip deflation failure at block " << bki
173:                     << std::endl;
174:                 err << "  avail_in = " << z1.z_deflate.avail_in
175:                     << "  avail_out = " << z1.z_deflate.avail_out
176:                     << std::endl;
177:                 err << "  bki = " << bki << "  nblks = " << nblks
178:                     << std::endl;
179:                 throw ContentError(err.str());
180:             }
181:
182:             fd_write(reconsfd, inflated8KB, 8192);
183:
184:             if (!( sums.a.checksum32(inflated8KB) == sigtab[ bki ].sig_a
185:                 && sums.b.checksum32(inflated8KB) == sigtab[ bki ].sig_b
186:                 && sums.c.checksum32(inflated8KB) == sigtab[ bki ].sig_c))
187:             {
188:                 std::ostringstream err;
189:                 err << "Inflated block signature mismatch at block " << bki;
190:                 throw ContentError(err.str());
191:             }
192:         }
193:         else {
194:             std::ostringstream err;
195:             err << "Interprocess communication failure at block " << bki;
196:             throw AssertionError(err.str());
197:         }
198:     }
199:
200:     //if (fd_read_if(0, whence, 4)) {
201:     std::cin.read(whence, 4);
202:     if (std::cin.fail()) {
203:         if (!std::cin.eof())
204:             throw AssertionError(
205:                 "Interprocess communication failure after last block");
206:     }
207:     else {
208:         if (!std::equal(whence, whence + 4, ">>>>"))
209:             throw AssertionError(
210:                 "Interprocess communication failure after last block");
211:
212:         // append the remaining bytes which are not divisible by 8192
213:         //for (
214:         //      ;
215:         //      rzr = read(0, buffer, 16384);
216:         //      rzi += rzr)
217:         //{
218:         
219:         rzr = rzend - rzi; // lose a sanity check -- rotten iostreams
220:                    // can't even locate the end of the stream
221:
222:         // block until the entire input stream is exhausted
223:         std::cin.read(buffer, rzr);
224:         if (std::cin.fail()) {
225:             std::ostringstream err;
226:             err << "Interprocess communication failure at the end of file"
227:                 << std::endl;
228:             err << "(" << rzr << ")";
229:             throw AssertionError(err.str());
230:         }
231:         
232:         rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
233:
234:         // inflated <= inflate(buffer)
235:         z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
236:         z0.z_inflate.avail_in = rzr;
237:         z0.z_inflate.next_out = (Zlib::Bytef*)inflated;
238:         z0.z_inflate.avail_out = 8192 + 1;
239:         s = z0.zlib_inflate();
240:         if (!(s == Z_OK || s == Z_STREAM_END)) {
241:             std::ostringstream err;
242:             err << "Decompression inflation failed: " << s;
243:             throw ContentError(err.str());
244:         }
245:
246:         scr = (8192 + 1 - z0.z_inflate.avail_out);
247:
248:         fd_write(reconsfd, inflated, scr);
249:
250:         rzi += rzr;
251:         sci += scr;
252:     }
253:
254:     // ensure the entire input stream is exhausted
255:     if (!std::cin.good()) {
256:         std::ostringstream err;
257:         err << "Interprocess communication failure at the end of file"
258:             << std::endl;
259:         err << "(not good)";
260:         throw AssertionError(err.str());
261:     }
262:     std::cin.read(buffer, 1);
263:     if (!std::cin.eof()) {
264:         std::ostringstream err;
265:         err << "Interprocess communication failure at the end of file"
266:             << std::endl;
267:         err << "(no more please)" << std::endl;
268:         throw AssertionError(err.str());
269:     }
270:
271:     // sanity checks
272:     //if (rzend != rzi) {
273:     //        std::ostringstream err;
274:     //        err << "Mismatch in calculated compressed stream size: "
275:     //      << rzi << " =/= " << rzend;
276:     //        throw ContentError(err.str());
277:     //}
278:     if (scend != sci) {
279:         std::ostringstream err;
280:         err << "Mismatch in calculated source file size: "
281:             << sci << " =/= " << scend;
282:         throw ContentError(err.str());
283:     }
284:     if (rzcrc != rzc)
285:         throw ContentError("Mismatch in calculated compressed stream CRC");
286:
287:     // one final sanity check
288:     lseek(reconsfd, 0 , 0);
289:     {
290:         struct stat stbuf;
291:         if (fstat(reconsfd, &stbuf))
292:             IOError::what_if("Can't stat file", 1);
293:         IOError::what_if("Can't stat file size", !stbuf.st_size);
294:         if (scend != stbuf.st_size)
295:             throw ContentError("Mismatch in calculated source file size");
296:     }
297: }
298:
299: // ######## ######## ######## ######## ######## ######## ######## ########
300:
301: // Reconstitute.cc # the actual synthesize algorithm