This repository has been archived on 2023-11-05. You can view files and clone it, but cannot push or open issues or pull requests.
wasm-micro-runtime/core/shared-lib/utils/bh_queue.c
2019-05-07 10:18:18 +08:00

259 lines
5.9 KiB
C

/*
* Copyright (C) 2019 Intel Corporation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "bh_queue.h"
#include "bh_thread.h"
#include "bh_memory.h"
#include "bh_time.h"
#include "bh_common.h"
typedef struct _bh_queue_node {
struct _bh_queue_node * next;
struct _bh_queue_node * prev;
unsigned short tag;
unsigned long len;
void * body;
bh_msg_cleaner msg_cleaner;
} bh_queue_node;
struct bh_queue {
bh_queue_mutex queue_lock;
bh_queue_cond queue_wait_cond;
unsigned int cnt;
unsigned int max;
unsigned int drops;
bh_queue_node * head;
bh_queue_node * tail;
bool exit_loop_run;
};
char * bh_message_payload(bh_message_t message)
{
return message->body;
}
int bh_message_payload_len(bh_message_t message)
{
return message->len;
}
int bh_message_type(bh_message_t message)
{
return message->tag;
}
bh_queue *
bh_queue_create()
{
int ret;
bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
if (queue) {
memset(queue, 0, sizeof(bh_queue));
queue->max = DEFAULT_QUEUE_LENGTH;
ret = bh_queue_mutex_init(&queue->queue_lock);
if (ret != 0) {
bh_queue_free(queue);
return NULL;
}
ret = bh_queue_cond_init(&queue->queue_wait_cond);
if (ret != 0) {
bh_queue_mutex_destroy(&queue->queue_lock);
bh_queue_free(queue);
return NULL;
}
}
return queue;
}
void bh_queue_destroy(bh_queue *queue)
{
bh_queue_node *node;
if (!queue)
return;
bh_queue_mutex_lock(&queue->queue_lock);
while (queue->head) {
node = queue->head;
queue->head = node->next;
bh_free_msg(node);
}
bh_queue_mutex_unlock(&queue->queue_lock);
bh_queue_cond_destroy(&queue->queue_wait_cond);
bh_queue_mutex_destroy(&queue->queue_lock);
bh_queue_free(queue);
}
bool bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
{
if (queue->cnt >= queue->max) {
queue->drops++;
bh_free_msg(msg);
return false;
}
bh_queue_mutex_lock(&queue->queue_lock);
if (queue->cnt == 0) {
bh_assert(queue->head == NULL);
bh_assert(queue->tail == NULL);
queue->head = queue->tail = msg;
msg->next = msg->prev = NULL;
queue->cnt = 1;
bh_queue_cond_signal(&queue->queue_wait_cond);
} else {
msg->next = NULL;
msg->prev = queue->tail;
queue->tail->next = msg;
queue->tail = msg;
queue->cnt++;
}
bh_queue_mutex_unlock(&queue->queue_lock);
return true;
}
bool bh_post_msg(bh_queue *queue, unsigned short tag, void *body,
unsigned int len)
{
bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
if (msg == NULL) {
queue->drops++;
if (len != 0 && body)
bh_free(body);
return false;
}
if (!bh_post_msg2(queue, msg)) {
// bh_post_msg2 already freed the msg for failure
return false;
}
return true;
}
bh_queue_node * bh_new_msg(unsigned short tag, void *body, unsigned int len,
void * handler)
{
bh_queue_node *msg = (bh_queue_node*) bh_queue_malloc(
sizeof(bh_queue_node));
if (msg == NULL)
return NULL;
memset(msg, 0, sizeof(bh_queue_node));
msg->len = len;
msg->body = body;
msg->tag = tag;
msg->msg_cleaner = (bh_msg_cleaner) handler;
return msg;
}
void bh_free_msg(bh_queue_node *msg)
{
if (msg->msg_cleaner) {
msg->msg_cleaner(msg->body);
bh_queue_free(msg);
return;
}
// note: sometime we just use the payload pointer for a integer value
// len!=0 is the only indicator about the body is an allocated buffer.
if (msg->body && msg->len)
bh_queue_free(msg->body);
bh_queue_free(msg);
}
bh_message_t bh_get_msg(bh_queue *queue, int timeout)
{
bh_queue_node *msg = NULL;
bh_queue_mutex_lock(&queue->queue_lock);
if (queue->cnt == 0) {
bh_assert(queue->head == NULL);
bh_assert(queue->tail == NULL);
if (timeout == 0) {
bh_queue_mutex_unlock(&queue->queue_lock);
return NULL;
}
bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
timeout);
}
if (queue->cnt == 0) {
bh_assert(queue->head == NULL);
bh_assert(queue->tail == NULL);
} else if (queue->cnt == 1) {
bh_assert(queue->head == queue->tail);
msg = queue->head;
queue->head = queue->tail = NULL;
queue->cnt = 0;
} else {
msg = queue->head;
queue->head = queue->head->next;
queue->head->prev = NULL;
queue->cnt--;
}
bh_queue_mutex_unlock(&queue->queue_lock);
return msg;
}
unsigned bh_queue_get_message_count(bh_queue *queue)
{
if (!queue)
return 0;
return queue->cnt;
}
void bh_queue_enter_loop_run(bh_queue *queue,
bh_queue_handle_msg_callback handle_cb)
{
if (!queue)
return;
while (!queue->exit_loop_run) {
bh_queue_node * message = bh_get_msg(queue, BH_WAIT_FOREVER);
if (message) {
handle_cb(message);
bh_free_msg(message);
}
}
}
void bh_queue_exit_loop_run(bh_queue *queue)
{
if (queue) {
queue->exit_loop_run = true;
bh_queue_cond_signal(&queue->queue_wait_cond);
}
}