Finish the new data receiver when update fails

The thread to receive new data may still be alive after we exit
PerformBlockImageUpdate() upon failures. This caused memory corruption
when we run the unittest repeatedly. Set the receiver_available flag
to false and make sure the receiver exits when the update fails.

Bug: 65430057
Test: unittests passed with tsan
Change-Id: Icb232d13fb96c78262249ffbd29cdbe5b77f1fce
diff --git a/tests/component/updater_test.cpp b/tests/component/updater_test.cpp
index e6aec4a..266657d 100644
--- a/tests/component/updater_test.cpp
+++ b/tests/component/updater_test.cpp
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <android-base/file.h>
@@ -74,6 +75,23 @@
   ASSERT_EQ(cause_code, state.cause_code);
 }
 
+static void BuildUpdatePackage(const std::unordered_map<std::string, std::string>& entries,
+                               int fd) {
+  FILE* zip_file_ptr = fdopen(fd, "wb");
+  ZipWriter zip_writer(zip_file_ptr);
+
+  for (const auto& entry : entries) {
+    ASSERT_EQ(0, zip_writer.StartEntry(entry.first.c_str(), 0));
+    if (!entry.second.empty()) {
+      ASSERT_EQ(0, zip_writer.WriteBytes(entry.second.data(), entry.second.size()));
+    }
+    ASSERT_EQ(0, zip_writer.FinishEntry());
+  }
+
+  ASSERT_EQ(0, zip_writer.Finish());
+  ASSERT_EQ(0, fclose(zip_file_ptr));
+}
+
 static std::string get_sha1(const std::string& content) {
   uint8_t digest[SHA_DIGEST_LENGTH];
   SHA1(reinterpret_cast<const uint8_t*>(content.c_str()), content.size(), digest);
@@ -420,30 +438,19 @@
   ASSERT_EQ(0, fclose(updater_info.cmd_pipe));
 }
 
