From ccfcc5162acc89fb89fb980075b32874eed95b04 Mon Sep 17 00:00:00 2001 From: chenrenhui Date: Tue, 6 May 2025 10:49:07 +0800 Subject: [PATCH] fuse3: support fastpath --- example/passthrough_hp.cc | 10 +- include/fuse_ipc.h | 4 + include/fuse_kernel.h | 15 +- include/fuse_lowlevel.h | 2 + include/meson.build | 3 +- lib/fuse_i.h | 20 +++ lib/fuse_ipc.c | 359 ++++++++++++++++++++++++++++++++++++ lib/fuse_loop.c | 45 +++++ lib/fuse_loop_mt.c | 3 + lib/fuse_lowlevel.c | 369 +++++++++++++++++++++++++++++++++++--- lib/fuse_versionscript | 1 + lib/meson.build | 2 +- lib/mount.c | 4 + 13 files changed, 808 insertions(+), 29 deletions(-) create mode 100644 include/fuse_ipc.h create mode 100644 lib/fuse_ipc.c diff --git a/example/passthrough_hp.cc b/example/passthrough_hp.cc index 872fc73..1f96820 100644 --- a/example/passthrough_hp.cc +++ b/example/passthrough_hp.cc @@ -1146,7 +1146,10 @@ static cxxopts::ParseResult parse_options(int argc, char **argv) { ("help", "Print help") ("nocache", "Disable all caching") ("nosplice", "Do not use splice(2) to transfer data") - ("single", "Run single-threaded"); + ("single", "Run single-threaded") + ("nointerrupt", "Do not process interrupt request") + ("noforget", "Do not process forget request") + ("usefastpath", "use fastpath"); // FIXME: Find a better way to limit the try clause to just // opt_parser.parse() (cf. https://github.com/jarro2783/cxxopts/issues/146) @@ -1225,7 +1228,10 @@ int main(int argc, char *argv[]) { if (fuse_opt_add_arg(&args, argv[0]) || fuse_opt_add_arg(&args, "-o") || fuse_opt_add_arg(&args, "default_permissions,fsname=hpps") || - (options.count("debug-fuse") && fuse_opt_add_arg(&args, "-odebug"))) + (options.count("debug-fuse") && fuse_opt_add_arg(&args, "-odebug")) || + (options.count("nointerrupt") && fuse_opt_add_arg(&args, "-ono_interrupt")) || + (options.count("noforget") && fuse_opt_add_arg(&args, "-ono_forget")) || + (options.count("usefastpath") && fuse_opt_add_arg(&args, "-ouse_fastpath"))) errx(3, "ERROR: Out of memory"); fuse_lowlevel_ops sfs_oper {}; diff --git a/include/fuse_ipc.h b/include/fuse_ipc.h new file mode 100644 index 0000000..9b1da9b --- /dev/null +++ b/include/fuse_ipc.h @@ -0,0 +1,4 @@ +void fuse_ipc_create_server_mt(struct fuse_session *se); +int fuse_ipc_wait_and_ret_call(int fd, unsigned long arg); +int fuse_ipc_wait_call(int fd, unsigned long arg); +int fuse_ipc_ret_call(int fd, unsigned long arg); diff --git a/include/fuse_kernel.h b/include/fuse_kernel.h index 018a00a..331cbe0 100644 --- a/include/fuse_kernel.h +++ b/include/fuse_kernel.h @@ -821,8 +821,21 @@ struct fuse_notify_retrieve_in { uint64_t dummy4; }; +struct fuse_ipc_io { + char *buf; + uint64_t buf_len; + struct iovec *iov; + int count; +}; + /* Device ioctls: */ -#define FUSE_DEV_IOC_CLONE _IOR(229, 0, uint32_t) +#define FUSE_DEV_IOC_MAGIC 229 +#define FUSE_DEV_IOC_CLONE _IOR(FUSE_DEV_IOC_MAGIC, 0, uint32_t) +#define FUSE_DEV_IOC_IPC_BIND _IO(FUSE_DEV_IOC_MAGIC, 1) +#define FUSE_DEV_IOC_WAIT_RET_CALL _IOR(FUSE_DEV_IOC_MAGIC, 2, struct fuse_ipc_io) +#define FUSE_DEV_IOC_IPC_UNBIND _IO(FUSE_DEV_IOC_MAGIC, 4) +#define FUSE_DEV_IOC_WAIT_CALL _IOR(FUSE_DEV_IOC_MAGIC, 5, struct fuse_ipc_io) +#define FUSE_DEV_IOC_RET_CALL _IOR(FUSE_DEV_IOC_MAGIC, 6, struct fuse_ipc_io) struct fuse_lseek_in { uint64_t fh; diff --git a/include/fuse_lowlevel.h b/include/fuse_lowlevel.h index d73e9fa..911a22e 100644 --- a/include/fuse_lowlevel.h +++ b/include/fuse_lowlevel.h @@ -30,6 +30,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -1966,6 +1967,7 @@ int fuse_session_mount(struct fuse_session *se, const char *mountpoint); * @return 0, -errno, or a signal value */ int fuse_session_loop(struct fuse_session *se); +int fuse_session_loop_ipc(struct fuse_session *se); #if FUSE_USE_VERSION < 32 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd); diff --git a/include/meson.build b/include/meson.build index bf67197..3c757d3 100644 --- a/include/meson.build +++ b/include/meson.build @@ -1,4 +1,5 @@ libfuse_headers = [ 'fuse.h', 'fuse_common.h', 'fuse_lowlevel.h', - 'fuse_opt.h', 'cuse_lowlevel.h', 'fuse_log.h' ] + 'fuse_opt.h', 'cuse_lowlevel.h', 'fuse_log.h', + 'fuse_ipc.h' ] install_headers(libfuse_headers, subdir: 'fuse3') diff --git a/lib/fuse_i.h b/lib/fuse_i.h index d38b630..c7dd585 100644 --- a/lib/fuse_i.h +++ b/lib/fuse_i.h @@ -8,6 +8,7 @@ #include "fuse.h" #include "fuse_lowlevel.h" +#include "fuse_kernel.h" struct mount_opts; @@ -41,6 +42,12 @@ struct fuse_notify_req { struct fuse_notify_req *prev; }; +struct fuse_ipc_data { + struct fuse_ipc_io ipc_io; + int ipc_ret; + void *data_page; +}; + struct fuse_session { char *mountpoint; volatile int exited; @@ -64,6 +71,14 @@ struct fuse_session { struct fuse_notify_req notify_list; size_t bufsize; int error; + int ipc_server_fd; + void *ipc_data; + int not_first_msg; + pthread_key_t ipc_data_key; + int no_interrupt; + int no_forget; + int use_fastpath; + sem_t finish; }; struct fuse_chan { @@ -125,12 +140,17 @@ int fuse_session_receive_buf_int(struct fuse_session *se, struct fuse_buf *buf, struct fuse_chan *ch); void fuse_session_process_buf_int(struct fuse_session *se, const struct fuse_buf *buf, struct fuse_chan *ch); +void fuse_session_process_buf_int_fast(struct fuse_session *se, + const struct fuse_buf *buf, struct fuse_chan *ch); struct fuse *fuse_new_31(struct fuse_args *args, const struct fuse_operations *op, size_t op_size, void *private_data); int fuse_loop_mt_32(struct fuse *f, struct fuse_loop_config *config); int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config); +struct fuse_ipc_data *fuse_ll_get_ipc_data(struct fuse_session *se); +void fuse_ll_clear_ipc_data(struct fuse_session *se); + #define FUSE_MAX_MAX_PAGES 256 #define FUSE_DEFAULT_MAX_PAGES_PER_REQ 32 diff --git a/lib/fuse_ipc.c b/lib/fuse_ipc.c new file mode 100644 index 0000000..6528098 --- /dev/null +++ b/lib/fuse_ipc.c @@ -0,0 +1,359 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fuse_i.h" +#include "fuse_ipc.h" +#include "fuse_lowlevel.h" + +#define FUSE_DATA_PAGE_SIZE 4096 + +struct fuse_ipc_thread_args { + struct fuse_session *se; + + int wait_flag; + pthread_mutex_t mutex; + pthread_cond_t cond; + + int create_result; + + int cpu_id; +}; + +pthread_t *fuse_ipc_workers; + +static int fuse_ipc_cond_init(struct fuse_ipc_thread_args *args) +{ + int ret; + + args->wait_flag = 0; + + ret = pthread_mutex_init(&args->mutex, NULL); + if (ret != 0) { + fuse_log(FUSE_LOG_ERR, "mutex init failed, errno = %d\n", errno); + return -1; + } + + ret = pthread_cond_init(&args->cond, NULL); + if (ret != 0) { + fuse_log(FUSE_LOG_ERR, "cond init failed, errno = %d\n", errno); + return -1; + } + + return 0; +} + +static void fuse_ipc_cond_wait(struct fuse_ipc_thread_args *args) +{ + (void)pthread_mutex_lock(&args->mutex); + while (args->wait_flag == 0) + (void)pthread_cond_wait(&args->cond, &args->mutex); + (void)pthread_mutex_unlock(&args->mutex); +} + +static void fuse_ipc_cond_signal(struct fuse_ipc_thread_args *args, int result) +{ + args->create_result = result; + + (void)pthread_mutex_lock(&args->mutex); + if (args->wait_flag == 0) { + args->wait_flag = 1; + (void)pthread_cond_signal(&args->cond); + } + (void)pthread_mutex_unlock(&args->mutex); +} + +static int fuse_ipc_server_thread_bind(int fd) +{ + int ret; + + ret = ioctl(fd, FUSE_DEV_IOC_IPC_BIND); + if (ret < 0) { + fuse_log(FUSE_LOG_ERR, "fuse_ipc_server_thread bind failed, errno = %d\n", errno); + return -1; + } + + return 0; +} + +static int +fuse_mmap_data_page(struct fuse_session* se, struct fuse_ipc_data* ipc_data) +{ + void* addr; + + addr = mmap(NULL, FUSE_DATA_PAGE_SIZE, + PROT_READ | PROT_WRITE, MAP_SHARED, se->fd, 0); + if (addr == MAP_FAILED) { + fuse_log(FUSE_LOG_ERR, "mmap data page failed, errno = %d\n", errno); + return -1; + } + + ipc_data->data_page = addr; + + return 0; +} + +static int fuse_munmap_data_page(struct fuse_ipc_data *ipc_data) +{ + int ret; + + ret = munmap(ipc_data->data_page, FUSE_DATA_PAGE_SIZE); + if (ret) { + fuse_log(FUSE_LOG_ERR, "munmap data page failed, errno = %d\n", errno); + return ret; + } + + ipc_data->data_page = NULL; + + return ret; +} + +int fuse_ipc_wait_and_ret_call(int fd, unsigned long arg) +{ + int ret; + + ret = ioctl(fd, FUSE_DEV_IOC_WAIT_RET_CALL, arg); + if (ret < 0) { + if (errno != 4) + fuse_log(FUSE_LOG_ERR, "fuse_ipc_wait_and_ret call failed, errno = %d\n", errno); + return -1; + } + + return ret; +} + +int fuse_ipc_wait_call(int fd, unsigned long arg) +{ + int ret; + + ret = ioctl(fd, FUSE_DEV_IOC_WAIT_CALL, arg); + if (ret < 0) { + if (errno != 4 && errno != 19) + fuse_log(FUSE_LOG_ERR, "fuse_ipc_wait_call failed, errno = %d\n", errno); + return -1; + } + + return ret; +} + +int fuse_ipc_ret_call(int fd, unsigned long arg) +{ + int ret; + + ret = ioctl(fd, FUSE_DEV_IOC_RET_CALL, arg); + if (ret < 0) { + fuse_log(FUSE_LOG_ERR, "fuse_ipc_ret_call failed, errno = %d\n", errno); + return -1; + } + return ret; +} + +static int fuse_ipc_server_thread_unbind(struct fuse_session *se) +{ + int ret; + + ret = ioctl(se->fd, FUSE_DEV_IOC_IPC_UNBIND); + if (ret < 0) { + fuse_log(FUSE_LOG_ERR, "fuse_ipc_server_thread_unbind failed, errno = %d\n", errno); + return -1; + } + + return 0; +} + +static void *fuse_ipc_server_thread(void *args) +{ + int ret = 0; + int volatile err = -1; + struct fuse_ipc_thread_args *thread_args = (struct fuse_ipc_thread_args *)args; + struct fuse_session *se = thread_args->se; + int cpu_id = thread_args->cpu_id; + struct fuse_ipc_io *ipc_io; + struct fuse_ipc_data *ipc_data; + struct fuse_buf fbuf = { + .mem = NULL, + }; + cpu_set_t mask; + + CPU_ZERO(&mask); + CPU_SET(cpu_id, &mask); + + if (-1 == pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask)) { + fuse_log(FUSE_LOG_ERR, "error: pthread_setaffinity_np(%d) fiailed\n", cpu_id); + goto out; + } + + ipc_data = fuse_ll_get_ipc_data(se); + if (ipc_data == NULL) + goto out; + + ret = fuse_mmap_data_page(se, ipc_data); + if (ret) + goto clear; + + ret = fuse_ipc_server_thread_bind(se->fd); + if (ret) { + goto unmap; + } + + ipc_io = &(ipc_data->ipc_io); + ipc_io->buf = malloc(se->bufsize); + if (ipc_io->buf == NULL) + goto unbind; + ipc_io->buf_len = se->bufsize; + + fuse_ipc_cond_signal(thread_args, 0); + + err = 0; + fbuf.mem = ipc_io->buf; + + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + pthread_cleanup_push((void *)fuse_ll_clear_ipc_data, (void *)se); + pthread_cleanup_push((void *)fuse_munmap_data_page, (void *)ipc_data); + pthread_cleanup_push((void *)fuse_ipc_server_thread_unbind, (void *)se); + pthread_cleanup_push(free, ipc_io->buf); + for (;;) { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + ret = fuse_ipc_wait_call(se->fd, (unsigned long)ipc_io); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + if (ret <= 0) { + fuse_session_exit(se); + break; + } + if (fuse_session_exited(se)) + break; + fbuf.size = ret; + fuse_session_process_buf_int_fast(se, &fbuf, NULL); + } + + pthread_cleanup_pop(0); + pthread_cleanup_pop(0); + pthread_cleanup_pop(0); + pthread_cleanup_pop(0); + + free(ipc_io->buf); +unbind: + fuse_ipc_server_thread_unbind(se); +unmap: + fuse_munmap_data_page(ipc_data); +clear: + fuse_ll_clear_ipc_data(se); +out: + if (err) + fuse_ipc_cond_signal(thread_args, -1); + + sem_post(&se->finish); + + return NULL; +} + +static int fuse_ipc_server_thread_create(struct fuse_session *se, int cpu_id) +{ + struct fuse_ipc_thread_args *args; + int ret; + pthread_t tid; + pthread_attr_t attr; + unsigned int stack_size = 8 * 1024 * 1024; + sigset_t oldset; + sigset_t newset; + + args = (struct fuse_ipc_thread_args *)malloc(sizeof(struct fuse_ipc_thread_args)); + if (!args) { + fuse_log(FUSE_LOG_ERR, "malloc fuse_ipc_thread_args failed\n"); + return -1; + } + + ret = fuse_ipc_cond_init(args); + if (ret != 0) { + goto failed; + } + + ret = pthread_attr_init(&attr); + if (ret != 0) { + fuse_log(FUSE_LOG_ERR, "attr init failed, errno = %d\n", errno); + goto failed; + } + + ret = pthread_attr_setstacksize(&attr, stack_size); + if (ret != 0) { + fuse_log(FUSE_LOG_ERR, "set stack size failed, errno = %d\n", errno); + goto failed; + } + + args->se = se; + args->cpu_id = cpu_id; + + sigemptyset(&newset); + sigaddset(&newset, SIGTERM); + sigaddset(&newset, SIGINT); + sigaddset(&newset, SIGHUP); + sigaddset(&newset, SIGQUIT); + pthread_sigmask(SIG_BLOCK, &newset, &oldset); + ret = pthread_create(&tid, &attr, fuse_ipc_server_thread, (void *)args); + pthread_sigmask(SIG_SETMASK, &oldset, NULL); + pthread_attr_destroy(&attr); + if (ret != 0) { + fuse_log(FUSE_LOG_ERR, "pthread create failed, errno = %d\n", errno); + goto failed; + } + + fuse_ipc_cond_wait(args); + if (args->create_result) { + fuse_log(FUSE_LOG_ERR, "server thread create failed\n"); + ret = -1; + } else { + fuse_ipc_workers[cpu_id] = tid; + } + +failed: + free(args); + return ret; +} + +void fuse_ipc_create_server_mt(struct fuse_session *se) +{ + int ret = 0; + int thread_num = get_nprocs_conf(); + int created_num = 0; + int i = 0; + + if (thread_num != 0) { + fuse_ipc_workers = (pthread_t *)malloc(sizeof(pthread_t) * thread_num); + if (fuse_ipc_workers == NULL) { + fuse_log(FUSE_LOG_ERR, "fuse_ipc_create_server_mt: out of memory\n"); + goto out; + } + } + + sem_init(&se->finish, 0, 0); + for (i = 0; i < thread_num; i++) { + ret = fuse_ipc_server_thread_create(se, i); + if (ret) { + fuse_session_exit(se); + break; + } + } + + created_num = i; + while (!fuse_session_exited(se)) { + sem_wait(&se->finish); + } + + for (i = 0; i < created_num; i++) { + pthread_cancel(fuse_ipc_workers[i]); + } + for (i = 0; i < created_num; i++) { + pthread_join(fuse_ipc_workers[i], NULL); + } + +out: + free(fuse_ipc_workers); +} diff --git a/lib/fuse_loop.c b/lib/fuse_loop.c index e6560aa..4f86484 100644 --- a/lib/fuse_loop.c +++ b/lib/fuse_loop.c @@ -11,11 +11,50 @@ #include "config.h" #include "fuse_lowlevel.h" #include "fuse_i.h" +#include "fuse_ipc.h" #include #include #include +int fuse_session_loop_ipc(struct fuse_session *se) +{ + int res = 0; + struct fuse_buf fbuf = { + .mem = NULL, + }; + + while (!fuse_session_exited(se)) { + res = fuse_session_receive_buf_int(se, &fbuf, NULL); + + if (res == -EINTR) + continue; + if (res <= 0) + break; + + fuse_session_process_buf_int_fast(se, &fbuf, NULL); + + if (se->got_init) { + if (se->debug) + fuse_log(FUSE_LOG_DEBUG, "FUSE_INIT request is got\n"); + break; + } + } + + if (res > 0 && se->got_init) + fuse_ipc_create_server_mt(se); + + free(fbuf.mem); + if(res > 0) + /* No error, just the length of the most recently read + request */ + res = 0; + if(se->error != 0) + res = se->error; + fuse_session_reset(se); + return res; +} + int fuse_session_loop(struct fuse_session *se) { int res = 0; @@ -23,6 +62,12 @@ int fuse_session_loop(struct fuse_session *se) .mem = NULL, }; + if (se->use_fastpath) { + fuse_log(FUSE_LOG_ERR, + "single thread is not supported in fuse fastpath mode\n"); + return -1; + } + while (!fuse_session_exited(se)) { res = fuse_session_receive_buf_int(se, &fbuf, NULL); diff --git a/lib/fuse_loop_mt.c b/lib/fuse_loop_mt.c index 8fcc46c..ba98212 100644 --- a/lib/fuse_loop_mt.c +++ b/lib/fuse_loop_mt.c @@ -310,6 +310,9 @@ int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *co struct fuse_mt mt; struct fuse_worker *w; + if (se->use_fastpath) + return fuse_session_loop_ipc(se); + memset(&mt, 0, sizeof(struct fuse_mt)); mt.se = se; mt.clone_fd = config->clone_fd; diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c index 8e8fcb4..c6a5b6f 100644 --- a/lib/fuse_lowlevel.c +++ b/lib/fuse_lowlevel.c @@ -17,6 +17,7 @@ #include "fuse_opt.h" #include "fuse_misc.h" #include "mount_util.h" +#include "fuse_ipc.h" #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #ifndef F_LINUX_SPECIFIC_BASE #define F_LINUX_SPECIFIC_BASE 1024 @@ -133,14 +135,20 @@ void fuse_free_req(fuse_req_t req) int ctr; struct fuse_session *se = req->se; - pthread_mutex_lock(&se->lock); - req->u.ni.func = NULL; - req->u.ni.data = NULL; - list_del_req(req); - ctr = --req->ctr; - fuse_chan_put(req->ch); - req->ch = NULL; - pthread_mutex_unlock(&se->lock); + if (se->no_interrupt) { + ctr = --req->ctr; + fuse_chan_put(req->ch); + req->ch = NULL; + } else { + pthread_mutex_lock(&se->lock); + req->u.ni.func = NULL; + req->u.ni.data = NULL; + list_del_req(req); + ctr = --req->ctr; + fuse_chan_put(req->ch); + req->ch = NULL; + pthread_mutex_unlock(&se->lock); + } if (!ctr) destroy_req(req); } @@ -162,12 +170,18 @@ static struct fuse_req *fuse_ll_alloc_req(struct fuse_session *se) return req; } +static int fuse_send_msg_fast(struct fuse_session *se, struct fuse_chan *ch, + struct iovec *iov, int count); + /* Send data. If *ch* is NULL, send via session master fd */ static int fuse_send_msg(struct fuse_session *se, struct fuse_chan *ch, struct iovec *iov, int count) { struct fuse_out_header *out = iov[0].iov_base; + if (se->use_fastpath) + return fuse_send_msg_fast(se, ch, iov, count); + assert(se != NULL); out->len = iov_length(iov, count); if (se->debug) { @@ -200,6 +214,60 @@ static int fuse_send_msg(struct fuse_session *se, struct fuse_chan *ch, return 0; } +static int fuse_send_msg_fast(struct fuse_session *se, struct fuse_chan *ch, + struct iovec *iov, int count) +{ + struct fuse_out_header *out = iov[0].iov_base; + struct fuse_ipc_data *ipc_data; + struct fuse_ipc_io *ipc_io; + ssize_t res; + + assert(se != NULL); + out->len = iov_length(iov, count); + if (se->debug) { + if (out->unique == 0) { + fuse_log(FUSE_LOG_DEBUG, "NOTIFY: code=%d length=%u\n", + out->error, out->len); + } else if (out->error) { + fuse_log(FUSE_LOG_DEBUG, + " unique: %llu, error: %i (%s), outsize: %i\n", + (unsigned long long) out->unique, out->error, + strerror(-out->error), out->len); + } else { + fuse_log(FUSE_LOG_DEBUG, + " unique: %llu, success, outsize: %i\n", + (unsigned long long) out->unique, out->len); + } + + } + + if (!se->not_first_msg) { + res = writev(ch ? ch->fd : se->fd, + iov, count); + + int err = errno; + + if (res == -1) { + /* ENOENT means the operation was interrupted */ + if (!fuse_session_exited(se) && err != ENOENT) + perror("fuse: writing device"); + return -err; + } + se->not_first_msg = 1; + } else { + ipc_data = fuse_ll_get_ipc_data(se); + memcpy(ipc_data->data_page, iov[0].iov_base, iov[0].iov_len); + ipc_io = &(ipc_data->ipc_io); + ipc_io->iov = &iov[1]; + ipc_io->count = count - 1; + res = fuse_ipc_ret_call(se->fd, (unsigned long)ipc_io); + if (res == -1) { + return -errno; + } + } + + return 0; +} int fuse_send_reply_iov_nofree(fuse_req_t req, int error, struct iovec *iov, int count) @@ -585,6 +653,29 @@ static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_session *se) } #endif +struct fuse_ipc_data *fuse_ll_get_ipc_data(struct fuse_session *se) +{ + struct fuse_ipc_data *ipc_data = pthread_getspecific(se->ipc_data_key); + if (ipc_data == NULL) { + ipc_data = malloc(sizeof(struct fuse_ipc_data)); + if (ipc_data == NULL) + return NULL; + + pthread_setspecific(se->ipc_data_key, ipc_data); + } + + return ipc_data; +} + +void fuse_ll_clear_ipc_data(struct fuse_session *se) +{ + struct fuse_ipc_data *ipc_data = pthread_getspecific(se->ipc_data_key); + if (ipc_data) { + free(ipc_data); + pthread_setspecific(se->ipc_data_key, NULL); + } +} + static void fuse_ll_clear_pipe(struct fuse_session *se) { struct fuse_ll_pipe *llp = pthread_getspecific(se->pipe_key); @@ -1374,6 +1465,31 @@ static void do_write(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) fuse_reply_err(req, ENOSYS); } +static void do_write_fast(fuse_req_t req, fuse_ino_t nodeid, const void *inarg, + const struct fuse_buf *ibuf) +{ + struct fuse_write_in *arg = (struct fuse_write_in *) inarg; + struct fuse_file_info fi; + char *param = ibuf->mem; + + memset(&fi, 0, sizeof(fi)); + fi.fh = arg->fh; + fi.writepage = (arg->write_flags & FUSE_WRITE_CACHE) != 0; + + if (req->se->conn.proto_minor < 9) { + param = ((char *) arg) + FUSE_COMPAT_WRITE_IN_SIZE; + } else { + fi.lock_owner = arg->lock_owner; + fi.flags = arg->flags; + } + + if (req->se->op.write) + req->se->op.write(req, nodeid, param, arg->size, + arg->offset, &fi); + else + fuse_reply_err(req, ENOSYS); +} + static void do_write_buf(fuse_req_t req, fuse_ino_t nodeid, const void *inarg, const struct fuse_buf *ibuf) { @@ -1397,8 +1513,10 @@ static void do_write_buf(fuse_req_t req, fuse_ino_t nodeid, const void *inarg, } else { fi.lock_owner = arg->lock_owner; fi.flags = arg->flags; - if (!(bufv.buf[0].flags & FUSE_BUF_IS_FD)) - bufv.buf[0].mem = PARAM(arg); + if (!se->use_fastpath) { + if (!(bufv.buf[0].flags & FUSE_BUF_IS_FD)) + bufv.buf[0].mem = PARAM(arg); + } bufv.buf[0].size -= sizeof(struct fuse_in_header) + sizeof(struct fuse_write_in); @@ -1785,6 +1903,36 @@ static void do_bmap(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) fuse_reply_err(req, ENOSYS); } +static void do_ioctl_fast(fuse_req_t req, fuse_ino_t nodeid, const void *inarg, + const struct fuse_buf *ibuf) +{ + struct fuse_ioctl_in *arg = (struct fuse_ioctl_in *) inarg; + unsigned int flags = arg->flags; + void *in_buf = arg->in_size ? ibuf->mem : NULL; + struct fuse_file_info fi; + + if (flags & FUSE_IOCTL_DIR && + !(req->se->conn.want & FUSE_CAP_IOCTL_DIR)) { + fuse_reply_err(req, ENOTTY); + return; + } + + memset(&fi, 0, sizeof(fi)); + fi.fh = arg->fh; + + if (sizeof(void *) == 4 && req->se->conn.proto_minor >= 16 && + !(flags & FUSE_IOCTL_32BIT)) { + req->ioctl_64bit = 1; + } + + if (req->se->op.ioctl) + req->se->op.ioctl(req, nodeid, arg->cmd, + (void *)(uintptr_t)arg->arg, &fi, flags, + in_buf, arg->in_size, arg->out_size); + else + fuse_reply_err(req, ENOSYS); +} + static void do_ioctl(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) { struct fuse_ioctl_in *arg = (struct fuse_ioctl_in *) inarg; @@ -1941,8 +2089,14 @@ void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) if (arg->minor >= 6) { if (arg->max_readahead < se->conn.max_readahead) se->conn.max_readahead = arg->max_readahead; - if (arg->flags & FUSE_ASYNC_READ) - se->conn.capable |= FUSE_CAP_ASYNC_READ; + if (!se->use_fastpath) { + if (arg->flags & FUSE_ASYNC_READ) + se->conn.capable |= FUSE_CAP_ASYNC_READ; + if (arg->flags & FUSE_ASYNC_DIO) + se->conn.capable |= FUSE_CAP_ASYNC_DIO; + if (arg->flags & FUSE_WRITEBACK_CACHE) + se->conn.capable |= FUSE_CAP_WRITEBACK_CACHE; + } if (arg->flags & FUSE_POSIX_LOCKS) se->conn.capable |= FUSE_CAP_POSIX_LOCKS; if (arg->flags & FUSE_ATOMIC_O_TRUNC) @@ -1959,10 +2113,6 @@ void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) se->conn.capable |= FUSE_CAP_READDIRPLUS; if (arg->flags & FUSE_READDIRPLUS_AUTO) se->conn.capable |= FUSE_CAP_READDIRPLUS_AUTO; - if (arg->flags & FUSE_ASYNC_DIO) - se->conn.capable |= FUSE_CAP_ASYNC_DIO; - if (arg->flags & FUSE_WRITEBACK_CACHE) - se->conn.capable |= FUSE_CAP_WRITEBACK_CACHE; if (arg->flags & FUSE_NO_OPEN_SUPPORT) se->conn.capable |= FUSE_CAP_NO_OPEN_SUPPORT; if (arg->flags & FUSE_PARALLEL_DIROPS) @@ -1990,12 +2140,14 @@ void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) } if (se->conn.proto_minor >= 14) { + if (!se->use_fastpath) { #ifdef HAVE_SPLICE #ifdef HAVE_VMSPLICE - se->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE; + se->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE; #endif - se->conn.capable |= FUSE_CAP_SPLICE_READ; + se->conn.capable |= FUSE_CAP_SPLICE_READ; #endif + } } if (se->conn.proto_minor >= 18) se->conn.capable |= FUSE_CAP_IOCTL_DIR; @@ -2073,8 +2225,15 @@ void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) by the max_write option */ outarg.flags |= FUSE_BIG_WRITES; - if (se->conn.want & FUSE_CAP_ASYNC_READ) - outarg.flags |= FUSE_ASYNC_READ; + if (!se->use_fastpath) { + if (se->conn.want & FUSE_CAP_ASYNC_READ) + outarg.flags |= FUSE_ASYNC_READ; + if (se->conn.want & FUSE_CAP_ASYNC_DIO) + outarg.flags |= FUSE_ASYNC_DIO; + if (se->conn.want & FUSE_CAP_WRITEBACK_CACHE) + outarg.flags |= FUSE_WRITEBACK_CACHE; + } + if (se->conn.want & FUSE_CAP_POSIX_LOCKS) outarg.flags |= FUSE_POSIX_LOCKS; if (se->conn.want & FUSE_CAP_ATOMIC_O_TRUNC) @@ -2091,10 +2250,6 @@ void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg) outarg.flags |= FUSE_DO_READDIRPLUS; if (se->conn.want & FUSE_CAP_READDIRPLUS_AUTO) outarg.flags |= FUSE_READDIRPLUS_AUTO; - if (se->conn.want & FUSE_CAP_ASYNC_DIO) - outarg.flags |= FUSE_ASYNC_DIO; - if (se->conn.want & FUSE_CAP_WRITEBACK_CACHE) - outarg.flags |= FUSE_WRITEBACK_CACHE; if (se->conn.want & FUSE_CAP_POSIX_ACL) outarg.flags |= FUSE_POSIX_ACL; if (se->conn.want & FUSE_CAP_CACHE_SYMLINKS) @@ -2207,6 +2362,9 @@ static int send_notify_iov(struct fuse_session *se, int notify_code, if (!se->got_init) return -ENOTCONN; + if (se->use_fastpath) + return -ENOSYS; + out.unique = 0; out.error = notify_code; iov[0].iov_base = &out; @@ -2266,6 +2424,9 @@ int fuse_lowlevel_notify_inval_entry(struct fuse_session *se, fuse_ino_t parent, if (se->conn.proto_minor < 12) return -ENOSYS; + if (se->use_fastpath) + return -ENOSYS; + outarg.parent = parent; outarg.namelen = namelen; outarg.padding = 0; @@ -2291,6 +2452,9 @@ int fuse_lowlevel_notify_delete(struct fuse_session *se, if (se->conn.proto_minor < 18) return -ENOSYS; + if (se->use_fastpath) + return -ENOSYS; + outarg.parent = parent; outarg.child = child; outarg.namelen = namelen; @@ -2320,6 +2484,9 @@ int fuse_lowlevel_notify_store(struct fuse_session *se, fuse_ino_t ino, if (se->conn.proto_minor < 15) return -ENOSYS; + if (se->use_fastpath) + return -ENOSYS; + out.unique = 0; out.error = FUSE_NOTIFY_STORE; @@ -2398,6 +2565,9 @@ int fuse_lowlevel_notify_retrieve(struct fuse_session *se, fuse_ino_t ino, if (se->conn.proto_minor < 15) return -ENOSYS; + if (se->use_fastpath) + return -ENOSYS; + rreq = malloc(sizeof(*rreq)); if (rreq == NULL) return -ENOMEM; @@ -2685,6 +2855,116 @@ clear_pipe: goto out_free; } +void fuse_session_process_buf_int_fast(struct fuse_session *se, + const struct fuse_buf *buf, struct fuse_chan *ch) +{ + struct fuse_in_header *in; + const void *inarg; + struct fuse_req *req; + void *mbuf = NULL; + int err; + struct fuse_ipc_data *ipc_data; + + if (se->got_init){ + ipc_data = fuse_ll_get_ipc_data(se); + in = ipc_data->data_page;} + else { + in = buf->mem; + } + + if (se->debug) { + fuse_log(FUSE_LOG_DEBUG, + "unique: %llu, opcode: %s (%i), nodeid: %llu, insize: %zu, pid: %u\n", + (unsigned long long) in->unique, + opname((enum fuse_opcode) in->opcode), in->opcode, + (unsigned long long) in->nodeid, buf->size, in->pid); + } + + req = fuse_ll_alloc_req(se); + if (req == NULL) { + struct fuse_out_header out = { + .unique = in->unique, + .error = -ENOMEM, + }; + struct iovec iov = { + .iov_base = &out, + .iov_len = sizeof(struct fuse_out_header), + }; + + fuse_send_msg(se, ch, &iov, 1); + goto clear_pipe; + } + + req->unique = in->unique; + req->ctx.uid = in->uid; + req->ctx.gid = in->gid; + req->ctx.pid = in->pid; + req->ch = ch ? fuse_chan_get(ch) : NULL; + + err = EIO; + if (!se->got_init) { + enum fuse_opcode expected; + + expected = se->cuse_data ? CUSE_INIT : FUSE_INIT; + if (in->opcode != expected) + goto reply_err; + } else if (in->opcode == FUSE_INIT || in->opcode == CUSE_INIT) + goto reply_err; + + err = EACCES; + /* Implement -o allow_root */ + if (se->deny_others && in->uid != se->owner && in->uid != 0 && + in->opcode != FUSE_INIT && in->opcode != FUSE_READ && + in->opcode != FUSE_WRITE && in->opcode != FUSE_FSYNC && + in->opcode != FUSE_RELEASE && in->opcode != FUSE_READDIR && + in->opcode != FUSE_FSYNCDIR && in->opcode != FUSE_RELEASEDIR && + in->opcode != FUSE_NOTIFY_REPLY && + in->opcode != FUSE_READDIRPLUS) + goto reply_err; + + err = ENOSYS; + if (in->opcode >= FUSE_MAXOP || !fuse_ll_ops[in->opcode].func) + goto reply_err; + /* Do not process interrupt request */ + if (se->no_interrupt && in->opcode == FUSE_INTERRUPT) { + if (se->debug) + fuse_log(FUSE_LOG_DEBUG, "FUSE_INTERRUPT: reply to kernel to disable interrupt\n"); + goto reply_err; + } + if (!se->no_interrupt && in->opcode != FUSE_INTERRUPT) { + struct fuse_req *intr; + pthread_mutex_lock(&se->lock); + intr = check_interrupt(se, req); + list_add_req(req, &se->list); + pthread_mutex_unlock(&se->lock); + if (intr) + fuse_reply_err(intr, EAGAIN); + } + + inarg = (void *) &in[1]; + if (in->opcode == FUSE_WRITE && se->op.write_buf) + do_write_buf(req, in->nodeid, inarg, buf); + else if (in->opcode == FUSE_NOTIFY_REPLY) + do_notify_reply(req, in->nodeid, inarg, buf); + else if (in->opcode == FUSE_IOCTL) + do_ioctl_fast(req, in->nodeid, inarg, buf); + else if (in->opcode == FUSE_WRITE && !se->op.write_buf) + do_write_fast(req, in->nodeid, inarg, buf); + else + fuse_ll_ops[in->opcode].func(req, in->nodeid, inarg); + +out_free: + free(mbuf); + return; + +reply_err: + fuse_reply_err(req, err); +clear_pipe: + if (buf->flags & FUSE_BUF_IS_FD) + fuse_ll_clear_pipe(se); + goto out_free; +} + #define LL_OPTION(n,o,v) \ { n, offsetof(struct fuse_session, o), v } @@ -2693,6 +2973,16 @@ static const struct fuse_opt fuse_ll_opts[] = { LL_OPTION("-d", debug, 1), LL_OPTION("--debug", debug, 1), LL_OPTION("allow_root", deny_others, 1), +#ifdef __aarch64__ + LL_OPTION("no_interrupt", no_interrupt, 1), + LL_OPTION("no_forget", no_forget, 1), + /* The option use_fastpath is only supported on arm64, + * because this feature is only enabled on arm64 kernel. + * The main performance improvements come from increased parallelism, + * which is more valuable for arm64 with more cores. + */ + LL_OPTION("use_fastpath", use_fastpath, 1), +#endif FUSE_OPT_END }; @@ -2710,7 +3000,13 @@ void fuse_lowlevel_help(void) printf( " -o allow_other allow access by all users\n" " -o allow_root allow access by root\n" +#ifdef __aarch64__ +" -o auto_unmount auto unmount on process termination\n" +" -o no_interrupt do not process interrupt request\n" +" -o no_forget do not process forget request\n"); +#else " -o auto_unmount auto unmount on process termination\n"); +#endif } void fuse_session_destroy(struct fuse_session *se) @@ -2733,6 +3029,11 @@ void fuse_session_destroy(struct fuse_session *se) free(se); } +static void fuse_ll_ipc_data_destructor(void *data) +{ + struct fuse_ipc_data *ipc_data = data; + free(ipc_data); +} static void fuse_ll_pipe_destructor(void *data) { @@ -2939,6 +3240,17 @@ struct fuse_session *fuse_session_new(struct fuse_args *args, if(fuse_opt_add_arg(args, "-oallow_other") == -1) goto out2; } + + if (se->use_fastpath) { + if (fuse_opt_add_arg(args, "-ouse_fastpath") == -1) + goto out2; + } + + if (se->no_forget) { + if (fuse_opt_add_arg(args, "-ono_forget") == -1) + goto out2; + } + mo = parse_mount_opts(args); if (mo == NULL) goto out3; @@ -2968,6 +3280,15 @@ struct fuse_session *fuse_session_new(struct fuse_args *args, se->notify_ctr = 1; pthread_mutex_init(&se->lock, NULL); + if (se->use_fastpath) { + err = pthread_key_create(&se->ipc_data_key, fuse_ll_ipc_data_destructor); + if (err) { + fuse_log(FUSE_LOG_ERR, "fuse: failed to create thread specific key: %s\n", + strerror(err)); + goto out5; + } + } + err = pthread_key_create(&se->pipe_key, fuse_ll_pipe_destructor); if (err) { fuse_log(FUSE_LOG_ERR, "fuse: failed to create thread specific key: %s\n", diff --git a/lib/fuse_versionscript b/lib/fuse_versionscript index a06f768..9deef71 100644 --- a/lib/fuse_versionscript +++ b/lib/fuse_versionscript @@ -17,6 +17,7 @@ FUSE_3.0 { fuse_session_exit; fuse_session_exited; fuse_session_loop; + fuse_session_loop_ipc; fuse_session_loop_mt; fuse_session_reset; fuse_session_fd; diff --git a/lib/meson.build b/lib/meson.build index 98461d8..89cebbf 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -2,7 +2,7 @@ libfuse_sources = ['fuse.c', 'fuse_i.h', 'fuse_loop.c', 'fuse_loop_mt.c', 'fuse_lowlevel.c', 'fuse_misc.h', 'fuse_opt.c', 'fuse_signals.c', 'buffer.c', 'cuse_lowlevel.c', 'helper.c', 'modules/subdir.c', 'mount_util.c', - 'fuse_log.c' ] + 'fuse_log.c', 'fuse_ipc.c'] if host_machine.system().startswith('linux') libfuse_sources += [ 'mount.c' ] diff --git a/lib/mount.c b/lib/mount.c index 979f8d9..91b531e 100644 --- a/lib/mount.c +++ b/lib/mount.c @@ -111,6 +111,10 @@ static const struct fuse_opt fuse_mount_opts[] = { FUSE_OPT_KEY("dirsync", KEY_KERN_FLAG), FUSE_OPT_KEY("atime", KEY_KERN_FLAG), FUSE_OPT_KEY("noatime", KEY_KERN_FLAG), +#ifdef __aarch64__ + FUSE_OPT_KEY("no_forget", KEY_KERN_OPT), + FUSE_OPT_KEY("use_fastpath", KEY_KERN_OPT), +#endif FUSE_OPT_END }; -- 2.45.1.windows.1