RZ

001: // DataLocalScanner.cc # the actual synthesize algorithm
002:
003: #include <unistd.h>
004: #include <sys/uio.h>
005:
006: #include <vector>
007:
008: #include "RZError.hh"
009: #include "RZNetOrder.hh"
010: #include "ExtractWrapper.hh"
011:
012: #include "GetoptVerbose.hh"
013:
014: #include "DemandWrapper.hh"
015: #include "DataLocalScanner.hh"
016: #include "DemandReconstituter.hh"
017:
018: namespace DataLocalScanner {
019:     unsigned int local_total = 0;
020:
021:     struct RZ::Worker LOCAL;
022: }
023:
024: // ######## ######## ######## ######## ######## ######## ######## ########
025:
026: namespace DataLocalScanner {
027:     void local_close(void)
028:     {
029:         if (!LOCAL) return;
030:         LOCAL.close();
031:         // close LOCAL or $! and warn "Warning $!"; # may be openned "-|"
032:     }
033:
034:     bool is_pending(void)
035:     {
036:         return !!LOCAL;
037:     }
038: }
039:
040: // ######## ######## ######## ######## ######## ######## ######## ########
041:
042: namespace DataLocalScanner {
043:
044:     // ######## ######## ######## ######## ######## ######## ########
045:     // PACKET FORMAT
046:     //   1 word header in net order
047:     //   fixed block_size byte length body
048:     // fixed size packets simplifies processing
049:
050:     static std::vector< char > pkheader(4);
051:     static std::vector< char > pkbody;
052:
053:     static struct iovec vec[2];
054:
055:     static struct iovec *vbase;
056:     static unsigned int vlen = 0;
057:
058:     static inline void setbuffervector(void)
059:     {
060:         pkbody.resize(RZ::rz_wrapper.block_size);
061:
062:         vec[0].iov_base = &pkheader.front();
063:         vec[0].iov_len = 4;
064:         vec[1].iov_base = &pkbody.front();
065:         vec[1].iov_len = RZ::rz_wrapper.block_size;
066:
067:         vbase = vec; // buffer remaining
068:         vlen = 2;    // buffer remaining
069:     }
070:
071:     // ######## ######## ######## ######## ######## ######## ########
072:
073:     void select_processing(void)
074:     {
075:         if (!LOCAL) return;
076:
077:         {
078:             int r;
079:             if (!vlen) {
080:                 setbuffervector();
081:                 r = readv(LOCAL.file_no, vbase, vlen);
082:                 if (r == -1 && errno == EINTR) return;
083:                 RZ::IOError::what_if("Can't read from local scanner!");
084:                 if (!r) { local_close(); return; }
085:                 local_total += RZ::rz_wrapper.block_size;
086:             }
087:             else {
088:                 r = readv(LOCAL.file_no, vbase, vlen);
089:                 if (r == -1 && errno == EINTR) return;
090:                 RZ::IOError::what_if("Can't read from local scanner!");
091:                 if (!r) throw RZ::AssertionError("Truncated local packet");
092:             }
093:             for (; vlen; ++vbase, --vlen)
094:                 if (vbase->iov_len <= (unsigned int)r)
095:                     r -= vbase->iov_len;
096:                 else {
097:                     vbase->iov_base = (char *)(vbase->iov_base) + r;
098:                     vbase->iov_len -= r;
099:                     break;
100:                 }
101:         }
102:
103:         if (!vlen) {
104:             unsigned int bki = RZ::net_in(pkheader.begin());
105:             if (bki >= RZ::rz_wrapper.n_blks)
106:                 throw RZ::AssertionError("Malformed local packet");
107:             if (!DemandReconstituter::sc_buf[ bki ].empty())
108:                 throw RZ::AssertionError("Repeated local packet");
109:             DemandReconstituter::sc_buf[ bki ].swap(pkbody);
110:             DemandReconstituter::advance_pointer();
111:         }
112:     }
113: }
114:
115:
116: // ######## ######## ######## ######## ######## ######## ######## ########
117:
118: void DataLocalScanner::require(void) {}
119:
120: // DataLocalScanner.cc # the actual synthesize algorithm