From ce91d8f01c6f230f7a8f232a0d33ae90bd7be6f5 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Fri, 8 Apr 2022 12:07:01 -0700 Subject: [PATCH 1/9] Skeleton of test thread --- picoquic.sln | 10 + thread_tester/thread_test.c | 209 ++++++++++++++++++++ thread_tester/thread_tester.vcxproj | 171 ++++++++++++++++ thread_tester/thread_tester.vcxproj.filters | 22 +++ 4 files changed, 412 insertions(+) create mode 100644 thread_tester/thread_test.c create mode 100644 thread_tester/thread_tester.vcxproj create mode 100644 thread_tester/thread_tester.vcxproj.filters diff --git a/picoquic.sln b/picoquic.sln index 96d335709..c7b57094d 100644 --- a/picoquic.sln +++ b/picoquic.sln @@ -71,6 +71,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "PerfAndStressTest", "PerfAn {998765EE-64DF-49C1-8471-A79E2DA7CD21} = {998765EE-64DF-49C1-8471-A79E2DA7CD21} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "thread_tester", "thread_tester\thread_tester.vcxproj", "{57341DB3-B498-4E7B-9CB2-E897A562C15F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -175,6 +177,14 @@ Global {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x64.Build.0 = Release|x64 {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x86.ActiveCfg = Release|Win32 {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x86.Build.0 = Release|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x64.ActiveCfg = Debug|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x64.Build.0 = Debug|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x86.ActiveCfg = Debug|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x86.Build.0 = Debug|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x64.ActiveCfg = Release|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x64.Build.0 = Release|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x86.ActiveCfg = Release|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x86.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c new file mode 100644 index 000000000..59136e5d9 --- /dev/null +++ b/thread_tester/thread_test.c @@ -0,0 +1,209 @@ +#ifdef _WINDOWS +#define WIN32_LEAN_AND_MEAN +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef SOCKET_TYPE +#define SOCKET_TYPE SOCKET +#endif +#ifndef SOCKET_CLOSE +#define SOCKET_CLOSE(x) closesocket(x) +#endif +#ifndef WSA_LAST_ERROR +#define WSA_LAST_ERROR(x) WSAGetLastError() +#endif +#ifndef socklen_t +#define socklen_t int +#endif +#else +#include +#include +#include +#include +#include +#include +#include + +#ifndef __USE_XOPEN2K +#define __USE_XOPEN2K +#endif +#ifndef __USE_POSIX +#define __USE_POSIX +#endif +#include +#include +#include +#include +#include + +#ifndef SOCKET_TYPE +#define SOCKET_TYPE int +#endif +#ifndef INVALID_SOCKET +#define INVALID_SOCKET -1 +#endif +#ifndef SOCKET_CLOSE +#define SOCKET_CLOSE(x) close(x) +#endif +#ifndef WSA_LAST_ERROR +#define WSA_LAST_ERROR(x) ((long)(x)) +#endif +#endif + +#include "picosocks.h" +#include "picoquic.h" +#include "picoquic_internal.h" +/* #include "picoquic_packet_loop.h" */ +/* #include "picoquic_unified_log.h" */ + +/* Thread context, passed as parameter when starting network thread */ +#define NB_THREAD_TEST_MSG 100 +#define NB_THREAD_TEST_EVENT 100 +#define ERROR_THREAD_TEST_MESSAGE 1 +#define ERROR_THREAD_TEST_DUPLICATE 2 +typedef struct st_thread_test_context_t { + int should_stop; + struct sockaddr_in network_thread_addr; + int network_thread_addr_len; + uint64_t msg_sent_at[NB_THREAD_TEST_MSG]; + uint64_t msg_recv_at[NB_THREAD_TEST_MSG]; + uint64_t event_sent_at[NB_THREAD_TEST_EVENT]; + uint64_t event_recv_at[NB_THREAD_TEST_EVENT]; + int event_sent_count; + int event_seen_count; + int message_error; + uint64_t message_number_error; + uint64_t network_exit_time; + int message_loop_error; + uint64_t message_loop_error_index; + uint64_t message_loop_exit_time; + + +} thread_test_context_t; + +/* Wakeup function */ +void network_wake_up() +{ +} + +/* Network thread */ +DWORD WINAPI network_thread(LPVOID lpParam) +{ + thread_test_context_t* ctx = (thread_test_context_t*)lpParam; + SOCKET_TYPE n_socket; + uint64_t current_time = 0; + uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; + /* Create a socket */ + /* Create an event */ + + /* Loop on wait for socket or event */ + while (!ctx->should_stop) { + int receive_ready = 0; + uint64_t message_number = 0; + /* wait for socket or event */ + /* get time */ + /* if event received */ + while (ctx->event_seen_count < ctx->event_sent_count) { + ctx->event_recv_at[ctx->event_seen_count] = current_time; + ctx->event_seen_count++; + } + /* if receive ready */ + if (receive_ready) { + /* On windows, receive async */ + /* On linux, receive socket message */ + /* Get the new time */ + current_time = picoquic_current_time(); + /* Get the message number */ + message_number = PICOPARSE_64(buffer); + if (message_number > NB_THREAD_TEST_MSG) { + DBG_PRINTF("Unexpected message number: %" PRIx64, message_number); + ctx->message_error = ERROR_THREAD_TEST_MESSAGE; + ctx->message_number_error = message_number; + break; + } + else if (ctx->msg_recv_at[message_number] != 0) { + DBG_PRINTF("Unexpected message number: %" PRIx64 ", first: %" PRIu64 ", then: %" PRIu64, + message_number, ctx->msg_recv_at[message_number], current_time); + ctx->message_error = ERROR_THREAD_TEST_DUPLICATE; + ctx->message_number_error = message_number; + break; + } + else { + ctx->msg_recv_at[message_number] = current_time; + } + } + } + ctx->should_stop = 1; + ctx->network_exit_time = current_time; + /* Close the socket */ + /* Close the event handle if needed */ + return 0; +} + +/* Network loop thread -- socket only */ +DWORD WINAPI network_loop_thread(LPVOID lpParam) +{ + thread_test_context_t* ctx = (thread_test_context_t*)lpParam; + SOCKET_TYPE l_socket; + uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; + uint64_t current_time; + + memset(buffer, 0, sizeof(buffer)); + /* Create a socket */ + /* Loop on send to socket */ + for (uint64_t i = 0; i < NB_THREAD_TEST_MSG && !ctx->should_stop; i++) + { + int ret = 0; + /* Wait some amount of time */ + sleep(0); + current_time = picoquic_current_time(); + /* send the message */ + picoformat_64(i, buffer); + ret = (int)sendto(l_socket, buffer, PICOQUIC_MAX_PACKET_SIZE, + 0, (struct sockaddr*)&ctx->network_thread_addr, ctx->network_thread_addr_len); + if (ret != PICOQUIC_MAX_PACKET_SIZE) { + /* Error. Document and exit */ + DBG_PRINTF("Network loop returns %d (0x%x)", ret, ret); + ctx->message_loop_error = ret; + ctx->message_loop_error_index = i; + break; + } + else { + ctx->msg_sent_at[i] = current_time; + } + } + ctx->message_loop_exit_time = picoquic_current_time(); + /* Close the socket */ + /* exit the thread */ +} + +/* Event loop thread -- event only */ +DWORD WINAPI event_loop_thread(LPVOID lpParam) +{ + thread_test_context_t* ctx = (thread_test_context_t*)lpParam; + SOCKET_TYPE n_socket; + /* Create a socket */ + /* Loop on send to socket */ + /* Close the socket */ + /* exit the thread */ +} + +int main(int argc, char ** argv) +{ +#ifdef _WINDOWS + WSADATA wsaData = { 0 }; + (void)WSA_START(MAKEWORD(2, 2), &wsaData); +#endif + + printf("testing the thread execution\n"); + printf("thread1: backgroud thread, listens to a socket and to interrupts."); + printf("thread2: backgroud thread, Wakes up the main thread at random intervals."); + printf("thread3: network thread, sends at random intervals."); +} diff --git a/thread_tester/thread_tester.vcxproj b/thread_tester/thread_tester.vcxproj new file mode 100644 index 000000000..020c24dc6 --- /dev/null +++ b/thread_tester/thread_tester.vcxproj @@ -0,0 +1,171 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + + + + 16.0 + Win32Proj + {57341db3-b498-4e7b-9cb2-e897a562c15f} + threadtester + 10.0.18362.0 + + + + Application + true + v141 + Unicode + + + Application + false + v141 + true + Unicode + + + Application + true + v141 + Unicode + + + Application + false + v141 + true + Unicode + + + + + + + + + + + + + + + + + + + + + true + thread_test + + + false + thread_test + + + true + thread_test + + + false + thread_test + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;_WINDOWS;%(PreprocessorDefinitions) + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + + + + + Console + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;_WINDOWS;%(PreprocessorDefinitions) + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + + + + + Console + true + true + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) + + + + + Level3 + true + _DEBUG;_CONSOLE;_WINDOWS;_WINDOWS64;%(PreprocessorDefinitions) + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + + + + + Console + true + $(OutDir);$(SolutionDir)..\picotls\picotlsvs\$(Platform)\$(Configuration)\;$(OPENSSL64DIR);$(OPENSSL64DIR)\lib + picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;_WINDOWS;_WINDOWS64;%(PreprocessorDefinitions) + true + $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + + + + + Console + true + true + true + $(OutDir);$(SolutionDir)..\picotls\picotlsvs\$(Platform)\$(Configuration)\;$(OPENSSL64DIR);$(OPENSSL64DIR)\lib + picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) + + + + + + \ No newline at end of file diff --git a/thread_tester/thread_tester.vcxproj.filters b/thread_tester/thread_tester.vcxproj.filters new file mode 100644 index 000000000..dfc2123df --- /dev/null +++ b/thread_tester/thread_tester.vcxproj.filters @@ -0,0 +1,22 @@ + + + + + {4FC737F1-C7A5-4376-A066-2A32D752A2FF} + cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx + + + {93995380-89BD-4b04-88EB-625FBE52EBFB} + h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd + + + {67DA6AB6-F800-4c08-8B7A-83BB121AAD01} + rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms + + + + + Source Files + + + \ No newline at end of file From 4be6b6590cf8d0edf870885b0c1139cfb805aa7c Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Fri, 8 Apr 2022 18:58:40 -0700 Subject: [PATCH 2/9] More tests --- thread_tester/thread_test.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index 59136e5d9..d8c86746b 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -69,6 +69,10 @@ #define ERROR_THREAD_TEST_MESSAGE 1 #define ERROR_THREAD_TEST_DUPLICATE 2 typedef struct st_thread_test_context_t { +#ifdef _WINDOWS + +#else +#endif int should_stop; struct sockaddr_in network_thread_addr; int network_thread_addr_len; @@ -97,7 +101,11 @@ void network_wake_up() DWORD WINAPI network_thread(LPVOID lpParam) { thread_test_context_t* ctx = (thread_test_context_t*)lpParam; +#ifdef _WINDOWS + picoquic_recvmsg_async_ctx_t* recv_ctx; +#else SOCKET_TYPE n_socket; +#endif uint64_t current_time = 0; uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; /* Create a socket */ @@ -108,6 +116,10 @@ DWORD WINAPI network_thread(LPVOID lpParam) int receive_ready = 0; uint64_t message_number = 0; /* wait for socket or event */ +#ifdef _WINDOWS + DWORD ret_event = WSAWaitForMultipleEvents(nb_sockets, events, FALSE, delta_t_ms, TRUE); +#else +#endif /* get time */ /* if event received */ while (ctx->event_seen_count < ctx->event_sent_count) { From e2f20b943392b8b1f9d0e233bff6925cc6f6cbe7 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Thu, 14 Apr 2022 12:56:19 -0700 Subject: [PATCH 3/9] Windows version of threads works. --- picoquic/picoquic_utils.h | 1 + picoquic/util.c | 14 + thread_tester/thread_test.c | 410 ++++++++++++++++++++++++---- thread_tester/thread_tester.vcxproj | 2 +- 4 files changed, 379 insertions(+), 48 deletions(-) diff --git a/picoquic/picoquic_utils.h b/picoquic/picoquic_utils.h index 9777e0cb1..dc9fb461f 100644 --- a/picoquic/picoquic_utils.h +++ b/picoquic/picoquic_utils.h @@ -205,6 +205,7 @@ typedef struct st_picoquic_event_t { #endif int picoquic_create_thread(picoquic_thread_t* thread, picoquic_thread_fn thread_fn, void* arg); +int picoquic_wait_thread(picoquic_thread_t thread); void picoquic_delete_thread(picoquic_thread_t* thread); int picoquic_create_mutex(picoquic_mutex_t* mutex); diff --git a/picoquic/util.c b/picoquic/util.c index 39eeacad3..2dc261b1d 100644 --- a/picoquic/util.c +++ b/picoquic/util.c @@ -960,6 +960,19 @@ int picoquic_create_thread(picoquic_thread_t * thread, picoquic_thread_fn thread return ret; } +int picoquic_wait_thread(picoquic_thread_t thread) +{ + int ret = 0; +#ifdef _WINDOWS + if (WaitForSingleObject(thread, INFINITE) == WAIT_TIMEOUT) { + ret = -1; + } +#else + ret = pthread_join(thread, NULL); +#endif + return 0; +} + void picoquic_delete_thread(picoquic_thread_t * thread) { #ifdef _WINDOWS @@ -978,6 +991,7 @@ void picoquic_delete_thread(picoquic_thread_t * thread) #endif } + int picoquic_create_mutex(picoquic_mutex_t * mutex) { #ifdef _WINDOWS diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index d8c86746b..2423017f2 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -60,6 +60,7 @@ #include "picosocks.h" #include "picoquic.h" #include "picoquic_internal.h" +#include "picoquic_utils.h" /* #include "picoquic_packet_loop.h" */ /* #include "picoquic_unified_log.h" */ @@ -70,21 +71,28 @@ #define ERROR_THREAD_TEST_DUPLICATE 2 typedef struct st_thread_test_context_t { #ifdef _WINDOWS - + HANDLE network_thread; + HANDLE network_event; #else #endif - int should_stop; - struct sockaddr_in network_thread_addr; + int is_ready; + volatile int should_stop; + uint16_t server_port; + struct sockaddr_storage network_thread_addr; int network_thread_addr_len; + int msg_recv_count; uint64_t msg_sent_at[NB_THREAD_TEST_MSG]; uint64_t msg_recv_at[NB_THREAD_TEST_MSG]; uint64_t event_sent_at[NB_THREAD_TEST_EVENT]; uint64_t event_recv_at[NB_THREAD_TEST_EVENT]; - int event_sent_count; + int timeout_count; + int event_wake_count; + volatile int event_sent_count; int event_seen_count; int message_error; uint64_t message_number_error; uint64_t network_exit_time; + volatile int message_loop_sent; int message_loop_error; uint64_t message_loop_error_index; uint64_t message_loop_exit_time; @@ -93,34 +101,115 @@ typedef struct st_thread_test_context_t { } thread_test_context_t; /* Wakeup function */ -void network_wake_up() +int network_wake_up(thread_test_context_t* ctx) { + int ret = 0; +#ifdef _WINDOWS + if (SetEvent(ctx->network_event) == 0) { + DWORD err = WSAGetLastError(); + DBG_PRINTF("Set network event fails, error 0x%x", err); + ret = (int)err; + } +#else +#endif + return ret; } /* Network thread */ +#ifdef _WINDOWS DWORD WINAPI network_thread(LPVOID lpParam) +#else +int network_thread(void * lpParam) +#endif { + uint64_t current_time = 0; + uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; thread_test_context_t* ctx = (thread_test_context_t*)lpParam; + int server_af = ctx->network_thread_addr.ss_family; #ifdef _WINDOWS + DWORD ret = 0; picoquic_recvmsg_async_ctx_t* recv_ctx; + HANDLE events[2] = { NULL, NULL }; + /* Create an asynchronous socket */ + recv_ctx = picoquic_create_async_socket(server_af, 0, 0); + if (recv_ctx == NULL) { + ret = GetLastError(); + DBG_PRINTF("Cannot create async socket in AF = %d, err = 0x%x", server_af, ret); + } + if (ret == 0) { + /* Bind to specified port */ + ret = picoquic_bind_to_port(recv_ctx->fd, server_af, ctx->server_port); + if (ret != 0) { + DBG_PRINTF("Cannot bind socket to port %d, err = %d (0x%x)", ctx->server_port, ret, ret); + } + } + if (ret == 0) { + /* Create an event */ + ctx->network_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (ctx->network_event == NULL) { + ret = GetLastError(); + DBG_PRINTF("Cannot create event, err = %d (0x%x)", ret, ret); + } + else { + /* Set the event list */ + events[0] = ctx->network_event; + events[1] = recv_ctx->overlap.hEvent; + } + } + if (ret == 0) { + /* Start receiving */ + ret = picoquic_recvmsg_async_start(recv_ctx); + if (ret != 0) { + DBG_PRINTF("Cannot start recv on socket, err = %d (0x%x)", ret, ret); + } + } #else SOCKET_TYPE n_socket; -#endif - uint64_t current_time = 0; - uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; - /* Create a socket */ /* Create an event */ + /* Bind to specified port */ + ret = picoquic_bind_to_port(); +#endif + printf("Starting network thread, state=%x.\n", ret); + ctx->is_ready = 1; /* Loop on wait for socket or event */ - while (!ctx->should_stop) { + while (!ctx->should_stop && ret == 0) { + int event_rank = -1; int receive_ready = 0; uint64_t message_number = 0; /* wait for socket or event */ #ifdef _WINDOWS - DWORD ret_event = WSAWaitForMultipleEvents(nb_sockets, events, FALSE, delta_t_ms, TRUE); + DWORD ret_event = WSAWaitForMultipleEvents(2, events, FALSE, 1000, TRUE); + if (ret_event == WSA_WAIT_FAILED) { + ret = WSAGetLastError(); + DBG_PRINTF("WSAWaitForMultipleEvents fails, error 0x%x", ret); + break; + } + else if (ret_event == WSA_WAIT_TIMEOUT) { + ctx->timeout_count++; +#if 1 + receive_ready = 1; +#endif + } + else if (ret_event >= WSA_WAIT_EVENT_0) { + event_rank = ret_event - WSA_WAIT_EVENT_0; + if (event_rank > 0) { + ctx->msg_recv_count++; + receive_ready = 1; + } else { + /* Event number 0 is the wake signal */ + ctx->event_wake_count++; + if (ResetEvent(ctx->network_event) == 0) { + ret = GetLastError(); + DBG_PRINTF("Cannot reset network event, error 0x%x", ret); + break; + } + } + } #else #endif /* get time */ + current_time = picoquic_current_time(); /* if event received */ while (ctx->event_seen_count < ctx->event_sent_count) { ctx->event_recv_at[ctx->event_seen_count] = current_time; @@ -128,94 +217,321 @@ DWORD WINAPI network_thread(LPVOID lpParam) } /* if receive ready */ if (receive_ready) { + int received_length = 0; +#ifdef _WINDOWS /* On windows, receive async */ - /* On linux, receive socket message */ - /* Get the new time */ - current_time = picoquic_current_time(); - /* Get the message number */ - message_number = PICOPARSE_64(buffer); - if (message_number > NB_THREAD_TEST_MSG) { - DBG_PRINTF("Unexpected message number: %" PRIx64, message_number); - ctx->message_error = ERROR_THREAD_TEST_MESSAGE; - ctx->message_number_error = message_number; + ret = picoquic_recvmsg_async_finish(recv_ctx); + if (ret != 0) { + DBG_PRINTF("%s", "Cannot finish async recv"); break; } - else if (ctx->msg_recv_at[message_number] != 0) { - DBG_PRINTF("Unexpected message number: %" PRIx64 ", first: %" PRIu64 ", then: %" PRIu64, - message_number, ctx->msg_recv_at[message_number], current_time); - ctx->message_error = ERROR_THREAD_TEST_DUPLICATE; - ctx->message_number_error = message_number; + else if (ResetEvent(recv_ctx->overlap.hEvent) == 0) { + ret = GetLastError(); + DBG_PRINTF("Cannot reset socket event, error 0x%x", ret); break; } else { - ctx->msg_recv_at[message_number] = current_time; - } + received_length = (int)recv_ctx->bytes_recv; + } +#else + /* On linux, receive socket message */ +#endif + if (ret == 0) { + if (received_length >= 8) { + /* Get the message number */ + message_number = PICOPARSE_64(recv_ctx->recv_buffer); + if (message_number > NB_THREAD_TEST_MSG) { + DBG_PRINTF("Unexpected message number: %" PRIx64, message_number); + ctx->message_error = ERROR_THREAD_TEST_MESSAGE; + ctx->message_number_error = message_number; + break; + } + else if (ctx->msg_recv_at[message_number] != 0) { + DBG_PRINTF("Unexpected message number: %" PRIx64 ", first: %" PRIu64 ", then: %" PRIu64, + message_number, ctx->msg_recv_at[message_number], current_time); + ctx->message_error = ERROR_THREAD_TEST_DUPLICATE; + ctx->message_number_error = message_number; + break; + } + else { + ctx->msg_recv_at[message_number] = current_time; + } + } + else { + DBG_PRINTF("Message too sort, nb_bytes = %d", received_length); + } + } + if (ret == 0) { + /* Start receiving */ + ret = picoquic_recvmsg_async_start(recv_ctx); + if (ret != 0) { + DBG_PRINTF("Cannot start recv on socket, err = %d (0x%x)", ret, ret); + } + } } } - ctx->should_stop = 1; ctx->network_exit_time = current_time; + printf("Network thread exits.\n"); +#ifdef _WINDOWS + /* Close the socket */ + if (recv_ctx != NULL) { + picoquic_delete_async_socket(recv_ctx); + recv_ctx = NULL; + } + /* Close the event handle */ + if (ctx->network_event != NULL) { + CloseHandle(ctx->network_event); + ctx->network_event = NULL; + } +#else /* Close the socket */ - /* Close the event handle if needed */ - return 0; +#endif + return ret; } /* Network loop thread -- socket only */ -DWORD WINAPI network_loop_thread(LPVOID lpParam) +#ifdef _WINDOWS +DWORD WINAPI network_load_thread(LPVOID lpParam) +#else +int network_loop_thread(void* lpParam) +#endif { + DWORD ret = 0; thread_test_context_t* ctx = (thread_test_context_t*)lpParam; SOCKET_TYPE l_socket; uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; uint64_t current_time; + printf("Starting load thread.\n"); + memset(buffer, 0, sizeof(buffer)); /* Create a socket */ + if ((l_socket = socket(ctx->network_thread_addr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) == INVALID_SOCKET) + { + DBG_PRINTF("Cannot set socket (af=%d)\n", ctx->network_thread_addr.ss_family); + } /* Loop on send to socket */ - for (uint64_t i = 0; i < NB_THREAD_TEST_MSG && !ctx->should_stop; i++) + for (uint64_t i = 0; i < NB_THREAD_TEST_MSG && !ctx->should_stop && ret == 0; i++) { int ret = 0; /* Wait some amount of time */ - sleep(0); + Sleep(1 + ((int)(i%7))*3); current_time = picoquic_current_time(); /* send the message */ - picoformat_64(i, buffer); - ret = (int)sendto(l_socket, buffer, PICOQUIC_MAX_PACKET_SIZE, + picoformat_64(buffer, i); + ret = (int)sendto(l_socket, buffer, 256, 0, (struct sockaddr*)&ctx->network_thread_addr, ctx->network_thread_addr_len); - if (ret != PICOQUIC_MAX_PACKET_SIZE) { + if (ret != 256) { /* Error. Document and exit */ - DBG_PRINTF("Network loop returns %d (0x%x)", ret, ret); + int err_ret = 0; +#ifdef _WINDOWS + err_ret = (int)WSAGetLastError(); +#else + err_ret = (int)errno; +#endif + DBG_PRINTF("Network loop returns %d (0x%x), %d (0x%x)", ret, ret, err_ret, err_ret); ctx->message_loop_error = ret; ctx->message_loop_error_index = i; break; } else { ctx->msg_sent_at[i] = current_time; + ctx->message_loop_sent++; } } + + + printf("load thread ends.\n"); + ctx->message_loop_exit_time = picoquic_current_time(); /* Close the socket */ + if (l_socket != INVALID_SOCKET) { + SOCKET_CLOSE(l_socket); + l_socket = INVALID_SOCKET; + } /* exit the thread */ + return ret; } /* Event loop thread -- event only */ DWORD WINAPI event_loop_thread(LPVOID lpParam) { + int ret = 0; thread_test_context_t* ctx = (thread_test_context_t*)lpParam; - SOCKET_TYPE n_socket; - /* Create a socket */ - /* Loop on send to socket */ - /* Close the socket */ - /* exit the thread */ + + printf("Starting event thread.\n"); + + for (int i = 0; i < NB_THREAD_TEST_EVENT && !ctx->should_stop && ret == 0; i++) + { + Sleep(5); + ctx->event_sent_at[i] = picoquic_current_time(); + ctx->event_sent_count++; + if ((ret = network_wake_up(ctx)) != 0) { + DBG_PRINTF("Network wake up returns 0x%x", ret); + break; + } + } + + printf("End event thread after %d events.\n", ctx->event_sent_count); + + return (DWORD)ret; } -int main(int argc, char ** argv) +int main(int argc, char** argv) { + int ret = 0; + thread_test_context_t ctx; + int is_name = 0; + picoquic_thread_t t_net = NULL; + picoquic_thread_t t_load = NULL; + picoquic_thread_t t_wake = NULL; + #ifdef _WINDOWS WSADATA wsaData = { 0 }; (void)WSA_START(MAKEWORD(2, 2), &wsaData); #endif - printf("testing the thread execution\n"); - printf("thread1: backgroud thread, listens to a socket and to interrupts."); - printf("thread2: backgroud thread, Wakes up the main thread at random intervals."); - printf("thread3: network thread, sends at random intervals."); + + debug_set_stream(stdout); + + memset(&ctx, 0, sizeof(thread_test_context_t)); + ctx.server_port = 12345; + ret = picoquic_get_server_address("::1", ctx.server_port, + &ctx.network_thread_addr, &is_name); + ctx.network_thread_addr_len = sizeof(struct sockaddr_in6); + if (ret != 0) { + DBG_PRINTF("Cannot get server address, ret = %d (0x%x)", ret, ret); + } + + if (ret == 0) { + printf("thread1: backgroud thread, listens to a socket and to interrupts.\n"); + ret = picoquic_create_thread(&t_net, network_thread, &ctx); + if (ret != 0) { + DBG_PRINTF("Cannot create network thread, ret= 0x%x", ret); + } + else { + for (int i = 0; i < 2000 && !ctx.is_ready; i++) { + Sleep(1); + } + if (ctx.is_ready) { + printf("Network thread is ready.\n"); + } + else { + printf("Network thread not started in time.\n"); + } + } + } + + if (ret == 0) { + printf("thread2: backgroud thread, Wakes up the main thread at random intervals.\n"); + ret = picoquic_create_thread(&t_wake, event_loop_thread, &ctx); + if (ret != 0) { + DBG_PRINTF("Cannot create event loop thread, ret= 0x%x", ret); + } + } + if (ret == 0) { + printf("thread3: network thread, sends at random intervals.\n"); + ret = picoquic_create_thread(&t_load, network_load_thread, &ctx); + if (ret != 0) { + DBG_PRINTF("Cannot create event thread, ret= 0x%x", ret); + } + } + /* Wait first on the message load thread. */ + if (ret == 0) { + printf("Waiting for network load thread.\n"); + ret = picoquic_wait_thread(t_load); + if (ret != 0) { + DBG_PRINTF("Cannot close load thread, ret= 0x%x", ret); + } + else { + t_load = NULL; + } + } + /* Wait next on the message event thread. */ + if (ret == 0) { + printf("Waiting for wake thread.\n"); + ret = picoquic_wait_thread(t_wake); + if (ret != 0) { + DBG_PRINTF("Cannot close wake thread, ret= 0x%x", ret); + } + else { + t_wake = NULL; + } + } + printf("Load and wake thread are closed.\n"); + /* Wait explicitly for some time, to give the program a chance to receive data */ + Sleep(100); + /* Set the termination flag */ + ctx.should_stop = 1; + /* Wait for the network thread */ + if (ret == 0) { + printf("Waiting for net thread.\n"); + ret = picoquic_wait_thread(t_net); + if (ret != 0) { + DBG_PRINTF("Cannot close wake thread, ret= 0x%x", ret); + } + else { + t_net = NULL; + } + } + /* To do: statistics */ + if (ret == 0) { + uint64_t event_delay_min = UINT64_MAX; + uint64_t event_delay_max = 0; + printf("Timeouts: %d\n", ctx.timeout_count); + printf("Events sent: %d\n", ctx.event_sent_count); + printf("Events wake: %d\n", ctx.event_wake_count); + printf("Events received: %d\n", ctx.event_seen_count); + if (ctx.event_seen_count == ctx.event_sent_count && ctx.event_seen_count > 0) { + for (int i = 0; i < ctx.event_sent_count; i++) { + uint64_t delay = ctx.event_recv_at[i] - ctx.event_sent_at[i]; + if (delay < event_delay_min) { + event_delay_min = delay; + } + if (delay > event_delay_max) { + event_delay_max = delay; + } + } + printf("Events delay min: %" PRIu64 "us.\n", event_delay_min); + printf("Events delay max: %" PRIu64 "us.\n", event_delay_max); + } + } + + if (ret == 0) { + int msg_sent = 0; + int msg_recv = 0; + uint64_t msg_delay_min = UINT64_MAX; + uint64_t msg_delay_max = 0; + + for (int i = 0; i < NB_THREAD_TEST_MSG; i++) { + uint64_t delay; + if (ctx.msg_sent_at[i] == 0) { + continue; + } + msg_sent++; + if (ctx.msg_recv_at[i] == 0) { + continue; + } + msg_recv++; + delay = ctx.msg_recv_at[i] - ctx.msg_sent_at[i]; + if (delay < msg_delay_min) { + msg_delay_min = delay; + } + if (delay > msg_delay_max) { + msg_delay_max = delay; + } + } + + printf("Messages sent: %d\n", msg_sent); + printf("Messages received: %d\n", msg_recv); + printf("Losses: %d\n", msg_sent - msg_recv); + if (msg_recv > 0){ + printf("Message delay min: %" PRIu64 "us.\n", msg_delay_min); + printf("Message delay max: %" PRIu64 "us.\n", msg_delay_max); + } + } + + /* To do: clean up */ + + exit(ret); } diff --git a/thread_tester/thread_tester.vcxproj b/thread_tester/thread_tester.vcxproj index 020c24dc6..614ee8c09 100644 --- a/thread_tester/thread_tester.vcxproj +++ b/thread_tester/thread_tester.vcxproj @@ -141,7 +141,7 @@ Console true $(OutDir);$(SolutionDir)..\picotls\picotlsvs\$(Platform)\$(Configuration)\;$(OPENSSL64DIR);$(OPENSSL64DIR)\lib - picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) + picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;picotls-fusion.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) From df8bc8eac53b454e06b7b9641c0b25ce36c1c8ac Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Sun, 17 Apr 2022 10:55:50 -0700 Subject: [PATCH 4/9] Async restart is windows only. --- thread_tester/thread_test.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index 2423017f2..7810feb3f 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -177,6 +177,7 @@ int network_thread(void * lpParam) int event_rank = -1; int receive_ready = 0; uint64_t message_number = 0; + uint8_t* recv_buf = NULL; /* wait for socket or event */ #ifdef _WINDOWS DWORD ret_event = WSAWaitForMultipleEvents(2, events, FALSE, 1000, TRUE); @@ -261,6 +262,7 @@ int network_thread(void * lpParam) DBG_PRINTF("Message too sort, nb_bytes = %d", received_length); } } +#ifdef _WINDOWS if (ret == 0) { /* Start receiving */ ret = picoquic_recvmsg_async_start(recv_ctx); @@ -268,6 +270,7 @@ int network_thread(void * lpParam) DBG_PRINTF("Cannot start recv on socket, err = %d (0x%x)", ret, ret); } } +#endif } } ctx->network_exit_time = current_time; From ad46b93d08b555f3af0c77ee794a637043dc33bb Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Wed, 6 Dec 2023 15:31:51 -0800 Subject: [PATCH 5/9] Thread wake up trials --- picoquic.sln | 16 ++ thread_tester/thread_test.c | 307 ++++++++++++++++++++++++++---------- 2 files changed, 244 insertions(+), 79 deletions(-) diff --git a/picoquic.sln b/picoquic.sln index 93f5163ee..161fbc5bb 100644 --- a/picoquic.sln +++ b/picoquic.sln @@ -186,6 +186,22 @@ Global {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x64.Build.0 = Release|x64 {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x86.ActiveCfg = Release|Win32 {436B580F-B0E8-4EB8-A987-08218D863316}.Release|x86.Build.0 = Release|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x64.ActiveCfg = Debug|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x64.Build.0 = Debug|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x86.ActiveCfg = Debug|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Debug|x86.Build.0 = Debug|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x64.ActiveCfg = Release|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x64.Build.0 = Release|x64 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x86.ActiveCfg = Release|Win32 + {57341DB3-B498-4E7B-9CB2-E897A562C15F}.Release|x86.Build.0 = Release|Win32 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Debug|x64.ActiveCfg = Debug|x64 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Debug|x64.Build.0 = Debug|x64 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Debug|x86.ActiveCfg = Debug|Win32 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Debug|x86.Build.0 = Debug|Win32 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Release|x64.ActiveCfg = Release|x64 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Release|x64.Build.0 = Release|x64 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Release|x86.ActiveCfg = Release|Win32 + {C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}.Release|x86.Build.0 = Release|Win32 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index 7810feb3f..2202395ba 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -42,6 +42,7 @@ #include #include #include +#include #ifndef SOCKET_TYPE #define SOCKET_TYPE int @@ -73,7 +74,13 @@ typedef struct st_thread_test_context_t { #ifdef _WINDOWS HANDLE network_thread; HANDLE network_event; + HANDLE events[2]; + picoquic_recvmsg_async_ctx_t* recv_ctx; #else + int network_pipe_fd[2]; + SOCKET_TYPE n_socket; + uint8_t buffer[2048]; + size_t buffer_max; #endif int is_ready; volatile int should_stop; @@ -111,34 +118,34 @@ int network_wake_up(thread_test_context_t* ctx) ret = (int)err; } #else + /* TODO: write to network pipe */ + ssize_t written = 0; + if ((written = write(ctx->network_pipe_fd[1], &ret, 1)) != 1) { + if (written == 0) { + ret = EPIPE; + } + else { + ret = errno; + } + } #endif return ret; } -/* Network thread */ #ifdef _WINDOWS -DWORD WINAPI network_thread(LPVOID lpParam) -#else -int network_thread(void * lpParam) -#endif +int windows_events_init(thread_test_context_t* ctx) { - uint64_t current_time = 0; - uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; - thread_test_context_t* ctx = (thread_test_context_t*)lpParam; - int server_af = ctx->network_thread_addr.ss_family; -#ifdef _WINDOWS DWORD ret = 0; - picoquic_recvmsg_async_ctx_t* recv_ctx; - HANDLE events[2] = { NULL, NULL }; + int server_af = ctx->network_thread_addr.ss_family; /* Create an asynchronous socket */ - recv_ctx = picoquic_create_async_socket(server_af, 0, 0); - if (recv_ctx == NULL) { + ctx->recv_ctx = picoquic_create_async_socket(server_af, 0, 0); + if (ctx->recv_ctx == NULL) { ret = GetLastError(); DBG_PRINTF("Cannot create async socket in AF = %d, err = 0x%x", server_af, ret); } if (ret == 0) { /* Bind to specified port */ - ret = picoquic_bind_to_port(recv_ctx->fd, server_af, ctx->server_port); + ret = picoquic_bind_to_port(ctx->recv_ctx->fd, server_af, ctx->server_port); if (ret != 0) { DBG_PRINTF("Cannot bind socket to port %d, err = %d (0x%x)", ctx->server_port, ret, ret); } @@ -152,62 +159,223 @@ int network_thread(void * lpParam) } else { /* Set the event list */ - events[0] = ctx->network_event; - events[1] = recv_ctx->overlap.hEvent; + ctx->events[0] = ctx->network_event; + ctx->events[1] = ctx->recv_ctx->overlap.hEvent; } } if (ret == 0) { /* Start receiving */ - ret = picoquic_recvmsg_async_start(recv_ctx); + ret = picoquic_recvmsg_async_start(ctx->recv_ctx); if (ret != 0) { DBG_PRINTF("Cannot start recv on socket, err = %d (0x%x)", ret, ret); } } + return ret; +} + +int windows_wait_multiple(thread_test_context_t* ctx, int * receive_ready) +{ + int ret = 0; + int event_rank = -1; + DWORD ret_event = WSAWaitForMultipleEvents(2, ctx->events, FALSE, 1000, TRUE); + if (ret_event == WSA_WAIT_FAILED) { + ret = WSAGetLastError(); + DBG_PRINTF("WSAWaitForMultipleEvents fails, error 0x%x", ret); + } + else if (ret_event == WSA_WAIT_TIMEOUT) { + ctx->timeout_count++; + *receive_ready = 1; + } + else if (ret_event >= WSA_WAIT_EVENT_0) { + event_rank = ret_event - WSA_WAIT_EVENT_0; + if (event_rank > 0) { + ctx->msg_recv_count++; + *receive_ready = 1; + } + else { + /* Event number 0 is the wake signal */ + ctx->event_wake_count++; + if (ResetEvent(ctx->network_event) == 0) { + ret = GetLastError(); + DBG_PRINTF("Cannot reset network event, error 0x%x", ret); + } + } + } + return ret; +} + +int windows_receive_async(thread_test_context_t* ctx, int* received_length, uint8_t** p_recv_buf) +{ + /* On windows, receive async */ + int ret = picoquic_recvmsg_async_finish(ctx->recv_ctx); + if (ret != 0) { + DBG_PRINTF("%s", "Cannot finish async recv"); + } + else if (ResetEvent(ctx->recv_ctx->overlap.hEvent) == 0) { + ret = GetLastError(); + DBG_PRINTF("Cannot reset socket event, error 0x%x", ret); + } + else { + *received_length = (int)ctx->recv_ctx->bytes_recv; + *p_recv_buf = ctx->recv_ctx->recv_buffer; + } + return ret; +} + +void windows_close_socket(thread_test_context_t* ctx) +{ + /* Close the socket */ + if (ctx->recv_ctx != NULL) { + picoquic_delete_async_socket(ctx->recv_ctx); + ctx->recv_ctx = NULL; + } + /* Close the event handle */ + if (ctx->network_event != NULL) { + CloseHandle(ctx->network_event); + ctx->network_event = NULL; + } +} #else - SOCKET_TYPE n_socket; - /* Create an event */ +int unix_sockets_init(thread_test_context_t* ctx) +{ + int ret = 0; + ctx->buffer_max = sizeof(ctx->buffer); + ctx->n_socket = picoquic_open_client_socket(int af); + /* Bind to specified port */ - ret = picoquic_bind_to_port(); + if (ret == 0) { + /* Bind to specified port */ + ret = picoquic_bind_to_port(ctx->n_socket, server_af, ctx->server_port); + if (ret != 0) { + DBG_PRINTF("Cannot bind socket to port %d, err = %d (0x%x)", ctx->server_port, ret, ret); + } + } + if (ret == 0) { + /* Create the pipe for network wake up */ + ret = pipe(ctx->network_pipe_fd); + } + return ret; +} + +int unix_select_multiple(thread_test_context_t* ctx, int * receive_ready)) +{ + fd_set readfds; + int ret_select = 0; + int bytes_recv = 0; + int sockmax = 0; + + FD_ZERO(&readfds); + + FD_SET(ctx->n_socket); + sockmax = ctx->n_socket; + FD_SET(ctx->network_pipe_fd[0]); + if (sockmax < ctx->network_pipe_fd[0]) { + sockmax = ctx->network_pipe_fd[0]; + } + ret_select = select(sockmax + 1, &readfds, NULL, NULL, NULL); + + if (ret_select < 0) { + bytes_recv = -1; + ret = -1; + DBG_PRINTF("Error: select returns %d\n", ret_select); + } + else { + if (FD_ISSET(ctx->n_socket)) { + *receive_ready = 1; + } + if (FD_ISSET(ctx->network_pipe_fd[0])) { + /* Something was written on the "wakeup" pipe. Read it. */ + uint8_t eventbuf[8]; + if ((bytes_recv = read(ctx->network_pipe_fd[0], eventbuf, sizeof(eventbuf)) <= 0) { + if (bytes_recv == 0) { + ret = EPIPE; + } + else { + ret = errno; + } + } + else { + ctx->event_wake_count++; + } + } + } + return ret; +} + +int picoquic_recvmsg(SOCKET_TYPE fd, + struct sockaddr_storage* addr_from, + struct sockaddr_storage* addr_dest, + int* dest_if, + unsigned char* received_ecn, + uint8_t* buffer, int buffer_max); + +int unix_receive_from_socket(thread_test_context_t* ctx, int* received_length, uint8_t** p_recv_buf) +{ + int ret = 0; + struct sockaddr_storage addr_from; + struct sockaddr_storage addr_dest; + int dest_if; + unsigned char received_ecn; + int bytes_recv; + + bytes_recv = picoquic_recvmsg(ctx->n_socket, addr_from, + addr_dest, &dest_if, &received_ecn, + ctx->buffer, ctx->buffer_max); + + if (bytes_recv <= 0) { + ret = errno; + DBG_PRINTF("Could not receive packet on UDP socket[%d]= 0%x!\n", + (int)sockets[i], ret); + } + else { + *received_length = bytes_recv; + *p_recv_buf = ctx->buffer; + } + return ret; +} + +void unix_close_socket(thread_test_context_t* ctx) +{ + /* Close the socket */ + if (ctx->n_socket != INVALID_SOCKET) { + (void)close(n_socket); + } + /* Close the pipe */ + for (int i = 0; i < 2; i++) { + (void)close(ctx->network_pipe_fd[i]); + } +} #endif - printf("Starting network thread, state=%x.\n", ret); - ctx->is_ready = 1; + +/* Network thread */ +#ifdef _WINDOWS +DWORD WINAPI network_thread(LPVOID lpParam) +#else +int network_thread(void * lpParam) +#endif +{ + uint64_t current_time = 0; + thread_test_context_t* ctx = (thread_test_context_t*)lpParam; +#ifdef _WINDOWS + int ret = windows_events_init(ctx); +#else + int ret = unix_sockets_init(ctx); +#endif + if (ret == 0) { + printf("Starting network thread, state=%x.\n", ret); + ctx->is_ready = 1; + } /* Loop on wait for socket or event */ while (!ctx->should_stop && ret == 0) { - int event_rank = -1; int receive_ready = 0; uint64_t message_number = 0; uint8_t* recv_buf = NULL; /* wait for socket or event */ #ifdef _WINDOWS - DWORD ret_event = WSAWaitForMultipleEvents(2, events, FALSE, 1000, TRUE); - if (ret_event == WSA_WAIT_FAILED) { - ret = WSAGetLastError(); - DBG_PRINTF("WSAWaitForMultipleEvents fails, error 0x%x", ret); - break; - } - else if (ret_event == WSA_WAIT_TIMEOUT) { - ctx->timeout_count++; -#if 1 - receive_ready = 1; -#endif - } - else if (ret_event >= WSA_WAIT_EVENT_0) { - event_rank = ret_event - WSA_WAIT_EVENT_0; - if (event_rank > 0) { - ctx->msg_recv_count++; - receive_ready = 1; - } else { - /* Event number 0 is the wake signal */ - ctx->event_wake_count++; - if (ResetEvent(ctx->network_event) == 0) { - ret = GetLastError(); - DBG_PRINTF("Cannot reset network event, error 0x%x", ret); - break; - } - } - } + ret = windows_wait_multiple(ctx, &receive_ready); #else + ret = unix_select_multiple(ctx, &receive_ready); #endif /* get time */ current_time = picoquic_current_time(); @@ -221,26 +389,14 @@ int network_thread(void * lpParam) int received_length = 0; #ifdef _WINDOWS /* On windows, receive async */ - ret = picoquic_recvmsg_async_finish(recv_ctx); - if (ret != 0) { - DBG_PRINTF("%s", "Cannot finish async recv"); - break; - } - else if (ResetEvent(recv_ctx->overlap.hEvent) == 0) { - ret = GetLastError(); - DBG_PRINTF("Cannot reset socket event, error 0x%x", ret); - break; - } - else { - received_length = (int)recv_ctx->bytes_recv; - } + ret = windows_receive_async(ctx, &received_length, &recv_buf); #else - /* On linux, receive socket message */ + /* receive message on unix */ #endif if (ret == 0) { if (received_length >= 8) { /* Get the message number */ - message_number = PICOPARSE_64(recv_ctx->recv_buffer); + message_number = PICOPARSE_64(recv_buf); if (message_number > NB_THREAD_TEST_MSG) { DBG_PRINTF("Unexpected message number: %" PRIx64, message_number); ctx->message_error = ERROR_THREAD_TEST_MESSAGE; @@ -265,7 +421,7 @@ int network_thread(void * lpParam) #ifdef _WINDOWS if (ret == 0) { /* Start receiving */ - ret = picoquic_recvmsg_async_start(recv_ctx); + ret = picoquic_recvmsg_async_start(ctx->recv_ctx); if (ret != 0) { DBG_PRINTF("Cannot start recv on socket, err = %d (0x%x)", ret, ret); } @@ -274,21 +430,12 @@ int network_thread(void * lpParam) } } ctx->network_exit_time = current_time; - printf("Network thread exits.\n"); #ifdef _WINDOWS - /* Close the socket */ - if (recv_ctx != NULL) { - picoquic_delete_async_socket(recv_ctx); - recv_ctx = NULL; - } - /* Close the event handle */ - if (ctx->network_event != NULL) { - CloseHandle(ctx->network_event); - ctx->network_event = NULL; - } + windows_close_socket(ctx); #else - /* Close the socket */ + unix_close_socket(ctx); #endif + printf("Network thread exits.\n"); return ret; } @@ -356,6 +503,7 @@ int network_loop_thread(void* lpParam) return ret; } +#ifdef _WINDOWS /* Event loop thread -- event only */ DWORD WINAPI event_loop_thread(LPVOID lpParam) { @@ -379,6 +527,7 @@ DWORD WINAPI event_loop_thread(LPVOID lpParam) return (DWORD)ret; } +#endif int main(int argc, char** argv) { From 24e476dfe12121b0e5ab8792f7add93662eb0211 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Wed, 6 Dec 2023 15:57:40 -0800 Subject: [PATCH 6/9] Fix windows 32 bit builds --- picoquic.sln | 5 +++++ thread_tester/thread_tester.vcxproj | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/picoquic.sln b/picoquic.sln index 161fbc5bb..fb8d479cd 100644 --- a/picoquic.sln +++ b/picoquic.sln @@ -72,6 +72,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "PerfAndStressTest", "PerfAn EndProjectSection EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "thread_tester", "thread_tester\thread_tester.vcxproj", "{57341DB3-B498-4E7B-9CB2-E897A562C15F}" + ProjectSection(ProjectDependencies) = postProject + {B3DDD196-3D03-4396-97BD-E5DE733E9D24} = {B3DDD196-3D03-4396-97BD-E5DE733E9D24} + {63E1E6B7-DB5F-4EDC-8AC8-7E9F5990D11F} = {63E1E6B7-DB5F-4EDC-8AC8-7E9F5990D11F} + {998765EE-64DF-49C1-8471-A79E2DA7CD21} = {998765EE-64DF-49C1-8471-A79E2DA7CD21} + EndProjectSection EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "baton_app", "baton_app\baton_app.vcxproj", "{C0F21D3F-ECC3-4AB5-A3E3-E2D48965EBA5}" ProjectSection(ProjectDependencies) = postProject diff --git a/thread_tester/thread_tester.vcxproj b/thread_tester/thread_tester.vcxproj index 614ee8c09..120206165 100644 --- a/thread_tester/thread_tester.vcxproj +++ b/thread_tester/thread_tester.vcxproj @@ -102,7 +102,7 @@ Console true - $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + $(OutDir);$(SolutionDir)..\picotls\picotlsvs\$(Configuration)\;$(OPENSSLDIR);$(OPENSSLDIR)\lib;$(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) @@ -123,7 +123,7 @@ true true true - $(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) + $(OutDir);$(SolutionDir)..\picotls\picotlsvs\$(Configuration)\;$(OPENSSLDIR);$(OPENSSLDIR)\lib;$(SolutionDir)\picoquic;$(SolutionDir)\picohttp;$(SolutionDir)\loglib;%(AdditionalIncludeDirectories) picoquic.lib;loglib.lib;picotls-core.lib;picotls-minicrypto.lib;picotls-minicrypto-deps.lib;picotls-openssl.lib;ws2_32.lib;libcrypto.lib;bcrypt.lib;%(AdditionalDependencies) From 12474fcf0247c3f54fee3d67c1eecb195aeb7983 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Wed, 6 Dec 2023 17:02:56 -0800 Subject: [PATCH 7/9] fix unused variable warning --- picoquic/util.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/picoquic/util.c b/picoquic/util.c index 8f86b270d..32d2592aa 100644 --- a/picoquic/util.c +++ b/picoquic/util.c @@ -1025,7 +1025,7 @@ int picoquic_wait_thread(picoquic_thread_t thread) #else ret = pthread_join(thread, NULL); #endif - return 0; + return ret; } void picoquic_delete_thread(picoquic_thread_t * thread) From b3190e537900cde09f35579161df43ad239d1ca8 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Wed, 6 Dec 2023 21:28:13 -0800 Subject: [PATCH 8/9] Fix thread tester on Mac. --- CMakeLists.txt | 7 ++++ thread_tester/thread_test.c | 72 ++++++++++++++++++++++--------------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5f1ab039d..f27bd59a2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -404,6 +404,13 @@ if(BUILD_TESTING AND picoquic_BUILD_TESTS) COMMAND picoquic_ct -S ${PROJECT_SOURCE_DIR} -n -r) add_test(NAME picohttp_ct COMMAND picohttp_ct -S ${PROJECT_SOURCE_DIR} -n -r) + + add_executable(thread_test + thread_tester/thread_test.c) + target_link_libraries(thread_test PRIVATE picoquic-log picoquic-core) + target_include_directories(thread_test PRIVATE loglib picoquic) + set_picoquic_compile_settings(thread_test) + endif() # get all project files for formatting diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index 2202395ba..805c42b5f 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -30,7 +30,7 @@ #include #include #include - +#include #ifndef __USE_XOPEN2K #define __USE_XOPEN2K #endif @@ -239,8 +239,9 @@ void windows_close_socket(thread_test_context_t* ctx) int unix_sockets_init(thread_test_context_t* ctx) { int ret = 0; + int server_af = ctx->network_thread_addr.ss_family; ctx->buffer_max = sizeof(ctx->buffer); - ctx->n_socket = picoquic_open_client_socket(int af); + ctx->n_socket = picoquic_open_client_socket(server_af); /* Bind to specified port */ if (ret == 0) { @@ -257,18 +258,19 @@ int unix_sockets_init(thread_test_context_t* ctx) return ret; } -int unix_select_multiple(thread_test_context_t* ctx, int * receive_ready)) +int unix_select_multiple(thread_test_context_t* ctx, int * receive_ready) { fd_set readfds; int ret_select = 0; int bytes_recv = 0; int sockmax = 0; + int ret = 0; FD_ZERO(&readfds); - FD_SET(ctx->n_socket); + FD_SET(ctx->n_socket, &readfds); sockmax = ctx->n_socket; - FD_SET(ctx->network_pipe_fd[0]); + FD_SET(ctx->network_pipe_fd[0], &readfds); if (sockmax < ctx->network_pipe_fd[0]) { sockmax = ctx->network_pipe_fd[0]; } @@ -280,13 +282,13 @@ int unix_select_multiple(thread_test_context_t* ctx, int * receive_ready)) DBG_PRINTF("Error: select returns %d\n", ret_select); } else { - if (FD_ISSET(ctx->n_socket)) { + if (FD_ISSET(ctx->n_socket, &readfds)) { *receive_ready = 1; } - if (FD_ISSET(ctx->network_pipe_fd[0])) { + if (FD_ISSET(ctx->network_pipe_fd[0], &readfds)) { /* Something was written on the "wakeup" pipe. Read it. */ uint8_t eventbuf[8]; - if ((bytes_recv = read(ctx->network_pipe_fd[0], eventbuf, sizeof(eventbuf)) <= 0) { + if ((bytes_recv = read(ctx->network_pipe_fd[0], eventbuf, sizeof(eventbuf))) <= 0) { if (bytes_recv == 0) { ret = EPIPE; } @@ -318,14 +320,14 @@ int unix_receive_from_socket(thread_test_context_t* ctx, int* received_length, u unsigned char received_ecn; int bytes_recv; - bytes_recv = picoquic_recvmsg(ctx->n_socket, addr_from, - addr_dest, &dest_if, &received_ecn, + bytes_recv = picoquic_recvmsg(ctx->n_socket, &addr_from, + &addr_dest, &dest_if, &received_ecn, ctx->buffer, ctx->buffer_max); if (bytes_recv <= 0) { ret = errno; DBG_PRINTF("Could not receive packet on UDP socket[%d]= 0%x!\n", - (int)sockets[i], ret); + (int)ctx->n_socket, ret); } else { *received_length = bytes_recv; @@ -338,7 +340,8 @@ void unix_close_socket(thread_test_context_t* ctx) { /* Close the socket */ if (ctx->n_socket != INVALID_SOCKET) { - (void)close(n_socket); + (void)close(ctx->n_socket); + ctx->n_socket = INVALID_SOCKET; } /* Close the pipe */ for (int i = 0; i < 2; i++) { @@ -352,7 +355,7 @@ void unix_close_socket(thread_test_context_t* ctx) #ifdef _WINDOWS DWORD WINAPI network_thread(LPVOID lpParam) #else -int network_thread(void * lpParam) +void* network_thread(void * lpParam) #endif { uint64_t current_time = 0; @@ -392,6 +395,7 @@ int network_thread(void * lpParam) ret = windows_receive_async(ctx, &received_length, &recv_buf); #else /* receive message on unix */ + ret = unix_receive_from_socket(ctx, &received_length, &recv_buf); #endif if (ret == 0) { if (received_length >= 8) { @@ -430,23 +434,26 @@ int network_thread(void * lpParam) } } ctx->network_exit_time = current_time; + printf("Network thread exits.\n"); #ifdef _WINDOWS windows_close_socket(ctx); + return (DWORD)ret; #else unix_close_socket(ctx); + pthread_exit((void*)&ret); #endif - printf("Network thread exits.\n"); - return ret; } -/* Network loop thread -- socket only */ +/* Network load thread -- socket only */ #ifdef _WINDOWS +#define SLEEP(x) Sleep(x) DWORD WINAPI network_load_thread(LPVOID lpParam) #else -int network_loop_thread(void* lpParam) +#define SLEEP(x) usleep((x)*1000) +void* network_load_thread(void* lpParam) #endif { - DWORD ret = 0; + int ret = 0; thread_test_context_t* ctx = (thread_test_context_t*)lpParam; SOCKET_TYPE l_socket; uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; @@ -463,9 +470,8 @@ int network_loop_thread(void* lpParam) /* Loop on send to socket */ for (uint64_t i = 0; i < NB_THREAD_TEST_MSG && !ctx->should_stop && ret == 0; i++) { - int ret = 0; /* Wait some amount of time */ - Sleep(1 + ((int)(i%7))*3); + SLEEP(1 + ((int)(i%7))*3); current_time = picoquic_current_time(); /* send the message */ picoformat_64(buffer, i); @@ -479,7 +485,7 @@ int network_loop_thread(void* lpParam) #else err_ret = (int)errno; #endif - DBG_PRINTF("Network loop returns %d (0x%x), %d (0x%x)", ret, ret, err_ret, err_ret); + DBG_PRINTF("Network load loop returns %d (0x%x), %d (0x%x)", ret, ret, err_ret, err_ret); ctx->message_loop_error = ret; ctx->message_loop_error_index = i; break; @@ -500,12 +506,19 @@ int network_loop_thread(void* lpParam) l_socket = INVALID_SOCKET; } /* exit the thread */ - return ret; +#ifdef _WINDOWS + return (DWORD)ret; +#else + pthread_exit((void*)&ret); +#endif } #ifdef _WINDOWS /* Event loop thread -- event only */ DWORD WINAPI event_loop_thread(LPVOID lpParam) +#else +void* event_loop_thread(void* lpParam) +#endif { int ret = 0; thread_test_context_t* ctx = (thread_test_context_t*)lpParam; @@ -514,7 +527,7 @@ DWORD WINAPI event_loop_thread(LPVOID lpParam) for (int i = 0; i < NB_THREAD_TEST_EVENT && !ctx->should_stop && ret == 0; i++) { - Sleep(5); + SLEEP(5); ctx->event_sent_at[i] = picoquic_current_time(); ctx->event_sent_count++; if ((ret = network_wake_up(ctx)) != 0) { @@ -524,10 +537,12 @@ DWORD WINAPI event_loop_thread(LPVOID lpParam) } printf("End event thread after %d events.\n", ctx->event_sent_count); - +#ifdef _WINDOWS return (DWORD)ret; -} +#else + pthread_exit((void*)&ret); #endif +} int main(int argc, char** argv) { @@ -563,7 +578,7 @@ int main(int argc, char** argv) } else { for (int i = 0; i < 2000 && !ctx.is_ready; i++) { - Sleep(1); + SLEEP(1); } if (ctx.is_ready) { printf("Network thread is ready.\n"); @@ -585,7 +600,7 @@ int main(int argc, char** argv) printf("thread3: network thread, sends at random intervals.\n"); ret = picoquic_create_thread(&t_load, network_load_thread, &ctx); if (ret != 0) { - DBG_PRINTF("Cannot create event thread, ret= 0x%x", ret); + DBG_PRINTF("Cannot create network load thread, ret= 0x%x", ret); } } /* Wait first on the message load thread. */ @@ -612,12 +627,13 @@ int main(int argc, char** argv) } printf("Load and wake thread are closed.\n"); /* Wait explicitly for some time, to give the program a chance to receive data */ - Sleep(100); + SLEEP(100); /* Set the termination flag */ ctx.should_stop = 1; /* Wait for the network thread */ if (ret == 0) { printf("Waiting for net thread.\n"); + (void)network_wake_up(&ctx); ret = picoquic_wait_thread(t_net); if (ret != 0) { DBG_PRINTF("Cannot close wake thread, ret= 0x%x", ret); From dd27fa058f8c38782f6f3c618dd67396cb22c3a8 Mon Sep 17 00:00:00 2001 From: Christian Huitema Date: Wed, 6 Dec 2023 21:51:09 -0800 Subject: [PATCH 9/9] Fix network load thread --- thread_tester/thread_test.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/thread_tester/thread_test.c b/thread_tester/thread_test.c index 805c42b5f..06b708256 100644 --- a/thread_tester/thread_test.c +++ b/thread_tester/thread_test.c @@ -458,6 +458,7 @@ void* network_load_thread(void* lpParam) SOCKET_TYPE l_socket; uint8_t buffer[PICOQUIC_MAX_PACKET_SIZE]; uint64_t current_time; + int nb_sent = 0; printf("Starting load thread.\n"); @@ -470,14 +471,16 @@ void* network_load_thread(void* lpParam) /* Loop on send to socket */ for (uint64_t i = 0; i < NB_THREAD_TEST_MSG && !ctx->should_stop && ret == 0; i++) { + size_t bytes_sent = 0; /* Wait some amount of time */ SLEEP(1 + ((int)(i%7))*3); current_time = picoquic_current_time(); + nb_sent++; /* send the message */ picoformat_64(buffer, i); - ret = (int)sendto(l_socket, buffer, 256, + bytes_sent = (int)sendto(l_socket, buffer, 256, 0, (struct sockaddr*)&ctx->network_thread_addr, ctx->network_thread_addr_len); - if (ret != 256) { + if (bytes_sent != 256) { /* Error. Document and exit */ int err_ret = 0; #ifdef _WINDOWS @@ -496,8 +499,8 @@ void* network_load_thread(void* lpParam) } } - - printf("load thread ends.\n"); + printf("load thread ends after %d loops (%d, 0x%x).\n", nb_sent, + ctx->should_stop, ret); ctx->message_loop_exit_time = picoquic_current_time(); /* Close the socket */