mtp: Update PosixAsyncIO

Signed-off-by: sekaiacg <sekaiacg@gmail.com>
Change-Id: I833f39fa7c1bfbc280c480b001c20f903aa8a47d
diff --git a/mtp/ffs/AsyncIO.cpp b/mtp/ffs/AsyncIO.cpp
deleted file mode 100644
index eb97a98..0000000
--- a/mtp/ffs/AsyncIO.cpp
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (C) 2016 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *		http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <android-base/logging.h>
-#include <condition_variable>
-#include <memory>
-#include <mutex>
-#include <queue>
-
-#include "AsyncIO.h"
-
-void read_func(struct aiocb *aiocbp) {
-	aiocbp->ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes,
-				aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
-}
-
-void write_func(struct aiocb *aiocbp) {
-	aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
-				aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
-}
-
-void splice_read_func(struct aiocb *aiocbp) {
-	loff_t long_offset = aiocbp->aio_offset;
-	aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes,
-				&long_offset, aiocbp->aio_sink,
-				NULL, aiocbp->aio_nbytes, 0));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
-}
-
-void splice_write_func(struct aiocb *aiocbp) {
-	loff_t long_offset = aiocbp->aio_offset;
-	aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes, NULL,
-				aiocbp->aio_sink, &long_offset,
-				aiocbp->aio_nbytes, 0));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
-}
-
-std::queue<std::unique_ptr<struct aiocb>> queue;
-std::mutex queue_lock;
-std::condition_variable queue_cond;
-std::condition_variable write_cond;
-int done = 1;
-void splice_write_pool_func(int) {
-	while(1) {
-		std::unique_lock<std::mutex> lk(queue_lock);
-		queue_cond.wait(lk, []{return !queue.empty() || done;});
-		if (queue.empty() && done) {
-			return;
-		}
-		std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front());
-		queue.pop();
-		lk.unlock();
-		write_cond.notify_one();
-		splice_write_func(aiocbp.get());
-		close(aiocbp->aio_fildes);
-	}
-}
-
-void write_pool_func(int) {
-	while(1) {
-		std::unique_lock<std::mutex> lk(queue_lock);
-		queue_cond.wait(lk, []{return !queue.empty() || done;});
-		if (queue.empty() && done) {
-			return;
-		}
-		std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front());
-		queue.pop();
-		lk.unlock();
-		write_cond.notify_one();
-		aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
-					aiocbp->aio_pool_buf.get(), aiocbp->aio_nbytes, aiocbp->aio_offset));
-		if (aiocbp->ret == -1) aiocbp->error = errno;
-	}
-}
-
-constexpr int NUM_THREADS = 1;
-constexpr int MAX_QUEUE_SIZE = 10;
-std::thread pool[NUM_THREADS];
-
-aiocb::~aiocb() {
-	CHECK(!thread.joinable());
-}
-
-void aio_pool_init(void(f)(int)) {
-	CHECK(done == 1);
-	done = 0;
-	for (int i = 0; i < NUM_THREADS; i++) {
-		pool[i] = std::thread(f, i);
-	}
-}
-
-void aio_pool_splice_init() {
-	aio_pool_init(splice_write_pool_func);
-}
-
-void aio_pool_write_init() {
-	aio_pool_init(write_pool_func);
-}
-
-void aio_pool_end() {
-	done = 1;
-	for (int i = 0; i < NUM_THREADS; i++) {
-		std::unique_lock<std::mutex> lk(queue_lock);
-		lk.unlock();
-		queue_cond.notify_one();
-	}
-
-	for (int i = 0; i < NUM_THREADS; i++) {
-		pool[i].join();
-	}
-}
-
-// used for both writes and splices depending on which init was used before.
-int aio_pool_write(struct aiocb *aiocbp) {
-	std::unique_lock<std::mutex> lk(queue_lock);
-	write_cond.wait(lk, []{return queue.size() < MAX_QUEUE_SIZE;});
-	queue.push(std::unique_ptr<struct aiocb>(aiocbp));
-	lk.unlock();
-	queue_cond.notify_one();
-	return 0;
-}
-
-int aio_read(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(read_func, aiocbp);
-	return 0;
-}
-
-int aio_write(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(write_func, aiocbp);
-	return 0;
-}
-
-int aio_splice_read(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(splice_read_func, aiocbp);
-	return 0;
-}
-
-int aio_splice_write(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(splice_write_func, aiocbp);
-	return 0;
-}
-
-int aio_error(const struct aiocb *aiocbp) {
-	return aiocbp->error;
-}
-
-ssize_t aio_return(struct aiocb *aiocbp) {
-	return aiocbp->ret;
-}
-
-int aio_suspend(struct aiocb *aiocbp[], int n,
-		const struct timespec *) {
-	for (int i = 0; i < n; i++) {
-		aiocbp[i]->thread.join();
-	}
-	return 0;
-}
-
-int aio_cancel(int, struct aiocb *) {
-	// Not implemented
-	return -1;
-}
-
diff --git a/mtp/ffs/AsyncIO.h b/mtp/ffs/AsyncIO.h
deleted file mode 100644
index 19e7617..0000000
--- a/mtp/ffs/AsyncIO.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (C) 2016 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *		http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _ASYNCIO_H
-#define _ASYNCIO_H
-
-#include <fcntl.h>
-#include <linux/aio_abi.h>
-#include <memory>
-#include <signal.h>
-#include <sys/cdefs.h>
-#include <sys/types.h>
-#include <time.h>
-#include <thread>
-#include <unistd.h>
-
-/**
- * Provides a subset of POSIX aio operations, as well
- * as similar operations with splice and threadpools.
- */
-
-struct aiocb {
-	int aio_fildes;		// Assumed to be the source for splices
-	void *aio_buf;		// Unused for splices
-
-	// Used for threadpool operations only, freed automatically
-	std::unique_ptr<char[]> aio_pool_buf;
-
-	off_t aio_offset;
-	size_t aio_nbytes;
-
-	int aio_sink;		// Unused for non splice r/w
-
-	// Used internally
-	std::thread thread;
-	ssize_t ret;
-	int error;
-
-	~aiocb();
-};
-
-// Submit a request for IO to be completed
-int aio_read(struct aiocb *);
-int aio_write(struct aiocb *);
-int aio_splice_read(struct aiocb *);
-int aio_splice_write(struct aiocb *);
-
-// Suspend current thread until given IO is complete, at which point
-// its return value and any errors can be accessed
-// All submitted requests must have a corresponding suspend.
-// aiocb->aio_buf must refer to valid memory until after the suspend call
-int aio_suspend(struct aiocb *[], int, const struct timespec *);
-int aio_error(const struct aiocb *);
-ssize_t aio_return(struct aiocb *);
-
-// (Currently unimplemented)
-int aio_cancel(int, struct aiocb *);
-
-// Initialize a threadpool to perform IO. Only one pool can be
-// running at a time.
-void aio_pool_write_init();
-void aio_pool_splice_init();
-// Suspend current thread until all queued work is complete, then ends the threadpool
-void aio_pool_end();
-// Submit IO work for the threadpool to complete. Memory associated with the work is
-// freed automatically when the work is complete.
-int aio_pool_write(struct aiocb *);
-
-#endif // ASYNCIO_H
-
diff --git a/mtp/ffs/PosixAsyncIO.cpp b/mtp/ffs/PosixAsyncIO.cpp
index 435000a..8205e3b 100644
--- a/mtp/ffs/PosixAsyncIO.cpp
+++ b/mtp/ffs/PosixAsyncIO.cpp
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *		http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,62 +15,132 @@
  */
 
 #include <android-base/logging.h>
