Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.0: use Ra checkpoints in rabbit_fifo for sub-linear time recovery of QQs on boot #10487

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ rabbitmq_suite(
deps = [
"//deps/rabbit_common:erlang_app",
"@proper//:erlang_app",
"@meck//:erlang_app",
"@ra//:erlang_app",
],
)
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -418,6 +419,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -573,6 +575,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_dlx.hrl",
"src/rabbit_fifo_v0.hrl",
"src/rabbit_fifo_v1.hrl",
"src/rabbit_fifo_v3.hrl",
"src/rabbit_stream_coordinator.hrl",
"src/rabbit_stream_sac_coordinator.hrl",
],
Expand Down Expand Up @@ -705,6 +708,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_fifo_index.erl",
"src/rabbit_fifo_v0.erl",
"src/rabbit_fifo_v1.erl",
"src/rabbit_fifo_v3.erl",
"src/rabbit_file.erl",
"src/rabbit_global_counters.erl",
"src/rabbit_guid.erl",
Expand Down Expand Up @@ -1367,7 +1371,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
hdrs = ["src/rabbit_fifo.hrl"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/rabbit_common:erlang_app", "@proper//:erlang_app"],
)
erlang_bytecode(
name = "rabbit_fifo_dlx_SUITE_beam_files",
Expand Down
10 changes: 7 additions & 3 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,9 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
QName = rabbit_misc:r(Vhost, queue, QNameBin),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
%%TODO Consider adding a new rabbit_queue_type:remove_consumer API that - from the point of view of
%% the queue process - behaves as if our session process terminated: All messages checked out
%% TODO: Consider adding a new rabbit_queue_type:remove_consumer
%% API that - from the point of view of the queue process -
%% behaves as if our session process terminated: All messages checked out
%% to this consumer should be re-queued automatically instead of us requeueing them here after cancelling
%% consumption.
%% For AMQP legacy (and STOMP / MQTT) consumer cancellation not requeueing messages is a good approach as
Expand All @@ -1149,7 +1150,10 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
%% first detaching and then re-attaching to the same session with the same link handle (the handle
%% becomes available for re-use once a link is closed): This will result in the same consumer tag,
%% and we ideally disallow "updating" an AMQP consumer.
case rabbit_queue_type:cancel(Q, Ctag, undefined, Username, QStates0) of
Spec = #{consumer_tag => Ctag,
reason => remove,
user => Username},
case rabbit_queue_type:cancel(Q, Spec, QStates0) of
{ok, QStates1} ->
{Unsettled1, MsgIds} = remove_link_from_outgoing_unsettled_map(Ctag, Unsettled0),
case MsgIds of
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1772,8 +1772,10 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
rabbit_queue_type:state()) ->
{ok, rabbit_queue_type:state()} | {error, term()}.
basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->
rabbit_queue_type:cancel(Q, ConsumerTag,
OkMsg, ActingUser, QStates).
%% TODO: is this function used anywhere?
rabbit_queue_type:cancel(Q, #{consumer_tag => ConsumerTag,
ok_msg => OkMsg,
user => ActingUser}, QStates).

-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.

Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
fun () -> {error, not_found} end,
fun () ->
rabbit_queue_type:cancel(
Q, ConsumerTag, ok_msg(NoWait, OkMsg),
Username, QueueStates0)
Q, #{consumer_tag => ConsumerTag,
ok_msg => ok_msg(NoWait, OkMsg),
user => Username}, QueueStates0)
end) of
{ok, QueueStates} ->
rabbit_global_counters:consumer_deleted(amqp091),
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
close/1,
update/2,
consume/3,
cancel/5,
cancel/3,
handle_event/3,
deliver/3,
settle/5,
Expand Down Expand Up @@ -276,7 +276,9 @@ consume_backwards_compat({credited, credit_api_v1}, Args) ->
[{<<"x-credit">>, table, [{<<"credit">>, long, 0},
{<<"drain">>, bool, false}]} | Args]}.

cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
cancel(Q, #{consumer_tag := ConsumerTag,
user := ActingUser} = Spec, State) ->
OkMsg = maps:get(ok_msg, Spec, undefined),
QPid = amqqueue:get_pid(Q),
case delegate:invoke(QPid, {gen_server2, call,
[{basic_cancel, self(), ConsumerTag,
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,11 @@
#{desc => "Credit API v2 between queue clients and queue processes",
stability => stable
}}).

-rabbit_feature_flag(
{quorum_queues_v4,
#{desc => "Unlocks QQ v4 goodies",
stability => stable,
depends_on => [quorum_queue,
credit_api_v2]
}}).
Loading
Loading