Skip to content

Commit

Permalink
chore: refactor azure blob connector to use new erlazure without `g…
Browse files Browse the repository at this point in the history
…en_server`

Depends on dkataskin/erlazure#43 being merged and then synced to
our fork.
  • Loading branch information
thalesmg committed Jun 6, 2024
1 parent c0c5545 commit 4f486a9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@
on_batch_query/3
]).

%% `ecpool_worker' API
-export([
connect/1,
do_create_append_blob/3,
do_create_block_blob/3,
do_append_data/5,
do_put_block_list/4,
do_put_block_blob/4,
do_health_check/1,
do_list_blobs/2
]).

%% `emqx_connector_aggreg_delivery' API
-export([
init_transfer_state/2,
Expand Down Expand Up @@ -71,7 +59,7 @@
}.

-type connector_state() :: #{
pool_name := connector_resource_id(),
driver_state := driver_state(),
installed_actions := #{action_resource_id() => action_state()}
}.

Expand Down Expand Up @@ -124,7 +112,7 @@

-type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}.

-type pool_name() :: connector_resource_id().
-type driver_state() :: _.

-type transfer_opts() :: #{
upload_options := #{
Expand All @@ -133,7 +121,7 @@
container := string(),
min_block_size := pos_integer(),
max_block_size := pos_integer(),
pool := connector_resource_id()
driver_state := driver_state()
}
}.

Expand All @@ -148,7 +136,7 @@
min_block_size := pos_integer(),
next_block := queue:queue(iolist()),
num_blocks := non_neg_integer(),
pool := pool_name(),
driver_state := driver_state(),
started := boolean()
}.

Expand All @@ -162,38 +150,32 @@ callback_mode() ->

-spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, _Reason}.
on_start(ConnResId, ConnConfig) ->
on_start(_ConnResId, ConnConfig) ->
#{
account_name := AccountName,
account_key := AccountKey
} = ConnConfig,
Endpoint = maps:get(endpoint, ConnConfig, undefined),
ClientOpts = [
{account_name, AccountName},
{account_key, AccountKey},
{endpoint, Endpoint}
],
case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of
ok ->
State = #{
pool_name => ConnResId,
installed_actions => #{}
},
{ok, State};
{error, Reason} ->
{error, Reason}
end.
{ok, DriverState} = erlazure:new(#{
account => AccountName,
key => AccountKey,
endpoint => Endpoint
}),
State = #{
driver_state => DriverState,
installed_actions => #{}
},
{ok, State}.

-spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnResId, _ConnState) ->
Res = emqx_resource_pool:stop(ConnResId),
?tp(azure_blob_storage_stop, #{instance_id => ConnResId}),
Res.
on_stop(_ConnResId, _ConnState) ->
?tp(azure_blob_storage_stop, #{instance_id => _ConnResId}),
ok.

-spec on_get_status(connector_resource_id(), connector_state()) ->
?status_connected | ?status_disconnected.
on_get_status(ConnResId, _ConnState) ->
health_check(ConnResId).
on_get_status(_ConnResId, _ConnState = #{driver_state := DriverState}) ->
health_check(DriverState).

-spec on_add_channel(
connector_resource_id(),
Expand Down Expand Up @@ -236,22 +218,22 @@ on_get_channels(ConnResId) ->
) ->
?status_connected | ?status_disconnected.
on_get_channel_status(
ConnResId,
_ConnResId,
ActionResId,
_ConnectorState = #{installed_actions := InstalledActions}
ConnectorState = #{installed_actions := InstalledActions}
) when is_map_key(ActionResId, InstalledActions) ->
#{ActionResId := ActionConfig} = InstalledActions,
channel_status(ActionConfig, ConnResId);
channel_status(ActionConfig, ConnectorState);
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
?status_disconnected.

-spec on_query(connector_resource_id(), query(), connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) ->
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) ->
case maps:get(Tag, InstalledActions, undefined) of
ChannelState = #{mode := direct} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}),
run_direct_transfer(Data, ConnResId, Tag, ChannelState);
run_direct_transfer(Data, ConnResId, Tag, ChannelState, ConnState);
ChannelState = #{mode := aggregated} ->
?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}),
run_aggregated_transfer([Data], ChannelState);
Expand All @@ -271,67 +253,27 @@ on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := Install
end.

