Fix atomic.wait, get wasi_ctx exit code and thread mgr issues (#2024)

- Remove notify_stale_threads_on_exception and change atomic.wait
  to be interruptible by keep waiting and checking every one second,
  like the implementation of poll_oneoff in libc-wasi
- Wait all other threads exit and then get wasi exit_code to avoid
  getting invalid value
- Inherit suspend_flags of parent thread while creating new thread to
  avoid terminated flag isn't set for new thread
- Fix wasi-threads test case update_shared_data_and_alloc_heap
- Add "Lib wasi-threads enabled" prompt for cmake
- Fix aot get exception, use aot_copy_exception instead
This commit is contained in:
Wenyong Huang 2023-03-15 07:47:36 +08:00 committed by GitHub
parent 2de24587a8
commit bab2402b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 95 additions and 81 deletions

View File

@ -223,6 +223,9 @@ endif ()
if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1) if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1)
message (" Lib pthread semaphore enabled") message (" Lib pthread semaphore enabled")
endif () endif ()
if (WAMR_BUILD_LIB_WASI_THREADS EQUAL 1)
message (" Lib wasi-threads enabled")
endif ()
if (WAMR_BUILD_LIBC_EMCC EQUAL 1) if (WAMR_BUILD_LIBC_EMCC EQUAL 1)
message (" Libc emcc enabled") message (" Libc emcc enabled")
endif () endif ()

View File

