From 426d9be447a4e9a103cdac1994891e72c50f16c7 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:47:53 +0900 Subject: [PATCH 1/6] refactor: extract global executor from scalar_function.c into executor.{c,h} MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the global executor thread implementation (~230 lines) from scalar_function.c into a new shared module executor.{c,h}. This makes the executor reusable by other C files (e.g., table_function.c) that also need to dispatch callbacks from non-Ruby threads. The executor API is generalized with a callback function pointer (rbduckdb_callback_fn) instead of the scalar-specific callback_arg. scalar_function.c adds a thin wrapper (scalar_execute_via_executor) to adapt the generic signature. No behavior change — all existing tests pass. --- ext/duckdb/duckdb.c | 1 + ext/duckdb/executor.c | 224 +++++++++++++++++++++++++++++++++ ext/duckdb/executor.h | 29 +++++ ext/duckdb/ruby-duckdb.h | 1 + ext/duckdb/scalar_function.c | 232 +---------------------------------- 5 files changed, 261 insertions(+), 226 deletions(-) create mode 100644 ext/duckdb/executor.c create mode 100644 ext/duckdb/executor.h diff --git a/ext/duckdb/duckdb.c b/ext/duckdb/duckdb.c index cdb9b492..d5c835ba 100644 --- a/ext/duckdb/duckdb.c +++ b/ext/duckdb/duckdb.c @@ -68,4 +68,5 @@ Init_duckdb_native(void) { rbduckdb_init_duckdb_table_function_bind_info(); rbduckdb_init_duckdb_table_function_init_info(); rbduckdb_init_duckdb_table_function_function_info(); + rbduckdb_init_executor(); } diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c new file mode 100644 index 00000000..b632c647 --- /dev/null +++ b/ext/duckdb/executor.c @@ -0,0 +1,224 @@ +#include "ruby-duckdb.h" + +/* + * Cross-platform threading primitives. + * MSVC (mswin) does not provide . + * MinGW-w64 (mingw, ucrt) provides via winpthreads. + */ +#ifdef _MSC_VER +#include +#else +#include +#endif + +#include "executor.h" + +/* ============================================================================ + * Global Executor Thread + * ============================================================================ + * + * A single Ruby thread that processes callback requests from non-Ruby threads. + * DuckDB worker threads enqueue requests and block until completion. + * + * Modeled after FFI gem's async callback dispatcher: + * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c + */ + +/* Per-callback request, stack-allocated on the DuckDB worker thread */ +struct executor_request { + rbduckdb_callback_fn callback_func; + void *callback_data; + int done; +#ifdef _MSC_VER + CRITICAL_SECTION done_lock; + CONDITION_VARIABLE done_cond; +#else + pthread_mutex_t done_mutex; + pthread_cond_t done_cond; +#endif + struct executor_request *next; +}; + +/* Global executor state */ +#ifdef _MSC_VER +static CRITICAL_SECTION g_executor_lock; +static CONDITION_VARIABLE g_executor_cond; +static int g_sync_initialized = 0; +#else +static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; +#endif +static struct executor_request *g_request_list = NULL; +static VALUE g_executor_thread = Qnil; +static int g_executor_started = 0; + +/* Data passed to the executor wait function */ +struct executor_wait_data { + struct executor_request *request; + int stop; +}; + +/* Runs without GVL: blocks on condvar waiting for a callback request */ +static void *executor_wait_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + + w->request = NULL; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + while (!w->stop && g_request_list == NULL) { + SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + while (!w->stop && g_request_list == NULL) { + pthread_cond_wait(&g_executor_cond, &g_executor_mutex); + } + if (g_request_list != NULL) { + w->request = g_request_list; + g_request_list = g_request_list->next; + } + pthread_mutex_unlock(&g_executor_mutex); +#endif + + return NULL; +} + +/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ +static void executor_stop_func(void *data) { + struct executor_wait_data *w = (struct executor_wait_data *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&g_executor_lock); + w->stop = 1; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); +#else + pthread_mutex_lock(&g_executor_mutex); + w->stop = 1; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); +#endif +} + +/* The executor thread main loop (Ruby thread) */ +static VALUE executor_thread_func(void *data) { + struct executor_wait_data w; + w.stop = 0; + + while (!w.stop) { + /* Release GVL and wait for a callback request */ + rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); + + if (w.request != NULL) { + struct executor_request *req = w.request; + + /* Execute the callback with the GVL */ + req->callback_func(req->callback_data); + + /* Signal the DuckDB worker thread that the callback is done */ +#ifdef _MSC_VER + EnterCriticalSection(&req->done_lock); + req->done = 1; + WakeConditionVariable(&req->done_cond); + LeaveCriticalSection(&req->done_lock); +#else + pthread_mutex_lock(&req->done_mutex); + req->done = 1; + pthread_cond_signal(&req->done_cond); + pthread_mutex_unlock(&req->done_mutex); +#endif + } + } + + return Qnil; +} + +/* + * Start the global executor thread (must be called with GVL held). + * + * Thread safety: This function is called from Ruby methods that always run + * with the GVL held. The GVL serializes all calls, so the g_executor_started + * check-then-set is safe without an extra mutex. + */ +void rbduckdb_executor_ensure_started(void) { + if (g_executor_started) return; + +#ifdef _MSC_VER + if (!g_sync_initialized) { + InitializeCriticalSection(&g_executor_lock); + InitializeConditionVariable(&g_executor_cond); + g_sync_initialized = 1; + } +#endif + + g_executor_thread = rb_thread_create(executor_thread_func, NULL); + rb_global_variable(&g_executor_thread); + g_executor_started = 1; +} + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data) { + struct executor_request req; + + req.callback_func = callback_func; + req.callback_data = callback_data; + req.done = 0; + req.next = NULL; + +#ifdef _MSC_VER + InitializeCriticalSection(&req.done_lock); + InitializeConditionVariable(&req.done_cond); + + /* Enqueue the request */ + EnterCriticalSection(&g_executor_lock); + req.next = g_request_list; + g_request_list = &req; + WakeConditionVariable(&g_executor_cond); + LeaveCriticalSection(&g_executor_lock); + + /* Wait for the executor to process our callback */ + EnterCriticalSection(&req.done_lock); + while (!req.done) { + SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); + } + LeaveCriticalSection(&req.done_lock); + + DeleteCriticalSection(&req.done_lock); +#else + pthread_mutex_init(&req.done_mutex, NULL); + pthread_cond_init(&req.done_cond, NULL); + + /* Enqueue the request */ + pthread_mutex_lock(&g_executor_mutex); + req.next = g_request_list; + g_request_list = &req; + pthread_cond_signal(&g_executor_cond); + pthread_mutex_unlock(&g_executor_mutex); + + /* Wait for the executor to process our callback */ + pthread_mutex_lock(&req.done_mutex); + while (!req.done) { + pthread_cond_wait(&req.done_cond, &req.done_mutex); + } + pthread_mutex_unlock(&req.done_mutex); + + pthread_cond_destroy(&req.done_cond); + pthread_mutex_destroy(&req.done_mutex); +#endif +} + +/* + * Initialize the executor subsystem. + * Called once from Init_duckdb_native. + */ +void rbduckdb_init_executor(void) { +} diff --git a/ext/duckdb/executor.h b/ext/duckdb/executor.h new file mode 100644 index 00000000..1ebeab6b --- /dev/null +++ b/ext/duckdb/executor.h @@ -0,0 +1,29 @@ +#ifndef RUBY_DUCKDB_EXECUTOR_H +#define RUBY_DUCKDB_EXECUTOR_H + +/* + * Shared executor infrastructure for dispatching callbacks from non-Ruby + * threads (DuckDB worker threads) to Ruby threads that can safely hold + * the GVL. + * + * A global executor thread waits for callback requests from a shared queue. + * Used as a generic mechanism for any C file that needs to dispatch callbacks + * from non-Ruby threads. + */ + +/* Generic callback function signature */ +typedef void (*rbduckdb_callback_fn)(void *data); + +/* Initialize the executor subsystem (call from Init_duckdb_native) */ +void rbduckdb_init_executor(void); + +/* Ensure the global executor thread is running (call with GVL held) */ +void rbduckdb_executor_ensure_started(void); + +/* + * Dispatch a callback to the global executor thread. + * Called from a non-Ruby thread. Blocks until the callback completes. + */ +void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data); + +#endif diff --git a/ext/duckdb/ruby-duckdb.h b/ext/duckdb/ruby-duckdb.h index 76594592..703ae70d 100644 --- a/ext/duckdb/ruby-duckdb.h +++ b/ext/duckdb/ruby-duckdb.h @@ -41,6 +41,7 @@ #include "./data_chunk.h" #include "./memory_helper.h" #include "./table_function.h" +#include "./executor.h" extern VALUE mDuckDB; extern VALUE cDuckDBDatabase; diff --git a/ext/duckdb/scalar_function.c b/ext/duckdb/scalar_function.c index 4725d1a0..6c65eea2 100644 --- a/ext/duckdb/scalar_function.c +++ b/ext/duckdb/scalar_function.c @@ -1,19 +1,5 @@ #include "ruby-duckdb.h" -/* - * Cross-platform threading primitives. - * MSVC (mswin) does not provide . - * MinGW-w64 (mingw, ucrt) provides via winpthreads. - * - * See also: FFI gem's approach in ext/ffi_c/Function.c - * https://github.com/ffi/ffi/blob/master/ext/ffi_c/Function.c - */ -#ifdef _MSC_VER -#include -#else -#include -#endif - /* * Thread detection functions (available since Ruby 2.3). * Used to determine the correct dispatch path for scalar function callbacks. @@ -57,105 +43,6 @@ static VALUE process_rows(VALUE arg); static VALUE process_no_param_rows(VALUE arg); static VALUE cleanup_callback(VALUE arg); -/* - * ============================================================================ - * Global Executor Thread - * ============================================================================ - * - * DuckDB calls scalar function callbacks from its own worker threads, which - * are NOT Ruby threads. Ruby's GVL (Global VM Lock) cannot be acquired from - * non-Ruby threads (rb_thread_call_with_gvl crashes with rb_bug). - * - * Solution (modeled after FFI gem's async callback dispatcher): - * - A global Ruby "executor" thread waits for callback requests. - * - DuckDB worker threads enqueue requests via pthread mutex/condvar and block. - * - The executor thread processes callbacks with the GVL, then signals completion. - * - * When the callback is invoked from a Ruby thread (e.g., threads=1 where DuckDB - * uses the calling thread), we use rb_thread_call_with_gvl directly, avoiding - * the executor overhead. - */ - -/* Per-callback request, stack-allocated on the DuckDB worker thread */ -struct callback_request { - struct callback_arg *cb_arg; - int done; -#ifdef _MSC_VER - CRITICAL_SECTION done_lock; - CONDITION_VARIABLE done_cond; -#else - pthread_mutex_t done_mutex; - pthread_cond_t done_cond; -#endif - struct callback_request *next; -}; - -/* Global executor state */ -#ifdef _MSC_VER -static CRITICAL_SECTION g_executor_lock; -static CONDITION_VARIABLE g_executor_cond; -static int g_sync_initialized = 0; -#else -static pthread_mutex_t g_executor_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t g_executor_cond = PTHREAD_COND_INITIALIZER; -#endif -static struct callback_request *g_request_list = NULL; -static VALUE g_executor_thread = Qnil; -static int g_executor_started = 0; - -/* Data passed to the executor wait function */ -struct executor_wait_data { - struct callback_request *request; - int stop; -}; - -/* Runs without GVL: blocks on condvar waiting for a callback request */ -static void *executor_wait_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - - w->request = NULL; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - while (!w->stop && g_request_list == NULL) { - SleepConditionVariableCS(&g_executor_cond, &g_executor_lock, INFINITE); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - while (!w->stop && g_request_list == NULL) { - pthread_cond_wait(&g_executor_cond, &g_executor_mutex); - } - if (g_request_list != NULL) { - w->request = g_request_list; - g_request_list = g_request_list->next; - } - pthread_mutex_unlock(&g_executor_mutex); -#endif - - return NULL; -} - -/* Unblock function: called by Ruby to interrupt the executor (e.g., VM shutdown) */ -static void executor_stop_func(void *data) { - struct executor_wait_data *w = (struct executor_wait_data *)data; - -#ifdef _MSC_VER - EnterCriticalSection(&g_executor_lock); - w->stop = 1; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); -#else - pthread_mutex_lock(&g_executor_mutex); - w->stop = 1; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); -#endif -} /* Execute a callback (called with GVL held) */ static VALUE execute_callback(VALUE varg) { @@ -188,115 +75,12 @@ static void execute_callback_protected(struct callback_arg *arg) { } } -/* The executor thread main loop (Ruby thread) */ -static VALUE executor_thread_func(void *data) { - struct executor_wait_data w; - w.stop = 0; - - while (!w.stop) { - /* Release GVL and wait for a callback request */ - rb_thread_call_without_gvl(executor_wait_func, &w, executor_stop_func, &w); - - if (w.request != NULL) { - struct callback_request *req = w.request; - - /* Execute the Ruby callback with the GVL */ - execute_callback_protected(req->cb_arg); - - /* Signal the DuckDB worker thread that the callback is done */ -#ifdef _MSC_VER - EnterCriticalSection(&req->done_lock); - req->done = 1; - WakeConditionVariable(&req->done_cond); - LeaveCriticalSection(&req->done_lock); -#else - pthread_mutex_lock(&req->done_mutex); - req->done = 1; - pthread_cond_signal(&req->done_cond); - pthread_mutex_unlock(&req->done_mutex); -#endif - } - } - - return Qnil; -} - /* - * Start the global executor thread (must be called from a Ruby thread). - * - * Thread safety: This function is only called from - * rbduckdb_scalar_function_set_function(), which is a Ruby method and - * always runs with the GVL held. The GVL serializes all calls, so the - * g_executor_started check-then-set is safe without an extra mutex. + * Wrapper for dispatching through the shared executor. + * Adapts the generic callback signature to scalar function's callback_arg. */ -static void ensure_executor_started(void) { - if (g_executor_started) return; - -#ifdef _MSC_VER - if (!g_sync_initialized) { - InitializeCriticalSection(&g_executor_lock); - InitializeConditionVariable(&g_executor_cond); - g_sync_initialized = 1; - } -#endif - - g_executor_thread = rb_thread_create(executor_thread_func, NULL); - rb_global_variable(&g_executor_thread); - g_executor_started = 1; -} - -/* - * Dispatch a callback to the global executor thread. - * Called from a DuckDB worker thread (non-Ruby thread). - * The caller blocks until the callback is processed. - */ -static void dispatch_callback_to_executor(struct callback_arg *arg) { - struct callback_request req; - - req.cb_arg = arg; - req.done = 0; - req.next = NULL; - -#ifdef _MSC_VER - InitializeCriticalSection(&req.done_lock); - InitializeConditionVariable(&req.done_cond); - - /* Enqueue the request */ - EnterCriticalSection(&g_executor_lock); - req.next = g_request_list; - g_request_list = &req; - WakeConditionVariable(&g_executor_cond); - LeaveCriticalSection(&g_executor_lock); - - /* Wait for the executor to process our callback */ - EnterCriticalSection(&req.done_lock); - while (!req.done) { - SleepConditionVariableCS(&req.done_cond, &req.done_lock, INFINITE); - } - LeaveCriticalSection(&req.done_lock); - - DeleteCriticalSection(&req.done_lock); -#else - pthread_mutex_init(&req.done_mutex, NULL); - pthread_cond_init(&req.done_cond, NULL); - - /* Enqueue the request */ - pthread_mutex_lock(&g_executor_mutex); - req.next = g_request_list; - g_request_list = &req; - pthread_cond_signal(&g_executor_cond); - pthread_mutex_unlock(&g_executor_mutex); - - /* Wait for the executor to process our callback */ - pthread_mutex_lock(&req.done_mutex); - while (!req.done) { - pthread_cond_wait(&req.done_cond, &req.done_mutex); - } - pthread_mutex_unlock(&req.done_mutex); - - pthread_cond_destroy(&req.done_cond); - pthread_mutex_destroy(&req.done_mutex); -#endif +static void scalar_execute_via_executor(void *data) { + execute_callback_protected((struct callback_arg *)data); } /* @@ -308,10 +92,6 @@ static void *callback_with_gvl(void *data) { return NULL; } -/* ============================================================================ - * End of Executor Thread - * ============================================================================ */ - static const rb_data_type_t scalar_function_data_type = { "DuckDB/ScalarFunction", {mark, deallocate, memsize, compact}, @@ -468,7 +248,7 @@ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chun } } else { /* Case 3: Non-Ruby thread - dispatch to executor */ - dispatch_callback_to_executor(&arg); + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); } } @@ -720,7 +500,7 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_volatile(p->scalar_function); /* Ensure the global executor thread is running for multi-thread dispatch */ - ensure_executor_started(); + rbduckdb_executor_ensure_started(); return self; } From 02fc9be491cb8e06f3d278ab07717c0c4c56f75a Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:49:47 +0900 Subject: [PATCH 2/6] feat: add per-worker proxy thread infrastructure to executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add per-worker proxy threads to the shared executor module. Each DuckDB worker thread can be assigned a dedicated Ruby proxy thread that waits on its own condvar, acquires the GVL independently, and executes callbacks without going through the global executor queue. This eliminates the global executor bottleneck by distributing GVL acquisition across multiple Ruby threads — the Ruby equivalent of Python's PyGILState_Ensure() approach. Key components: - struct worker_proxy with dedicated condvar per proxy - rbduckdb_worker_proxy_create(): spawns proxy Ruby thread - rbduckdb_worker_proxy_dispatch(): sends callback, blocks until done - rbduckdb_worker_proxy_destroy(): safe cleanup from non-Ruby threads - g_proxy_threads GC protection array Not yet wired up to any callback — integration follows in the next commit. --- ext/duckdb/executor.c | 263 ++++++++++++++++++++++++++++++++++++++++++ ext/duckdb/executor.h | 40 ++++++- 2 files changed, 300 insertions(+), 3 deletions(-) diff --git a/ext/duckdb/executor.c b/ext/duckdb/executor.c index b632c647..c0db1f9c 100644 --- a/ext/duckdb/executor.c +++ b/ext/duckdb/executor.c @@ -216,9 +216,272 @@ void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callba #endif } +/* ============================================================================ + * Per-Worker Proxy Threads + * ============================================================================ + * + * Each DuckDB worker thread can be assigned a dedicated Ruby proxy thread. + * The proxy waits for callback requests via OS condvar, acquires the GVL, + * executes the callback, and signals completion. + * + * This eliminates the global executor bottleneck — each worker has its own + * condvar and its own Ruby thread, so GVL acquisition is distributed. + */ + +/* GC protection array for proxy Ruby threads */ +static VALUE g_proxy_threads = Qnil; + +struct worker_proxy { + VALUE ruby_thread; + volatile int stop_requested; + rbduckdb_callback_fn callback_func; + void *callback_data; + volatile int has_request; + volatile int request_done; + volatile int thread_exited; +#ifdef _MSC_VER + CRITICAL_SECTION lock; + CONDITION_VARIABLE request_cond; + CONDITION_VARIABLE request_done_cond; + CONDITION_VARIABLE thread_exit_cond; +#else + pthread_mutex_t lock; + pthread_cond_t request_cond; + pthread_cond_t request_done_cond; + pthread_cond_t thread_exit_cond; +#endif +}; + +/* Runs without GVL: proxy waits for a callback request */ +static void *proxy_wait_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + SleepConditionVariableCS(&proxy->request_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + while (!proxy->stop_requested && !proxy->has_request) { + pthread_cond_wait(&proxy->request_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif + + return NULL; +} + +/* Unblock function for proxy thread (VM shutdown or Thread#kill) */ +static void proxy_stop_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* The proxy thread main loop (Ruby thread) */ +static VALUE proxy_thread_func(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + + while (!proxy->stop_requested) { + /* Release GVL and wait for a request */ + rb_thread_call_without_gvl(proxy_wait_func, proxy, proxy_stop_func, proxy); + + if (proxy->stop_requested) break; + + if (proxy->has_request) { + /* Execute the callback with the GVL held */ + proxy->callback_func(proxy->callback_data); + + /* Signal completion to the DuckDB worker thread */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + WakeConditionVariable(&proxy->request_done_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->has_request = 0; + proxy->request_done = 1; + pthread_cond_signal(&proxy->request_done_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + } + } + + /* Remove ourselves from the GC protection array */ + if (g_proxy_threads != Qnil) { + rb_ary_delete(g_proxy_threads, proxy->ruby_thread); + } + + /* + * Signal that this thread has finished and no longer references + * the proxy struct. After this signal, rbduckdb_worker_proxy_destroy + * may free the struct. + */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->thread_exited = 1; + WakeConditionVariable(&proxy->thread_exit_cond); + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->thread_exited = 1; + pthread_cond_signal(&proxy->thread_exit_cond); + pthread_mutex_unlock(&proxy->lock); +#endif + + return Qnil; +} + +/* + * Create a per-worker proxy thread. + * Must be called with GVL held (e.g., from the global executor callback). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void) { + /* + * Use calloc (not xcalloc) because rbduckdb_worker_proxy_destroy + * frees the struct from a non-Ruby thread where xfree is unsafe. + */ + struct worker_proxy *proxy = calloc(1, sizeof(struct worker_proxy)); + if (proxy == NULL) { + rb_raise(rb_eNoMemError, "failed to allocate worker_proxy"); + } + + proxy->stop_requested = 0; + proxy->has_request = 0; + proxy->request_done = 0; + proxy->thread_exited = 0; + +#ifdef _MSC_VER + InitializeCriticalSection(&proxy->lock); + InitializeConditionVariable(&proxy->request_cond); + InitializeConditionVariable(&proxy->request_done_cond); + InitializeConditionVariable(&proxy->thread_exit_cond); +#else + pthread_mutex_init(&proxy->lock, NULL); + pthread_cond_init(&proxy->request_cond, NULL); + pthread_cond_init(&proxy->request_done_cond, NULL); + pthread_cond_init(&proxy->thread_exit_cond, NULL); +#endif + + proxy->ruby_thread = rb_thread_create(proxy_thread_func, proxy); + + /* Protect from GC */ + if (g_proxy_threads != Qnil) { + rb_ary_push(g_proxy_threads, proxy->ruby_thread); + } + + return proxy; +} + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread (non-Ruby thread). + * Blocks until the proxy completes the callback. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn callback_func, + void *callback_data) { +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; + proxy->has_request = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + /* Wait for completion */ + EnterCriticalSection(&proxy->lock); + while (!proxy->request_done) { + SleepConditionVariableCS(&proxy->request_done_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->callback_func = callback_func; + proxy->callback_data = callback_data; + proxy->request_done = 0; + proxy->has_request = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + /* Wait for completion */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->request_done) { + pthread_cond_wait(&proxy->request_done_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); +#endif +} + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t: void (*)(void *). + * Safe to call from non-Ruby threads — uses only OS primitives. + * + * Blocks until the proxy thread has exited and no longer references the + * struct, then destroys OS synchronization primitives and frees memory. + */ +void rbduckdb_worker_proxy_destroy(void *data) { + struct worker_proxy *proxy = (struct worker_proxy *)data; + if (proxy == NULL) return; + + /* Signal the proxy thread to stop */ +#ifdef _MSC_VER + EnterCriticalSection(&proxy->lock); + proxy->stop_requested = 1; + WakeConditionVariable(&proxy->request_cond); + LeaveCriticalSection(&proxy->lock); + + /* Wait for the proxy thread to finish */ + EnterCriticalSection(&proxy->lock); + while (!proxy->thread_exited) { + SleepConditionVariableCS(&proxy->thread_exit_cond, &proxy->lock, INFINITE); + } + LeaveCriticalSection(&proxy->lock); + + DeleteCriticalSection(&proxy->lock); +#else + pthread_mutex_lock(&proxy->lock); + proxy->stop_requested = 1; + pthread_cond_signal(&proxy->request_cond); + pthread_mutex_unlock(&proxy->lock); + + /* Wait for the proxy thread to finish */ + pthread_mutex_lock(&proxy->lock); + while (!proxy->thread_exited) { + pthread_cond_wait(&proxy->thread_exit_cond, &proxy->lock); + } + pthread_mutex_unlock(&proxy->lock); + + pthread_cond_destroy(&proxy->thread_exit_cond); + pthread_cond_destroy(&proxy->request_done_cond); + pthread_cond_destroy(&proxy->request_cond); + pthread_mutex_destroy(&proxy->lock); +#endif + + free(proxy); +} + /* * Initialize the executor subsystem. * Called once from Init_duckdb_native. */ void rbduckdb_init_executor(void) { + g_proxy_threads = rb_ary_new(); + rb_global_variable(&g_proxy_threads); } diff --git a/ext/duckdb/executor.h b/ext/duckdb/executor.h index 1ebeab6b..50fa15d3 100644 --- a/ext/duckdb/executor.h +++ b/ext/duckdb/executor.h @@ -6,9 +6,14 @@ * threads (DuckDB worker threads) to Ruby threads that can safely hold * the GVL. * - * A global executor thread waits for callback requests from a shared queue. - * Used as a generic mechanism for any C file that needs to dispatch callbacks - * from non-Ruby threads. + * Two mechanisms are provided: + * + * 1. Global executor thread — a single Ruby thread that processes callbacks + * from a shared queue. Used as bootstrap and fallback. + * + * 2. Per-worker proxy threads — each DuckDB worker thread gets a dedicated + * Ruby thread. Created via init_local_state and stored in DuckDB's + * per-thread local state. Eliminates the global executor bottleneck. */ /* Generic callback function signature */ @@ -26,4 +31,33 @@ void rbduckdb_executor_ensure_started(void); */ void rbduckdb_executor_dispatch(rbduckdb_callback_fn callback_func, void *callback_data); +/* + * Per-worker proxy thread. + * Opaque structure — callers use the functions below. + */ +struct worker_proxy; + +/* + * Create a per-worker proxy thread. + * Must be called from a context where GVL can be acquired (typically + * dispatched through the global executor from init_local_state). + */ +struct worker_proxy *rbduckdb_worker_proxy_create(void); + +/* + * Dispatch a callback through a per-worker proxy. + * Called from the DuckDB worker thread that owns this proxy. + * Blocks until the callback completes. + */ +void rbduckdb_worker_proxy_dispatch(struct worker_proxy *proxy, + rbduckdb_callback_fn callback_func, + void *callback_data); + +/* + * Destroy a per-worker proxy. + * Compatible with duckdb_delete_callback_t signature (void (*)(void *)). + * Safe to call from non-Ruby threads — only sets flags and signals OS condvar. + */ +void rbduckdb_worker_proxy_destroy(void *proxy); + #endif From a13e51702cf364fcbb00affee99f0a0f4a7820fd Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:51:44 +0900 Subject: [PATCH 3/6] feat: integrate per-worker proxy in scalar function callbacks Wire up the per-worker proxy infrastructure in scalar_function.c: - Add scalar_function_init_local_state() callback that creates a per-worker proxy for each non-Ruby DuckDB worker thread via duckdb_scalar_function_set_init (DuckDB >= 1.5.0) - Update Case 3 dispatch to use per-worker proxy when available, falling back to global executor otherwise - Register init_local_state in set_function - Add version gate to multithread scalar function test All code is guarded by HAVE_DUCKDB_H_GE_V1_5_0. On older DuckDB versions, the global executor fallback path is unchanged. --- ext/duckdb/scalar_function.c | 51 +++++++++++++++++++++++- test/duckdb_test/scalar_function_test.rb | 6 ++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/ext/duckdb/scalar_function.c b/ext/duckdb/scalar_function.c index 6c65eea2..af16d14b 100644 --- a/ext/duckdb/scalar_function.c +++ b/ext/duckdb/scalar_function.c @@ -92,6 +92,42 @@ static void *callback_with_gvl(void *data) { return NULL; } +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* + * init_local_state callback for scalar functions. + * + * Called by DuckDB once per worker thread before the first UDF invocation. + * For non-Ruby threads, creates a per-worker proxy thread so that callbacks + * can be dispatched without going through the global executor bottleneck. + * + * Requires DuckDB >= 1.5.0 (duckdb_scalar_function_set_init). + */ +struct proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void create_proxy_callback(void *data) { + struct proxy_create_arg *arg = (struct proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void scalar_function_init_local_state(duckdb_init_info info) { + if (ruby_native_thread_p()) { + /* Ruby thread — no proxy needed (Case 1/2 handles it) */ + return; + } + + /* Non-Ruby thread — create a proxy via the global executor */ + struct proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_scalar_function_init_set_state(info, arg.proxy, rbduckdb_worker_proxy_destroy); + } +} +#endif + static const rb_data_type_t scalar_function_data_type = { "DuckDB/ScalarFunction", {mark, deallocate, memsize, compact}, @@ -247,8 +283,18 @@ static void scalar_function_callback(duckdb_function_info info, duckdb_data_chun rb_thread_call_with_gvl(callback_with_gvl, &arg); } } else { - /* Case 3: Non-Ruby thread - dispatch to executor */ + /* Case 3: Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_scalar_function_get_state(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, scalar_execute_via_executor, &arg); + } else { + rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); + } +#else rbduckdb_executor_dispatch(scalar_execute_via_executor, &arg); +#endif } } @@ -492,6 +538,9 @@ static VALUE rbduckdb_scalar_function_set_function(VALUE self) { duckdb_scalar_function_set_extra_info(p->scalar_function, p, NULL); duckdb_scalar_function_set_function(p->scalar_function, scalar_function_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + duckdb_scalar_function_set_init(p->scalar_function, scalar_function_init_local_state); +#endif /* * Mark as volatile to prevent constant folding during query optimization. diff --git a/test/duckdb_test/scalar_function_test.rb b/test/duckdb_test/scalar_function_test.rb index dffb67cc..f885fdbd 100644 --- a/test/duckdb_test/scalar_function_test.rb +++ b/test/duckdb_test/scalar_function_test.rb @@ -674,7 +674,11 @@ def test_create_with_different_types # rubocop:disable Metrics/MethodLength assert_equal 'Hello - World', rows[0][0] end - def test_scalar_function_with_multithread + def test_scalar_function_with_multithread # rubocop:disable Metrics/MethodLength + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end + @con.execute('SET threads=4') @con.execute('CREATE TABLE large_test AS SELECT range::INTEGER AS value FROM range(10000)') From c70fdae8dbafb1f47d1a2c49d6638b0f61ea8909 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:53:44 +0900 Subject: [PATCH 4/6] refactor: add three-path dispatch to table function callbacks Refactor table_function.c bind/init/execute callbacks to use the same three-path dispatch pattern as scalar_function.c: 1. Ruby thread WITH GVL -> call directly 2. Ruby thread WITHOUT GVL -> rb_thread_call_with_gvl 3. Non-Ruby thread -> dispatch to global executor Each callback's core logic is extracted into a *_with_gvl() function (table_bind_with_gvl, table_init_with_gvl, table_execute_with_gvl) that the dispatch paths invoke. The global executor is started when the execute callback is registered. This makes table function callbacks thread-safe when invoked from non-Ruby threads. The SET threads=1 restriction is not yet removed (done in a subsequent commit). --- ext/duckdb/table_function.c | 144 ++++++++++++++++++++++++++++-------- 1 file changed, 112 insertions(+), 32 deletions(-) diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index 5391481b..93c58684 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -1,5 +1,11 @@ #include "ruby-duckdb.h" +/* + * Thread detection functions (available since Ruby 2.3). + */ +extern int ruby_thread_has_gvl_p(void); +extern int ruby_native_thread_p(void); + VALUE cDuckDBTableFunction; extern VALUE cDuckDBTableFunctionBindInfo; extern VALUE cDuckDBTableFunctionInitInfo; @@ -217,32 +223,55 @@ static VALUE call_bind_proc(VALUE arg) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_bind_callback(duckdb_bind_info info) { +/* Bind callback core logic — must be called with GVL held */ +struct table_bind_arg { + duckdb_bind_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_bind_with_gvl(void *data) { + struct table_bind_arg *arg = (struct table_bind_arg *)data; rubyDuckDBBindInfo *bind_info_ctx; VALUE bind_info_obj; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); - if (!ctx || ctx->bind_proc == Qnil) { + if (!arg->ctx || arg->ctx->bind_proc == Qnil) { return; } - // Create BindInfo wrapper bind_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionBindInfo); bind_info_ctx = get_struct_bind_info(bind_info_obj); - bind_info_ctx->bind_info = info; + bind_info_ctx->bind_info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->bind_proc, bind_info_obj }; + VALUE call_args[2] = { arg->ctx->bind_proc, bind_info_obj }; rb_protect(call_bind_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_bind_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_bind_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_bind_gvl_wrapper(void *data) { + table_bind_with_gvl(data); + return NULL; +} + +static void table_function_bind_callback(duckdb_bind_info info) { + struct table_bind_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_bind_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_bind_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_bind_with_gvl, &arg); } } @@ -281,32 +310,55 @@ static VALUE call_init_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 1, args[1]); } -static void table_function_init_callback(duckdb_init_info info) { +/* Init callback core logic — must be called with GVL held */ +struct table_init_arg { + duckdb_init_info info; rubyDuckDBTableFunction *ctx; +}; + +static void table_init_with_gvl(void *data) { + struct table_init_arg *arg = (struct table_init_arg *)data; VALUE init_info_obj; rubyDuckDBInitInfo *init_info_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); - if (!ctx || ctx->init_proc == Qnil) { + if (!arg->ctx || arg->ctx->init_proc == Qnil) { return; } - // Create InitInfo wrapper init_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionInitInfo); init_info_ctx = get_struct_init_info(init_info_obj); - init_info_ctx->info = info; + init_info_ctx->info = arg->info; - // Call Ruby block with exception protection - VALUE call_args[2] = { ctx->init_proc, init_info_obj }; + VALUE call_args[2] = { arg->ctx->init_proc, init_info_obj }; rb_protect(call_init_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_init_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_init_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_init_gvl_wrapper(void *data) { + table_init_with_gvl(data); + return NULL; +} + +static void table_function_init_callback(duckdb_init_info info) { + struct table_init_arg arg; + arg.info = info; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_init_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_init_gvl_wrapper, &arg); + } + } else { + rbduckdb_executor_dispatch(table_init_with_gvl, &arg); } } @@ -335,6 +387,9 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); + /* Ensure the global executor thread is running for multi-thread dispatch */ + rbduckdb_executor_ensure_started(); + return self; } @@ -343,39 +398,64 @@ static VALUE call_execute_proc(VALUE args_val) { return rb_funcall(args[0], rb_intern("call"), 2, args[1], args[2]); } -static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { +/* Execute callback core logic — must be called with GVL held */ +struct table_execute_arg { + duckdb_function_info info; + duckdb_data_chunk output; rubyDuckDBTableFunction *ctx; +}; + +static void table_execute_with_gvl(void *data) { + struct table_execute_arg *arg = (struct table_execute_arg *)data; VALUE func_info_obj; VALUE data_chunk_obj; rubyDuckDBFunctionInfo *func_info_ctx; rubyDuckDBDataChunk *data_chunk_ctx; int state = 0; - // Get the C struct pointer (safe with GC compaction) - ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); - if (!ctx || ctx->execute_proc == Qnil) { + if (!arg->ctx || arg->ctx->execute_proc == Qnil) { return; } - // Create FunctionInfo wrapper func_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionFunctionInfo); func_info_ctx = get_struct_function_info(func_info_obj); - func_info_ctx->info = info; + func_info_ctx->info = arg->info; - // Create DataChunk wrapper data_chunk_obj = rb_class_new_instance(0, NULL, cDuckDBDataChunk); data_chunk_ctx = get_struct_data_chunk(data_chunk_obj); - data_chunk_ctx->data_chunk = output; + data_chunk_ctx->data_chunk = arg->output; - // Call Ruby block with exception protection - VALUE call_args[3] = { ctx->execute_proc, func_info_obj, data_chunk_obj }; + VALUE call_args[3] = { arg->ctx->execute_proc, func_info_obj, data_chunk_obj }; rb_protect(call_execute_proc, (VALUE)call_args, &state); if (state) { VALUE err = rb_errinfo(); VALUE msg = rb_funcall(err, rb_intern("message"), 0); - duckdb_function_set_error(info, StringValueCStr(msg)); - rb_set_errinfo(Qnil); // Clear the error + duckdb_function_set_error(arg->info, StringValueCStr(msg)); + rb_set_errinfo(Qnil); + } +} + +static void *table_execute_gvl_wrapper(void *data) { + table_execute_with_gvl(data); + return NULL; +} + +static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) { + struct table_execute_arg arg; + arg.info = info; + arg.output = output; + arg.ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info); + + if (ruby_native_thread_p()) { + if (ruby_thread_has_gvl_p()) { + table_execute_with_gvl(&arg); + } else { + rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg); + } + } else { + /* Non-Ruby thread — dispatch to global executor */ + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); } } From 5327b981d55c0d913aef407f9fab6cfc735cfeb7 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:55:27 +0900 Subject: [PATCH 5/6] feat: add per-worker proxy to table function execute callback Add per-worker proxy support to the table function execute callback for DuckDB >= 1.5.0: - Add table_function_local_init_callback() that creates a per-worker proxy for each non-Ruby DuckDB worker thread - Register via duckdb_table_function_set_local_init in set_execute - Update execute callback Case 3 to use proxy when available, falling back to global executor otherwise All code is guarded by HAVE_DUCKDB_H_GE_V1_5_0. The SET threads=1 restriction is not yet removed (done in the next commit). --- ext/duckdb/table_function.c | 48 ++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/ext/duckdb/table_function.c b/ext/duckdb/table_function.c index 93c58684..32ceaed8 100644 --- a/ext/duckdb/table_function.c +++ b/ext/duckdb/table_function.c @@ -27,6 +27,9 @@ static VALUE rbduckdb_table_function_set_init(VALUE self); static void table_function_init_callback(duckdb_init_info info); static VALUE rbduckdb_table_function_set_execute(VALUE self); static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +static void table_function_local_init_callback(duckdb_init_info info); +#endif static const rb_data_type_t table_function_data_type = { "DuckDB/TableFunction", @@ -386,6 +389,9 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) { ctx->execute_proc = rb_block_proc(); duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback); +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback); +#endif /* Ensure the global executor thread is running for multi-thread dispatch */ rbduckdb_executor_ensure_started(); @@ -454,10 +460,50 @@ static void table_function_execute_callback(duckdb_function_info info, duckdb_da rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg); } } else { - /* Non-Ruby thread — dispatch to global executor */ + /* Non-Ruby thread */ +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 + /* Use per-worker proxy if available (DuckDB >= 1.5.0) */ + struct worker_proxy *proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info); + if (proxy) { + rbduckdb_worker_proxy_dispatch(proxy, table_execute_with_gvl, &arg); + } else { + rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); + } +#else rbduckdb_executor_dispatch(table_execute_with_gvl, &arg); +#endif + } +} + +#ifdef HAVE_DUCKDB_H_GE_V1_5_0 +/* + * local_init callback for table functions. + * Creates a per-worker proxy for non-Ruby threads. + * Requires DuckDB >= 1.5.0 (duckdb_table_function_set_local_init). + */ +struct table_proxy_create_arg { + struct worker_proxy *proxy; +}; + +static void table_create_proxy_callback(void *data) { + struct table_proxy_create_arg *arg = (struct table_proxy_create_arg *)data; + arg->proxy = rbduckdb_worker_proxy_create(); +} + +static void table_function_local_init_callback(duckdb_init_info info) { + if (ruby_native_thread_p()) { + return; + } + + struct table_proxy_create_arg arg; + arg.proxy = NULL; + rbduckdb_executor_dispatch(table_create_proxy_callback, &arg); + + if (arg.proxy != NULL) { + duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy); } } +#endif rubyDuckDBTableFunction *get_struct_table_function(VALUE self) { rubyDuckDBTableFunction *ctx; From 8b1d2f04e472f85e879b640100b27e03781f2a45 Mon Sep 17 00:00:00 2001 From: otegami Date: Thu, 2 Apr 2026 19:58:42 +0900 Subject: [PATCH 6/6] feat: remove SET threads=1 requirement for table functions on DuckDB >= 1.5.0 --- lib/duckdb/connection.rb | 15 ++++--- test/duckdb_test/data_chunk_test.rb | 20 ++++----- test/duckdb_test/gc_stress_test.rb | 4 +- test/duckdb_test/table_function_csv_test.rb | 4 +- .../table_function_integration_test.rb | 4 +- test/duckdb_test/table_function_test.rb | 42 +++++++++++++++++-- 6 files changed, 67 insertions(+), 22 deletions(-) diff --git a/lib/duckdb/connection.rb b/lib/duckdb/connection.rb index a29f2bc0..cd8b135a 100644 --- a/lib/duckdb/connection.rb +++ b/lib/duckdb/connection.rb @@ -236,12 +236,11 @@ def register_table_function(table_function) # @param columns [Hash{String => DuckDB::LogicalType}, nil] optional column schema override; # if omitted, the adapter determines the columns (e.g. from headers or inference) # @raise [ArgumentError] if no adapter is registered for the object's class - # @raise [DuckDB::Error] if threads setting is not 1 + # @raise [DuckDB::Error] if threads > 1 on DuckDB < 1.5.0 # @return [void] # # @example Expose a CSV as a table # require 'csv' - # con.execute('SET threads=1') # DuckDB::TableFunction.add_table_adapter(CSV, CSVTableAdapter.new) # csv = CSV.new(File.read('data.csv'), headers: true) # con.expose_as_table(csv, 'csv_table') @@ -263,16 +262,22 @@ def expose_as_table(object, name, columns: nil) private + # DuckDB >= 1.5.0 provides per-worker proxy threads via init_local_state, + # making table function callbacks thread-safe with multiple DuckDB threads. + # On older versions, the global executor serializes all callbacks and can + # deadlock under concurrent workloads, so we enforce threads=1. def check_threads + return if Gem::Version.new(LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + result = execute("SELECT current_setting('threads')") thread_count = result.first.first.to_i return unless thread_count > 1 raise DuckDB::Error, - 'Functions with Ruby callbacks require single-threaded execution. ' \ - "Current threads setting: #{thread_count}. " \ - "Execute 'SET threads=1' before registering functions." + 'Table functions with Ruby callbacks require single-threaded execution ' \ + "on DuckDB < 1.5.0. Current threads setting: #{thread_count}. " \ + "Execute 'SET threads=1' before registering table functions." end def run_appender_block(appender, &) diff --git a/test/duckdb_test/data_chunk_test.rb b/test/duckdb_test/data_chunk_test.rb index 026a0bdd..f408ab7e 100644 --- a/test/duckdb_test/data_chunk_test.rb +++ b/test/duckdb_test/data_chunk_test.rb @@ -56,7 +56,7 @@ def test_data_chunk_get_vector # Test 4: Vector#logical_type returns LogicalType def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') table_function = DuckDB::TableFunction.new table_function.name = 'test_vector_type' @@ -98,7 +98,7 @@ def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLe # Test 5: DataChunk#set_value with INTEGER def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -133,7 +133,7 @@ def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics # Test 6: DataChunk#set_value with BIGINT def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -165,7 +165,7 @@ def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/ # Test 7: DataChunk#set_value with VARCHAR def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -199,7 +199,7 @@ def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics # Test 8: DataChunk#set_value with DOUBLE def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -233,7 +233,7 @@ def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/ # Test 9: DataChunk#set_value with NULL def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -269,7 +269,7 @@ def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 10: DataChunk#set_value with BLOB def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -306,7 +306,7 @@ def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/Me # Test 11: DataChunk#set_value with multiple columns # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions def test_data_chunk_set_value_multiple_columns - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -361,7 +361,7 @@ def test_data_chunk_set_value_multiple_columns # Test 12: DataChunk#set_value with TIMESTAMP def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metrics/MethodLength - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new @@ -398,7 +398,7 @@ def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metri # Test 13: DataChunk#set_value with TIMESTAMP_TZ def test_data_chunk_set_value_timestamp_tz # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions - @conn.execute('SET threads=1') + @conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') done = false table_function = DuckDB::TableFunction.new diff --git a/test/duckdb_test/gc_stress_test.rb b/test/duckdb_test/gc_stress_test.rb index 00013716..2e482b4a 100644 --- a/test/duckdb_test/gc_stress_test.rb +++ b/test/duckdb_test/gc_stress_test.rb @@ -83,7 +83,7 @@ def test_table_function_with_gc_compaction skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution + @con.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') # Capture local variables multiplier = 3 @@ -136,7 +136,7 @@ def test_mixed_functions_gc_stress skip 'GC.compact not available' unless GC.respond_to?(:compact) skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform? - @con.execute('SET threads=1') # Table functions still require single-threaded execution + @con.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') # Register both scalar and table functions @con.register_scalar_function(DuckDB::ScalarFunction.new.tap do |sf| diff --git a/test/duckdb_test/table_function_csv_test.rb b/test/duckdb_test/table_function_csv_test.rb index a93ece42..7d6b932b 100644 --- a/test/duckdb_test/table_function_csv_test.rb +++ b/test/duckdb_test/table_function_csv_test.rb @@ -50,7 +50,9 @@ def csv_to_duckdb_data(csv, output) def setup @db = DuckDB::Database.open @con = @db.connect - @con.execute('SET threads=1') # Required for Ruby callbacks + return unless Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + + @con.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_integration_test.rb b/test/duckdb_test/table_function_integration_test.rb index 88eb0825..4452c5a7 100644 --- a/test/duckdb_test/table_function_integration_test.rb +++ b/test/duckdb_test/table_function_integration_test.rb @@ -7,7 +7,9 @@ class TableFunctionIntegrationTest < Minitest::Test def setup @database = DuckDB::Database.open @connection = @database.connect - @connection.execute('SET threads=1') # Required for Ruby callbacks + return unless Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') + + @connection.execute('SET threads=1') end def teardown diff --git a/test/duckdb_test/table_function_test.rb b/test/duckdb_test/table_function_test.rb index 249c0fa2..9fe909f9 100644 --- a/test/duckdb_test/table_function_test.rb +++ b/test/duckdb_test/table_function_test.rb @@ -16,7 +16,7 @@ def test_new def test_create_with_set_value db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') + conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') called = 0 @@ -92,7 +92,7 @@ def test_gc_compaction_safety db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') + conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') # Capture local variable in callbacks row_multiplier = 2 @@ -158,7 +158,7 @@ def test_gc_compaction_safety def test_symbol_columns db = DuckDB::Database.open conn = db.connect - conn.query('SET threads=1') + conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0') # Capture local variable in callbacks row_multiplier = 2 @@ -206,6 +206,42 @@ def test_symbol_columns end # rubocop:enable Metrics/AbcSize, Metrics/MethodLength + def test_table_function_with_multithread # rubocop:disable Metrics/AbcSize, Metrics/MethodLength + unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0') + skip 'per-worker proxy requires duckdb >= 1.5.0' + end + + db = DuckDB::Database.open + conn = db.connect + conn.execute('SET threads=4') + + called = 0 + tf = DuckDB::TableFunction.new + tf.name = 'mt_generate' + tf.bind do |bind_info| + bind_info.add_result_column('value', DuckDB::LogicalType::INTEGER) + end + tf.init { |_init_info| } # rubocop:disable Lint/EmptyBlock + tf.execute do |_func_info, output| + if called.zero? + 100.times { |i| output.set_value(0, i, i * 2) } + output.size = 100 + called += 1 + else + output.size = 0 + end + end + + conn.register_table_function(tf) + result = conn.execute('SELECT SUM(value) FROM mt_generate()') + + # sum(0, 2, 4, ..., 198) = 2 * sum(0..99) = 2 * 4950 = 9900 + assert_equal 9900, result.first.first + + conn.disconnect + db.close + end + private def setup_incomplete_function