RZ

001: // Reconstitute.cc # the actual synthesize algorithm
002:
003: #include "Reconstitute.hh"
004:
005: // ######## ######## ######## ######## ######## ######## ######## ########
006:
007: //#include <cassert>
008:
009: //#include <unistd.h>
010: #include <sys/stat.h>
011: //#include <sys/mman.h>
012:
013: #include <string>
014: #include <sstream>
015: #include <iostream>
016: #include <vector>
017: #include <algorithm>
018:
019: #include "RZError.hh"
020: #include "RZNetOrder.hh"
021: #include "RZIO.hh"
022: #include "RZMagic.hh"
023: #include "AlgoZlib.hh"
024:
025: using namespace RZ;
026:
027: // ###################### Reconstitute the source stream ######################
028:
029: void RZ::reconstitute(int reconsfd)
030: {
031:     // read initial >>>>
032:     char whence[ 4 ];
033:     //fd_read(0, whence, 4);
034:     std::cin.read(whence, 4);
035:     if (std::cin.fail())
036:         throw AssertionError(
037:             "Interprocess communication failure before block 0");
038:     if (!std::equal(whence, whence + 4, ">>>>"))
039:         throw AssertionError(
040:             "Interprocess communication failure before block 0");
041:
042:     // read the header --- the magic and version number
043:     char header[ 32 ];
044:     //fd_read(0, header, 32);
045:     std::cin.read(header, 32);
046:     if (std::cin.fail())
047:         throw ContentError("Can't read the file header");
048:     if (!std::equal(RZMagic::file_magic, RZMagic::file_magic + 16, header))
049:         throw ContentError("The magic goes away --- not an rz file");
050:
051:     struct Sums const sums(net4in< RZModuli >(header + 16));
052:
053:     // read the stream structure
054:     char structure[ 32 ];
055:     //fd_read(0, structure, 32);
056:     std::cin.read(structure, 32);
057:     if (std::cin.fail())
058:         throw ContentError("Can't read the file stream structure");
059:     
060:     int nblks = net_in(&structure[  0 ]); // num complete 8192K src blks
061:     int rzcrc = net_in(&structure[  4 ]); // crc of the compressed stream
062:     int scend = net_in(&structure[  8 ]); // end (size) source stream
063:     int rzend = net_in(&structure[ 12 ]); // end (size) compressed stream
064:
065:     //int ncont = net_in(&structure[ 16 ]); // num blks before comp reinits
066:     int rzc   = net_in(&structure[ 20 ]); // init (zero) crc value
067:     int scoff = net_in(&structure[ 24 ]); // init offset src before blk 0
068:     int rzoff = net_in(&structure[ 28 ]); // init offset comprs bef blk 0
069:
070:     if ((scend >> 13) != nblks)
071:         throw ContentError("Quotient is wrong!");
072:
073:     // read the signature table
074:     std::vector< char > signature(16 * nblks);
075:     //fd_read(0, &signature[ 0 ], 16 * nblks);
076:     std::cin.read(&signature[ 0 ], 16 * nblks);
077:     if (std::cin.fail())
078:         throw ContentError("Can't read the file signature table");
079:     std::vector< struct RZSignatureTable::SignatureEntry > sigtab;
080:     sigtab.reserve(nblks);
081:     for (int i = 0; i < nblks; ++i) {
082:         char *buffer = &signature[ i * 16 ];
083:
084:         struct RZSignatureTable::SignatureEntry e;
085:         e.sig_a = net_in(&buffer[  0 ]);
086:         e.sig_b = net_in(&buffer[  4 ]);
087:         e.sig_c = net_in(&buffer[  8 ]);
088:         e.rz_to = net_in(&buffer[ 12 ]);
089:         sigtab.push_back(e);
090:     }
091:
092:     // set up the decompression/recompression streams
093:     struct Zlib::Inflate z0("Decompression");
094:     struct Zlib::Deflate z1("Recompression");
095:     int s;
096:
097:     // the indices
098:     int rzi, rzr, sci, scr;
099:
100:     // leader
101:     if (scoff != 0 || rzoff != 0)
102:         throw ContentError("I don't know how to skip an initial offset");
103:
104:     // data buffers
105:     char buffer8KB[ 8193 ];
106:     char buffer[ 16385 ];
107:     char deflated[ 16385 ];
108:     char inflated8KB[ 8193 ];
109:     char * const inflated = inflated8KB;
110:
111:     // synthesize the local and remote data blocks
112:     int bki;
113:     for (
114:         bki = 0, rzi = 0, sci = 0;
115:         bki < nblks && (rzr = sigtab[ bki ].rz_to - rzi);
116:         ++bki, rzi += rzr, sci += 8192)
117:     {
118:         //fd_read(0, whence, 4);
119:         std::cin.read(whence, 4);
120:         if (std::cin.fail()) {
121:             std::ostringstream err;
122:             err << "Interprocess communication failure at block " << bki;
123:             throw AssertionError(err.str());
124:         }
125:         if (std::equal(whence, whence + 4, "<<<<")) {
126:             //fd_read(0, buffer8KB, 8192);
127:             std::cin.read(buffer8KB, 8192);
128:             if (std::cin.fail()) {
129:                 std::ostringstream err;
130:                 err << "Interprocess communication failure at local block "
131:                     << bki;
132:                 throw AssertionError(err.str());
133:             }
134:
135:             // deflated <= deflate(buffer8KB)
136:             z1.z_deflate.next_in = (Zlib::Bytef*)buffer8KB;
137:             z1.z_deflate.avail_in = 8192;
138:             z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
139:             z1.z_deflate.avail_out = rzr + 1;
140:             s = z1.zlib_deflate(Z_SYNC_FLUSH);
141:             if (s != Z_OK) {
142:                 std::ostringstream err;
143:                 err << "Recompression synchronization failed at block " << bki
144:                     << ": " << s;
145:                 throw ContentError(err.str());
146:             }
147:             if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
148:                 std::ostringstream err;
149:                 err << "Mismatch in recompression length at block " << bki;
150:                 throw ContentError(err.str());
151:             }
152:
153:             rzc = crc32(rzc, (Zlib::Bytef*)deflated, rzr);
154:
155:             // inflated8KB <= inflate(deflated);
156:             z0.z_inflate.next_in = (Zlib::Bytef*)deflated;
157:             z0.z_inflate.avail_in = rzr;
158:             z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
159:             z0.z_inflate.avail_out = 8192 + 1;
160:             s = z0.zlib_inflate();
161:             if (!(s == Z_OK || s == Z_STREAM_END)) {
162:                 std::ostringstream err;
163:                 err << "Decompression inflation failed at block " << bki
164:                     << ": " << s;
165:                 throw ContentError(err.str());
166:             }
167:             if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1))
168:             {
169:                 std::ostringstream err;
170:                 err << "Roundtrip inflation failure at block " << bki;
171:                 throw ContentError(err.str());
172:             }
173:
174:             fd_write(reconsfd, inflated8KB, 8192);
175:         }
176:         else if (std::equal(whence, whence + 4, ">>>>")) {
177:             //fd_read(0, buffer, rzr);
178:             std::cin.read(buffer, rzr);
179:             if (std::cin.fail()) {
180:                 std::ostringstream err;
181:                 err << "Interprocess communication failure at remote block "
182:                     << bki;
183:                 throw AssertionError(err.str());
184:             }
185:
186:             rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
187:
188:             // inflated8KB <= inflate(buffer)
189:             z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
190:             z0.z_inflate.avail_in = rzr;
191:             z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
192:             z0.z_inflate.avail_out = 8192 + 1;
193:             s = z0.zlib_inflate();
194:             if (!(s == Z_OK || s == Z_STREAM_END)) {
195:                 std::ostringstream err;
196:                 err << "Decompression inflation failed at block " << bki
197:                     << ": " << s;
198:                 throw ContentError(err.str());
199:             }
200:             if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1)) {
201:                 std::ostringstream err;
202:                 err << "Inflated block too small at block " << bki;
203:                 throw ContentError(err.str());
204:             }
205:
206:             // deflated <= deflate(inflated8KB)
207:             z1.z_deflate.next_in = (Zlib::Bytef*)inflated8KB;
208:             z1.z_deflate.avail_in = 8192;
209:             z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
210:             z1.z_deflate.avail_out = rzr + 1;
211:             s = z1.zlib_deflate(Z_SYNC_FLUSH);
212:             if (s != Z_OK) {
213:                 std::ostringstream err;
214:                 err << "Recompression deflation failed at block " << bki
215:                     << ": " << s;
216:                 throw ContentError(err.str());
217:             }
218:             if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
219:                 std::ostringstream err;
220:                 err << "Roundtrip deflation failure at block " << bki
221:                     << std::endl;
222:                 err << "  avail_in = " << z1.z_deflate.avail_in
223:                     << "  avail_out = " << z1.z_deflate.avail_out
224:                     << std::endl;
225:                 err << "  bki = " << bki << "  nblks = " << nblks
226:                     << std::endl;
227:                 throw ContentError(err.str());
228:             }
229:
230:             fd_write(reconsfd, inflated8KB, 8192);
231:
232:             if (!( sums.a.checksum32(inflated8KB) == sigtab[ bki ].sig_a
233:                 && sums.b.checksum32(inflated8KB) == sigtab[ bki ].sig_b
234:                 && sums.c.checksum32(inflated8KB) == sigtab[ bki ].sig_c))
235:             {
236:                 std::ostringstream err;
237:                 err << "Inflated block signature mismatch at block " << bki;
238:                 throw ContentError(err.str());
239:             }
240:         }
241:         else {
242:             std::ostringstream err;
243:             err << "Interprocess communication failure at block " << bki;
244:             throw AssertionError(err.str());
245:         }
246:     }
247:
248:     //if (fd_read_if(0, whence, 4)) {
249:     std::cin.read(whence, 4);
250:     if (std::cin.fail()) {
251:         if (!std::cin.eof())
252:             throw AssertionError(
253:                 "Interprocess communication failure after last block");
254:     }
255:     else {
256:         if (!std::equal(whence, whence + 4, ">>>>"))
257:             throw AssertionError(
258:                 "Interprocess communication failure after last block");
259:
260:         // append the remaining bytes which are not divisible by 8192
261:         //for (
262:         //      ;
263:         //      rzr = read(0, buffer, 16384);
264:         //      rzi += rzr)
265:         //{
266:         
267:         rzr = rzend - rzi; // lose a sanity check -- rotten iostreams
268:                    // can't even locate the end of the stream
269:
270:         // block until the entire input stream is exhausted
271:         std::cin.read(buffer, rzr);
272:         if (std::cin.fail()) {
273:             std::ostringstream err;
274:             err << "Interprocess communication failure at the end of file"
275:                 << std::endl;
276:             err << "(" << rzr << ")";
277:             throw AssertionError(err.str());
278:         }
279:         
280:         rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
281:
282:         // inflated <= inflate(buffer)
283:         z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
284:         z0.z_inflate.avail_in = rzr;
285:         z0.z_inflate.next_out = (Zlib::Bytef*)inflated;
286:         z0.z_inflate.avail_out = 8192 + 1;
287:         s = z0.zlib_inflate();
288:         if (!(s == Z_OK || s == Z_STREAM_END)) {
289:             std::ostringstream err;
290:             err << "Decompression inflation failed: " << s;
291:             throw ContentError(err.str());
292:         }
293:
294:         scr = (8192 + 1 - z0.z_inflate.avail_out);
295:
296:         fd_write(reconsfd, inflated, scr);
297:
298:         rzi += rzr;
299:         sci += scr;
300:     }
301:
302:     // ensure the entire input stream is exhausted
303:     if (!std::cin.good()) {
304:         std::ostringstream err;
305:         err << "Interprocess communication failure at the end of file"
306:             << std::endl;
307:         err << "(not good)";
308:         throw AssertionError(err.str());
309:     }
310:     std::cin.read(buffer, 1);
311:     if (!std::cin.eof()) {
312:         std::ostringstream err;
313:         err << "Interprocess communication failure at the end of file"
314:             << std::endl;
315:         err << "(no more please)" << std::endl;
316:         throw AssertionError(err.str());
317:     }
318:
319:     // sanity checks
320:     //if (rzend != rzi) {
321:     //        std::ostringstream err;
322:     //        err << "Mismatch in calculated compressed stream size: "
323:     //      << rzi << " =/= " << rzend;
324:     //        throw ContentError(err.str());
325:     //}
326:     if (scend != sci) {
327:         std::ostringstream err;
328:         err << "Mismatch in calculated source file size: "
329:             << sci << " =/= " << scend;
330:         throw ContentError(err.str());
331:     }
332:     if (rzcrc != rzc)
333:         throw ContentError("Mismatch in calculated compressed stream CRC");
334:
335:     // one final sanity check
336:     lseek(reconsfd, 0 , 0);
337:     {
338:         struct stat stbuf;
339:         if (fstat(reconsfd, &stbuf))
340:             IOError::what_if("Can't stat file", 1);
341:         IOError::what_if("Can't stat file size", !stbuf.st_size);
342:         if (scend != stbuf.st_size)
343:             throw ContentError("Mismatch in calculated source file size");
344:     }
345: }
346:
347: // ######## ######## ######## ######## ######## ######## ######## ########
348:
349: // Reconstitute.cc # the actual synthesize algorithm