refactor: worker struct and remove pipe

This commit is contained in:
2025-10-17 21:18:13 +02:00
parent 81bdd2acda
commit 01d29ff9f3
14 changed files with 139 additions and 93 deletions

View File

@@ -0,0 +1,7 @@
#include "utils/utils.h"
cws_server_ret cws_epoll_add(int epfd, int sockfd);
cws_server_ret cws_epoll_del(int epfd, int sockfd);
int cws_epoll_create_with_fd(int fd);

View File

@@ -5,7 +5,7 @@
#include <myclib/mysocket.h> #include <myclib/mysocket.h>
#include <signal.h> #include <signal.h>
#include "utils/config.h" #include "config/config.h"
#include "utils/utils.h" #include "utils/utils.h"
#include "worker.h" #include "worker.h"

View File

@@ -5,14 +5,13 @@
#include <pthread.h> #include <pthread.h>
#include <signal.h> #include <signal.h>
#include "../utils/config.h" #include "../config/config.h"
#include "../utils/utils.h" #include "../utils/utils.h"
extern volatile sig_atomic_t cws_server_run; extern volatile sig_atomic_t cws_server_run;
typedef struct cws_worker { typedef struct cws_worker {
int epfd; int epfd;
int pipefd[2];
size_t clients_num; size_t clients_num;
pthread_t thread; pthread_t thread;
cws_config_s *config; cws_config_s *config;
@@ -26,10 +25,6 @@ void *cws_worker_loop(void *arg);
void cws_server_close_client(int epfd, int client_fd); void cws_server_close_client(int epfd, int client_fd);
cws_server_ret cws_epoll_add(int epfd, int sockfd);
cws_server_ret cws_epoll_del(int epfd, int sockfd);
cws_server_ret cws_server_handle_client_data(int epfd, int client_fd); cws_server_ret cws_server_handle_client_data(int epfd, int client_fd);
#endif #endif

View File

@@ -1,6 +1,8 @@
#ifndef CWS_COLORS_H #ifndef CWS_COLORS_H
#define CWS_COLORS_H #define CWS_COLORS_H
#include <stdio.h>
/* ANSI color escape sequences */ /* ANSI color escape sequences */
#define RED "\033[31m" #define RED "\033[31m"
#define GREEN "\033[32m" #define GREEN "\033[32m"

View File

@@ -1,8 +1,8 @@
#ifndef CWS_UTILS_H #ifndef CWS_UTILS_H
#define CWS_UTILS_H #define CWS_UTILS_H
#include <myclib/mysocket.h>
#include <stdbool.h> #include <stdbool.h>
#include <sys/socket.h>
typedef enum cws_server_ret { typedef enum cws_server_ret {
CWS_SERVER_OK, CWS_SERVER_OK,

View File

@@ -1,4 +1,4 @@
#include "utils/config.h" #include "config/config.h"
#include <cyaml/cyaml.h> #include <cyaml/cyaml.h>
#include <stdio.h> #include <stdio.h>
@@ -52,6 +52,6 @@ cws_config_s *cws_config_init(void) {
void cws_config_free(cws_config_s *config) { void cws_config_free(cws_config_s *config) {
cyaml_err_t err = cyaml_free(&cyaml_config, &top_schema, config, 0); cyaml_err_t err = cyaml_free(&cyaml_config, &top_schema, config, 0);
if (err != CYAML_OK) { if (err != CYAML_OK) {
/* TODO: Handle */ return;
} }
} }

View File

@@ -134,8 +134,18 @@ static cws_server_ret http_send_resource(cws_http_s *request) {
size_t response_len = http_response_builder(&response, HTTP_OK, content_type, data, content_length); size_t response_len = http_response_builder(&response, HTTP_OK, content_type, data, content_length);
ssize_t sent = sock_writeall(request->sockfd, response, response_len); size_t total_sent = 0;
CWS_LOG_DEBUG("Sent %zu bytes", sent); while (total_sent < response_len) {
ssize_t sent = send(request->sockfd, response + total_sent, response_len - total_sent, MSG_NOSIGNAL);
if (sent < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
break;
}
total_sent += sent;
}
CWS_LOG_DEBUG("Sent %zd bytes", total_sent);
free(response); free(response);
free(data); free(data);
@@ -174,14 +184,20 @@ cws_http_s *cws_http_parse(string_s *request_str) {
return NULL; return NULL;
} }
char *request_copy = strdup(request_str->data);
if (request_copy == NULL) {
cws_http_free(request);
return NULL;
}
char *saveptr = NULL; char *saveptr = NULL;
char *pch = NULL; char *pch = NULL;
/* Parse HTTP method */ /* Parse HTTP method */
pch = strtok_r(request_str->data, " ", &saveptr); pch = strtok_r(request_copy, " ", &saveptr);
if (pch == NULL) { if (pch == NULL) {
cws_http_free(request); cws_http_free(request);
free(request_copy);
return NULL; return NULL;
} }
CWS_LOG_DEBUG("method: %s", pch); CWS_LOG_DEBUG("method: %s", pch);
@@ -189,7 +205,7 @@ cws_http_s *cws_http_parse(string_s *request_str) {
request->method = http_parse_method(pch); request->method = http_parse_method(pch);
if (request->method == HTTP_UNKNOWN) { if (request->method == HTTP_UNKNOWN) {
cws_http_free(request); cws_http_free(request);
free(request_copy);
return NULL; return NULL;
} }
@@ -197,17 +213,16 @@ cws_http_s *cws_http_parse(string_s *request_str) {
pch = strtok_r(NULL, " ", &saveptr); pch = strtok_r(NULL, " ", &saveptr);
if (pch == NULL) { if (pch == NULL) {
cws_http_free(request); cws_http_free(request);
free(request_copy);
return NULL; return NULL;
} }
string_append(request->location, pch); string_append(request->location, pch);
/* Adjust location path */ /* Adjust location path */
string_append(request->location_path, "www/"); string_append(request->location_path, "www");
CWS_LOG_DEBUG("location path: %s", request->location_path->data);
if (strcmp(request->location->data, "/") == 0) { if (strcmp(request->location->data, "/") == 0) {
string_append(request->location_path, "index.html"); string_append(request->location_path, "/index.html");
} else { } else {
string_append(request->location_path, request->location->data); string_append(request->location_path, request->location->data);
} }
@@ -216,6 +231,7 @@ cws_http_s *cws_http_parse(string_s *request_str) {
/* Parse HTTP version */ /* Parse HTTP version */
pch = strtok_r(NULL, " \r\n", &saveptr); pch = strtok_r(NULL, " \r\n", &saveptr);
if (pch == NULL) { if (pch == NULL) {
free(request_copy);
cws_http_free(request); cws_http_free(request);
return NULL; return NULL;
@@ -254,6 +270,8 @@ cws_http_s *cws_http_parse(string_s *request_str) {
hm_set(request->headers, hk, hv); hm_set(request->headers, hk, hv);
} }
free(request_copy);
/* TODO: Parse body */ /* TODO: Parse body */
return request; return request;
@@ -264,6 +282,7 @@ static size_t http_header_len(char *status_code, char *content_type, size_t body
"HTTP/1.1 %s\r\n" "HTTP/1.1 %s\r\n"
"Content-Type: %s\r\n" "Content-Type: %s\r\n"
"Content-Length: %zu\r\n" "Content-Length: %zu\r\n"
"Connection: close\r\n"
"\r\n", "\r\n",
status_code, content_type, body_len); status_code, content_type, body_len);
@@ -281,12 +300,15 @@ size_t http_response_builder(char **response, cws_http_status_e status, char *co
return 0; return 0;
} }
snprintf(*response, header_len + 1, "HTTP/1.1 %s\r\nContent-Type: %s\r\nContent-Length: %zu\r\n\r\n", status_code, content_type, body_len_bytes); snprintf(*response, header_len + 1, "HTTP/1.1 %s\r\nContent-Type: %s\r\nContent-Length: %zu\r\nConnection: close\r\n\r\n", status_code, content_type,
body_len_bytes);
// CWS_LOG_DEBUG("response: %s", *response);
/* Only append body if we have it */ /* Only append body if we have it */
if (body && body_len_bytes > 0) { if (body && body_len_bytes > 0) {
memcpy(*response + header_len, body, body_len_bytes); memcpy(*response + header_len, body, body_len_bytes);
} }
// CWS_LOG_DEBUG("response: %s", *response);
(*response)[total_len] = '\0'; (*response)[total_len] = '\0';

View File

@@ -6,8 +6,8 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include "config/config.h"
#include "server/server.h" #include "server/server.h"
#include "utils/config.h"
#include "utils/debug.h" #include "utils/debug.h"
void cws_signal_handler(int) { cws_server_run = 0; } void cws_signal_handler(int) { cws_server_run = 0; }

View File

@@ -1,3 +1,9 @@
server = files('main.c', 'server/server.c', 'server/worker.c') server = files('main.c')
server += files('utils/utils.c', 'utils/config.c') server += files(
'server/epoll_utils.c',
'server/server.c',
'server/worker.c',
)
server += files('config/config.c')
server += files('utils/utils.c')
server += files('http/http.c') server += files('http/http.c')

43
src/server/epoll_utils.c Normal file
View File

@@ -0,0 +1,43 @@
#include "server/epoll_utils.h"
#include <sys/epoll.h>
#include "utils/debug.h"
#include "utils/utils.h"
cws_server_ret cws_epoll_add(int epfd, int sockfd) {
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sockfd;
const int status = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
if (status != 0) {
CWS_LOG_ERROR("epoll_ctl_add()");
return CWS_SERVER_EPOLL_ADD_ERROR;
}
return CWS_SERVER_OK;
}
cws_server_ret cws_epoll_del(int epfd, int sockfd) {
const int status = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
if (status != 0) {
CWS_LOG_ERROR("epoll_ctl_del()");
return CWS_SERVER_EPOLL_DEL_ERROR;
}
return CWS_SERVER_OK;
}
int cws_epoll_create_with_fd(int fd) {
int epfd = epoll_create1(0);
if (epfd == -1) {
return -1;
}
if (cws_epoll_add(epfd, fd) != CWS_SERVER_OK) {
return -1;
}
return epfd;
}

View File

@@ -7,6 +7,7 @@
#include <sys/epoll.h> #include <sys/epoll.h>
#include <unistd.h> #include <unistd.h>
#include "server/epoll_utils.h"
#include "server/worker.h" #include "server/worker.h"
#include "utils/debug.h" #include "utils/debug.h"
#include "utils/utils.h" #include "utils/utils.h"
@@ -131,8 +132,9 @@ cws_server_ret cws_server_start(cws_server_s *server) {
continue; continue;
} }
/* Add client to worker */ /* Add client to a worker */
write(server->workers[workers_index]->pipefd[1], &client_fd, sizeof(int)); cws_fd_set_nonblocking(client_fd);
cws_epoll_add(server->workers[workers_index]->epfd, client_fd);
workers_index = (workers_index + 1) % CWS_WORKERS_NUM; workers_index = (workers_index + 1) % CWS_WORKERS_NUM;
} }
} }

View File

@@ -1,5 +1,6 @@
#include "server/worker.h" #include "server/worker.h"
#include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@@ -8,42 +9,36 @@
#include <unistd.h> #include <unistd.h>
#include "http/http.h" #include "http/http.h"
#include "utils/debug.h" #include "server/epoll_utils.h"
#include "utils/utils.h"
static int cws_worker_setup_epoll(cws_worker_s *worker) { static cws_server_ret cws_worker_setup_epoll(cws_worker_s *worker) {
worker->epfd = epoll_create1(0); worker->epfd = epoll_create1(0);
if (worker->epfd == -1) { if (worker->epfd == -1) {
return -1; return CWS_SERVER_EPOLL_CREATE_ERROR;
} }
cws_server_ret ret; return CWS_SERVER_OK;
ret = cws_fd_set_nonblocking(worker->pipefd[0]);
if (ret != CWS_SERVER_OK) {
sock_close(worker->epfd);
return -1;
}
ret = cws_epoll_add(worker->epfd, worker->pipefd[0]);
if (ret != CWS_SERVER_OK) {
sock_close(worker->epfd);
sock_close(worker->pipefd[0]);
}
return 0;
} }
static int cws_read_data(int sockfd, string_s *str) { static ssize_t cws_read_data(int sockfd, string_s *str) {
char tmp[4096]; char tmp[4096] = {0};
memset(tmp, 0, sizeof tmp);
int bytes = sock_readall(sockfd, tmp, sizeof(tmp)); ssize_t n = recv(sockfd, tmp, sizeof tmp, MSG_PEEK);
if (bytes < 0) { if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0;
}
return -1; return -1;
} }
string_append(str, tmp);
return bytes; if (n == 0) {
/* Connection closed */
return -1;
}
string_append(str, tmp);
return n;
} }
cws_worker_s **cws_worker_new(size_t workers_num, cws_config_s *config) { cws_worker_s **cws_worker_new(size_t workers_num, cws_config_s *config) {
@@ -67,8 +62,7 @@ cws_worker_s **cws_worker_new(size_t workers_num, cws_config_s *config) {
workers[i]->config = config; workers[i]->config = config;
/* Communicate though threads */ /* Setup worker's epoll */
pipe(workers[i]->pipefd);
int ret = cws_worker_setup_epoll(workers[i]); int ret = cws_worker_setup_epoll(workers[i]);
if (ret == -1) { if (ret == -1) {
for (size_t j = 0; j < i; ++j) { for (size_t j = 0; j < i; ++j) {
@@ -118,58 +112,32 @@ void *cws_worker_loop(void *arg) {
} }
for (int i = 0; i < nfds; ++i) { for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == worker->pipefd[0]) { /* Handle client's data */
/* Handle new client */
int client_fd;
read(worker->pipefd[0], &client_fd, sizeof(int));
cws_fd_set_nonblocking(client_fd);
cws_epoll_add(worker->epfd, client_fd);
} else {
/* Handle client data */
int client_fd = events[i].data.fd; int client_fd = events[i].data.fd;
cws_server_handle_client_data(worker->epfd, client_fd); cws_server_handle_client_data(worker->epfd, client_fd);
} }
} }
}
return NULL; return NULL;
} }
void cws_server_close_client(int epfd, int client_fd) { void cws_server_close_client(int epfd, int client_fd) {
cws_epoll_del(epfd, client_fd); cws_epoll_del(epfd, client_fd);
sock_close(client_fd); close(client_fd);
}
cws_server_ret cws_epoll_add(int epfd, int sockfd) {
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sockfd;
const int status = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
if (status != 0) {
CWS_LOG_ERROR("epoll_ctl_add()");
return CWS_SERVER_EPOLL_ADD_ERROR;
}
return CWS_SERVER_OK;
}
cws_server_ret cws_epoll_del(int epfd, int sockfd) {
const int status = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
if (status != 0) {
CWS_LOG_ERROR("epoll_ctl_del()");
return CWS_SERVER_EPOLL_DEL_ERROR;
}
return CWS_SERVER_OK;
} }
cws_server_ret cws_server_handle_client_data(int epfd, int client_fd) { cws_server_ret cws_server_handle_client_data(int epfd, int client_fd) {
string_s *data = string_new("", 4096); string_s *data = string_new("", 4096);
size_t total_bytes = cws_read_data(client_fd, data); ssize_t total_bytes = cws_read_data(client_fd, data);
if (total_bytes == 0) {
/* Request not completed yet */
string_free(data);
return CWS_SERVER_OK;
}
if (total_bytes <= 0) { if (total_bytes <= 0) {
/* Something happened, close connection */
string_free(data); string_free(data);
cws_server_close_client(epfd, client_fd); cws_server_close_client(epfd, client_fd);
@@ -180,7 +148,6 @@ cws_server_ret cws_server_handle_client_data(int epfd, int client_fd) {
string_free(data); string_free(data);
if (request == NULL) { if (request == NULL) {
cws_server_close_client(epfd, client_fd); cws_server_close_client(epfd, client_fd);
return CWS_SERVER_HTTP_PARSE_ERROR; return CWS_SERVER_HTTP_PARSE_ERROR;
} }
request->sockfd = client_fd; request->sockfd = client_fd;

View File

@@ -1,6 +1,8 @@
#include "utils/utils.h" #include "utils/utils.h"
#include <arpa/inet.h>
#include <fcntl.h> #include <fcntl.h>
#include <netinet/in.h>
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@@ -69,6 +71,6 @@ bool my_int_equal_fn(const void *a, const void *b) {
void my_int_free_key_fn(void *key) { void my_int_free_key_fn(void *key) {
int fd = *(int *)key; int fd = *(int *)key;
sock_close(fd); close(fd);
free(key); free(key);
} }