Abort the update if there's not enough new data

Right now the update stuck in a deadlock if there's less new data than
expection. Add some checkers and abort the update if such case happens.
Also add a corresponding test.

Bug: 36787146
Test: update aborts correctly on bullhead && recovery_component_test passes
Change-Id: I914e4a2a4cf157b99ef2fc65bd21c6981e38ca47
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index 0f08d17..d5c1704 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -149,7 +149,7 @@
 class RangeSinkWriter {
  public:
   RangeSinkWriter(int fd, const RangeSet& tgt)
-      : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
+      : fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0), bytes_written_(0) {
     CHECK_NE(tgt.size(), static_cast<size_t>(0));
   };
 
@@ -201,9 +201,14 @@
       written += write_now;
     }
 
+    bytes_written_ += written;
     return written;
   }
 
+  size_t BytesWritten() const {
+    return bytes_written_;
+  }
+
  private:
   // The input data.
   int fd_;
@@ -213,6 +218,8 @@
   size_t next_range_;
   // The number of bytes to write before moving to the next range.
   size_t current_range_left_;
+  // Total bytes written by the writer.
+  size_t bytes_written_;
 };
 
 /**
@@ -238,6 +245,7 @@
   ZipEntry entry;
 
   RangeSinkWriter* writer;
+  bool receiver_available;
 
   pthread_mutex_t mu;
   pthread_cond_t cv;
@@ -274,9 +282,16 @@
 }
 
 static void* unzip_new_data(void* cookie) {
-    NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
-    ProcessZipEntryContents(nti->za, &nti->entry, receive_new_data, nti);
-    return nullptr;
+  NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
+  ProcessZipEntryContents(nti->za, &nti->entry, receive_new_data, nti);
+
+  pthread_mutex_lock(&nti->mu);
+  nti->receiver_available = false;
+  if (nti->writer != nullptr) {
+    pthread_cond_broadcast(&nti->cv);
+  }
+  pthread_mutex_unlock(&nti->mu);
+  return nullptr;
 }
 
 static int ReadBlocks(const RangeSet& src, std::vector<uint8_t>& buffer, int fd) {
@@ -1133,6 +1148,12 @@
     pthread_cond_broadcast(&params.nti.cv);
 
     while (params.nti.writer != nullptr) {
+      if (!params.nti.receiver_available) {
+        LOG(ERROR) << "missing " << (tgt.blocks() * BLOCKSIZE - params.nti.writer->BytesWritten())
+                   << " bytes of new data";
+        pthread_mutex_unlock(&params.nti.mu);
+        return -1;
+      }
       pthread_cond_wait(&params.nti.cv, &params.nti.mu);
     }
 
@@ -1361,6 +1382,7 @@
   if (params.canwrite) {
     params.nti.za = za;
     params.nti.entry = new_entry;
+    params.nti.receiver_available = true;
 
     pthread_mutex_init(&params.nti.mu, nullptr);
     pthread_cond_init(&params.nti.cv, nullptr);