-#include <condition_variable>
 #include <memory>
-#include <mutex>
+#include <pthread.h>
 #include <queue>
+#include <thread>
 #include <unistd.h>
 
 #include "PosixAsyncIO.h"
 
 namespace {
 
-void read_func(struct aiocb *aiocbp) {
-	aiocbp->ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes,
-				aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
+std::thread gWorkerThread;
+std::deque<struct aiocb*> gWorkQueue;
+bool gSuspended = true;
+int gAiocbRefcount = 0;
+std::mutex gLock;
+std::condition_variable gWait;
+
+void work_func(void *) {
+    pthread_setname_np(pthread_self(), "AsyncIO work");
+    while (true) {
+        struct aiocb *aiocbp;
+        {
+            std::unique_lock<std::mutex> lk(gLock);
+            gWait.wait(lk, []{return gWorkQueue.size() > 0 || gSuspended;});
+            if (gSuspended)
+                return;
+            aiocbp = gWorkQueue.back();
+            gWorkQueue.pop_back();
+        }
+        CHECK(aiocbp->queued);
+        int ret;
+        if (aiocbp->read) {
+            ret = TEMP_FAILURE_RETRY(pread64(aiocbp->aio_fildes,
+                    aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
+        } else {
+            ret = TEMP_FAILURE_RETRY(pwrite64(aiocbp->aio_fildes,
+               aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
+        }
+        {
+            std::unique_lock<std::mutex> lk(aiocbp->lock);
+            aiocbp->ret = ret;
+            if (aiocbp->ret == -1) {
+                aiocbp->error = errno;
+            }
+            aiocbp->queued = false;
+        }
+        aiocbp->cv.notify_all();
+    }
 }
 
-void write_func(struct aiocb *aiocbp) {
-	aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes,
-				aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset));
-	if (aiocbp->ret == -1) aiocbp->error = errno;
+int aio_add(struct aiocb *aiocbp) {
+    CHECK(!aiocbp->queued);
+    aiocbp->queued = true;
+    {
+        std::unique_lock<std::mutex> lk(gLock);
+        gWorkQueue.push_front(aiocbp);
+    }
+    gWait.notify_one();
+    return 0;
 }
 
 } // end anonymous namespace
 
+aiocb::aiocb() {
+    this->ret = 0;
+    this->queued = false;
+    {
+        std::unique_lock<std::mutex> lk(gLock);
+        if (gAiocbRefcount == 0) {
+            CHECK(gWorkQueue.size() == 0);
+            CHECK(gSuspended);
+            gSuspended = false;
+            gWorkerThread = std::thread(work_func, nullptr);
+        }
+        gAiocbRefcount++;
+    }
+}
+
 aiocb::~aiocb() {
-	CHECK(!thread.joinable());
+    CHECK(!this->queued);
+    {
+        std::unique_lock<std::mutex> lk(gLock);
+        CHECK(!gSuspended);
+        if (gAiocbRefcount == 1) {
+            CHECK(gWorkQueue.size() == 0);
+            gSuspended = true;
+            lk.unlock();
+            gWait.notify_one();
+            gWorkerThread.join();
+            lk.lock();
+        }
+        gAiocbRefcount--;
+    }
 }
 
 int aio_read(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(read_func, aiocbp);
-	return 0;
+    aiocbp->read = true;
+    return aio_add(aiocbp);
 }
 
 int aio_write(struct aiocb *aiocbp) {
-	aiocbp->thread = std::thread(write_func, aiocbp);
-	return 0;
+    aiocbp->read = false;
+    return aio_add(aiocbp);
 }
 
 int aio_error(const struct aiocb *aiocbp) {
-	return aiocbp->error;
+    return aiocbp->error;
 }
 
 ssize_t aio_return(struct aiocb *aiocbp) {
-	return aiocbp->ret;
+    return aiocbp->ret;
 }
 
 int aio_suspend(struct aiocb *aiocbp[], int n,
-		const struct timespec *) {
-	for (int i = 0; i < n; i++) {
-		aiocbp[i]->thread.join();
-	}
-	return 0;
+        const struct timespec *) {
+    for (int i = 0; i < n; i++) {
+        {
+            std::unique_lock<std::mutex> lk(aiocbp[i]->lock);
+            aiocbp[i]->cv.wait(lk, [aiocbp, i]{return !aiocbp[i]->queued;});
+        }
+    }
+    return 0;
 }
 
-void aio_prepare(struct aiocb *aiocbp, void* buf, size_t count, off_t offset) {
-	aiocbp->aio_buf = buf;
-	aiocbp->aio_offset = offset;
-	aiocbp->aio_nbytes = count;
+void aio_prepare(struct aiocb *aiocbp, void* buf, size_t count, off64_t offset) {
+    aiocbp->aio_buf = buf;
+    aiocbp->aio_offset = offset;
+    aiocbp->aio_nbytes = count;
 }
diff --git a/mtp/ffs/PosixAsyncIO.h b/mtp/ffs/PosixAsyncIO.h
index 69ab9a5..2bcae4c 100644
--- a/mtp/ffs/PosixAsyncIO.h
+++ b/mtp/ffs/PosixAsyncIO.h
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *		http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
 #ifndef _POSIXASYNCIO_H
 #define _POSIXASYNCIO_H
 
+#include <condition_variable>
+#include <mutex>
 #include <sys/cdefs.h>
 #include <sys/types.h>
 #include <time.h>
-#include <thread>
 #include <unistd.h>
 
 /**
@@ -28,18 +29,23 @@
  */
 
 struct aiocb {
-	int aio_fildes;
-	void *aio_buf;
+    int aio_fildes;
+    void *aio_buf;
 
-	off_t aio_offset;
-	size_t aio_nbytes;
+    off64_t aio_offset;
+    size_t aio_nbytes;
 
-	// Used internally
-	std::thread thread;
-	ssize_t ret;
-	int error;
+    // Used internally
+    bool read;
+    bool queued;
+    ssize_t ret;
+    int error;
 
-	~aiocb();
+    std::mutex lock;
+    std::condition_variable cv;
+
+    aiocb();
+    ~aiocb();
 };
 
 // Submit a request for IO to be completed
@@ -55,7 +61,7 @@
 ssize_t aio_return(struct aiocb *);
 
 // Helper method for setting aiocb members
-void aio_prepare(struct aiocb *, void*, size_t, off_t);
+void aio_prepare(struct aiocb *, void*, size_t, off64_t);
 
 #endif // POSIXASYNCIO_H