@ -1321,8 +1321,9 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr,
uint16 result_count = func_type->result_count; uint16 result_count = func_type->result_count;
const uint8 *types = func_type->types; const uint8 *types = func_type->types;
#ifdef BH_PLATFORM_WINDOWS #ifdef BH_PLATFORM_WINDOWS
const char *exce;
int result; int result;
bool has_exception;
char exception[EXCEPTION_BUF_LEN];
#endif #endif
bool ret; bool ret;
@ -1356,14 +1357,14 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr,
void (*NativeFunc)(WASMExecEnv *, uint32) = void (*NativeFunc)(WASMExecEnv *, uint32) =
(void (*)(WASMExecEnv *, uint32))func_ptr; (void (*)(WASMExecEnv *, uint32))func_ptr;
NativeFunc(exec_env, argv[0]); NativeFunc(exec_env, argv[0]);
ret = aot_get_exception(module_inst) ? false : true; ret = aot_copy_exception(module_inst, NULL) ? false : true;
} }
else if (result_count == 1 else if (result_count == 1
&& types[param_count] == VALUE_TYPE_I32) { && types[param_count] == VALUE_TYPE_I32) {
uint32 (*NativeFunc)(WASMExecEnv *, uint32) = uint32 (*NativeFunc)(WASMExecEnv *, uint32) =
(uint32(*)(WASMExecEnv *, uint32))func_ptr; (uint32(*)(WASMExecEnv *, uint32))func_ptr;
argv_ret[0] = NativeFunc(exec_env, argv[0]); argv_ret[0] = NativeFunc(exec_env, argv[0]);
ret = aot_get_exception(module_inst) ? false : true; ret = aot_copy_exception(module_inst, NULL) ? false : true;
} }
else { else {
ret = wasm_runtime_invoke_native(exec_env, func_ptr, func_type, ret = wasm_runtime_invoke_native(exec_env, func_ptr, func_type,
@ -1377,8 +1378,8 @@ invoke_native_with_hw_bound_check(WASMExecEnv *exec_env, void *func_ptr,
argv_ret); argv_ret);
} }
#ifdef BH_PLATFORM_WINDOWS #ifdef BH_PLATFORM_WINDOWS
if ((exce = aot_get_exception(module_inst)) has_exception = aot_copy_exception(module_inst, exception);
&& strstr(exce, "native stack overflow")) { if (has_exception && strstr(exception, "native stack overflow")) {
/* After a stack overflow, the stack was left /* After a stack overflow, the stack was left
in a damaged state, let the CRT repair it */ in a damaged state, let the CRT repair it */
result = _resetstkoflw(); result = _resetstkoflw();
@ -1541,7 +1542,7 @@ aot_call_function(WASMExecEnv *exec_env, AOTFunctionInstance *function,
func_type, NULL, NULL, argv, argc, argv); func_type, NULL, NULL, argv, argc, argv);
#if WASM_ENABLE_DUMP_CALL_STACK != 0 #if WASM_ENABLE_DUMP_CALL_STACK != 0
if (aot_get_exception(module_inst)) { if (aot_copy_exception(module_inst, NULL)) {
if (aot_create_call_stack(exec_env)) { if (aot_create_call_stack(exec_env)) {
aot_dump_call_stack(exec_env, true, NULL, 0); aot_dump_call_stack(exec_env, true, NULL, 0);
} }
@ -1552,7 +1553,7 @@ aot_call_function(WASMExecEnv *exec_env, AOTFunctionInstance *function,
aot_free_frame(exec_env); aot_free_frame(exec_env);
#endif #endif
return ret && !aot_get_exception(module_inst) ? true : false; return ret && !aot_copy_exception(module_inst, NULL) ? true : false;
} }
} }
@ -1611,6 +1612,14 @@ aot_get_exception(AOTModuleInstance *module_inst)
return wasm_get_exception(module_inst); return wasm_get_exception(module_inst);
} }
bool
aot_copy_exception(AOTModuleInstance *module_inst, char *exception_buf)
{
/* The field offsets of cur_exception in AOTModuleInstance and
WASMModuleInstance are the same */
return wasm_copy_exception(module_inst, exception_buf);
}
static bool static bool
execute_malloc_function(AOTModuleInstance *module_inst, execute_malloc_function(AOTModuleInstance *module_inst,
AOTFunctionInstance *malloc_func, AOTFunctionInstance *malloc_func,

View File

@ -415,6 +415,15 @@ aot_set_exception_with_id(AOTModuleInstance *module_inst, uint32 id);
const char * const char *
aot_get_exception(AOTModuleInstance *module_inst); aot_get_exception(AOTModuleInstance *module_inst);
/**
* @brief Copy exception in buffer passed as parameter. Thread-safe version of
* `aot_get_exception()`
* @note Buffer size must be no smaller than EXCEPTION_BUF_LEN
* @return true if exception found, false otherwise
*/
bool
aot_copy_exception(AOTModuleInstance *module_inst, char *exception_buf);
uint32 uint32
aot_module_malloc(AOTModuleInstance *module_inst, uint32 size, aot_module_malloc(AOTModuleInstance *module_inst, uint32 size,
void **p_native_addr); void **p_native_addr);

View File

@ -2331,12 +2331,6 @@ wasm_set_exception(WASMModuleInstance *module_inst, const char *exception)
if (exec_env) { if (exec_env) {
wasm_cluster_spread_exception(exec_env, exception ? false : true); wasm_cluster_spread_exception(exec_env, exception ? false : true);
} }
#if WASM_ENABLE_SHARED_MEMORY
if (exception) {
notify_stale_threads_on_exception(
(WASMModuleInstanceCommon *)module_inst);
}
#endif
#else #else
(void)exec_env; (void)exec_env;
#endif #endif
@ -3144,6 +3138,21 @@ uint32_t
wasm_runtime_get_wasi_exit_code(WASMModuleInstanceCommon *module_inst) wasm_runtime_get_wasi_exit_code(WASMModuleInstanceCommon *module_inst)
{ {
WASIContext *wasi_ctx = wasm_runtime_get_wasi_ctx(module_inst); WASIContext *wasi_ctx = wasm_runtime_get_wasi_ctx(module_inst);
#if WASM_ENABLE_THREAD_MGR != 0
WASMCluster *cluster;
WASMExecEnv *exec_env;
exec_env = wasm_runtime_get_exec_env_singleton(module_inst);
if (exec_env && (cluster = wasm_exec_env_get_cluster(exec_env))) {
/**
* The main thread may exit earlier than other threads, and
* the exit_code of wasi_ctx may be changed by other thread
* when it runs into wasi_proc_exit, here we wait until all
* other threads exit to avoid getting invalid exit_code.
*/
wasm_cluster_wait_for_all_except_self(cluster, exec_env);
}
#endif
return wasi_ctx->exit_code; return wasi_ctx->exit_code;
} }

View File

@ -106,61 +106,6 @@ search_module(WASMModuleCommon *module)
return NULL; return NULL;
} }
static void
wait_map_address_count_callback(void *key, void *value,
void *p_total_elem_count)
{
*(uint32 *)p_total_elem_count = *(uint32 *)p_total_elem_count + 1;
}
static void
create_list_of_waiter_addresses(void *key, void *value, void *user_data)
{
AtomicWaitAddressArgs *data = (AtomicWaitAddressArgs *)user_data;
data->addr[data->index++] = key;
}
void
notify_stale_threads_on_exception(WASMModuleInstanceCommon *module_inst)
{
AtomicWaitAddressArgs args = { 0 };
uint32 i = 0, total_elem_count = 0;
uint64 total_elem_count_size = 0;
os_mutex_lock(&wait_map_lock); /* Make the two traversals atomic */
/* count number of addresses in wait_map */
bh_hash_map_traverse(wait_map, wait_map_address_count_callback,
(void *)&total_elem_count);
if (!total_elem_count) {
os_mutex_unlock(&wait_map_lock);
return;
}
/* allocate memory */
total_elem_count_size = (uint64)sizeof(void *) * total_elem_count;
if (total_elem_count_size >= UINT32_MAX
|| !(args.addr = wasm_runtime_malloc((uint32)total_elem_count_size))) {
LOG_ERROR(
"failed to allocate memory for list of atomic wait addresses");
os_mutex_unlock(&wait_map_lock);
return;
}
/* set values in list of addresses */
bh_hash_map_traverse(wait_map, create_list_of_waiter_addresses, &args);
os_mutex_unlock(&wait_map_lock);
/* notify */
for (i = 0; i < args.index; i++) {
wasm_runtime_atomic_notify(module_inst, args.addr[i], UINT32_MAX);
}
/* free memory allocated to args data */
wasm_runtime_free(args.addr);
}
WASMSharedMemNode * WASMSharedMemNode *
wasm_module_get_shared_memory(WASMModuleCommon *module) wasm_module_get_shared_memory(WASMModuleCommon *module)
{ {
@ -413,7 +358,9 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
AtomicWaitInfo *wait_info; AtomicWaitInfo *wait_info;
AtomicWaitNode *wait_node; AtomicWaitNode *wait_node;
WASMSharedMemNode *node; WASMSharedMemNode *node;
#if WASM_ENABLE_THREAD_MGR != 0
WASMExecEnv *exec_env; WASMExecEnv *exec_env;
#endif
bool check_ret, is_timeout, no_wait; bool check_ret, is_timeout, no_wait;
bh_assert(module->module_type == Wasm_Module_Bytecode bh_assert(module->module_type == Wasm_Module_Bytecode
@ -489,14 +436,47 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
/* condition wait start */ /* condition wait start */
os_mutex_lock(&wait_node->wait_lock); os_mutex_lock(&wait_node->wait_lock);
if (!no_wait if (!no_wait) {
/* unit of timeout is nsec, convert it to usec */
uint64 timeout_left = (uint64)timeout / 1000, timeout_wait;
uint64 timeout_1sec = 1e6;
while (1) {
if (timeout < 0) {
/* wait forever until it is notified or terminatied
here we keep waiting and checking every second */
os_cond_reltimedwait(&wait_node->wait_cond,
&wait_node->wait_lock,
(uint64)timeout_1sec);
if (wait_node->status
== S_NOTIFIED /* notified by atomic.notify */
#if WASM_ENABLE_THREAD_MGR != 0 #if WASM_ENABLE_THREAD_MGR != 0
&& !wasm_cluster_is_thread_terminated(exec_env) /* terminated by other thread */
|| wasm_cluster_is_thread_terminated(exec_env)
#endif #endif
) { ) {
os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock, break;
timeout < 0 ? BHT_WAIT_FOREVER }
: (uint64)timeout / 1000); /* continue to wait */
}
else {
timeout_wait =
timeout_left < timeout_1sec ? timeout_left : timeout_1sec;
os_cond_reltimedwait(&wait_node->wait_cond,
&wait_node->wait_lock, timeout_wait);
if (wait_node->status
== S_NOTIFIED /* notified by atomic.notify */
|| timeout_left <= timeout_wait /* time out */
#if WASM_ENABLE_THREAD_MGR != 0
/* terminated by other thread */
|| wasm_cluster_is_thread_terminated(exec_env)
#endif
) {
break;
}
timeout_left -= timeout_wait;
}
}
} }
is_timeout = wait_node->status == S_WAITING ? true : false; is_timeout = wait_node->status == S_WAITING ? true : false;

View File

@ -7,7 +7,6 @@
#define _WASM_SHARED_MEMORY_H #define _WASM_SHARED_MEMORY_H
#include "bh_common.h" #include "bh_common.h"
#include "wasm_exec_env.h"
#if WASM_ENABLE_INTERP != 0 #if WASM_ENABLE_INTERP != 0
#include "wasm_runtime.h" #include "wasm_runtime.h"
#endif #endif
@ -40,9 +39,6 @@ wasm_shared_memory_init();
void void
wasm_shared_memory_destroy(); wasm_shared_memory_destroy();
void
notify_stale_threads_on_exception(WASMModuleInstanceCommon *module);
WASMSharedMemNode * WASMSharedMemNode *
wasm_module_get_shared_memory(WASMModuleCommon *module); wasm_module_get_shared_memory(WASMModuleCommon *module);

View File

@ -41,9 +41,6 @@ __wasi_thread_start_C(int thread_id, int *start_arg)
for (int i = 0; i < NUM_ITER; i++) for (int i = 0; i < NUM_ITER; i++)
__atomic_fetch_add(data->count, 1, __ATOMIC_SEQ_CST); __atomic_fetch_add(data->count, 1, __ATOMIC_SEQ_CST);
pthread_mutex_lock(&mutex); /* malloc is not thread-safe in wasi-libc */
vals[data->iteration] = malloc(sizeof(int));
pthread_mutex_unlock(&mutex);
*vals[data->iteration] = data->iteration; *vals[data->iteration] = data->iteration;
__atomic_store_n(&data->th_done, 1, __ATOMIC_SEQ_CST); __atomic_store_n(&data->th_done, 1, __ATOMIC_SEQ_CST);
@ -60,6 +57,11 @@ main(int argc, char **argv)
assert(count != NULL && "Failed to call calloc"); assert(count != NULL && "Failed to call calloc");
assert(pthread_mutex_init(&mutex, NULL) == 0 && "Failed to init mutex"); assert(pthread_mutex_init(&mutex, NULL) == 0 && "Failed to init mutex");
for (int i = 0; i < NUM_THREADS; i++) {
vals[i] = malloc(sizeof(int));
assert(vals[i] != NULL && "Failed to call calloc");
}
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
assert(start_args_init(&data[i].base) assert(start_args_init(&data[i].base)
&& "Stack allocation for thread failed"); && "Stack allocation for thread failed");

View File

@ -525,6 +525,9 @@ wasm_cluster_spawn_exec_env(WASMExecEnv *exec_env)
goto fail4; goto fail4;
} }
/* Inherit suspend_flags of parent thread */
new_exec_env->suspend_flags.flags = exec_env->suspend_flags.flags;
if (!wasm_cluster_add_exec_env(cluster, new_exec_env)) if (!wasm_cluster_add_exec_env(cluster, new_exec_env))
goto fail4; goto fail4;
@ -674,6 +677,9 @@ wasm_cluster_create_thread(WASMExecEnv *exec_env,
new_exec_env->aux_stack_bottom.bottom = UINT32_MAX; new_exec_env->aux_stack_bottom.bottom = UINT32_MAX;
} }
/* Inherit suspend_flags of parent thread */
new_exec_env->suspend_flags.flags = exec_env->suspend_flags.flags;
if (!wasm_cluster_add_exec_env(cluster, new_exec_env)) if (!wasm_cluster_add_exec_env(cluster, new_exec_env))
goto fail3; goto fail3;