From 1eed3ef40a45f5b856c252aaa234acd302364901 Mon Sep 17 00:00:00 2001 From: staylightblow8 Date: Mon, 26 Feb 2024 20:02:19 +0800 Subject: [PATCH] add heartbeat to ws_thread Signed-off-by: staylightblow8 --- src/ws_thread.c | 55 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/src/ws_thread.c b/src/ws_thread.c index 7d88b47de..76dea7a98 100644 --- a/src/ws_thread.c +++ b/src/ws_thread.c @@ -32,6 +32,7 @@ static struct event_base *ws_base; static struct evdns_base *ws_dnsbase; +static struct event *ws_heartbeat_ev; static char *fixed_key = "dGhlIHNhbXBsZSBub25jZQ=="; static char *fixed_accept = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="; static bool upgraded = false; @@ -217,6 +218,14 @@ ws_request(struct bufferevent* b_ws) evbuffer_add_printf(out, "\r\n"); } +static void +ws_heartbeat_cb(evutil_socket_t fd, short event, void *arg) +{ + struct bufferevent *b_ws = (struct bufferevent *)arg; + struct evbuffer *out = bufferevent_get_output(b_ws); + ws_send(out, "ping", 4); +} + static void ws_read_cb(struct bufferevent *b_ws, void *ctx) { @@ -250,6 +259,16 @@ ws_read_cb(struct bufferevent *b_ws, void *ctx) config_get_config()->gw_id); ws_send(bufferevent_get_output(b_ws), jdata, strlen(jdata)); debug(LOG_DEBUG, "send connect data %s\n", jdata); + + // add timer to send heartbeat + if (ws_heartbeat_ev != NULL) { + event_free(ws_heartbeat_ev); + } + struct timeval tv; + tv.tv_sec = 60; + tv.tv_usec = 0; + ws_heartbeat_ev = event_new(ws_base, -1, EV_PERSIST, ws_heartbeat_cb, b_ws); + event_add(ev, &tv); } else { ws_receive(data, pos); } @@ -265,19 +284,26 @@ wsevent_connection_cb(struct bufferevent* b_ws, short events, void *ctx){ debug(LOG_ERR, "ws connection error: %s\n", strerror(errno)); sleep(1); // reconnect ws server - bufferevent_free(b_ws); + if (b_ws != NULL) { + bufferevent_free(b_ws); + } b_ws = bufferevent_socket_new(ws_base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS); bufferevent_setcb(b_ws, ws_read_cb, NULL, wsevent_connection_cb, NULL); bufferevent_enable(b_ws, EV_READ|EV_WRITE); t_auth_serv *auth_server = get_auth_server(); + int ret = 0; if (!auth_server->authserv_use_ssl) { - bufferevent_socket_connect_hostname(b_ws, ws_dnsbase, AF_INET, + ret = bufferevent_socket_connect_hostname(b_ws, ws_dnsbase, AF_INET, auth_server->authserv_hostname, auth_server->authserv_http_port); } else { - bufferevent_socket_connect_hostname(b_ws, ws_dnsbase, AF_INET, + ret = bufferevent_socket_connect_hostname(b_ws, ws_dnsbase, AF_INET, auth_server->authserv_hostname, auth_server->authserv_ssl_port); } upgraded = false; + if (ret < 0) { + debug(LOG_ERR, "ws connection error: %s\n", strerror(errno)); + bufferevent_free(b_ws); + } } } @@ -292,21 +318,34 @@ start_ws_thread(void *arg) ws_dnsbase = evdns_base_new(ws_base, 1); struct bufferevent *ws_bev = bufferevent_socket_new(ws_base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS); - + if (ws_bev == NULL) { + debug(LOG_ERR, "create bufferevent failed\n"); + goto ERR; + } bufferevent_setcb(ws_bev, ws_read_cb, NULL, wsevent_connection_cb, NULL); bufferevent_enable(ws_bev, EV_READ|EV_WRITE); + int ret = 0 if (!auth_server->authserv_use_ssl) { - bufferevent_socket_connect_hostname(ws_bev, ws_dnsbase, AF_INET, + ret = bufferevent_socket_connect_hostname(ws_bev, ws_dnsbase, AF_INET, auth_server->authserv_hostname, auth_server->authserv_http_port); } else { - bufferevent_socket_connect_hostname(ws_bev, ws_dnsbase, AF_INET, + ret = bufferevent_socket_connect_hostname(ws_bev, ws_dnsbase, AF_INET, auth_server->authserv_hostname, auth_server->authserv_ssl_port); } - + + if (ret < 0) { + debug(LOG_ERR, "ws connection error: %s\n", strerror(errno)); + bufferevent_free(ws_bev); + goto ERR; + } + event_base_dispatch(ws_base); - event_base_free(ws_base); +ERR: + if (ws_base) event_base_free(ws_base); + if (ws_dnsbase) evdns_base_free(ws_dnsbase, 0); + if (ws_bev) bufferevent_free(ws_bev); return; }