RZ

001: // DataRemoteRetriever.cc # the actual synthesize algorithm
002:
003: #include <iostream>
004: #include <sstream>
005: #include <vector>
006: #include <deque>
007:
008: #include "RZError.hh"
009: #include "RZNetOrder.hh"
010: #include "RZMagic.hh"
011: #include "ExtractWrapper.hh"
012: #include "ExtractRemoteCon.hh"
013:
014: #include "DataPackets.hh"
015: #include "GetoptVerbose.hh"
016:
017: #include "DemandWrapper.hh"
018: #include "DataRemoteController.hh"
019: #include "DataRemotePack.hh"
020: #include "DataRemoteQueue.hh"
021: #include "DataRemoteRetriever.hh"
022: #include "DemandReconstituter.hh"
023:
024: // ######## ######## ######## ######## ######## ######## ######## ########
025:
026: namespace DataRemoteRetriever {
027: // private
028:     static void monotonic(unsigned int a, unsigned int b, unsigned int c)
029:     {
030:         if (0 > a || a > b || b > c)
031:             throw RZ::AssertionError("Malformed packet");
032:     }
033:
034: // private
035:     static unsigned int packetto, filesize;
036:
037: // private
038:     static int bki = 0; // the index of the start of the current block
039:     static std::deque< std::vector< char > > bkqueue; // block queue
040:
041: // public
042:     void reduce_queue(unsigned int reduction) 
043:     {
044:         for (; reduction > 0; --reduction) {
045:             std::vector< char > &f = bkqueue.front();
046:
047:             if (f.size() < RZ::rz_tab.rz_size(bki)) return;
048:             if (f.size() > RZ::rz_tab.rz_size(bki))
049:                 throw RZ::AssertionError("Impossible block reduction");
050:
051:             DemandReconstituter::rz_buf[bki++].swap(f);
052:             bkqueue.pop_front();
053:             DemandReconstituter::advance_pointer();
054:         }
055:     }
056:
057: // private
058:     static inline void enqueue(int &i, int &remaining)
059:     {
060:         std::vector< char > t;
061:         for (; remaining > 0; ++i) {
062:             {
063:                 unsigned int rzr = RZ::rz_tab.rz_size(i);
064:                 remaining -= rzr;
065:                 t.resize(rzr + std::min(0, remaining));
066:             }
067:             bkqueue.push_back(std::vector< char >());
068:             bkqueue.back().swap(t);
069:         }
070:     }        
071:
072: // public
073:     void push_header(
074:         unsigned int nextfrom, unsigned int nextto, unsigned int nextfs)
075:     {
076:         monotonic(nextfrom, nextto, nextfs);
077:
078:         // unless packets contiguous, require pkbuf start on a rz splice pt
079:         if (packetto != nextfrom) {
080:             bkqueue.clear();
081:
082:             unsigned int rzj = nextfrom - RZ::rz_tab.wrapper_size;
083:             while (rzj > RZ::rz_tab.rz_at[bki]) ++bki;
084:             while (rzj < RZ::rz_tab.rz_at[bki]) --bki;
085:             if (rzj != RZ::rz_tab.rz_at[bki])
086:                 throw RZ::AssertionError("Malformed remote packet");
087:         }
088:
089:         packetto = nextto;
090:
091:         if (filesize != nextfs)
092:             throw RZ::AssertionError("Malformed remote packet");
093:
094:         if (RZ::debug)
095:             std::cerr << "{" << nextfrom << "-" << nextto - 1 << "}";
096:
097:         unsigned int advance = 0;
098:         int bkiend = bki, remaining = nextto - nextfrom;
099:         remote_total += remaining;
100:             
101:         if (!bkqueue.empty()) {
102:             if (bkqueue.size() != 1) throw RZ::AssertionError("Queue too big");
103:
104:             std::vector< char > &f = bkqueue.front();
105:             advance = f.size();
106:
107:             unsigned int rzr = RZ::rz_tab.rz_size(bki);
108:             if (advance >= rzr) throw RZ::AssertionError("Remnant too large");
109:             f.resize(rzr);
110:             
111:             ++bkiend;
112:             remaining += advance - rzr;
113:         }
114:
115:         enqueue(bkiend, remaining);
116:
117:         set_buffer_vector(bkqueue);
118:         if (advance) advance_v(advance);
119:     }
120:
121: // private
122:     static void firstqueue(void)
123:     {
124:         bkqueue.clear();
125:         bki = RZ::rz_tab.rz_at_begin;
126:
127:         int bkiend = bki, remaining = packetto - RZ::rz_tab.wrapper_size;
128:         enqueue(bkiend, remaining); 
129:
130:         set_buffer_vector(bkqueue);
131:     }
132:
133: // public
134:     void require(void)
135:     {
136:         //######## tell the remote retrieval process what we need ########
137:         {
138:             std::ostringstream rangeout;
139:             rangeout << "bytes=0-" << DataPackets::FIRST_PACKET_SIZE - 1
140:                 << std::endl;
141:             std::string range = rangeout.str();
142:             RZ::fd_write(DataRemoteController::RANGE.file_no,
143:                 range.c_str(), range.size());
144:         }
145:
146:         using DemandReconstituter::rz_wrapper_buf;
147:         rz_wrapper_buf.resize(64);
148:
149:         {
150:             unsigned int packetfrom = 0;
151:             first_fill(packetfrom, packetto, filesize,
152:                 &rz_wrapper_buf.front(), 64);
153:             monotonic(packetfrom, packetto, filesize);
154:             if (packetfrom != 0 || packetto < 64)
155:                 throw RZ::AssertionError("Malformed remote packet");
156:
157:             remote_total += packetto;
158:         }
159:
160:         if (RZ::debug) std::cerr << "  receiving {bytes 0-" << packetto - 1
161:             << "/" << filesize <<"}" << std::endl;
162:
163:         RZ::rz_wrapper.extractor_header(rz_wrapper_buf.begin());
164:         
165:         if (RZ::rz_wrapper.rz_off != 0)
166:             throw RZ::AssertionError("Offset not implemented");
167:         if (filesize != RZ::rz_wrapper.file_size) throw RZ::AssertionError(
168:             "Mismatch in actual and calculated remote file size\n");
169:
170:         unsigned int ws = RZ::rz_wrapper.wrapper_size;
171:         rz_wrapper_buf.resize(ws);
172:
173:         if (RZ::rz_wrapper.wrapper_size > packetto) {
174:             // original packet was too small
175:
176:             // buffer the remainder of the packet
177:             flush_fill(&rz_wrapper_buf[64], packetto - 64); 
178:
179:             //###### tell the remote retrieval process what more we need ######
180:             {
181:                 std::ostringstream rangeout;
182:                 rangeout << "bytes=" << packetto << "-" << ws - 1 << std::endl;
183:                 std::string range = rangeout.str();
184:                 RZ::fd_write(DataRemoteController::RANGE.file_no,
185:                     range.c_str(), range.size());
186:             }
187:
188:             unsigned int oldpacketto = packetto, packetfrom = 0, fs = 0;
189:             first_fill(packetfrom, packetto, fs,
190:                 &rz_wrapper_buf[oldpacketto], ws - oldpacketto);
191:             monotonic(packetfrom, packetto, fs);
192:             if (packetfrom != oldpacketto || packetto != ws || filesize != fs)
193:                 throw RZ::AssertionError("Malformed remote packet");
194:
195:             remote_total += packetto - packetfrom;
196:
197:             if (RZ::debug) std::cerr << "  receiving {" << packetfrom
198:                 << "-" << packetto - 1 << "}" << std::endl;
199:         }
200:         else {
201:             body_fill(&rz_wrapper_buf[64], ws - 64); 
202:         }
203:
204:         // set up controller for next needed packet
205:         DataRemoteController::first_packet_from = packetto;
206:         DataRemoteController::first_packet_need = (
207:             packetto == ws && RZ::rz_wrapper.rz_end);
208:         
209:         RZ::rz_wrapper.extractor_signatures(rz_wrapper_buf.begin());
210:         RZ::rz_tab = RZ::rz_wrapper;
211:
212:         firstqueue();
213:     }
214: }
215:
216: // ######## ######## ######## ######## ######## ######## ######## ########
217:
218: // DataRemoteRetriever.cc # the actual synthesize algorithm