Add support to decompress brotli compressed new data

Add a new writer that can decode the brotli-compressed system/vendor
new data stored in the OTA zip.

Brotli generally gives better compression rate at the cost of slightly
increased time consumption. The patch.dat is already compressed
by BZ; so there's no point to further compress it.

For the given 1.9G bullhead system image:
Size: 875M -> 787M; ~10% reduction of package size.
Time: 147s -> 153s; ~4% increase of the block_image_update execution time.
(I guess I/O takes much longer time than decompression.)

Also it takes 4 minutes to compress the system image on my local
machine, 3 more minutes than zip.

Test: recovery tests pass && apply a full OTA with brotli compressed
system/vendor.new.dat on bullhead

Change-Id: I232335ebf662a9c55579ca073ad45265700a621e
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index df366b0..2bec487 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -44,6 +44,7 @@
 #include <android-base/strings.h>
 #include <android-base/unique_fd.h>
 #include <applypatch/applypatch.h>
+#include <brotli/decode.h>
 #include <openssl/sha.h>
 #include <private/android_filesystem_config.h>
 #include <ziparchive/zip_archive.h>
@@ -149,40 +150,32 @@
 class RangeSinkWriter {
  public:
   RangeSinkWriter(int fd, const RangeSet& tgt)
-      : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0), bytes_written_(0) {
+      : fd_(fd),
+        tgt_(tgt),
+        next_range_(0),
+        current_range_left_(0),
+        bytes_written_(0) {
     CHECK_NE(tgt.size(), static_cast<size_t>(0));
   };
 
+  virtual ~RangeSinkWriter() {};
+
   bool Finished() const {
     return next_range_ == tgt_.size() && current_range_left_ == 0;
   }
 
-  size_t Write(const uint8_t* data, size_t size) {
+  // Return number of bytes consumed; and 0 indicates a writing failure.
+  virtual size_t Write(const uint8_t* data, size_t size) {
     if (Finished()) {
       LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
       return 0;
     }
 
-    size_t written = 0;
+    size_t consumed = 0;
     while (size > 0) {
       // Move to the next range as needed.
-      if (current_range_left_ == 0) {
-        if (next_range_ < tgt_.size()) {
-          const Range& range = tgt_[next_range_];
-          off64_t offset = static_cast<off64_t>(range.first) * BLOCKSIZE;
-          current_range_left_ = (range.second - range.first) * BLOCKSIZE;
-          next_range_++;
-          if (!discard_blocks(fd_, offset, current_range_left_)) {
-            break;
-          }
-
-          if (!check_lseek(fd_, offset, SEEK_SET)) {
-            break;
-          }
-        } else {
-          // We can't write any more; return how many bytes have been written so far.
-          break;
-        }
+      if (!SeekToOutputRange()) {
+        break;
       }
 
       size_t write_now = size;
@@ -198,21 +191,47 @@
       size -= write_now;
 
       current_range_left_ -= write_now;
-      written += write_now;
+      consumed += write_now;
     }
 
-    bytes_written_ += written;
-    return written;
+    bytes_written_ += consumed;
+    return consumed;
   }
 
   size_t BytesWritten() const {
     return bytes_written_;
   }
 
- private:
-  // The input data.
+ protected:
+  // Set up the output cursor, move to next range if needed.
+  bool SeekToOutputRange() {
+    // We haven't finished the current range yet.
+    if (current_range_left_ != 0) {
+      return true;
+    }
+    // We can't write any more; let the write function return how many bytes have been written
+    // so far.
+    if (next_range_ >= tgt_.size()) {
+      return false;
+    }
+
+    const Range& range = tgt_[next_range_];
+    off64_t offset = static_cast<off64_t>(range.first) * BLOCKSIZE;
+    current_range_left_ = (range.second - range.first) * BLOCKSIZE;
+    next_range_++;
+
+    if (!discard_blocks(fd_, offset, current_range_left_)) {
+      return false;
+    }
+    if (!check_lseek(fd_, offset, SEEK_SET)) {
+      return false;
+    }
+    return true;
+  }
+
+  // The output file descriptor.
   int fd_;
-  // The destination for the data.
+  // The destination ranges for the data.
   const RangeSet& tgt_;
   // The next range that we should write to.
   size_t next_range_;
@@ -222,6 +241,75 @@
   size_t bytes_written_;
 };
 
+class BrotliNewDataWriter : public RangeSinkWriter {
+ public:
+  BrotliNewDataWriter(int fd, const RangeSet& tgt, BrotliDecoderState* state)
+      : RangeSinkWriter(fd, tgt), state_(state) {}
+
+  size_t Write(const uint8_t* data, size_t size) override {
+    if (Finished()) {
+      LOG(ERROR) << "Brotli new data write overrun; can't write " << size << " bytes";
+      return 0;
+    }
+    CHECK(state_ != nullptr);
+
+    size_t consumed = 0;
+    while (true) {
+      // Move to the next range as needed.
+      if (!SeekToOutputRange()) {
+        break;
+      }
+
+      size_t available_in = size;
+      size_t write_now = std::min<size_t>(32768, current_range_left_);
+      uint8_t buffer[write_now];
+
+      size_t available_out = write_now;
+      uint8_t* next_out = buffer;
+
+      // The brotli decoder will update |data|, |available_in|, |next_out| and |available_out|.
+      BrotliDecoderResult result = BrotliDecoderDecompressStream(
+          state_, &available_in, &data, &available_out, &next_out, nullptr);
+
+      // We don't have a way to recover from the decode error; report the failure.
+      if (result == BROTLI_DECODER_RESULT_ERROR) {
+        LOG(ERROR) << "Decompression failed with "
+                   << BrotliDecoderErrorString(BrotliDecoderGetErrorCode(state_));
+        return 0;
+      }
+
+      if (write_all(fd_, buffer, write_now - available_out) == -1) {
+        return 0;
+      }
+
+      LOG(DEBUG) << "bytes written: " << write_now - available_out << ", bytes consumed "
+                 << size - available_in << ", decoder status " << result;
+
+      // Update the total bytes written to output by the current writer; this is different from the
+      // consumed input bytes.
+      bytes_written_ += write_now - available_out;
+      current_range_left_ -= (write_now - available_out);
+      consumed += (size - available_in);
+
+      // Update the remaining size. The input data ptr is already updated by brotli decoder
+      // function.
+      size = available_in;
+
+      // Continue if we have more output to write, or more input to consume.
+      if (result == BROTLI_DECODER_RESULT_SUCCESS ||
+          (result == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && size == 0)) {
+        break;
+      }
+    }
+
+    return consumed;
+  }
+
+ private:
+  // Pointer to the decoder state. (initialized by PerformBlockImageUpdate)
+  BrotliDecoderState* state_;
+};
+
 /**
  * All of the data for all the 'new' transfers is contained in one file in the update package,
  * concatenated together in the order in which transfers.list will need it. We want to stream it out
@@ -243,8 +331,10 @@
 struct NewThreadInfo {
   ZipArchiveHandle za;
   ZipEntry entry;
+  bool brotli_compressed;
 
-  RangeSinkWriter* writer;
+  std::unique_ptr<RangeSinkWriter> writer;
+  BrotliDecoderState* brotli_decoder_state;
   bool receiver_available;
 
   pthread_mutex_t mu;
@@ -264,9 +354,16 @@
 
     // At this point nti->writer is set, and we own it. The main thread is waiting for it to
     // disappear from nti.
-    size_t written = nti->writer->Write(data, size);
-    data += written;
-    size -= written;
+    size_t consumed = nti->writer->Write(data, size);
+
+    // We encounter a fatal error if we fail to consume any input bytes. If this happens, abort the
+    // extraction.
+    if (consumed == 0) {
+      LOG(ERROR) << "Failed to process " << size << " input bytes.";
+      return false;
+    }
+    data += consumed;
+    size -= consumed;
 
     if (nti->writer->Finished()) {
       // We have written all the bytes desired by this writer.
@@ -1142,9 +1239,13 @@
   if (params.canwrite) {
     LOG(INFO) << " writing " << tgt.blocks() << " blocks of new data";
 
-    RangeSinkWriter writer(params.fd, tgt);
     pthread_mutex_lock(&params.nti.mu);
-    params.nti.writer = &writer;
+    if (params.nti.brotli_compressed) {
+      params.nti.writer =
+          std::make_unique<BrotliNewDataWriter>(params.fd, tgt, params.nti.brotli_decoder_state);
+    } else {
+      params.nti.writer = std::make_unique<RangeSinkWriter>(params.fd, tgt);
+    }
     pthread_cond_broadcast(&params.nti.cv);
 
     while (params.nti.writer != nullptr) {
@@ -1384,6 +1485,12 @@
   if (params.canwrite) {
     params.nti.za = za;
     params.nti.entry = new_entry;
+    // The entry is compressed by brotli if has a 'br' extension.
+    params.nti.brotli_compressed = android::base::EndsWith(new_data_fn->data, ".br");
+    if (params.nti.brotli_compressed) {
+      // Initialize brotli decoder state.
+      params.nti.brotli_decoder_state = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
+    }
     params.nti.receiver_available = true;
 
     pthread_mutex_init(&params.nti.mu, nullptr);
@@ -1526,6 +1633,10 @@
   }
   // params.fd will be automatically closed because it's a unique_fd.
 
+  if (params.nti.brotli_decoder_state != nullptr) {
+    BrotliDecoderDestroyInstance(params.nti.brotli_decoder_state);
+  }
+
   // Only delete the stash if the update cannot be resumed, or it's a verification run and we
   // created the stash.
   if (params.isunresumable || (!params.canwrite && params.createdstash)) {