001:
002:
003: #include "Reconstitute.hh"
004:
005:
006:
007:
008:
009:
010: #include <sys/stat.h>
011:
012:
013: #include <string>
014: #include <sstream>
015: #include <iostream>
016: #include <vector>
017: #include <algorithm>
018:
019: #include "RZError.hh"
020: #include "RZNetOrder.hh"
021: #include "RZIO.hh"
022: #include "RZMagic.hh"
023: #include "AlgoZlib.hh"
024:
025: using namespace RZ;
026:
027:
028:
029: void RZ::reconstitute(int reconsfd)
030: {
031:
032: char whence[ 4 ];
033:
034: std::cin.read(whence, 4);
035: if (std::cin.fail())
036: throw AssertionError(
037: "Interprocess communication failure before block 0");
038: if (!std::equal(whence, whence + 4, ">>>>"))
039: throw AssertionError(
040: "Interprocess communication failure before block 0");
041:
042:
043: char header[ 32 ];
044:
045: std::cin.read(header, 32);
046: if (std::cin.fail())
047: throw ContentError("Can't read the file header");
048: if (!std::equal(RZMagic::file_magic, RZMagic::file_magic + 16, header))
049: throw ContentError("The magic goes away --- not an rz file");
050:
051: struct Sums const sums(net4in< RZModuli >(header + 16));
052:
053:
054: char structure[ 32 ];
055:
056: std::cin.read(structure, 32);
057: if (std::cin.fail())
058: throw ContentError("Can't read the file stream structure");
059:
060: int nblks = net_in(&structure[ 0 ]);
061: int rzcrc = net_in(&structure[ 4 ]);
062: int scend = net_in(&structure[ 8 ]);
063: int rzend = net_in(&structure[ 12 ]);
064:
065:
066: int rzc = net_in(&structure[ 20 ]);
067: int scoff = net_in(&structure[ 24 ]);
068: int rzoff = net_in(&structure[ 28 ]);
069:
070: if ((scend >> 13) != nblks)
071: throw ContentError("Quotient is wrong!");
072:
073:
074: std::vector< char > signature(16 * nblks);
075:
076: std::cin.read(&signature[ 0 ], 16 * nblks);
077: if (std::cin.fail())
078: throw ContentError("Can't read the file signature table");
079: std::vector< struct RZSignatureTable::SignatureEntry > sigtab;
080: sigtab.reserve(nblks);
081: for (int i = 0; i < nblks; ++i) {
082: char *buffer = &signature[ i * 16 ];
083:
084: struct RZSignatureTable::SignatureEntry e;
085: e.sig_a = net_in(&buffer[ 0 ]);
086: e.sig_b = net_in(&buffer[ 4 ]);
087: e.sig_c = net_in(&buffer[ 8 ]);
088: e.rz_to = net_in(&buffer[ 12 ]);
089: sigtab.push_back(e);
090: }
091:
092:
093: struct Zlib::Inflate z0("Decompression");
094: struct Zlib::Deflate z1("Recompression");
095: int s;
096:
097:
098: int rzi, rzr, sci, scr;
099:
100:
101: if (scoff != 0 || rzoff != 0)
102: throw ContentError("I don't know how to skip an initial offset");
103:
104:
105: char buffer8KB[ 8193 ];
106: char buffer[ 16385 ];
107: char deflated[ 16385 ];
108: char inflated8KB[ 8193 ];
109: char * const inflated = inflated8KB;
110:
111:
112: int bki;
113: for (
114: bki = 0, rzi = 0, sci = 0;
115: bki < nblks && (rzr = sigtab[ bki ].rz_to - rzi);
116: ++bki, rzi += rzr, sci += 8192)
117: {
118:
119: std::cin.read(whence, 4);
120: if (std::cin.fail()) {
121: std::ostringstream err;
122: err << "Interprocess communication failure at block " << bki;
123: throw AssertionError(err.str());
124: }
125: if (std::equal(whence, whence + 4, "<<<<")) {
126:
127: std::cin.read(buffer8KB, 8192);
128: if (std::cin.fail()) {
129: std::ostringstream err;
130: err << "Interprocess communication failure at local block "
131: << bki;
132: throw AssertionError(err.str());
133: }
134:
135:
136: z1.z_deflate.next_in = (Zlib::Bytef*)buffer8KB;
137: z1.z_deflate.avail_in = 8192;
138: z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
139: z1.z_deflate.avail_out = rzr + 1;
140: s = z1.zlib_deflate(Z_SYNC_FLUSH);
141: if (s != Z_OK) {
142: std::ostringstream err;
143: err << "Recompression synchronization failed at block " << bki
144: << ": " << s;
145: throw ContentError(err.str());
146: }
147: if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
148: std::ostringstream err;
149: err << "Mismatch in recompression length at block " << bki;
150: throw ContentError(err.str());
151: }
152:
153: rzc = crc32(rzc, (Zlib::Bytef*)deflated, rzr);
154:
155:
156: z0.z_inflate.next_in = (Zlib::Bytef*)deflated;
157: z0.z_inflate.avail_in = rzr;
158: z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
159: z0.z_inflate.avail_out = 8192 + 1;
160: s = z0.zlib_inflate();
161: if (!(s == Z_OK || s == Z_STREAM_END)) {
162: std::ostringstream err;
163: err << "Decompression inflation failed at block " << bki
164: << ": " << s;
165: throw ContentError(err.str());
166: }
167: if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1))
168: {
169: std::ostringstream err;
170: err << "Roundtrip inflation failure at block " << bki;
171: throw ContentError(err.str());
172: }
173:
174: fd_write(reconsfd, inflated8KB, 8192);
175: }
176: else if (std::equal(whence, whence + 4, ">>>>")) {
177:
178: std::cin.read(buffer, rzr);
179: if (std::cin.fail()) {
180: std::ostringstream err;
181: err << "Interprocess communication failure at remote block "
182: << bki;
183: throw AssertionError(err.str());
184: }
185:
186: rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
187:
188:
189: z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
190: z0.z_inflate.avail_in = rzr;
191: z0.z_inflate.next_out = (Zlib::Bytef*)inflated8KB;
192: z0.z_inflate.avail_out = 8192 + 1;
193: s = z0.zlib_inflate();
194: if (!(s == Z_OK || s == Z_STREAM_END)) {
195: std::ostringstream err;
196: err << "Decompression inflation failed at block " << bki
197: << ": " << s;
198: throw ContentError(err.str());
199: }
200: if (!(z0.z_inflate.avail_in == 0 && z0.z_inflate.avail_out == 1)) {
201: std::ostringstream err;
202: err << "Inflated block too small at block " << bki;
203: throw ContentError(err.str());
204: }
205:
206:
207: z1.z_deflate.next_in = (Zlib::Bytef*)inflated8KB;
208: z1.z_deflate.avail_in = 8192;
209: z1.z_deflate.next_out = (Zlib::Bytef*)deflated;
210: z1.z_deflate.avail_out = rzr + 1;
211: s = z1.zlib_deflate(Z_SYNC_FLUSH);
212: if (s != Z_OK) {
213: std::ostringstream err;
214: err << "Recompression deflation failed at block " << bki
215: << ": " << s;
216: throw ContentError(err.str());
217: }
218: if (!(z1.z_deflate.avail_in == 0 && z1.z_deflate.avail_out == 1)) {
219: std::ostringstream err;
220: err << "Roundtrip deflation failure at block " << bki
221: << std::endl;
222: err << " avail_in = " << z1.z_deflate.avail_in
223: << " avail_out = " << z1.z_deflate.avail_out
224: << std::endl;
225: err << " bki = " << bki << " nblks = " << nblks
226: << std::endl;
227: throw ContentError(err.str());
228: }
229:
230: fd_write(reconsfd, inflated8KB, 8192);
231:
232: if (!( sums.a.checksum32(inflated8KB) == sigtab[ bki ].sig_a
233: && sums.b.checksum32(inflated8KB) == sigtab[ bki ].sig_b
234: && sums.c.checksum32(inflated8KB) == sigtab[ bki ].sig_c))
235: {
236: std::ostringstream err;
237: err << "Inflated block signature mismatch at block " << bki;
238: throw ContentError(err.str());
239: }
240: }
241: else {
242: std::ostringstream err;
243: err << "Interprocess communication failure at block " << bki;
244: throw AssertionError(err.str());
245: }
246: }
247:
248:
249: std::cin.read(whence, 4);
250: if (std::cin.fail()) {
251: if (!std::cin.eof())
252: throw AssertionError(
253: "Interprocess communication failure after last block");
254: }
255: else {
256: if (!std::equal(whence, whence + 4, ">>>>"))
257: throw AssertionError(
258: "Interprocess communication failure after last block");
259:
260:
261:
262:
263:
264:
265:
266:
267: rzr = rzend - rzi;
268:
269:
270:
271: std::cin.read(buffer, rzr);
272: if (std::cin.fail()) {
273: std::ostringstream err;
274: err << "Interprocess communication failure at the end of file"
275: << std::endl;
276: err << "(" << rzr << ")";
277: throw AssertionError(err.str());
278: }
279:
280: rzc = crc32(rzc, (Zlib::Bytef*)buffer, rzr);
281:
282:
283: z0.z_inflate.next_in = (Zlib::Bytef*)buffer;
284: z0.z_inflate.avail_in = rzr;
285: z0.z_inflate.next_out = (Zlib::Bytef*)inflated;
286: z0.z_inflate.avail_out = 8192 + 1;
287: s = z0.zlib_inflate();
288: if (!(s == Z_OK || s == Z_STREAM_END)) {
289: std::ostringstream err;
290: err << "Decompression inflation failed: " << s;
291: throw ContentError(err.str());
292: }
293:
294: scr = (8192 + 1 - z0.z_inflate.avail_out);
295:
296: fd_write(reconsfd, inflated, scr);
297:
298: rzi += rzr;
299: sci += scr;
300: }
301:
302:
303: if (!std::cin.good()) {
304: std::ostringstream err;
305: err << "Interprocess communication failure at the end of file"
306: << std::endl;
307: err << "(not good)";
308: throw AssertionError(err.str());
309: }
310: std::cin.read(buffer, 1);
311: if (!std::cin.eof()) {
312: std::ostringstream err;
313: err << "Interprocess communication failure at the end of file"
314: << std::endl;
315: err << "(no more please)" << std::endl;
316: throw AssertionError(err.str());
317: }
318:
319:
320:
321:
322:
323:
324:
325:
326: if (scend != sci) {
327: std::ostringstream err;
328: err << "Mismatch in calculated source file size: "
329: << sci << " =/= " << scend;
330: throw ContentError(err.str());
331: }
332: if (rzcrc != rzc)
333: throw ContentError("Mismatch in calculated compressed stream CRC");
334:
335:
336: lseek(reconsfd, 0 , 0);
337: {
338: struct stat stbuf;
339: if (fstat(reconsfd, &stbuf))
340: IOError::what_if("Can't stat file", 1);
341: IOError::what_if("Can't stat file size", !stbuf.st_size);
342: if (scend != stbuf.st_size)
343: throw ContentError("Mismatch in calculated source file size");
344: }
345: }
346:
347:
348:
349: