Merge "updater: Move RangeSinkWrite into RangeSinkState." am: 65c065b4bb am: 7ccb4090d1 am: 8538fab040
am: 4555745014

Change-Id: I9376c1905d7fa9dd83c225eeff8e63c752432d56
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index 4409cbe..a1a5773 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -232,125 +232,135 @@
     buffer.resize(size);
 }
 
-struct RangeSinkState {
-    explicit RangeSinkState(RangeSet& rs) : tgt(rs) { };
+/**
+ * RangeSinkWriter reads data from the given FD, and writes them to the destination specified by the
+ * given RangeSet.
+ */
+class RangeSinkWriter {
+ public:
+  RangeSinkWriter(int fd, const RangeSet& tgt)
+      : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
+    CHECK_NE(tgt.count, static_cast<size_t>(0));
+  };
 
-    int fd;
-    const RangeSet& tgt;
-    size_t p_block;
-    size_t p_remain;
-};
-
-static size_t RangeSinkWrite(const uint8_t* data, size_t size, RangeSinkState* rss) {
-  if (rss->p_remain == 0) {
-    LOG(ERROR) << "range sink write overrun";
-    return 0;
+  bool Finished() const {
+    return next_range_ == tgt_.count && current_range_left_ == 0;
   }
 
-  size_t written = 0;
-  while (size > 0) {
-    size_t write_now = size;
-
-    if (rss->p_remain < write_now) {
-      write_now = rss->p_remain;
+  size_t Write(const uint8_t* data, size_t size) {
+    if (Finished()) {
+      LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
+      return 0;
     }
 
-    if (write_all(rss->fd, data, write_now) == -1) {
-      break;
-    }
+    size_t written = 0;
+    while (size > 0) {
+      // Move to the next range as needed.
+      if (current_range_left_ == 0) {
+        if (next_range_ < tgt_.count) {
+          off64_t offset = static_cast<off64_t>(tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
+          current_range_left_ =
+              (tgt_.pos[next_range_ * 2 + 1] - tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
+          next_range_++;
+          if (!discard_blocks(fd_, offset, current_range_left_)) {
+            break;
+          }
 
-    data += write_now;
-    size -= write_now;
-
-    rss->p_remain -= write_now;
-    written += write_now;
-
-    if (rss->p_remain == 0) {
-      // Move to the next block.
-      ++rss->p_block;
-      if (rss->p_block < rss->tgt.count) {
-        rss->p_remain =
-            (rss->tgt.pos[rss->p_block * 2 + 1] - rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
-
-        off64_t offset = static_cast<off64_t>(rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
-        if (!discard_blocks(rss->fd, offset, rss->p_remain)) {
+          if (!check_lseek(fd_, offset, SEEK_SET)) {
+            break;
+          }
+        } else {
+          // We can't write any more; return how many bytes have been written so far.
           break;
         }
+      }
 
-        if (!check_lseek(rss->fd, offset, SEEK_SET)) {
-          break;
-        }
+      size_t write_now = size;
+      if (current_range_left_ < write_now) {
+        write_now = current_range_left_;
+      }
 
-      } else {
-        // We can't write any more; return how many bytes have been written so far.
+      if (write_all(fd_, data, write_now) == -1) {
         break;
       }
+
+      data += write_now;
+      size -= write_now;
+
+      current_range_left_ -= write_now;
+      written += write_now;
     }
+
+    return written;
   }
 
-  return written;
-}
+ private:
+  // The input data.
+  int fd_;
+  // The destination for the data.
+  const RangeSet& tgt_;
+  // The next range that we should write to.
+  size_t next_range_;
+  // The number of bytes to write before moving to the next range.
+  size_t current_range_left_;
+};
 
-// All of the data for all the 'new' transfers is contained in one
-// file in the update package, concatenated together in the order in
-// which transfers.list will need it.  We want to stream it out of the
-// archive (it's compressed) without writing it to a temp file, but we
-// can't write each section until it's that transfer's turn to go.
-//
-// To achieve this, we expand the new data from the archive in a
-// background thread, and block that threads 'receive uncompressed
-// data' function until the main thread has reached a point where we
-// want some new data to be written.  We signal the background thread
-// with the destination for the data and block the main thread,
-// waiting for the background thread to complete writing that section.
-// Then it signals the main thread to wake up and goes back to
-// blocking waiting for a transfer.
-//
-// NewThreadInfo is the struct used to pass information back and forth
-// between the two threads.  When the main thread wants some data
-// written, it sets rss to the destination location and signals the
-// condition.  When the background thread is done writing, it clears
-// rss and signals the condition again.
-
+/**
+ * All of the data for all the 'new' transfers is contained in one file in the update package,
+ * concatenated together in the order in which transfers.list will need it. We want to stream it out
+ * of the archive (it's compressed) without writing it to a temp file, but we can't write each
+ * section until it's that transfer's turn to go.
+ *
+ * To achieve this, we expand the new data from the archive in a background thread, and block that
+ * threads 'receive uncompressed data' function until the main thread has reached a point where we
+ * want some new data to be written. We signal the background thread with the destination for the
+ * data and block the main thread, waiting for the background thread to complete writing that
+ * section. Then it signals the main thread to wake up and goes back to blocking waiting for a
+ * transfer.
+ *
+ * NewThreadInfo is the struct used to pass information back and forth between the two threads. When
+ * the main thread wants some data written, it sets writer to the destination location and signals
+ * the condition. When the background thread is done writing, it clears writer and signals the
+ * condition again.
+ */
 struct NewThreadInfo {
-    ZipArchiveHandle za;
-    ZipEntry entry;
+  ZipArchiveHandle za;
+  ZipEntry entry;
 
-    RangeSinkState* rss;
+  RangeSinkWriter* writer;
 
-    pthread_mutex_t mu;
-    pthread_cond_t cv;
+  pthread_mutex_t mu;
+  pthread_cond_t cv;
 };
 
 static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
-    NewThreadInfo* nti = reinterpret_cast<NewThreadInfo*>(cookie);
+  NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
 
-    while (size > 0) {
-        // Wait for nti->rss to be non-null, indicating some of this
-        // data is wanted.
-        pthread_mutex_lock(&nti->mu);
-        while (nti->rss == nullptr) {
-            pthread_cond_wait(&nti->cv, &nti->mu);
-        }
-        pthread_mutex_unlock(&nti->mu);
-
-        // At this point nti->rss is set, and we own it.  The main
-        // thread is waiting for it to disappear from nti.
-        size_t written = RangeSinkWrite(data, size, nti->rss);
-        data += written;
-        size -= written;
-
-        if (nti->rss->p_block == nti->rss->tgt.count) {
-            // we have written all the bytes desired by this rss.
-
-            pthread_mutex_lock(&nti->mu);
-            nti->rss = nullptr;
-            pthread_cond_broadcast(&nti->cv);
-            pthread_mutex_unlock(&nti->mu);
-        }
+  while (size > 0) {
+    // Wait for nti->writer to be non-null, indicating some of this data is wanted.
+    pthread_mutex_lock(&nti->mu);
+    while (nti->writer == nullptr) {
+      pthread_cond_wait(&nti->cv, &nti->mu);
     }
+    pthread_mutex_unlock(&nti->mu);
 
-    return true;
+    // At this point nti->writer is set, and we own it. The main thread is waiting for it to
+    // disappear from nti.
+    size_t written = nti->writer->Write(data, size);
+    data += written;
+    size -= written;
+
+    if (nti->writer->Finished()) {
+      // We have written all the bytes desired by this writer.
+
+      pthread_mutex_lock(&nti->mu);
+      nti->writer = nullptr;
+      pthread_cond_broadcast(&nti->cv);
+      pthread_mutex_unlock(&nti->mu);
+    }
+  }
+
+  return true;
 }
 
 static void* unzip_new_data(void* cookie) {
@@ -381,28 +391,26 @@
 }
 
 static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
-    const uint8_t* data = buffer.data();
-
-    size_t p = 0;
-    for (size_t i = 0; i < tgt.count; ++i) {
-        off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
-        size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
-        if (!discard_blocks(fd, offset, size)) {
-            return -1;
-        }
-
-        if (!check_lseek(fd, offset, SEEK_SET)) {
-            return -1;
-        }
-
-        if (write_all(fd, data + p, size) == -1) {
-            return -1;
-        }
-
-        p += size;
+  size_t written = 0;
+  for (size_t i = 0; i < tgt.count; ++i) {
+    off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
+    size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
+    if (!discard_blocks(fd, offset, size)) {
+      return -1;
     }
 
-    return 0;
+    if (!check_lseek(fd, offset, SEEK_SET)) {
+      return -1;
+    }
+
+    if (write_all(fd, buffer.data() + written, size) == -1) {
+      return -1;
+    }
+
+    written += size;
+  }
+
+  return 0;
 }
 
 // Parameters for transfer list command functions
@@ -1215,45 +1223,31 @@
 }
 
 static int PerformCommandNew(CommandParameters& params) {
+  if (params.cpos >= params.tokens.size()) {
+    LOG(ERROR) << "missing target blocks for new";
+    return -1;
+  }
 
-    if (params.cpos >= params.tokens.size()) {
-        LOG(ERROR) << "missing target blocks for new";
-        return -1;
+  RangeSet tgt = parse_range(params.tokens[params.cpos++]);
+
+  if (params.canwrite) {
+    LOG(INFO) << " writing " << tgt.size << " blocks of new data";
+
+    RangeSinkWriter writer(params.fd, tgt);
+    pthread_mutex_lock(&params.nti.mu);
+    params.nti.writer = &writer;
+    pthread_cond_broadcast(&params.nti.cv);
+
+    while (params.nti.writer != nullptr) {
+      pthread_cond_wait(&params.nti.cv, &params.nti.mu);
     }
 
-    RangeSet tgt = parse_range(params.tokens[params.cpos++]);
+    pthread_mutex_unlock(&params.nti.mu);
+  }
 
-    if (params.canwrite) {
-        LOG(INFO) << " writing " << tgt.size << " blocks of new data";
+  params.written += tgt.size;
 
-        RangeSinkState rss(tgt);
-        rss.fd = params.fd;
-        rss.p_block = 0;
-        rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
-
-        off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
-        if (!discard_blocks(params.fd, offset, tgt.size * BLOCKSIZE)) {
-            return -1;
-        }
-
-        if (!check_lseek(params.fd, offset, SEEK_SET)) {
-            return -1;
-        }
-
-        pthread_mutex_lock(&params.nti.mu);
-        params.nti.rss = &rss;
-        pthread_cond_broadcast(&params.nti.cv);
-
-        while (params.nti.rss) {
-            pthread_cond_wait(&params.nti.cv, &params.nti.mu);
-        }
-
-        pthread_mutex_unlock(&params.nti.mu);
-    }
-
-    params.written += tgt.size;
-
-    return 0;
+  return 0;
 }
 
 static int PerformCommandDiff(CommandParameters& params) {
@@ -1296,40 +1290,28 @@
       LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
       Value patch_value(
           VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
-      RangeSinkState rss(tgt);
-      rss.fd = params.fd;
-      rss.p_block = 0;
-      rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
 
-      off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
-      if (!discard_blocks(params.fd, offset, rss.p_remain)) {
-        return -1;
-      }
-
-      if (!check_lseek(params.fd, offset, SEEK_SET)) {
-        return -1;
-      }
-
+      RangeSinkWriter writer(params.fd, tgt);
       if (params.cmdname[0] == 'i') {  // imgdiff
-        if (ApplyImagePatch(
-                params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
-                std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
-                nullptr, nullptr) != 0) {
+        if (ApplyImagePatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
+                            std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
+                                      std::placeholders::_2),
+                            nullptr, nullptr) != 0) {
           LOG(ERROR) << "Failed to apply image patch.";
           return -1;
         }
       } else {
-        if (ApplyBSDiffPatch(
-                params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
-                std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
-                nullptr) != 0) {
+        if (ApplyBSDiffPatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
+                             std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
+                                       std::placeholders::_2),
+                             nullptr) != 0) {
           LOG(ERROR) << "Failed to apply bsdiff patch.";
           return -1;
         }
       }
 
       // We expect the output of the patcher to fill the tgt ranges exactly.
-      if (rss.p_block != tgt.count || rss.p_remain != 0) {
+      if (!writer.Finished()) {
         LOG(ERROR) << "range sink underrun?";
       }
     } else {