%%------------------------------------------------------------------------------
%% `ecpool_worker' API
%% Driver calls
%%------------------------------------------------------------------------------

connect(Opts0) ->
#{
account_name := AccountName,
account_key := AccountKey,
endpoint := Endpoint
} = maps:from_list(Opts0),
erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}).

do_create_append_blob(Worker, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity).

create_block_blob(Pool, Container, Blob) ->
ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover).

do_create_block_blob(Worker, Container, Blob) ->
do_create_block_blob(DriverState, Container, Blob) ->
%% TODO: check container type before setting content type
Opts = [{content_type, "text/csv"}],
erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity).

append_data(Pool, Container, Blob, BlockId, IOData) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover
).
erlazure:put_block_blob(DriverState, Container, Blob, <<>>, Opts).

do_append_data(Worker, Container, Blob, BlockId, IOData) ->
erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity).
do_append_data(DriverState, Container, Blob, BlockId, IOData) ->
erlazure:put_block(DriverState, Container, Blob, BlockId, IOData, []).

put_block_list(Pool, Container, Blob, BlockRefs) ->
ecpool:pick_and_do(
Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover
).

do_put_block_list(Worker, Container, Blob, BlockRefs) ->
do_put_block_list(DriverState, Container, Blob, BlockRefs) ->
%% TODO: check container type before setting content type
Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}],
erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity).

put_block_blob(Pool, Container, Blob, IOData) ->
ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover).

do_put_block_blob(Worker, Container, Blob, IOData) ->
erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity).

do_health_check(Worker) ->
case erlazure:list_containers(Worker, [], infinity) of
{error, _} ->
error;
{L, _} when is_list(L) ->
ok
end.
erlazure:put_block_list(DriverState, Container, Blob, BlockRefs, Opts).

list_blobs(Pool, Container) ->
ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover).
do_put_block_blob(DriverState, Container, Blob, IOData) ->
erlazure:put_block_blob(DriverState, Container, Blob, IOData, []).

do_list_blobs(Worker, Container) ->
case erlazure:list_blobs(Worker, Container, [], infinity) of
do_list_blobs(DriverState, Container) ->
case erlazure:list_blobs(DriverState, Container, []) of
{error, _} ->
error;
{L, _} when is_list(L) ->
Expand All @@ -352,7 +294,7 @@ init_transfer_state(Buffer, Opts) ->
container := Container,
max_block_size := MaxBlockSize,
min_block_size := MinBlockSize,
pool := Pool
driver_state := DriverState
}
} = Opts,
Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate),
Expand All @@ -365,7 +307,7 @@ init_transfer_state(Buffer, Opts) ->
min_block_size => MinBlockSize,
next_block => queue:new(),
num_blocks => 0,
pool => Pool,
driver_state => DriverState,
started => false
}.

Expand Down Expand Up @@ -401,14 +343,14 @@ process_append(IOData, TransferState0) ->
{ok, transfer_state()} | {error, term()}.
process_write(TransferState0 = #{started := false}) ->
#{
pool := Pool,
driver_state := DriverState,
blob := Blob,
container := Container
} = TransferState0,
%% TODO
%% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob'
%% limit, we could upload the whole thing here.
case create_block_blob(Pool, Container, Blob) of
case do_create_block_blob(DriverState, Container, Blob) of
{ok, _} ->
TransferState = TransferState0#{started := true},
process_write(TransferState);
Expand All @@ -432,9 +374,9 @@ do_process_write(IOData, TransferState0 = #{started := true}) ->
blob := Blob,
container := Container,
num_blocks := NumBlocks,
pool := Pool
driver_state := DriverState
} = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
case do_append_data(DriverState, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} ->
TransferState = TransferState0#{num_blocks := NumBlocks + 1},
process_write(TransferState);
Expand All @@ -451,7 +393,7 @@ process_complete(TransferState) ->
buffer_size := BufferSize,
container := Container,
num_blocks := NumBlocks0,
pool := Pool
driver_state := DriverState
} = TransferState,
%% Flush any left-over data
NumBlocks =
Expand All @@ -463,7 +405,7 @@ process_complete(TransferState) ->
NumBlocks0
end,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of
case do_put_block_list(DriverState, Container, Blob, BlockRefs) of
{ok, _} ->
{ok, #{num_blocks => NumBlocks}};
{error, Reason} ->
Expand Down Expand Up @@ -524,7 +466,7 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) ->
max_block_size => MaxBlockSize
};
install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) ->
#{pool_name := Pool} = ConnState,
#{driver_state := DriverState} = ConnState,
#{
bridge_name := Name,
parameters := #{
Expand Down Expand Up @@ -554,7 +496,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
container => ContainerName,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
pool => Pool
driver_state => DriverState
},
DeliveryOpts = #{
callback_module => ?MODULE,
Expand Down Expand Up @@ -584,7 +526,8 @@ stop_action(#{on_stop := {M, F, A}}) ->
stop_action(_) ->
ok.

run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
run_direct_transfer(Data, ConnResId, ActionResId, ActionState, ConnState) ->
#{driver_state := DriverState} = ConnState,
#{
container := ContainerTemplate,
blob := BlobTemplate,
Expand All @@ -608,7 +551,7 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) ->
false ->
ok
end,
case put_block_blob(ConnResId, Container, Blob, Content) of
case do_put_block_blob(DriverState, Container, Blob, Content) of
{ok, created} ->
?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}),
ok;
Expand Down Expand Up @@ -681,40 +624,26 @@ render_content(Template, Data) ->
iolist_to_string(IOList) ->
unicode:characters_to_list(IOList).

channel_status(#{mode := direct}, _ConnResId) ->
channel_status(#{mode := direct}, _ConnState) ->
%% There's nothing in particular to check for in this mode; the connector health check
%% already verifies that we're able to use the client to list containers.
?status_connected;
channel_status(#{mode := aggregated} = ActionState, ConnResId) ->
channel_status(#{mode := aggregated} = ActionState, ConnState) ->
#{driver_state := DriverState} = ConnState,
#{container := Container, aggreg_id := AggregId} = ActionState,
%% NOTE: This will effectively trigger uploads of buffers yet to be uploaded.
Timestamp = erlang:system_time(second),
ok = emqx_connector_aggregator:tick(AggregId, Timestamp),
ok = check_container_accessible(ConnResId, Container),
ok = check_container_accessible(DriverState, Container),
ok = check_aggreg_upload_errors(AggregId),
?status_connected.

health_check(ConnResId) ->
case
emqx_resource_pool:health_check_workers(
ConnResId,
fun ?MODULE:do_health_check/1,
emqx_resource_pool:health_check_timeout(),
#{return_values => true}
)
of
{ok, []} ->
?status_disconnected;
{ok, Values} ->
AllOk = lists:all(fun(S) -> S =:= ok end, Values),
case AllOk of
true ->
?status_connected;
false ->
?status_disconnected
end;
health_check(DriverState) ->
case erlazure:list_containers(DriverState, []) of
{error, _} ->
?status_disconnected
?status_disconnected;
{L, _} when is_list(L) ->
?status_connected
end.

map_error({failed_connect, _} = Reason) ->
Expand All @@ -734,8 +663,8 @@ check_aggreg_upload_errors(AggregId) ->
ok
end.

check_container_accessible(Pool, Container) ->
list_blobs(Pool, Container).
check_container_accessible(DriverState, Container) ->
do_list_blobs(DriverState, Container).

block_id(N) ->
NumDigits = 32,
Expand Down
Loading

0 comments on commit 4f486a9

Please sign in to comment.