diff --git a/core/iwasm/common/wasm_shared_memory.c b/core/iwasm/common/wasm_shared_memory.c index eb66f2f0..dc7cf1f9 100644 --- a/core/iwasm/common/wasm_shared_memory.c +++ b/core/iwasm/common/wasm_shared_memory.c @@ -5,6 +5,9 @@ #include "bh_log.h" #include "wasm_shared_memory.h" +#if WASM_ENABLE_THREAD_MGR != 0 +#include "../libraries/thread-mgr/thread_manager.h" +#endif static bh_list shared_memory_list_head; static bh_list *const shared_memory_list = &shared_memory_list_head; @@ -21,6 +24,8 @@ typedef struct AtomicWaitInfo { korp_mutex wait_list_lock; bh_list wait_list_head; bh_list *wait_list; + /* WARNING: insert to the list allowed only in acquire_wait_info + otherwise there will be data race as described in PR #2016 */ } AtomicWaitInfo; typedef struct AtomicWaitNode { @@ -298,7 +303,7 @@ notify_wait_list(bh_list *wait_list, uint32 count) } static AtomicWaitInfo * -acquire_wait_info(void *address, bool create) +acquire_wait_info(void *address, AtomicWaitNode *wait_node) { AtomicWaitInfo *wait_info = NULL; bh_list_status ret; @@ -308,7 +313,7 @@ acquire_wait_info(void *address, bool create) if (address) wait_info = (AtomicWaitInfo *)bh_hash_map_find(wait_map, address); - if (!create) { + if (!wait_node) { os_mutex_unlock(&wait_map_lock); return wait_info; } @@ -336,6 +341,12 @@ acquire_wait_info(void *address, bool create) } } + os_mutex_lock(&wait_info->wait_list_lock); + ret = bh_list_insert(wait_info->wait_list, wait_node); + os_mutex_unlock(&wait_info->wait_list_lock); + bh_assert(ret == BH_LIST_SUCCESS); + (void)ret; + os_mutex_unlock(&wait_map_lock); bh_assert(wait_info); @@ -376,16 +387,22 @@ destroy_wait_info(void *wait_info) } } -static bool -map_remove_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info, - void *address) +static void +map_try_release_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info, + void *address) { + os_mutex_lock(&wait_map_lock); + os_mutex_lock(&wait_info->wait_list_lock); if (wait_info->wait_list->len > 0) { - return false; + os_mutex_unlock(&wait_info->wait_list_lock); + os_mutex_unlock(&wait_map_lock); + return; } + os_mutex_unlock(&wait_info->wait_list_lock); bh_hash_map_remove(wait_map_, address, NULL, NULL); - return true; + os_mutex_unlock(&wait_map_lock); + destroy_wait_info(wait_info); } uint32 @@ -396,7 +413,8 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, AtomicWaitInfo *wait_info; AtomicWaitNode *wait_node; WASMSharedMemNode *node; - bool check_ret, is_timeout, no_wait, removed_from_map; + WASMExecEnv *exec_env; + bool check_ret, is_timeout, no_wait; bh_assert(module->module_type == Wasm_Module_Bytecode || module->module_type == Wasm_Module_AoT); @@ -418,14 +436,6 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, return -1; } - /* acquire the wait info, create new one if not exists */ - wait_info = acquire_wait_info(address, true); - - if (!wait_info) { - wasm_runtime_set_exception(module, "failed to acquire wait_info"); - return -1; - } - node = search_module((WASMModuleCommon *)module_inst->module); os_mutex_lock(&node->shared_mem_lock); no_wait = (!wait64 && *(uint32 *)address != (uint32)expect) @@ -435,40 +445,59 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, if (no_wait) { return 1; } - else { - bh_list_status ret; - if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) { - wasm_runtime_set_exception(module, "failed to create wait node"); - return -1; - } - memset(wait_node, 0, sizeof(AtomicWaitNode)); - - if (0 != os_mutex_init(&wait_node->wait_lock)) { - wasm_runtime_free(wait_node); - return -1; - } - - if (0 != os_cond_init(&wait_node->wait_cond)) { - os_mutex_destroy(&wait_node->wait_lock); - wasm_runtime_free(wait_node); - return -1; - } - - wait_node->status = S_WAITING; - os_mutex_lock(&wait_info->wait_list_lock); - ret = bh_list_insert(wait_info->wait_list, wait_node); - os_mutex_unlock(&wait_info->wait_list_lock); - bh_assert(ret == BH_LIST_SUCCESS); - (void)ret; + if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) { + wasm_runtime_set_exception(module, "failed to create wait node"); + return -1; } + memset(wait_node, 0, sizeof(AtomicWaitNode)); + + if (0 != os_mutex_init(&wait_node->wait_lock)) { + wasm_runtime_free(wait_node); + return -1; + } + + if (0 != os_cond_init(&wait_node->wait_cond)) { + os_mutex_destroy(&wait_node->wait_lock); + wasm_runtime_free(wait_node); + return -1; + } + + wait_node->status = S_WAITING; + + /* acquire the wait info, create new one if not exists */ + wait_info = acquire_wait_info(address, wait_node); + + if (!wait_info) { + os_mutex_destroy(&wait_node->wait_lock); + wasm_runtime_free(wait_node); + wasm_runtime_set_exception(module, "failed to acquire wait_info"); + return -1; + } + +#if WASM_ENABLE_THREAD_MGR != 0 + exec_env = + wasm_clusters_search_exec_env((WASMModuleInstanceCommon *)module_inst); + bh_assert(exec_env); +#endif + + os_mutex_lock(&node->shared_mem_lock); + no_wait = (!wait64 && *(uint32 *)address != (uint32)expect) + || (wait64 && *(uint64 *)address != expect); + os_mutex_unlock(&node->shared_mem_lock); /* condition wait start */ os_mutex_lock(&wait_node->wait_lock); - os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock, - timeout < 0 ? BHT_WAIT_FOREVER - : (uint64)timeout / 1000); + if (!no_wait +#if WASM_ENABLE_THREAD_MGR != 0 + && !wasm_cluster_is_thread_terminated(exec_env) +#endif + ) { + os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock, + timeout < 0 ? BHT_WAIT_FOREVER + : (uint64)timeout / 1000); + } is_timeout = wait_node->status == S_WAITING ? true : false; os_mutex_unlock(&wait_node->wait_lock); @@ -486,14 +515,12 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address, wasm_runtime_free(wait_node); /* Release wait info if no wait nodes attached */ - removed_from_map = map_remove_wait_info(wait_map, wait_info, address); os_mutex_unlock(&wait_info->wait_list_lock); - if (removed_from_map) - destroy_wait_info(wait_info); + map_try_release_wait_info(wait_map, wait_info, address); os_mutex_unlock(&node->shared_mem_lock); (void)check_ret; - return is_timeout ? 2 : 0; + return no_wait ? 1 : is_timeout ? 2 : 0; } uint32 @@ -523,7 +550,7 @@ wasm_runtime_atomic_notify(WASMModuleInstanceCommon *module, void *address, return -1; } - wait_info = acquire_wait_info(address, false); + wait_info = acquire_wait_info(address, NULL); /* Nobody wait on this address */ if (!wait_info) { diff --git a/core/iwasm/common/wasm_shared_memory.h b/core/iwasm/common/wasm_shared_memory.h index 98683f32..27019ff5 100644 --- a/core/iwasm/common/wasm_shared_memory.h +++ b/core/iwasm/common/wasm_shared_memory.h @@ -7,6 +7,7 @@ #define _WASM_SHARED_MEMORY_H #include "bh_common.h" +#include "wasm_exec_env.h" #if WASM_ENABLE_INTERP != 0 #include "wasm_runtime.h" #endif diff --git a/core/iwasm/libraries/lib-wasi-threads/test/common.h b/core/iwasm/libraries/lib-wasi-threads/test/common.h index a531e39d..d032f824 100644 --- a/core/iwasm/libraries/lib-wasi-threads/test/common.h +++ b/core/iwasm/libraries/lib-wasi-threads/test/common.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "wasi_thread_start.h" @@ -23,7 +24,6 @@ static bool termination_by_trap; static bool termination_in_main_thread; static blocking_task_type_t blocking_task_type; -#define TIMEOUT_SECONDS 10ll #define NUM_THREADS 3 static pthread_barrier_t barrier; @@ -36,15 +36,14 @@ void run_long_task() { if (blocking_task_type == BLOCKING_TASK_BUSY_WAIT) { - for (int i = 0; i < TIMEOUT_SECONDS; i++) - sleep(1); + for (;;) { + } } else if (blocking_task_type == BLOCKING_TASK_ATOMIC_WAIT) { - __builtin_wasm_memory_atomic_wait32( - 0, 0, TIMEOUT_SECONDS * 1000 * 1000 * 1000); + __builtin_wasm_memory_atomic_wait32(0, 0, -1); } else { - sleep(TIMEOUT_SECONDS); + sleep(UINT_MAX); } }