updater: Move RangeSinkWrite into RangeSinkState.
Then rename RangeSinkState to RangeSinkWriter. RangeSinkWriter reads
data from the given FD, and writes them to the desination RangeSet.
Test: Apply an incremental with the new updater.
Change-Id: I5e3ab6fc082efa1726562c55b56e2d418fe4acaf
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index 8c0f885..e39fac3 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -231,125 +231,135 @@
buffer.resize(size);
}
-struct RangeSinkState {
- explicit RangeSinkState(RangeSet& rs) : tgt(rs) { };
+/**
+ * RangeSinkWriter reads data from the given FD, and writes them to the destination specified by the
+ * given RangeSet.
+ */
+class RangeSinkWriter {
+ public:
+ RangeSinkWriter(int fd, const RangeSet& tgt)
+ : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
+ CHECK_NE(tgt.count, static_cast<size_t>(0));
+ };
- int fd;
- const RangeSet& tgt;
- size_t p_block;
- size_t p_remain;
-};
-
-static size_t RangeSinkWrite(const uint8_t* data, size_t size, RangeSinkState* rss) {
- if (rss->p_remain == 0) {
- LOG(ERROR) << "range sink write overrun";
- return 0;
+ bool Finished() const {
+ return next_range_ == tgt_.count && current_range_left_ == 0;
}
- size_t written = 0;
- while (size > 0) {
- size_t write_now = size;
-
- if (rss->p_remain < write_now) {
- write_now = rss->p_remain;
+ size_t Write(const uint8_t* data, size_t size) {
+ if (Finished()) {
+ LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
+ return 0;
}
- if (write_all(rss->fd, data, write_now) == -1) {
- break;
- }
+ size_t written = 0;
+ while (size > 0) {
+ // Move to the next range as needed.
+ if (current_range_left_ == 0) {
+ if (next_range_ < tgt_.count) {
+ off64_t offset = static_cast<off64_t>(tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
+ current_range_left_ =
+ (tgt_.pos[next_range_ * 2 + 1] - tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
+ next_range_++;
+ if (!discard_blocks(fd_, offset, current_range_left_)) {
+ break;
+ }
- data += write_now;
- size -= write_now;
-
- rss->p_remain -= write_now;
- written += write_now;
-
- if (rss->p_remain == 0) {
- // Move to the next block.
- ++rss->p_block;
- if (rss->p_block < rss->tgt.count) {
- rss->p_remain =
- (rss->tgt.pos[rss->p_block * 2 + 1] - rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
-
- off64_t offset = static_cast<off64_t>(rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
- if (!discard_blocks(rss->fd, offset, rss->p_remain)) {
+ if (!check_lseek(fd_, offset, SEEK_SET)) {
+ break;
+ }
+ } else {
+ // We can't write any more; return how many bytes have been written so far.
break;
}
+ }
- if (!check_lseek(rss->fd, offset, SEEK_SET)) {
- break;
- }
+ size_t write_now = size;
+ if (current_range_left_ < write_now) {
+ write_now = current_range_left_;
+ }
- } else {
- // We can't write any more; return how many bytes have been written so far.
+ if (write_all(fd_, data, write_now) == -1) {
break;
}
+
+ data += write_now;
+ size -= write_now;
+
+ current_range_left_ -= write_now;
+ written += write_now;
}
+
+ return written;
}
- return written;
-}
+ private:
+ // The input data.
+ int fd_;
+ // The destination for the data.
+ const RangeSet& tgt_;
+ // The next range that we should write to.
+ size_t next_range_;
+ // The number of bytes to write before moving to the next range.
+ size_t current_range_left_;
+};
-// All of the data for all the 'new' transfers is contained in one
-// file in the update package, concatenated together in the order in
-// which transfers.list will need it. We want to stream it out of the
-// archive (it's compressed) without writing it to a temp file, but we
-// can't write each section until it's that transfer's turn to go.
-//
-// To achieve this, we expand the new data from the archive in a
-// background thread, and block that threads 'receive uncompressed
-// data' function until the main thread has reached a point where we
-// want some new data to be written. We signal the background thread
-// with the destination for the data and block the main thread,
-// waiting for the background thread to complete writing that section.
-// Then it signals the main thread to wake up and goes back to
-// blocking waiting for a transfer.
-//
-// NewThreadInfo is the struct used to pass information back and forth
-// between the two threads. When the main thread wants some data
-// written, it sets rss to the destination location and signals the
-// condition. When the background thread is done writing, it clears
-// rss and signals the condition again.
-
+/**
+ * All of the data for all the 'new' transfers is contained in one file in the update package,
+ * concatenated together in the order in which transfers.list will need it. We want to stream it out
+ * of the archive (it's compressed) without writing it to a temp file, but we can't write each
+ * section until it's that transfer's turn to go.
+ *
+ * To achieve this, we expand the new data from the archive in a background thread, and block that
+ * threads 'receive uncompressed data' function until the main thread has reached a point where we
+ * want some new data to be written. We signal the background thread with the destination for the
+ * data and block the main thread, waiting for the background thread to complete writing that
+ * section. Then it signals the main thread to wake up and goes back to blocking waiting for a
+ * transfer.
+ *
+ * NewThreadInfo is the struct used to pass information back and forth between the two threads. When
+ * the main thread wants some data written, it sets writer to the destination location and signals
+ * the condition. When the background thread is done writing, it clears writer and signals the
+ * condition again.
+ */
struct NewThreadInfo {
- ZipArchiveHandle za;
- ZipEntry entry;
+ ZipArchiveHandle za;
+ ZipEntry entry;
- RangeSinkState* rss;
+ RangeSinkWriter* writer;
- pthread_mutex_t mu;
- pthread_cond_t cv;
+ pthread_mutex_t mu;
+ pthread_cond_t cv;
};
static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
- NewThreadInfo* nti = reinterpret_cast<NewThreadInfo*>(cookie);
+ NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
- while (size > 0) {
- // Wait for nti->rss to be non-null, indicating some of this
- // data is wanted.
- pthread_mutex_lock(&nti->mu);
- while (nti->rss == nullptr) {
- pthread_cond_wait(&nti->cv, &nti->mu);
- }
- pthread_mutex_unlock(&nti->mu);
-
- // At this point nti->rss is set, and we own it. The main
- // thread is waiting for it to disappear from nti.
- size_t written = RangeSinkWrite(data, size, nti->rss);
- data += written;
- size -= written;
-
- if (nti->rss->p_block == nti->rss->tgt.count) {
- // we have written all the bytes desired by this rss.
-
- pthread_mutex_lock(&nti->mu);
- nti->rss = nullptr;
- pthread_cond_broadcast(&nti->cv);
- pthread_mutex_unlock(&nti->mu);
- }
+ while (size > 0) {
+ // Wait for nti->writer to be non-null, indicating some of this data is wanted.
+ pthread_mutex_lock(&nti->mu);
+ while (nti->writer == nullptr) {
+ pthread_cond_wait(&nti->cv, &nti->mu);
}
+ pthread_mutex_unlock(&nti->mu);
- return true;
+ // At this point nti->writer is set, and we own it. The main thread is waiting for it to
+ // disappear from nti.
+ size_t written = nti->writer->Write(data, size);
+ data += written;
+ size -= written;
+
+ if (nti->writer->Finished()) {
+ // We have written all the bytes desired by this writer.
+
+ pthread_mutex_lock(&nti->mu);
+ nti->writer = nullptr;
+ pthread_cond_broadcast(&nti->cv);
+ pthread_mutex_unlock(&nti->mu);
+ }
+ }
+
+ return true;
}
static void* unzip_new_data(void* cookie) {
@@ -380,28 +390,26 @@
}
static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
- const uint8_t* data = buffer.data();
-
- size_t p = 0;
- for (size_t i = 0; i < tgt.count; ++i) {
- off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
- size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
- if (!discard_blocks(fd, offset, size)) {
- return -1;
- }
-
- if (!check_lseek(fd, offset, SEEK_SET)) {
- return -1;
- }
-
- if (write_all(fd, data + p, size) == -1) {
- return -1;
- }
-
- p += size;
+ size_t written = 0;
+ for (size_t i = 0; i < tgt.count; ++i) {
+ off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
+ size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
+ if (!discard_blocks(fd, offset, size)) {
+ return -1;
}
- return 0;
+ if (!check_lseek(fd, offset, SEEK_SET)) {
+ return -1;
+ }
+
+ if (write_all(fd, buffer.data() + written, size) == -1) {
+ return -1;
+ }
+
+ written += size;
+ }
+
+ return 0;
}
// Parameters for transfer list command functions
@@ -1214,45 +1222,31 @@
}
static int PerformCommandNew(CommandParameters& params) {
+ if (params.cpos >= params.tokens.size()) {
+ LOG(ERROR) << "missing target blocks for new";
+ return -1;
+ }
- if (params.cpos >= params.tokens.size()) {
- LOG(ERROR) << "missing target blocks for new";
- return -1;
+ RangeSet tgt = parse_range(params.tokens[params.cpos++]);
+
+ if (params.canwrite) {
+ LOG(INFO) << " writing " << tgt.size << " blocks of new data";
+
+ RangeSinkWriter writer(params.fd, tgt);
+ pthread_mutex_lock(¶ms.nti.mu);
+ params.nti.writer = &writer;
+ pthread_cond_broadcast(¶ms.nti.cv);
+
+ while (params.nti.writer != nullptr) {
+ pthread_cond_wait(¶ms.nti.cv, ¶ms.nti.mu);
}
- RangeSet tgt = parse_range(params.tokens[params.cpos++]);
+ pthread_mutex_unlock(¶ms.nti.mu);
+ }
- if (params.canwrite) {
- LOG(INFO) << " writing " << tgt.size << " blocks of new data";
+ params.written += tgt.size;
- RangeSinkState rss(tgt);
- rss.fd = params.fd;
- rss.p_block = 0;
- rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
-
- off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
- if (!discard_blocks(params.fd, offset, tgt.size * BLOCKSIZE)) {
- return -1;
- }
-
- if (!check_lseek(params.fd, offset, SEEK_SET)) {
- return -1;
- }
-
- pthread_mutex_lock(¶ms.nti.mu);
- params.nti.rss = &rss;
- pthread_cond_broadcast(¶ms.nti.cv);
-
- while (params.nti.rss) {
- pthread_cond_wait(¶ms.nti.cv, ¶ms.nti.mu);
- }
-
- pthread_mutex_unlock(¶ms.nti.mu);
- }
-
- params.written += tgt.size;
-
- return 0;
+ return 0;
}
static int PerformCommandDiff(CommandParameters& params) {
@@ -1295,40 +1289,28 @@
LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
Value patch_value(
VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
- RangeSinkState rss(tgt);
- rss.fd = params.fd;
- rss.p_block = 0;
- rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
- off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
- if (!discard_blocks(params.fd, offset, rss.p_remain)) {
- return -1;
- }
-
- if (!check_lseek(params.fd, offset, SEEK_SET)) {
- return -1;
- }
-
+ RangeSinkWriter writer(params.fd, tgt);
if (params.cmdname[0] == 'i') { // imgdiff
- if (ApplyImagePatch(
- params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
- std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
- nullptr, nullptr) != 0) {
+ if (ApplyImagePatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
+ std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
+ std::placeholders::_2),
+ nullptr, nullptr) != 0) {
LOG(ERROR) << "Failed to apply image patch.";
return -1;
}
} else {
- if (ApplyBSDiffPatch(
- params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
- std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
- nullptr) != 0) {
+ if (ApplyBSDiffPatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
+ std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
+ std::placeholders::_2),
+ nullptr) != 0) {
LOG(ERROR) << "Failed to apply bsdiff patch.";
return -1;
}
}
// We expect the output of the patcher to fill the tgt ranges exactly.
- if (rss.p_block != tgt.count || rss.p_remain != 0) {
+ if (!writer.Finished()) {
LOG(ERROR) << "range sink underrun?";
}
} else {