From 2934de12ddaa4e696438e3933b8699834961e8cc Mon Sep 17 00:00:00 2001 From: Francesco Date: Mon, 4 Aug 2025 00:39:23 +0200 Subject: [PATCH] improve server and fix memory leaks --- README.md | 4 +- include/http/http.h | 8 +- include/meson.build | 1 + include/server/server.h | 30 +---- include/server/threadpool.h | 27 +++-- include/utils/colors.h | 2 +- src/http/http.c | 18 ++- src/meson.build | 2 +- src/server/server.c | 232 +++++++++++++++++++----------------- src/server/threadpool.c | 101 ++++++++++++++-- 10 files changed, 251 insertions(+), 174 deletions(-) diff --git a/README.md b/README.md index c5304f5..49318e4 100644 --- a/README.md +++ b/README.md @@ -33,11 +33,11 @@ And then open the `docs/html/index.html`. ## Roadmap - Implement Keep-Alive -- Custom web pages (404) -- Support for virtual hosts - HTTPS support with TLS ## Future +- Support for virtual hosts +- Custom web pages (404) - CLI args - IPv6 compatible - Multithreading to handle concurrent requests diff --git a/include/http/http.h b/include/http/http.h index 01848ce..49d9bfe 100644 --- a/include/http/http.h +++ b/include/http/http.h @@ -7,12 +7,12 @@ #include "myclib/string/mystring.h" #include "utils/config.h" -#define CWS_HTTP_HEADER_MAX 64 -#define CWS_HTTP_HEADER_CONTENT_MAX 512 +#define CWS_HTTP_HEADER_MAX 512 +#define CWS_HTTP_HEADER_CONTENT_MAX 1024 typedef enum cws_http_method_t { - CWS_HTTP_GET, /**< GET method */ - CWS_HTTP_POST, /**< POST method */ + CWS_HTTP_GET, + CWS_HTTP_POST, CWS_HTTP_PUT, CWS_HTTP_DELETE, CWS_HTTP_HEAD, diff --git a/include/meson.build b/include/meson.build index 7fc76e1..215ba66 100644 --- a/include/meson.build +++ b/include/meson.build @@ -1,2 +1,3 @@ myclib = files('myclib/hashmap/myhashmap.c') myclib += files('myclib/string/mystring.c') +myclib += files('myclib/queue/myqueue.c') diff --git a/include/server/server.h b/include/server/server.h index 986c3fb..7483813 100644 --- a/include/server/server.h +++ b/include/server/server.h @@ -4,7 +4,6 @@ #include #include #include -#include #include "myclib/hashmap/myhashmap.h" #include "utils/config.h" @@ -42,29 +41,10 @@ typedef enum cws_server_ret_t { CWS_SERVER_HASHMAP_INIT, CWS_SERVER_MALLOC_ERROR, CWS_SERVER_REQUEST_TOO_LARGE, + CWS_SERVER_THREADPOOL_ERROR, + CWS_SERVER_EPOLL_CREATE_ERROR, } cws_server_ret; -/* TODO: use last_activity as keep-alive */ -typedef struct cws_client_t { - struct sockaddr_storage addr; - time_t last_activity; -} cws_client; - -typedef struct cws_pthread_data_t { - int client_fd; - int epfd; - cws_config *config; - mcl_hashmap *clients; -} cws_pthread_data; - -/** - * @brief Setups hints object - * - * @param[out] hints The hints addrinfo - * @param[in] hostname The hostname (could be NULL) - */ -void cws_server_setup_hints(struct addrinfo *hints, const char *hostname); - /** * @brief Runs the server * @@ -77,7 +57,7 @@ cws_server_ret cws_server_start(cws_config *config); * * @param[in,out] sockfd Socket of the commincation endpoint */ -cws_server_ret cws_server_loop(int sockfd, cws_config *config); +cws_server_ret cws_server_loop(int server_fd, cws_config *config); /** * @brief Adds a file descriptor to the interest list @@ -110,7 +90,7 @@ cws_server_ret cws_fd_set_nonblocking(int sockfd); * @param[out] their_sa Populates the struct with client's information * @param[in] theirsa_size Size of the struct */ -int cws_server_accept_client(int sockfd, struct sockaddr_storage *their_sa, socklen_t *theirsa_size); +int cws_server_accept_client(int server_fd, struct sockaddr_storage *their_sa, socklen_t *theirsa_size); /** * @brief Disconnect a client @@ -121,7 +101,7 @@ int cws_server_accept_client(int sockfd, struct sockaddr_storage *their_sa, sock */ void cws_server_close_client(int epfd, int client_fd, mcl_hashmap *hashmap); -cws_server_ret cws_server_handle_new_client(int sockfd, int epfd, mcl_hashmap *clients); +int cws_server_handle_new_client(int server_fd, int epfd, mcl_hashmap *clients); void *cws_server_handle_client_data(void *arg); #endif diff --git a/include/server/threadpool.h b/include/server/threadpool.h index b23b51c..f909887 100644 --- a/include/server/threadpool.h +++ b/include/server/threadpool.h @@ -1,7 +1,7 @@ #ifndef CWS_THREADPOOL_H #define CWS_THREADPOOL_H -#include "myclib/hashmap/myhashmap.h" +#include "myclib/queue/myqueue.h" #include "server.h" #include "utils/config.h" @@ -10,25 +10,26 @@ typedef struct cws_task_t { int client_fd; - int epfd; - mcl_hashmap *clients; cws_config *config; } cws_task; +typedef struct cws_thread_task_t { + void (*function)(void *); + void *arg; +} cws_thread_task; + typedef struct cws_threadpool_t { - cws_task queue[CWS_MAX_QUEUE_TASKS]; - int front, rear; - + mcl_queue *queue; pthread_mutex_t lock; - pthread_cond_t cond; - pthread_t threads[CWS_MAX_THREADS]; - + pthread_cond_t notify; + size_t threads_num; + pthread_t *threads; int shutdown; } cws_threadpool; -cws_threadpool *cws_threadpool_init(); -cws_server_ret cws_threadpool_add_task(cws_threadpool *pool, cws_task *task); -cws_server_ret cws_threadpool_worker(cws_threadpool *pool); -cws_server_ret cws_threadpool_destroy(cws_threadpool *pool); +cws_threadpool *cws_threadpool_init(size_t threads_num, size_t queue_size); +cws_server_ret cws_threadpool_submit(cws_threadpool *pool, cws_thread_task *task); +void *cws_threadpool_worker(void *arg); +void cws_threadpool_destroy(cws_threadpool *pool); #endif diff --git a/include/utils/colors.h b/include/utils/colors.h index 5373b3d..7524119 100644 --- a/include/utils/colors.h +++ b/include/utils/colors.h @@ -27,7 +27,7 @@ #define CWS_LOG_DEBUG(msg, ...) #endif -#define CWS_LOG_ERROR(msg, ...) fprintf(stderr, _ERR " " msg "\n", ##__VA_ARGS__) +#define CWS_LOG_ERROR(msg, ...) fprintf(stderr, _ERR " [%s:%d] " msg "\n", __FILE__, __LINE__, ##__VA_ARGS__) #define CWS_LOG_WARNING(msg, ...) fprintf(stdout, _WARNING " " msg "\n", ##__VA_ARGS__) #define CWS_LOG_INFO(msg, ...) fprintf(stdout, _INFO " " msg "\n", ##__VA_ARGS__) diff --git a/src/http/http.c b/src/http/http.c index f60b516..2cb47d2 100644 --- a/src/http/http.c +++ b/src/http/http.c @@ -126,18 +126,23 @@ cws_http *cws_http_parse(mcl_string *request_str, int sockfd, cws_config *config break; } + char hk[CWS_HTTP_HEADER_MAX]; + char hv[CWS_HTTP_HEADER_CONTENT_MAX]; + /* Header key */ - char *hkey = pch; - char *hkey_dup = strdup(hkey); + strncpy(hk, pch, sizeof(hk)); /* Header value (starting from ": ") */ char *hvalue = header_colon + 2; - char *hvalue_dup = strdup(hvalue); + strncpy(hv, hvalue, sizeof(hv)); - mcl_hm_set(request->headers, hkey_dup, hvalue_dup); + // CWS_LOG_DEBUG("hkey: %s -> %s", hk, hv); + mcl_hm_set(request->headers, hk, hv); } /* TODO: Parse body */ + free(request_str_cpy); + return request; } @@ -240,6 +245,7 @@ int cws_http_send_resource(cws_http *request) { if (read_bytes != content_length) { free(file_data); CWS_LOG_ERROR("Partial read from file"); + return 0; } @@ -250,6 +256,7 @@ int cws_http_send_resource(cws_http *request) { strcpy(conn, "keep-alive"); keepalive = 1; } + mcl_hm_free_bucket(connection); char *response = NULL; size_t response_len = cws_http_response_builder(&response, "HTTP/1.1", CWS_HTTP_OK, content_type, conn, file_data, content_length); @@ -338,5 +345,8 @@ void cws_http_send_simple_html(cws_http *request, cws_http_status status, char * void cws_http_free(cws_http *request) { mcl_hm_free(request->headers); + mcl_string_free(request->http_version); + mcl_string_free(request->location); + mcl_string_free(request->location_path); free(request); } diff --git a/src/meson.build b/src/meson.build index cda4eaf..a3dfa9e 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,3 +1,3 @@ -server = files('main.c', 'server/server.c') +server = files('main.c', 'server/server.c', 'server/threadpool.c') server += files('utils/utils.c', 'utils/config.c') server += files('http/http.c') diff --git a/src/server/server.c b/src/server/server.c index 0fa6abc..f980394 100644 --- a/src/server/server.c +++ b/src/server/server.c @@ -12,12 +12,13 @@ #include #include "http/http.h" +#include "server/threadpool.h" #include "utils/colors.h" #include "utils/utils.h" volatile sig_atomic_t cws_server_run = 1; -void cws_server_setup_hints(struct addrinfo *hints, const char *hostname) { +static void cws_server_setup_hints(struct addrinfo *hints, const char *hostname) { memset(hints, 0, sizeof(struct addrinfo)); /* IPv4 or IPv6 */ @@ -48,53 +49,52 @@ cws_server_ret cws_server_start(cws_config *config) { return CWS_SERVER_GETADDRINFO_ERROR; } - int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (sockfd < 0) { + int server_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (server_fd < 0) { CWS_LOG_ERROR("socket(): %s", strerror(errno)); return CWS_SERVER_SOCKET_ERROR; } const int opt = 1; - status = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt); + status = setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt); if (status != 0) { CWS_LOG_ERROR("setsockopt(): %s", strerror(errno)); return CWS_SERVER_SETSOCKOPT_ERROR; } - status = bind(sockfd, res->ai_addr, res->ai_addrlen); + status = bind(server_fd, res->ai_addr, res->ai_addrlen); if (status != 0) { CWS_LOG_ERROR("bind(): %s", strerror(errno)); return CWS_SERVER_BIND_ERROR; } - status = listen(sockfd, CWS_SERVER_BACKLOG); + status = listen(server_fd, CWS_SERVER_BACKLOG); if (status != 0) { CWS_LOG_ERROR("listen(): %s", strerror(errno)); return CWS_SERVER_LISTEN_ERROR; } - cws_server_ret ret = cws_server_loop(sockfd, config); + cws_server_ret ret = cws_server_loop(server_fd, config); CWS_LOG_DEBUG("cws_server_loop ret: %d", ret); freeaddrinfo(res); - close(sockfd); + close(server_fd); return CWS_SERVER_OK; } -cws_server_ret cws_server_loop(int sockfd, cws_config *config) { - /* Make the server loop multi-thread */ - mcl_hashmap *clients = mcl_hm_init(my_int_hash_fn, my_int_equal_fn, my_int_free_key_fn, my_str_free_fn, sizeof(int), sizeof(cws_client)); - if (!clients) { - return CWS_SERVER_HASHMAP_INIT; +static cws_server_ret cws_server_setup_epoll(int sockfd, int *epfd_out) { + int epfd = epoll_create1(0); + if (epfd == -1) { + CWS_LOG_ERROR("epoll_create(): %s", strerror(errno)); + + return CWS_SERVER_EPOLL_CREATE_ERROR; } cws_server_ret ret; - int epfd = epoll_create1(0); ret = cws_fd_set_nonblocking(sockfd); if (ret != CWS_SERVER_OK) { - mcl_hm_free(clients); close(epfd); return ret; @@ -102,12 +102,30 @@ cws_server_ret cws_server_loop(int sockfd, cws_config *config) { ret = cws_epoll_add(epfd, sockfd, EPOLLIN | EPOLLET); if (ret != CWS_SERVER_OK) { - mcl_hm_free(clients); close(epfd); return ret; } + *epfd_out = epfd; + + return CWS_SERVER_OK; +} + +cws_server_ret cws_server_loop(int server_fd, cws_config *config) { + cws_server_ret ret; + int epfd; + + ret = cws_server_setup_epoll(server_fd, &epfd); + if (ret != CWS_SERVER_OK) { + return ret; + } + + mcl_hashmap *clients = mcl_hm_init(my_int_hash_fn, my_int_equal_fn, my_int_free_key_fn, my_str_free_fn, sizeof(int), sizeof(char) * INET_ADDRSTRLEN); + if (!clients) { + return CWS_SERVER_HASHMAP_INIT; + } + struct epoll_event *revents = malloc(CWS_SERVER_EPOLL_MAXEVENTS * sizeof(struct epoll_event)); if (!revents) { mcl_hm_free(clients); @@ -116,6 +134,14 @@ cws_server_ret cws_server_loop(int sockfd, cws_config *config) { return CWS_SERVER_MALLOC_ERROR; } + cws_threadpool *pool = cws_threadpool_init(4, 16); + if (pool == NULL) { + mcl_hm_free(clients); + close(epfd); + + return CWS_SERVER_THREADPOOL_ERROR; + } + while (cws_server_run) { int nfds = epoll_wait(epfd, revents, CWS_SERVER_EPOLL_MAXEVENTS, CWS_SERVER_EPOLL_TIMEOUT); @@ -125,33 +151,38 @@ cws_server_ret cws_server_loop(int sockfd, cws_config *config) { } for (int i = 0; i < nfds; ++i) { - if (revents[i].data.fd == sockfd) { - ret = cws_server_handle_new_client(sockfd, epfd, clients); - if (ret != CWS_SERVER_OK) { - CWS_LOG_DEBUG("Handle new client: %d", ret); + if (revents[i].data.fd == server_fd) { + int client_fd = cws_server_handle_new_client(server_fd, epfd, clients); + if (client_fd < 0) { + continue; } + cws_epoll_add(epfd, client_fd, EPOLLIN | EPOLLET); + cws_fd_set_nonblocking(client_fd); } else { - cws_pthread_data *thread_data = malloc(sizeof(cws_pthread_data)); - if (!thread_data) { - continue; - } - pthread_t client_thread; - int client_fd = revents[i].data.fd; - thread_data->client_fd = client_fd; - thread_data->clients = clients; - thread_data->config = config; - thread_data->epfd = epfd; - ret = pthread_create(&client_thread, NULL, cws_server_handle_client_data, thread_data); - CWS_LOG_DEBUG("Started client thread for fd: %d", client_fd); - if (ret != 0) { - free(thread_data); + cws_task *task = malloc(sizeof(cws_task)); + if (!task) { cws_server_close_client(epfd, client_fd, clients); + continue; } - pthread_detach(client_thread); + task->client_fd = client_fd; + task->config = config; + + cws_thread_task *ttask = malloc(sizeof(cws_thread_task)); + if (!ttask) { + free(task); + + continue; + } + + ttask->arg = task; + ttask->function = (void *)cws_server_handle_client_data; + + ret = cws_threadpool_submit(pool, ttask); + free(ttask); } } } @@ -160,121 +191,99 @@ cws_server_ret cws_server_loop(int sockfd, cws_config *config) { free(revents); close(epfd); mcl_hm_free(clients); + cws_threadpool_destroy(pool); return CWS_SERVER_OK; } -cws_server_ret cws_server_handle_new_client(int sockfd, int epfd, mcl_hashmap *clients) { +int cws_server_handle_new_client(int server_fd, int epfd, mcl_hashmap *clients) { struct sockaddr_storage their_sa; socklen_t theirsa_size = sizeof their_sa; char ip[INET_ADDRSTRLEN]; - int client_fd = cws_server_accept_client(sockfd, &their_sa, &theirsa_size); + int client_fd = cws_server_accept_client(server_fd, &their_sa, &theirsa_size); if (client_fd < 0) { - return CWS_SERVER_FD_ERROR; + return client_fd; } cws_utils_get_client_ip(&their_sa, ip); CWS_LOG_INFO("Client (%s) (fd: %d) connected", ip, client_fd); - cws_fd_set_nonblocking(client_fd); - cws_epoll_add(epfd, client_fd, EPOLLIN); + return client_fd; +} - cws_client client = { - .addr = their_sa, - .last_activity = time(NULL), - }; - mcl_hm_set(clients, &client_fd, &client); +static size_t cws_read_data(int sockfd, mcl_string **str) { + size_t total_bytes = 0; + ssize_t bytes_read; - return CWS_SERVER_OK; + if (*str == NULL) { + *str = mcl_string_new("", 4096); + } + + char tmp[4096]; + memset(tmp, 0, sizeof(tmp)); + + while (1) { + bytes_read = recv(sockfd, tmp, sizeof(tmp), 0); + + if (bytes_read == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + break; + } + + CWS_LOG_ERROR("recv(): %s", strerror(errno)); + + return -1; + } else if (bytes_read == 0) { + return -1; + } + + total_bytes += bytes_read; + mcl_string_append(*str, tmp); + } + + return total_bytes; } void *cws_server_handle_client_data(void *arg) { - cws_pthread_data *thread_data = (cws_pthread_data *)arg; + cws_task *task = (cws_task *)arg; - int client_fd = thread_data->client_fd; - int epfd = thread_data->epfd; - mcl_hashmap *clients = thread_data->clients; - cws_config *config = thread_data->config; + int client_fd = task->client_fd; + cws_config *config = task->config; - char tmp_data[1024]; - memset(tmp_data, 0, sizeof(tmp_data)); - char ip[INET_ADDRSTRLEN] = {0}; - mcl_string *data = mcl_string_new("", 1024); - - /* Incoming data */ - ssize_t total_bytes = 0; - ssize_t bytes_read; - while ((bytes_read = recv(client_fd, tmp_data, sizeof(tmp_data), 0)) > 0) { - total_bytes += bytes_read; - if (total_bytes > CWS_SERVER_MAX_REQUEST_SIZE) { - mcl_string_free(data); - cws_server_close_client(epfd, client_fd, clients); - free(thread_data); - - return NULL; - } - mcl_string_append(data, tmp_data); - memset(tmp_data, 0, sizeof(tmp_data)); - } - - if (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - /* Error during read, handle it (close client) */ - CWS_LOG_ERROR("recv(): %s", strerror(errno)); + /* Read data from socket */ + mcl_string *data = NULL; + size_t total_bytes = cws_read_data(client_fd, &data); + if (total_bytes < 0) { mcl_string_free(data); - cws_server_close_client(epfd, client_fd, clients); - free(thread_data); + free(task); + + /* TODO: Here we should close the client_fd, same as total == 0 */ return NULL; } - /* Retrieve client ip */ - mcl_bucket *client = mcl_hm_get(clients, &client_fd); - if (!client) { - CWS_LOG_ERROR("Client fd %d not found in hashmap", client_fd); + if (total_bytes == 0) { mcl_string_free(data); - cws_epoll_del(epfd, client_fd); - close(client_fd); - free(thread_data); - - return NULL; - } - cws_client *client_info = (cws_client *)client->value; - cws_utils_get_client_ip(&client_info->addr, ip); - client_info->last_activity = time(NULL); - - if (bytes_read == 0) { - /* Client disconnected */ - CWS_LOG_INFO("Client (%s) disconnected", ip); - mcl_string_free(data); - cws_server_close_client(epfd, client_fd, clients); - free(thread_data); return NULL; } /* Parse HTTP request */ + // CWS_LOG_DEBUG("Raw request: %s", mcl_string_cstr(data)); cws_http *request = cws_http_parse(data, client_fd, config); mcl_string_free(data); if (request == NULL) { - CWS_LOG_INFO("Client (%s) disconnected (request NULL)", ip); - cws_server_close_client(epfd, client_fd, clients); - free(thread_data); - return NULL; } - int keepalive = cws_http_send_resource(request); + /* TODO: fix keep-alive */ + int _keepalive = cws_http_send_resource(request); cws_http_free(request); - /* Only close connection if not keep-alive */ - /* TODO: fix */ - if (keepalive <= 0) { - cws_server_close_client(epfd, client_fd, clients); - } - - free(thread_data); + /* Free the task */ + free(task); return NULL; } @@ -315,8 +324,8 @@ cws_server_ret cws_fd_set_nonblocking(int sockfd) { return CWS_SERVER_OK; } -int cws_server_accept_client(int sockfd, struct sockaddr_storage *their_sa, socklen_t *theirsa_size) { - const int client_fd = accept(sockfd, (struct sockaddr *)their_sa, theirsa_size); +int cws_server_accept_client(int server_fd, struct sockaddr_storage *their_sa, socklen_t *theirsa_size) { + const int client_fd = accept(server_fd, (struct sockaddr *)their_sa, theirsa_size); if (client_fd == -1) { if (errno != EWOULDBLOCK) { @@ -330,6 +339,7 @@ int cws_server_accept_client(int sockfd, struct sockaddr_storage *their_sa, sock void cws_server_close_client(int epfd, int client_fd, mcl_hashmap *hashmap) { if (fcntl(client_fd, F_GETFD) != -1) { + /* TODO: race condition here */ cws_epoll_del(epfd, client_fd); mcl_hm_remove(hashmap, &client_fd); } diff --git a/src/server/threadpool.c b/src/server/threadpool.c index f80e7ef..7473cd3 100644 --- a/src/server/threadpool.c +++ b/src/server/threadpool.c @@ -1,49 +1,124 @@ #include "server/threadpool.h" #include +#include -cws_threadpool *cws_threadpool_init() { +cws_threadpool *cws_threadpool_init(size_t threads_num, size_t queue_size) { + /* Allocate threadpool */ cws_threadpool *pool = malloc(sizeof(cws_threadpool)); if (pool == NULL) { return NULL; } + memset(pool, 0, sizeof(cws_threadpool)); int ret; - ret = pthread_mutex_init(&pool->lock, NULL); if (ret != 0) { free(pool); + return NULL; } - ret = pthread_cond_init(&pool->cond, NULL); + ret = pthread_cond_init(&pool->notify, NULL); if (ret != 0) { + pthread_mutex_destroy(&pool->lock); free(pool); + return NULL; } - pool->front = 0; - pool->rear = 0; - pool->shutdown = 0; + pool->queue = mcl_queue_init(queue_size, sizeof(cws_thread_task)); + if (pool->queue == NULL) { + pthread_mutex_destroy(&pool->lock); + pthread_cond_destroy(&pool->notify); + free(pool); + + return NULL; + } + + pool->threads_num = threads_num; + pool->threads = malloc(sizeof(pthread_t) * threads_num); + if (pool->threads == NULL) { + mcl_queue_free(pool->queue); + pthread_cond_destroy(&pool->notify); + pthread_mutex_destroy(&pool->lock); + free(pool); + + return NULL; + } + + for (size_t i = 0; i < threads_num; ++i) { + pthread_create(&pool->threads[i], NULL, cws_threadpool_worker, pool); + } return pool; } -cws_server_ret cws_threadpool_add_task(cws_threadpool *pool, cws_task *task) { +cws_server_ret cws_threadpool_submit(cws_threadpool *pool, cws_thread_task *task) { + cws_thread_task tt = { + .function = task->function, + .arg = task->arg, + }; + pthread_mutex_lock(&pool->lock); - /* ? */ + int res = mcl_queue_push(pool->queue, &tt); + if (res == 0) { + pthread_cond_signal(&pool->notify); + } pthread_mutex_unlock(&pool->lock); - return CWS_SERVER_OK; + return res; } -cws_server_ret cws_threadpool_worker(cws_threadpool *pool) { return CWS_SERVER_OK; } +void *cws_threadpool_worker(void *arg) { + cws_threadpool *pool = (cws_threadpool *)arg; + cws_thread_task task = { + .arg = NULL, + .function = NULL, + }; + int ret; + + while (1) { + /* Loop until notify */ + ret = pthread_mutex_lock(&pool->lock); + if (ret != 0) { + continue; + } + + while (!pool->shutdown && pool->queue->size == 0) { + pthread_cond_wait(&pool->notify, &pool->lock); + } + + if (pool->shutdown && pool->queue->size == 0) { + pthread_mutex_unlock(&pool->lock); + break; + } + + mcl_queue_pop(pool->queue, &task); + pthread_mutex_unlock(&pool->lock); + + task.function(task.arg); + } + + return NULL; +} + +void cws_threadpool_destroy(cws_threadpool *pool) { + pthread_mutex_lock(&pool->lock); + pool->shutdown = 1; + pthread_cond_broadcast(&pool->notify); + pthread_mutex_unlock(&pool->lock); + + for (size_t i = 0; i < pool->threads_num; ++i) { + pthread_join(pool->threads[i], NULL); + } -cws_server_ret cws_threadpool_destroy(cws_threadpool *pool) { pthread_mutex_destroy(&pool->lock); - pthread_cond_destroy(&pool->cond); + pthread_cond_destroy(&pool->notify); - return CWS_SERVER_OK; + mcl_queue_free(pool->queue); + free(pool->threads); + free(pool); }