Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for unknown_topic_cache_ttl option in brod_client #613

Merged
merged 7 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ start_client(BootstrapEndpoints, ClientId) ->
%% Producer configuration to use when auto_start_producers is true.
%% See {@link brod_producer:start_link/4} for details about producer config</li>
%%
%% <li>`unknown_topic_cache_ttl' (optional, default=120000)
%%
%% For how long unknown_topic error will be cached, in ms.</li>
%%
%% </ul>
%%
%% Connection options can be added to the same proplist. See
Expand Down Expand Up @@ -510,7 +514,7 @@ start_consumer(Client, TopicName, ConsumerConfig) ->
%% is not statically configured for them.
%% It is up to the callers how they want to distribute their data
%% (e.g. random, roundrobin or consistent-hashing) to the partitions.
%% NOTE: The partitions count is cached for 120 seconds.
%% NOTE: The partitions count is cached.
-spec get_partitions_count(client(), topic()) ->
{ok, pos_integer()} | {error, any()}.
get_partitions_count(Client, Topic) ->
Expand Down
42 changes: 27 additions & 15 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).

%% ClientId as ets table name.
-define(ETS(ClientId), ClientId).
Expand All @@ -85,8 +86,6 @@
-define(CONSUMER(Topic, Partition, Pid),
{?CONSUMER_KEY(Topic, Partition), Pid}).

-define(UNKNOWN_TOPIC_CACHE_EXPIRE_SECONDS, 120).

-type endpoint() :: brod:endpoint().
-type client() :: brod:client().
-type client_id() :: brod:client_id().
Expand Down Expand Up @@ -613,6 +612,7 @@ do_get_metadata(Topic, State) when not is_tuple(Topic) ->
do_get_metadata(FetchMetdataFor, Topic,
#state{ client_id = ClientId
, workers_tab = Ets
, config = Config
} = State0) ->
Topics = case FetchMetdataFor of
all -> all; %% in case no topic is given, get all
Expand All @@ -624,8 +624,11 @@ do_get_metadata(FetchMetdataFor, Topic,
case request_sync(State, Request) of
{ok, #kpro_rsp{api = metadata, msg = Metadata}} ->
TopicMetadataArray = kf(topics, Metadata),
ok = update_partitions_count_cache(Ets, TopicMetadataArray),
ok = maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray),
UnknownTopicCacheTtl = config(unknown_topic_cache_ttl, Config,
?DEFAULT_UNKNOWN_TOPIC_CACHE_TTL),
ok = update_partitions_count_cache(Ets, TopicMetadataArray, UnknownTopicCacheTtl),
ok = maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray,
UnknownTopicCacheTtl),
{{ok, Metadata}, State};
{error, Reason} ->
?BROD_LOG_ERROR("~p failed to fetch metadata for topics: ~p\n"
Expand Down Expand Up @@ -819,29 +822,30 @@ is_cooled_down(Ts, #state{config = Config}) ->

%% call this function after fetched metadata for all topics
%% to cache the not-found status of a given topic
maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray) ->
maybe_cache_unknown_topic_partition(Ets, Topic, TopicMetadataArray, UnknownTopicCacheTtl) ->
case find_partition_count_in_topic_metadata_array(TopicMetadataArray, Topic) of
{error, unknown_topic_or_partition} = Err ->
_ = ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, os:timestamp()}),
_ = ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err,
{expire, expire_ts(UnknownTopicCacheTtl)}}),
ok;
_ ->
%% do nothing when ok or any other error
ok
end.

-spec update_partitions_count_cache(ets:tab(), [kpro:struct()]) -> ok.
update_partitions_count_cache(_Ets, []) -> ok;
update_partitions_count_cache(Ets, [TopicMetadata | Rest]) ->
-spec update_partitions_count_cache(ets:tab(), [kpro:struct()], non_neg_integer()) -> ok.
update_partitions_count_cache(_Ets, [], _UnknownTopicCacheTtl) -> ok;
update_partitions_count_cache(Ets, [TopicMetadata | Rest], UnknownTopicCacheTtl) ->
Topic = kf(name, TopicMetadata),
case get_partitions_count_in_metadata(TopicMetadata) of
{ok, Cnt} ->
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Cnt, os:timestamp()});
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Cnt, {added, now_ts()}});
{error, ?unknown_topic_or_partition} = Err ->
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, os:timestamp()});
ets:insert(Ets, {?TOPIC_METADATA_KEY(Topic), Err, {expire, expire_ts(UnknownTopicCacheTtl)}});
{error, _Reason} ->
ok
end,
update_partitions_count_cache(Ets, Rest).
update_partitions_count_cache(Ets, Rest, UnknownTopicCacheTtl).

%% Get partition counter from cache.
%% If cache is not hit, send meta data request to retrieve.
Expand Down Expand Up @@ -893,9 +897,8 @@ lookup_partitions_count_cache(Ets, Topic) ->
try ets:lookup(Ets, ?TOPIC_METADATA_KEY(Topic)) of
[{_, Count, _Ts}] when is_integer(Count) ->
{ok, Count};
[{_, {error, Reason}, Ts}] ->
case timer:now_diff(os:timestamp(), Ts) =<
?UNKNOWN_TOPIC_CACHE_EXPIRE_SECONDS * 1000000 of
[{_, {error, Reason}, {expire, ExpireTs}}] when is_integer(ExpireTs) ->
case is_expired(ExpireTs) of
true -> {error, Reason};
false -> false
end;
Expand Down Expand Up @@ -987,6 +990,15 @@ safe_gen_call(Server, Call, Timeout) ->
-spec kf(kpro:field_name(), kpro:struct()) -> kpro:field_value().
kf(FieldName, Struct) -> kpro:find(FieldName, Struct).

-spec expire_ts(integer()) -> integer().
expire_ts(Ttl) -> now_ts() + Ttl.

-spec is_expired(integer()) -> boolean().
is_expired(ExpireTs) -> now_ts() - ExpireTs < 0.

-spec now_ts() -> integer().
now_ts() -> erlang:monotonic_time(millisecond).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
17 changes: 17 additions & 0 deletions test/brod_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
, t_sasl_plain_file_ssl/1
, t_sasl_callback/1
, t_magic_version/1
, t_get_partitions_count_configure_cache_ttl/1
, t_get_partitions_count_safe/1
, t_double_stop_consumer/1
]).
Expand Down Expand Up @@ -125,6 +126,22 @@ t_get_partitions_count_safe(Config) when is_list(Config) ->
?assertMatch({error, unknown_topic_or_partition}, Res2),
ok = brod:stop_client(Client).


t_get_partitions_count_configure_cache_ttl(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
ClientConfig = [{unknown_topic_cache_ttl, 100}],
ok = start_client(?HOSTS, Client, ClientConfig),
Topic = <<"unknown-topic-001">>,
Res = brod:get_partitions_count_safe(Client, Topic),
?assertMatch({error, unknown_topic_or_partition}, Res),
Res2 = brod_client:lookup_partitions_count_cache(Client, Topic),
?assertMatch({error, unknown_topic_or_partition}, Res2),
timer:sleep(101),
Res3 = brod_client:lookup_partitions_count_cache(Client, Topic),
?assertMatch(false, Res3),
ok = brod:stop_client(Client).


t_skip_unreachable_endpoint(Config) when is_list(Config) ->
Client = t_skip_unreachable_endpoint,
ok = start_client([{"localhost", 8192} | ?HOSTS], Client),
Expand Down