diff --git a/build-scripts/config_common.cmake b/build-scripts/config_common.cmake index 8d52efda..4b29ec86 100644 --- a/build-scripts/config_common.cmake +++ b/build-scripts/config_common.cmake @@ -223,6 +223,9 @@ endif () if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1) message (" Lib pthread semaphore enabled") endif () +if (WAMR_BUILD_LIB_WASI_THREADS EQUAL 1) + message (" Lib wasi-threads enabled") +endif () if (WAMR_BUILD_LIBC_EMCC EQUAL 1) message (" Libc emcc enabled") endif () diff --git a/core/iwasm/aot/aot_runtime.c b/core/iwasm/aot/aot_runtime.c index ca2ad4f7..c0fc2374 100644 --- a/core/iwasm/aot/aot_runtime.c +++ b/core/iwasm/aot/aot_runtime.c @@ -1321,8 +1321,9 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr, uint16 result_count = func_type->result_count; const uint8 *types = func_type->types; #ifdef BH_PLATFORM_WINDOWS - const char *exce; int result; + bool has_exception; + char exception[EXCEPTION_BUF_LEN]; #endif bool ret; @@ -1356,14 +1357,14 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr, void (*NativeFunc)(WASMExecEnv *, uint32) = (void (*)(WASMExecEnv *, uint32))func_ptr; NativeFunc(exec_env, argv[0]); - ret = aot_get_exception(module_inst) ? false : true; + ret = aot_copy_exception(module_inst, NULL) ? false : true; } else if (result_count == 1 && types[param_count] == VALUE_TYPE_I32) { uint32 (*NativeFunc)(WASMExecEnv *, uint32) = (uint32(*)(WASMExecEnv *, uint32))func_ptr; argv_ret[0] = NativeFunc(exec_env, argv[0]); - ret = aot_get_exception(module_inst) ? false : true; + ret = aot_copy_exception(module_inst, NULL) ? false : true; } else { ret = wasm_runtime_invoke_native(exec_env, func_ptr, func_type, @@ -1377,8 +1378,8 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr, argv_ret); } #ifdef BH_PLATFORM_WINDOWS - if ((exce = aot_get_exception(module_inst)) - && strstr(exce, "native stack overflow")) { + has_exception = aot_copy_exception(module_inst, exception); + if (has_exception && strstr(exception, "native stack overflow")) { /* After a stack overflow, the stack was left in a damaged state, let the CRT repair it */ result = _resetstkoflw(); @@ -1541,7 +1542,7 @@ aot_call_function(WASMExecEnv *exec_env, AOTFunctionInstance *function, func_type, NULL, NULL, argv, argc, argv); #if WASM_ENABLE_DUMP_CALL_STACK != 0 - if (aot_get_exception(module_inst)) { + if (aot_copy_exception(module_inst, NULL)) { if (aot_create_call_stack(exec_env)) { aot_dump_call_stack(exec_env, true, NULL, 0); } @@ -1552,7 +1553,7 @@ aot_call_function(WASMExecEnv *exec_env, AOTFunctionInstance *function, aot_free_frame(exec_env); #endif - return ret && !aot_get_exception(module_inst) ? true : false; + return ret && !aot_copy_exception(module_inst, NULL) ? true : false; } } @@ -1611,6 +1612,14 @@ aot_get_exception(AOTModuleInstance *module_inst) return wasm_get_exception(module_inst); } +bool +aot_copy_exception(AOTModuleInstance *module_inst, char *exception_buf) +{ + /* The field offsets of cur_exception in AOTModuleInstance and + WASMModuleInstance are the same */ + return wasm_copy_exception(module_inst, exception_buf); +} + static bool execute_malloc_function(AOTModuleInstance *module_inst, AOTFunctionInstance *malloc_func, diff --git a/core/iwasm/aot/aot_runtime.h b/core/iwasm/aot/aot_runtime.h index de12bdaa..35a1ccb4 100644 --- a/core/iwasm/aot/aot_runtime.h +++ b/core/iwasm/aot/aot_runtime.h @@ -415,6 +415,15 @@ aot_set_exception_with_id(AOTModuleInstance *module_inst, uint32 id); const char * aot_get_exception(AOTModuleInstance *module_inst); +/** + * @brief Copy exception in buffer passed as parameter. Thread-safe version of + * `aot_get_exception()` + * @note Buffer size must be no smaller than EXCEPTION_BUF_LEN + * @return true if exception found, false otherwise + */ +bool +aot_copy_exception(AOTModuleInstance *module_inst, char *exception_buf); + uint32 aot_module_malloc(AOTModuleInstance *module_inst, uint32 size, void **p_native_addr); diff --git a/core/iwasm/common/wasm_runtime_common.c b/core/iwasm/common/wasm_runtime_common.c index e92370c7..99e04ee9 100644 --- a/core/iwasm/common/wasm_runtime_common.c +++ b/core/iwasm/common/wasm_runtime_common.c @@ -2331,12 +2331,6 @@ wasm_set_exception(WASMModuleInstance *module_inst, const char *exception) if (exec_env) { wasm_cluster_spread_exception(exec_env, exception ? false : true); } -#if WASM_ENABLE_SHARED_MEMORY - if (exception) { - notify_stale_threads_on_exception( - (WASMModuleInstanceCommon *)module_inst); - } -#endif #else (void)exec_env; #endif @@ -3144,6 +3138,21 @@ uint32_t wasm_runtime_get_wasi_exit_code(WASMModuleInstanceCommon *module_inst) { WASIContext *wasi_ctx = wasm_runtime_get_wasi_ctx(module_inst); +#if WASM_ENABLE_THREAD_MGR != 0 + WASMCluster *cluster; + WASMExecEnv *exec_env; + + exec_env = wasm_runtime_get_exec_env_singleton(module_inst); + if (exec_env && (cluster = wasm_exec_env_get_cluster(exec_env))) { + /** + * The main thread may exit earlier than other threads, and + * the exit_code of wasi_ctx may be changed by other thread + * when it runs into wasi_proc_exit, here we wait until all + * other threads exit to avoid getting invalid exit_code. + */ + wasm_cluster_wait_for_all_except_self(cluster, exec_env); + } +#endif return wasi_ctx->exit_code; } diff --git a/core/iwasm/common/wasm_shared_memory.c b/core/iwasm/common/wasm_shared_memory.c index dc7cf1f9..3b896d9b 100644 --- a/core/iwasm/common/wasm_shared_memory.c +++ b/core/iwasm/common/wasm_shared_memory.c @@ -106,61 +106,6 @@ search_module(WASMModuleCommon *module) return NULL; } -static void -wait_map_address_count_callback(void *key, void *value, - void *p_total_elem_count) -{ - *(uint32 *)p_total_elem_count = *(uint32 *)p_total_elem_count + 1; -} - -static void -create_list_of_waiter_addresses(void *key, void *value, void *user_data) -{ - AtomicWaitAddressArgs *data = (AtomicWaitAddressArgs *)user_data; - data->addr[data->index++] = key; -} - -void -notify_stale_threads_on_exception(WASMModuleInstanceCommon *module_inst) -{ - AtomicWaitAddressArgs args = { 0 }; - uint32 i = 0, total_elem_count = 0; - uint64 total_elem_count_size = 0; - - os_mutex_lock(&wait_map_lock); /* Make the two traversals atomic */ - - /* count number of addresses in wait_map */ - bh_hash_map_traverse(wait_map, wait_map_address_count_callback, - (void *)&total_elem_count); - - if (!total_elem_count) { - os_mutex_unlock(&wait_map_lock); - return; - } - - /* allocate memory */ - total_elem_count_size = (uint64)sizeof(void *) * total_elem_count; - if (total_elem_count_size >= UINT32_MAX - || !(args.addr = wasm_runtime_malloc((uint32)total_elem_count_size))) { - LOG_ERROR( - "failed to allocate memory for list of atomic wait addresses"); - os_mutex_unlock(&wait_map_lock); - return; - } - - /* set values in list of addresses */ - bh_hash_map_traverse(wait_map, create_list_of_waiter_addresses, &args); - os_mutex_unlock(&wait_map_lock); - - /* notify */ - for (i = 0; i < args.index; i++) { - wasm_runtime_atomic_notify(module_inst, args.addr[i], UINT32_MAX); - } - - /* free memory allocated to args data */ - wasm_runtime_free(args.addr); -} - WASMSharedMemNode * wasm_module_get_shared_memory(WASMModuleCommon *module) { @@ -413,7 +358,9 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, AtomicWaitInfo *wait_info; AtomicWaitNode *wait_node; WASMSharedMemNode *node; +#if WASM_ENABLE_THREAD_MGR != 0 WASMExecEnv *exec_env; +#endif bool check_ret, is_timeout, no_wait; bh_assert(module->module_type == Wasm_Module_Bytecode @@ -489,14 +436,47 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, /* condition wait start */ os_mutex_lock(&wait_node->wait_lock); - if (!no_wait + if (!no_wait) { + /* unit of timeout is nsec, convert it to usec */ + uint64 timeout_left = (uint64)timeout / 1000, timeout_wait; + uint64 timeout_1sec = 1e6; + + while (1) { + if (timeout < 0) { + /* wait forever until it is notified or terminatied + here we keep waiting and checking every second */ + os_cond_reltimedwait(&wait_node->wait_cond, + &wait_node->wait_lock, + (uint64)timeout_1sec); + if (wait_node->status + == S_NOTIFIED /* notified by atomic.notify */ #if WASM_ENABLE_THREAD_MGR != 0 - && !wasm_cluster_is_thread_terminated(exec_env) + /* terminated by other thread */ + || wasm_cluster_is_thread_terminated(exec_env) #endif - ) { - os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock, - timeout < 0 ? BHT_WAIT_FOREVER - : (uint64)timeout / 1000); + ) { + break; + } + /* continue to wait */ + } + else { + timeout_wait = + timeout_left < timeout_1sec ? timeout_left : timeout_1sec; + os_cond_reltimedwait(&wait_node->wait_cond, + &wait_node->wait_lock, timeout_wait); + if (wait_node->status + == S_NOTIFIED /* notified by atomic.notify */ + || timeout_left <= timeout_wait /* time out */ +#if WASM_ENABLE_THREAD_MGR != 0 + /* terminated by other thread */ + || wasm_cluster_is_thread_terminated(exec_env) +#endif + ) { + break; + } + timeout_left -= timeout_wait; + } + } } is_timeout = wait_node->status == S_WAITING ? true : false; diff --git a/core/iwasm/common/wasm_shared_memory.h b/core/iwasm/common/wasm_shared_memory.h index 27019ff5..6c1c4921 100644 --- a/core/iwasm/common/wasm_shared_memory.h +++ b/core/iwasm/common/wasm_shared_memory.h @@ -7,7 +7,6 @@ #define _WASM_SHARED_MEMORY_H #include "bh_common.h" -#include "wasm_exec_env.h" #if WASM_ENABLE_INTERP != 0 #include "wasm_runtime.h" #endif @@ -40,9 +39,6 @@ wasm_shared_memory_init(); void wasm_shared_memory_destroy(); -void -notify_stale_threads_on_exception(WASMModuleInstanceCommon *module); - WASMSharedMemNode * wasm_module_get_shared_memory(WASMModuleCommon *module); diff --git a/core/iwasm/libraries/lib-wasi-threads/test/update_shared_data_and_alloc_heap.c b/core/iwasm/libraries/lib-wasi-threads/test/update_shared_data_and_alloc_heap.c index 2bfd196d..b7fb9afb 100644 --- a/core/iwasm/libraries/lib-wasi-threads/test/update_shared_data_and_alloc_heap.c +++ b/core/iwasm/libraries/lib-wasi-threads/test/update_shared_data_and_alloc_heap.c @@ -41,9 +41,6 @@ __wasi_thread_start_C(int thread_id, int *start_arg) for (int i = 0; i < NUM_ITER; i++) __atomic_fetch_add(data->count, 1, __ATOMIC_SEQ_CST); - pthread_mutex_lock(&mutex); /* malloc is not thread-safe in wasi-libc */ - vals[data->iteration] = malloc(sizeof(int)); - pthread_mutex_unlock(&mutex); *vals[data->iteration] = data->iteration; __atomic_store_n(&data->th_done, 1, __ATOMIC_SEQ_CST); @@ -60,6 +57,11 @@ main(int argc, char **argv) assert(count != NULL && "Failed to call calloc"); assert(pthread_mutex_init(&mutex, NULL) == 0 && "Failed to init mutex"); + for (int i = 0; i < NUM_THREADS; i++) { + vals[i] = malloc(sizeof(int)); + assert(vals[i] != NULL && "Failed to call calloc"); + } + for (int i = 0; i < NUM_THREADS; i++) { assert(start_args_init(&data[i].base) && "Stack allocation for thread failed"); diff --git a/core/iwasm/libraries/thread-mgr/thread_manager.c b/core/iwasm/libraries/thread-mgr/thread_manager.c index e61915d5..bfb7d0a4 100644 --- a/core/iwasm/libraries/thread-mgr/thread_manager.c +++ b/core/iwasm/libraries/thread-mgr/thread_manager.c @@ -525,6 +525,9 @@ wasm_cluster_spawn_exec_env(WASMExecEnv *exec_env) goto fail4; } + /* Inherit suspend_flags of parent thread */ + new_exec_env->suspend_flags.flags = exec_env->suspend_flags.flags; + if (!wasm_cluster_add_exec_env(cluster, new_exec_env)) goto fail4; @@ -674,6 +677,9 @@ wasm_cluster_create_thread(WASMExecEnv *exec_env, new_exec_env->aux_stack_bottom.bottom = UINT32_MAX; } + /* Inherit suspend_flags of parent thread */ + new_exec_env->suspend_flags.flags = exec_env->suspend_flags.flags; + if (!wasm_cluster_add_exec_env(cluster, new_exec_env)) goto fail3;