-TEST_F(UpdaterTest, block_image_update) {
-  // Create a zip file with new_data and patch_data.
-  TemporaryFile zip_file;
-  FILE* zip_file_ptr = fdopen(zip_file.release(), "wb");
-  ZipWriter zip_writer(zip_file_ptr);
-
-  // Add a dummy new data.
-  ASSERT_EQ(0, zip_writer.StartEntry("new_data", 0));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-
-  // Generate and add the patch data.
+TEST_F(UpdaterTest, block_image_update_patch_data) {
   std::string src_content = std::string(4096, 'a') + std::string(4096, 'c');
   std::string tgt_content = std::string(4096, 'b') + std::string(4096, 'd');
+
+  // Generate the patch data.
   TemporaryFile patch_file;
   ASSERT_EQ(0, bsdiff::bsdiff(reinterpret_cast<const uint8_t*>(src_content.data()),
       src_content.size(), reinterpret_cast<const uint8_t*>(tgt_content.data()),
       tgt_content.size(), patch_file.path, nullptr));
   std::string patch_content;
   ASSERT_TRUE(android::base::ReadFileToString(patch_file.path, &patch_content));
-  ASSERT_EQ(0, zip_writer.StartEntry("patch_data", 0));
-  ASSERT_EQ(0, zip_writer.WriteBytes(patch_content.data(), patch_content.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
 
-  // Add two transfer lists. The first one contains a bsdiff; and we expect the update to succeed.
+  // Create the transfer list that contains a bsdiff.
   std::string src_hash = get_sha1(src_content);
   std::string tgt_hash = get_sha1(tgt_content);
   std::vector<std::string> transfer_list = {
@@ -456,27 +463,16 @@
                                 src_hash.c_str(), tgt_hash.c_str(), src_hash.c_str()),
     "free " + src_hash,
   };
-  ASSERT_EQ(0, zip_writer.StartEntry("transfer_list", 0));
-  std::string commands = android::base::Join(transfer_list, '\n');
-  ASSERT_EQ(0, zip_writer.WriteBytes(commands.data(), commands.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
 
-  // Stash and free some blocks, then fail the 2nd update intentionally.
-  std::vector<std::string> fail_transfer_list = {
-    "4",
-    "2",
-    "0",
-    "2",
-    "stash " + tgt_hash + " 2,0,2",
-    "free " + tgt_hash,
-    "fail",
+  std::unordered_map<std::string, std::string> entries = {
+    { "new_data", "" },
+    { "patch_data", patch_content },
+    { "transfer_list", android::base::Join(transfer_list, '\n') },
   };
-  ASSERT_EQ(0, zip_writer.StartEntry("fail_transfer_list", 0));
-  std::string fail_commands = android::base::Join(fail_transfer_list, '\n');
-  ASSERT_EQ(0, zip_writer.WriteBytes(fail_commands.data(), fail_commands.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  ASSERT_EQ(0, zip_writer.Finish());
-  ASSERT_EQ(0, fclose(zip_file_ptr));
+
+  // Build the update package.
+  TemporaryFile zip_file;
+  BuildUpdatePackage(entries, zip_file.release());
 
   MemMapping map;
   ASSERT_TRUE(map.MapFile(zip_file.path));
@@ -491,7 +487,7 @@
   updater_info.package_zip_addr = map.addr;
   updater_info.package_zip_len = map.length;
 
-  // Execute the commands in the 1st transfer list.
+  // Execute the commands in the transfer list.
   TemporaryFile update_file;
   ASSERT_TRUE(android::base::WriteStringToFile(src_content, update_file.path));
   std::string script = "block_image_update(\"" + std::string(update_file.path) +
@@ -502,44 +498,98 @@
   ASSERT_TRUE(android::base::ReadFileToString(update_file.path, &updated_content));
   ASSERT_EQ(tgt_hash, get_sha1(updated_content));
 
-  // Expect the 2nd update to fail, but expect the stashed blocks to be freed.
-  script = "block_image_update(\"" + std::string(update_file.path) +
-      R"(", package_extract_file("fail_transfer_list"), "new_data", "patch_data"))";
+  ASSERT_EQ(0, fclose(updater_info.cmd_pipe));
+  CloseArchive(handle);
+}
+
+TEST_F(UpdaterTest, block_image_update_fail) {
+  std::string src_content(4096 * 2, 'e');
+  std::string src_hash = get_sha1(src_content);
+  // Stash and free some blocks, then fail the update intentionally.
+  std::vector<std::string> transfer_list = {
+    "4", "2", "0", "2", "stash " + src_hash + " 2,0,2", "free " + src_hash, "fail",
+  };
+
+  // Add a new data of 10 bytes to test the deadlock.
+  std::unordered_map<std::string, std::string> entries = {
+    { "new_data", std::string(10, 0) },
+    { "patch_data", "" },
+    { "transfer_list", android::base::Join(transfer_list, '\n') },
+  };
+
+  // Build the update package.
+  TemporaryFile zip_file;
+  BuildUpdatePackage(entries, zip_file.release());
+
+  MemMapping map;
+  ASSERT_TRUE(map.MapFile(zip_file.path));
+  ZipArchiveHandle handle;
+  ASSERT_EQ(0, OpenArchiveFromMemory(map.addr, map.length, zip_file.path, &handle));
+
+  // Set up the handler, command_pipe, patch offset & length.
+  UpdaterInfo updater_info;
+  updater_info.package_zip = handle;
+  TemporaryFile temp_pipe;
+  updater_info.cmd_pipe = fdopen(temp_pipe.release(), "wbe");
+  updater_info.package_zip_addr = map.addr;
+  updater_info.package_zip_len = map.length;
+
+  TemporaryFile update_file;
+  ASSERT_TRUE(android::base::WriteStringToFile(src_content, update_file.path));
+  // Expect the stashed blocks to be freed.
+  std::string script = "block_image_update(\"" + std::string(update_file.path) +
+                       R"(", package_extract_file("transfer_list"), "new_data", "patch_data"))";
   expect("", script.c_str(), kNoCause, &updater_info);
   // Updater generates the stash name based on the input file name.
   std::string name_digest = get_sha1(update_file.path);
   std::string stash_base = "/cache/recovery/" + name_digest;
   ASSERT_EQ(0, access(stash_base.c_str(), F_OK));
-  ASSERT_EQ(-1, access((stash_base + tgt_hash).c_str(), F_OK));
+  ASSERT_EQ(-1, access((stash_base + src_hash).c_str(), F_OK));
   ASSERT_EQ(0, rmdir(stash_base.c_str()));
 
   ASSERT_EQ(0, fclose(updater_info.cmd_pipe));
   CloseArchive(handle);
 }
 
-TEST_F(UpdaterTest, new_data_short_write) {
-  // Create a zip file with new_data.
+TEST_F(UpdaterTest, new_data_over_write) {
+  std::vector<std::string> transfer_list = {
+    "4", "1", "0", "0", "new 2,0,1",
+  };
+
+  // Write 4096 + 100 bytes of new data.
+  std::unordered_map<std::string, std::string> entries = {
+    { "new_data", std::string(4196, 0) },
+    { "patch_data", "" },
+    { "transfer_list", android::base::Join(transfer_list, '\n') },
+  };
+
+  // Build the update package.
   TemporaryFile zip_file;
-  FILE* zip_file_ptr = fdopen(zip_file.release(), "wb");
-  ZipWriter zip_writer(zip_file_ptr);
+  BuildUpdatePackage(entries, zip_file.release());
 
-  // Add the empty new data.
-  ASSERT_EQ(0, zip_writer.StartEntry("empty_new_data", 0));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  // Add the short written new data.
-  ASSERT_EQ(0, zip_writer.StartEntry("short_new_data", 0));
-  std::string new_data_short = std::string(10, 'a');
-  ASSERT_EQ(0, zip_writer.WriteBytes(new_data_short.data(), new_data_short.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  // Add the data of exactly one block.
-  ASSERT_EQ(0, zip_writer.StartEntry("exact_new_data", 0));
-  std::string new_data_exact = std::string(4096, 'a');
-  ASSERT_EQ(0, zip_writer.WriteBytes(new_data_exact.data(), new_data_exact.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  // Add a dummy patch data.
-  ASSERT_EQ(0, zip_writer.StartEntry("patch_data", 0));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
+  MemMapping map;
+  ASSERT_TRUE(map.MapFile(zip_file.path));
+  ZipArchiveHandle handle;
+  ASSERT_EQ(0, OpenArchiveFromMemory(map.addr, map.length, zip_file.path, &handle));
 
+  // Set up the handler, command_pipe, patch offset & length.
+  UpdaterInfo updater_info;
+  updater_info.package_zip = handle;
+  TemporaryFile temp_pipe;
+  updater_info.cmd_pipe = fdopen(temp_pipe.release(), "wbe");
+  updater_info.package_zip_addr = map.addr;
+  updater_info.package_zip_len = map.length;
+
+  TemporaryFile update_file;
+  std::string script = "block_image_update(\"" + std::string(update_file.path) +
+                       R"(", package_extract_file("transfer_list"), "new_data", "patch_data"))";
+  expect("t", script.c_str(), kNoCause, &updater_info);
+
+  ASSERT_EQ(0, fclose(updater_info.cmd_pipe));
+  CloseArchive(handle);
+}
+
+TEST_F(UpdaterTest, new_data_short_write) {
   std::vector<std::string> transfer_list = {
     "4",
     "1",
@@ -547,12 +597,17 @@
     "0",
     "new 2,0,1",
   };
-  ASSERT_EQ(0, zip_writer.StartEntry("transfer_list", 0));
-  std::string commands = android::base::Join(transfer_list, '\n');
-  ASSERT_EQ(0, zip_writer.WriteBytes(commands.data(), commands.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  ASSERT_EQ(0, zip_writer.Finish());
-  ASSERT_EQ(0, fclose(zip_file_ptr));
+
+  std::unordered_map<std::string, std::string> entries = {
+    { "empty_new_data", "" },
+    { "short_new_data", std::string(10, 'a') },
+    { "exact_new_data", std::string(4096, 'a') },
+    { "patch_data", "" },
+    { "transfer_list", android::base::Join(transfer_list, '\n') },
+  };
+
+  TemporaryFile zip_file;
+  BuildUpdatePackage(entries, zip_file.release());
 
   MemMapping map;
   ASSERT_TRUE(map.MapFile(zip_file.path));
@@ -587,14 +642,6 @@
 }
 
 TEST_F(UpdaterTest, brotli_new_data) {
-  // Create a zip file with new_data.
-  TemporaryFile zip_file;
-  FILE* zip_file_ptr = fdopen(zip_file.release(), "wb");
-  ZipWriter zip_writer(zip_file_ptr);
-
-  // Add a brotli compressed new data entry.
-  ASSERT_EQ(0, zip_writer.StartEntry("new.dat.br", 0));
-
   auto generator = []() { return rand() % 128; };
   // Generate 100 blocks of random data.
   std::string brotli_new_data;
@@ -602,16 +649,12 @@
   generate_n(back_inserter(brotli_new_data), 4096 * 100, generator);
 
   size_t encoded_size = BrotliEncoderMaxCompressedSize(brotli_new_data.size());
-  std::vector<uint8_t> encoded_data(encoded_size);
+  std::string encoded_data(encoded_size, 0);
   ASSERT_TRUE(BrotliEncoderCompress(
       BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, brotli_new_data.size(),
-      reinterpret_cast<const uint8_t*>(brotli_new_data.data()), &encoded_size, encoded_data.data()));
-
-  ASSERT_EQ(0, zip_writer.WriteBytes(encoded_data.data(), encoded_size));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  // Add a dummy patch data.
-  ASSERT_EQ(0, zip_writer.StartEntry("patch_data", 0));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
+      reinterpret_cast<const uint8_t*>(brotli_new_data.data()), &encoded_size,
+      reinterpret_cast<uint8_t*>(const_cast<char*>(encoded_data.data()))));
+  encoded_data.resize(encoded_size);
 
   // Write a few small chunks of new data, then a large chunk, and finally a few small chunks.
   // This helps us to catch potential short writes.
@@ -627,12 +670,15 @@
     "new 2,98,99",
     "new 2,99,100",
   };
-  ASSERT_EQ(0, zip_writer.StartEntry("transfer_list", 0));
-  std::string commands = android::base::Join(transfer_list, '\n');
-  ASSERT_EQ(0, zip_writer.WriteBytes(commands.data(), commands.size()));
-  ASSERT_EQ(0, zip_writer.FinishEntry());
-  ASSERT_EQ(0, zip_writer.Finish());
-  ASSERT_EQ(0, fclose(zip_file_ptr));
+
+  std::unordered_map<std::string, std::string> entries = {
+    { "new.dat.br", std::move(encoded_data) },
+    { "patch_data", "" },
+    { "transfer_list", android::base::Join(transfer_list, '\n') },
+  };
+
+  TemporaryFile zip_file;
+  BuildUpdatePackage(entries, zip_file.release());
 
   MemMapping map;
   ASSERT_TRUE(map.MapFile(zip_file.path));
diff --git a/updater/blockimg.cpp b/updater/blockimg.cpp
index ce3cea4..6c7b3ef 100644
--- a/updater/blockimg.cpp
+++ b/updater/blockimg.cpp
@@ -281,6 +281,11 @@
     // Wait for nti->writer to be non-null, indicating some of this data is wanted.
     pthread_mutex_lock(&nti->mu);
     while (nti->writer == nullptr) {
+      // End the new data receiver if we encounter an error when performing block image update.
+      if (!nti->receiver_available) {
+        pthread_mutex_unlock(&nti->mu);
+        return false;
+      }
       pthread_cond_wait(&nti->cv, &nti->mu);
     }
     pthread_mutex_unlock(&nti->mu);
@@ -316,6 +321,11 @@
     // Wait for nti->writer to be non-null, indicating some of this data is wanted.
     pthread_mutex_lock(&nti->mu);
     while (nti->writer == nullptr) {
+      // End the receiver if we encounter an error when performing block image update.
+      if (!nti->receiver_available) {
+        pthread_mutex_unlock(&nti->mu);
+        return false;
+      }
       pthread_cond_wait(&nti->cv, &nti->mu);
     }
     pthread_mutex_unlock(&nti->mu);
@@ -1591,29 +1601,44 @@
     }
   }
 
-  if (params.canwrite) {
-    pthread_join(params.thread, nullptr);
-
-    LOG(INFO) << "wrote " << params.written << " blocks; expected " << total_blocks;
-    LOG(INFO) << "stashed " << params.stashed << " blocks";
-    LOG(INFO) << "max alloc needed was " << params.buffer.size();
-
-    const char* partition = strrchr(blockdev_filename->data.c_str(), '/');
-    if (partition != nullptr && *(partition + 1) != 0) {
-      fprintf(cmd_pipe, "log bytes_written_%s: %zu\n", partition + 1, params.written * BLOCKSIZE);
-      fprintf(cmd_pipe, "log bytes_stashed_%s: %zu\n", partition + 1, params.stashed * BLOCKSIZE);
-      fflush(cmd_pipe);
-    }
-    // Delete stash only after successfully completing the update, as it may contain blocks needed
-    // to complete the update later.
-    DeleteStash(params.stashbase);
-  } else {
-    LOG(INFO) << "verified partition contents; update may be resumed";
-  }
-
   rc = 0;
 
 pbiudone:
+  if (params.canwrite) {
+    pthread_mutex_lock(&params.nti.mu);
+    if (params.nti.receiver_available) {
+      LOG(WARNING) << "new data receiver is still available after executing all commands.";
+    }
+    params.nti.receiver_available = false;
+    pthread_cond_broadcast(&params.nti.cv);
+    pthread_mutex_unlock(&params.nti.mu);
+    int ret = pthread_join(params.thread, nullptr);
+    if (ret != 0) {
+      LOG(WARNING) << "pthread join returned with " << strerror(ret);
+    }
+
+    if (rc == 0) {
+      LOG(INFO) << "wrote " << params.written << " blocks; expected " << total_blocks;
+      LOG(INFO) << "stashed " << params.stashed << " blocks";
+      LOG(INFO) << "max alloc needed was " << params.buffer.size();
+
+      const char* partition = strrchr(blockdev_filename->data.c_str(), '/');
+      if (partition != nullptr && *(partition + 1) != 0) {
+        fprintf(cmd_pipe, "log bytes_written_%s: %zu\n", partition + 1, params.written * BLOCKSIZE);
+        fprintf(cmd_pipe, "log bytes_stashed_%s: %zu\n", partition + 1, params.stashed * BLOCKSIZE);
+        fflush(cmd_pipe);
+      }
+      // Delete stash only after successfully completing the update, as it may contain blocks needed
+      // to complete the update later.
+      DeleteStash(params.stashbase);
+    }
+
+    pthread_mutex_destroy(&params.nti.mu);
+    pthread_cond_destroy(&params.nti.cv);
+  } else if (rc == 0) {
+    LOG(INFO) << "verified partition contents; update may be resumed";
+  }
+
   if (ota_fsync(params.fd) == -1) {
     failure_type = kFsyncFailure;
     PLOG(ERROR) << "fsync failed";