Skip to content

Commit

Permalink
Change cast to be used only when the option is set.
Browse files Browse the repository at this point in the history
  • Loading branch information
kinyoklion committed Jan 3, 2024
1 parent 9d7bde3 commit 6e155f9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
33 changes: 31 additions & 2 deletions src/ldclient_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,19 @@
datasource => poll | stream | file | testdata | undefined,
http_options => http_options(),
stream_initial_retry_delay_ms => non_neg_integer(),
application => app_info()
application => app_info(),

%% This functionality is experimental and may be removed. It changes event
%% processing to be completely async. This removes the back pressure
%% experienced under heavy event load. If event processing cannot catch
%% up, then this option can result in unbounded growth of the mailbox
%% for the event processor. If this situation arises then the BEAM VM will
%% exhaust all available memory and eventually crash (not just the erlang
%% process experiencing unbounded growth, but the entire node).
%% This option should only be used if you are certain that your spikes
%% will not cause this situation, and you are okay with the VM crashing
%% when the situation is encountered.
experimental_async_add_event => boolean()
}.
% Settings stored for each running SDK instance

Expand Down Expand Up @@ -126,6 +138,7 @@
-define(DEFAULT_TESTDATA_TAG, default).
-define(DEFAULT_DATASOURCE, undefined).
-define(DEFAULT_STREAM_RETRY_DELAY, 1000).
-define(DEFAULT_EXPERIMENTAL_ASYNC_ADD_EVENT, false).

-define(HTTP_DEFAULT_TLS_OPTIONS, undefined).
-define(HTTP_DEFAULT_CONNECT_TIMEOUT, 2000).
Expand Down Expand Up @@ -188,6 +201,8 @@ parse_options(SdkKey, Options) when is_list(SdkKey), is_map(Options) ->
StreamInitialRetryDelayMs = maps:get(stream_initial_retry_delay_ms, Options, ?DEFAULT_STREAM_RETRY_DELAY),
HttpOptions = parse_http_options(maps:get(http_options, Options, undefined)),
AppInfo = parse_application_info(maps:get(application, Options, ?APPLICATION_DEFAULT_OPTIONS)),
ExperimentalAsyncAddEvent = parse_experimental_async_add_event(
maps:get(experimental_async_add_event, Options, ?DEFAULT_EXPERIMENTAL_ASYNC_ADD_EVENT)),
RedisTls = maps:get(redis_tls, Options, ?DEFAULT_REDIS_TLS),
#{
sdk_key => SdkKey,
Expand Down Expand Up @@ -222,7 +237,8 @@ parse_options(SdkKey, Options) when is_list(SdkKey), is_map(Options) ->
testdata_tag => TestDataTag,
datasource => DataSource,
stream_initial_retry_delay_ms => StreamInitialRetryDelayMs,
application => AppInfo
application => AppInfo,
experimental_async_add_event => ExperimentalAsyncAddEvent
}.

