RZ

001: // DataRemoteRetriever.cc # the actual synthesize algorithm
002:
003: #include <unistd.h>
004: #include <sys/uio.h>
005:
006: #include <iostream>
007: #include <sstream>
008: #include <vector>
009: #include <deque>
010:
011: #include "RZError.hh"
012: #include "RZNetOrder.hh"
013: #include "RZMagic.hh"
014: #include "RZIO.hh"
015: #include "ExtractWrapper.hh"
016: #include "ExtractRemoteCon.hh"
017:
018: #include "DataPackets.hh"
019: #include "GetoptVerbose.hh"
020:
021: #include "DemandWrapper.hh"
022: #include "DataRemoteController.hh"
023: #include "DataRemotePack.hh"
024: #include "DataRemoteQueue.hh"
025: #include "DataRemoteRetriever.hh"
026: #include "DemandReconstituter.hh"
027:
028: namespace DataRemoteRetriever {
029:     unsigned int remote_total = 0;
030:
031:     struct RZ::Piper REMOTE;
032: }
033:
034: // ######## ######## ######## ######## ######## ######## ######## ########
035:
036: namespace DataRemoteRetriever {
037:     void remote_close(void)
038:     {
039:         DataRemoteController::need_close();
040:         if (!REMOTE) return;
041:         REMOTE = -1;
042:     }
043:
044:     bool is_pending(bool spinup)
045:     {
046:         return !!REMOTE && !(spinup && DataRemoteController::first_packet_need);
047:     }
048: }
049:
050: // ######## ######## ######## ######## ######## ######## ######## ########
051: // PACKET FORMAT
052: //   3 word header in net order
053: //   variable byte length body
054: //   1 word chain flag telling whether to adjoin on another packet
055:
056: namespace DataRemoteRetriever {
057:     static char packetheader[16];
058:     static std::vector< struct iovec > vec;
059:
060:     static struct iovec *vbase;
061:     static unsigned int vlen;
062:
063:     // ######## ######## push into buffer ######## ########
064:
065:     void advance_v(unsigned int r)
066:     {
067:         for (; vlen; ++vbase, --vlen)
068:             if (vbase->iov_len <= r)
069:                 r -= vbase->iov_len;
070:             else {
071:                 vbase->iov_base = (char *)(vbase->iov_base) + r;
072:                 vbase->iov_len -= r;
073:                 break;
074:             }
075:     }
076:     
077:     static void fillv(void)
078:     {
079:         while (vlen) {
080:             int r;
081:             do r = readv(REMOTE.file_no, vbase, vlen);
082:             while (r == -1 && errno == EINTR);
083:             RZ::IOError::what_if("Can't read from retriever!");
084:             if (!r) throw RZ::AssertionError("Malformed remote packet");
085:             advance_v(r);
086:         }
087:     }
088:
089:     void body_fill(char *base, unsigned int len)
090:     {
091:         while (len) {
092:             int r;
093:             do r = read(REMOTE.file_no, base, len);
094:             while (r == -1 && errno == EINTR);
095:             RZ::IOError::what_if("Can't read from retriever!");
096:             if (!r) throw RZ::AssertionError("Malformed remote packet");
097:             base += r;
098:             len -= r;
099:         }
100:     }
101:
102:     // ######## ######## ######## ######## ######## ######## ########
103:
104:     void first_fill(
105:         unsigned int &from, unsigned int &to, unsigned int &size,
106:         char *base, unsigned int len)
107:     {
108:         vec.resize(vlen = 2);
109:         vec[0].iov_base = packetheader + 4;
110:         vec[0].iov_len = 12;
111:         vec[1].iov_base = base;
112:         vec[1].iov_len = len;
113:         vbase = &vec.front();
114:
115:         fillv();
116:
117:         RZ::n_in(packetheader + 4, from, to, size);
118:     }
119:
120:     void flush_fill(char *base, unsigned int len)
121:     {
122:         vec.resize(vlen = 2);
123:         vec[0].iov_base = base;
124:         vec[0].iov_len = len;
125:         vec[1].iov_base = packetheader;
126:         vec[1].iov_len = 4;
127:         vbase = &vec.front();
128:
129:         fillv();
130:     }
131:
132:     void set_buffer_vector(std::deque< std::vector< char > > &queue)
133:     {
134:         vec.resize(vlen = queue.size() + 2);
135:         {
136:             std::vector< struct iovec >::iterator v = vec.begin();
137:             for (std::deque< std::vector< char > >::iterator
138:                 d = queue.begin(), dend = queue.end(); d < dend; ++d) 
139:             {
140:                 v->iov_base = &d->front();
141:                 v++->iov_len = d->size();
142:             }
143:             v->iov_base = packetheader;
144:             v++->iov_len = 4;
145:             v->iov_base = packetheader + 4;
146:             v++->iov_len = 12;
147:         }
148:         vbase = &vec.front();
149:     }
150:
151:     // ######## ######## push into buffer queue ######## ########
152:
153:     static char N0[4] = { 0, 0, 0, 0 }; // pack( 'N', 0 );
154:     void select_processing(bool spinup)
155:     {
156:         if (!REMOTE) throw RZ::AssertionError("Can't read from closed pipe");
157:
158:         unsigned int carryvlen = vlen;
159:         {
160:             int r = readv(REMOTE.file_no, vbase, vlen);
161:             if (r == -1 && errno == EINTR) return;
162:             RZ::IOError::what_if("Can't read from retriever!");
163:             if (!r) {
164:                 if (vlen != 1)
165:                     throw RZ::AssertionError("Malformed remote packet");
166:                 remote_close();
167:                 return;
168:             }
169:             advance_v(r);
170:         }
171:         if (vlen <= 1 && carryvlen > 1
172:             && std::equal(packetheader, packetheader + 4, N0))
173:         {
174:             if (vlen != 1) throw RZ::AssertionError("Who ordered that packet?");
175:             if (spinup)
176:                 DataRemoteController::first_packet_need = true;
177:             else if (DemandReconstituter::is_need())
178:                 DataRemoteController::need_more();
179:             else
180:                 remote_close();
181:         }
182:
183:         reduce_queue(std::max(carryvlen, 2u) - std::max(vlen, 2u));
184:
185:         if (!vlen) {
186:             unsigned int from, to, size;
187:             RZ::n_in(packetheader + 4, from, to, size);
188:             push_header(from, to, size);
189:         }
190:     }
191: }
192:
193: // ######## ######## ######## ######## ######## ######## ######## ########
194:
195: // DataRemoteRetriever.cc # the actual synthesize algorithm