%% @doc Get all registered tags
Expand Down Expand Up @@ -435,3 +451,16 @@ parse_private_attributes(Attributes) -> lists:map(fun ensure_attribute_reference
ldclient_attribute_reference:attribute_reference().
ensure_attribute_reference(Attribute) when is_binary(Attribute) -> ldclient_attribute_reference:new(Attribute);
ensure_attribute_reference(Attribute) -> Attribute.

-spec parse_experimental_async_add_event(Enabled :: boolean()) -> boolean().
parse_experimental_async_add_event(false) -> false;
parse_experimental_async_add_event(true) ->
error_logger:warning_msg("Using the experimental_async_add_event option.
This functionality is experimental and may be removed.
It changes event processing to be completely async. This removes the back pressure experienced under heavy
event load. If event processing cannot catch up, then this option can result in unbounded growth of the
mailbox for the event processor. If this situation arises then the BEAM VM will exhaust all available memory
and eventually crash (not just the erlang process experiencing unbounded growth, but the entire node). This
option should only be used if you are certain that your spikes will not cause this situation and you are
okay with the VM crashing when the situation is encountered"),
true.
8 changes: 7 additions & 1 deletion src/ldclient_event_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@
ok.
add_event(Tag, Event, Options) when is_atom(Tag) ->
ServerName = get_local_reg_name(Tag),
gen_server:cast(ServerName, {add_event, Event, Tag, Options}).
case ldclient_config:get_value(Tag, experimental_async_add_event) of
true -> gen_server:cast(ServerName, {add_event, Event, Tag, Options});
false -> gen_server:call(ServerName, {add_event, Event, Tag, Options})
end.

%% @doc Flush buffered events
%%
Expand Down Expand Up @@ -131,6 +134,9 @@ handle_call(_Request, _From, #{offline := true} = State) ->
{reply, ok, State};
handle_call(_Request, _From, #{send_events := false} = State) ->
{reply, ok, State};
handle_call({add_event, Event, Tag, Options}, _From, #{events := Events, summary_event := SummaryEvent, capacity := Capacity} = State) ->
{NewEvents, NewSummaryEvent} = add_event(Tag, Event, Options, Events, SummaryEvent, Capacity),
{reply, ok, State#{events := NewEvents, summary_event := NewSummaryEvent}};
handle_call({flush, Tag}, _From, #{events := Events, summary_event := SummaryEvent, flush_interval := FlushInterval, timer_ref := TimerRef} = State) ->
_ = erlang:cancel_timer(TimerRef),
ok = ldclient_event_process_server:send_events(Tag, Events, SummaryEvent),
Expand Down
34 changes: 31 additions & 3 deletions test/ldclient_events_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
fail_and_retry/1,
payload_id_differs/1,
add_flag_eval_events_flush_in_experiment_fallthrough/1,
add_flag_eval_events_flush_in_experiment_rule_match/1
add_flag_eval_events_flush_in_experiment_rule_match/1,
auto_flush_with_cast/1
]).

%%====================================================================
Expand All @@ -47,7 +48,8 @@ all() ->
fail_and_retry,
payload_id_differs,
add_flag_eval_events_flush_in_experiment_fallthrough,
add_flag_eval_events_flush_in_experiment_rule_match
add_flag_eval_events_flush_in_experiment_rule_match,
auto_flush_with_cast
].

init_per_suite(Config) ->
Expand All @@ -62,7 +64,12 @@ init_per_suite(Config) ->
events_capacity => 2,
events_flush_interval => 1000
},
AsyncEventsOptions = Options#{
experimental_async_add_event => true,
events_flush_interval => 1000
},
ldclient:start_instance("", another1, Another1Options),
ldclient:start_instance("", async_events_client, AsyncEventsOptions),
ldclient:start_instance("sdk-key-events-fail", failer, Another1Options),
Config.

Expand Down Expand Up @@ -257,7 +264,8 @@ send_await_events(Events, Options, Tag) ->
register_event_forwarding_process(),
IncludeReasons = maps:get(include_reasons, Options, false),
Flush = maps:get(flush, Options, false),
[ok = ldclient_event_server:add_event(Tag, E, #{include_reasons => IncludeReasons}) || E <- Events],
[ok = ldclient_event_server:add_event(Tag, E,
#{include_reasons => IncludeReasons}) || E <- Events],
ok = if Flush -> ldclient_event_server:flush(Tag); true -> ok end,
receive_events().

Expand Down Expand Up @@ -813,6 +821,26 @@ auto_flush(_) ->
}
] = ActualEvents.

auto_flush_with_cast(_) ->
Event1 = ldclient_event:new_identify(
ldclient_context:new_from_user(#{key => <<"12345">>, first_name => <<"Tester">>, last_name => <<"Testerson">>})),
Event2 = ldclient_event:new_identify(
ldclient_context:new_from_user(#{key => <<"abcde">>, first_name => <<"Tester">>, last_name => <<"Testerson">>})),
Events = [Event1, Event2],
{ActualEvents, _} = send_await_events(Events, #{flush => false}, async_events_client),
[
#{
<<"kind">> := <<"identify">>,
<<"context">> := #{<<"key">> := <<"12345">>, <<"kind">> := <<"user">>, <<"firstName">> := <<"Tester">>, <<"lastName">> := <<"Testerson">>},
<<"creationDate">> := _
},
#{
<<"kind">> := <<"identify">>,
<<"context">> := #{<<"key">> := <<"abcde">>, <<"kind">> := <<"user">>, <<"firstName">> := <<"Tester">>, <<"lastName">> := <<"Testerson">>},
<<"creationDate">> := _
}
] = ActualEvents.

exceed_capacity(_) ->
Event1 = ldclient_event:new_identify(ldclient_context:new_from_user(#{key => <<"foo">>, first_name => <<"Tester">>, last_name => <<"Testerson">>})),
Event2 = ldclient_event:new_identify(ldclient_context:new_from_user(#{key => <<"bar">>, first_name => <<"Tester">>, last_name => <<"Testerson">>})),
Expand Down

0 comments on commit 6e155f9

Please sign in to comment.