From 9be3111fb5523ee2530cd76c1f29002c45652634 Mon Sep 17 00:00:00 2001 From: Ovidiu Dobroiu Date: Sat, 10 Jan 2015 17:35:19 +0200 Subject: [PATCH] protocol v2 --- Makefile | 3 + README.md | 13 +- doc/README.md | 3 +- doc/seestar_batch.md | 90 ++++++++++++ doc/seestar_cqltypes.md | 2 +- doc/seestar_error.md | 44 ++---- doc/seestar_event.md | 24 +--- doc/seestar_password_auth.md | 36 +++++ doc/seestar_result.md | 72 +++++----- doc/seestar_session.md | 123 +++++++++++++--- include/seestar.hrl | 6 + src/seestar_batch.erl | 31 ++++ src/seestar_cqltypes.erl | 30 +++- src/seestar_frame.erl | 7 +- src/seestar_messages.erl | 222 ++++++++++++++++++++-------- src/seestar_messages.hrl | 93 +++++++++--- src/seestar_password_auth.erl | 35 +++++ src/seestar_result.erl | 35 +++-- src/seestar_session.erl | 254 +++++++++++++++++++++++++-------- src/seestar_types.erl | 24 +++- test/seestar_auth_tests.erl | 51 +++++++ test/seestar_session_tests.erl | 238 +++++++++++++++++++++++++----- test/test_utils.erl | 35 +++++ 23 files changed, 1159 insertions(+), 312 deletions(-) create mode 100644 doc/seestar_batch.md create mode 100644 doc/seestar_password_auth.md create mode 100644 include/seestar.hrl create mode 100644 src/seestar_batch.erl create mode 100644 src/seestar_password_auth.erl create mode 100644 test/seestar_auth_tests.erl create mode 100644 test/test_utils.erl diff --git a/Makefile b/Makefile index 936883e..26c2821 100644 --- a/Makefile +++ b/Makefile @@ -35,3 +35,6 @@ xref: test: @./rebar skip_deps=true eunit + +quick_test: + @./rebar skip_deps=true eunit suites=seestar_session_tests diff --git a/README.md b/README.md index 3e4b94e..42e3b5a 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,11 @@ where CASSANDRA_DIR is the path to the compiled Cassandra directory. - - - - - -
seestar
seestar_cqltypes
seestar_error
seestar_event
seestar_result
seestar_session
+seestar +seestar_batch +seestar_cqltypes +seestar_error +seestar_event +seestar_password_auth +seestar_session diff --git a/doc/README.md b/doc/README.md index ecbcc04..fd85a75 100644 --- a/doc/README.md +++ b/doc/README.md @@ -32,9 +32,10 @@ where CASSANDRA_DIR is the path to the compiled Cassandra directory. + - +
seestar
seestar_batch
seestar_cqltypes
seestar_error
seestar_event
seestar_result
seestar_password_auth
seestar_session
diff --git a/doc/seestar_batch.md b/doc/seestar_batch.md new file mode 100644 index 0000000..cc1dd9e --- /dev/null +++ b/doc/seestar_batch.md @@ -0,0 +1,90 @@ + + +# Module seestar_batch # +* [Data Types](#types) +* [Function Index](#index) +* [Function Details](#functions) + + + + + +## Data Types ## + + + + +### batch_query() ### + + + +

+batch_query() = #batch_query{}
+
+ + + + + +### batch_request() ### + + + +

+batch_request() = #batch{}
+
+ + + + +## Function Index ## + + +
batch_request/3Return a batch request that can be sent to cassandra +Use seestar_session module to send the request.
normal_query/2Return a normal query that can be added to batch request.
prepared_query/2Return a prepared query that can be added to batch request.
+ + + + +## Function Details ## + + + +### batch_request/3 ### + + +

+batch_request(Type::logged | unlogged | counter, Consistency::one | atom(), Queries::[batch_query()]) -> batch_request()
+
+
+ +Return a batch request that can be sent to cassandra +Use [`seestar_session`](seestar_session.md) module to send the request + +__See also:__ [seestar_session:batch/2](seestar_session.md#batch-2), [seestar_session:batch_async/2](seestar_session.md#batch_async-2). + + +### normal_query/2 ### + + +

+normal_query(Query::binary(), Values::[seestar_cqltypes:value()]) -> batch_query()
+
+
+ +Return a normal query that can be added to batch request + +__See also:__ [batch_request/3](#batch_request-3). + + +### prepared_query/2 ### + + +

+prepared_query(Prepared_query::seestar_result:prepared_query(), Values::[seestar_cqltypes:value()]) -> batch_query()
+
+
+ +Return a prepared query that can be added to batch request + +__See also:__ [batch_request/3](#batch_request-3). diff --git a/doc/seestar_cqltypes.md b/doc/seestar_cqltypes.md index 8d90d02..7a8749a 100644 --- a/doc/seestar_cqltypes.md +++ b/doc/seestar_cqltypes.md @@ -53,7 +53,7 @@ type() = native() | {list | set, inet:ip_address() | decimal() | list() | dict() | set() +value() = null | integer() | binary() | boolean() | float() | inet:ip_address() | decimal() | list() | dict_t() | set_t() diff --git a/doc/seestar_error.md b/doc/seestar_error.md index a0b80b7..dccede4 100644 --- a/doc/seestar_error.md +++ b/doc/seestar_error.md @@ -51,9 +51,7 @@ write_type() = simple | batch | unlogged_batch | counter | batch_log

 alive(Error::error()) -> integer()
 
- -

- +
@@ -64,9 +62,7 @@ alive(Error::error()) -> integer()

 code(Error::error()) -> integer()
 
- -

- +
@@ -77,9 +73,7 @@ code(Error::error()) -> integer()

 consistency(Error::error()) -> seestar:consistency()
 
- -

- +
@@ -90,9 +84,7 @@ consistency(Error::error()) -> error()) -> boolean() - -

- +
@@ -103,9 +95,7 @@ data_present(Error::error()) -> boolean()

 keyspace(Error::error()) -> binary()
 
- -

- +
@@ -116,9 +106,7 @@ keyspace(Error::error()) -> binary()

 message(Error::error()) -> binary()
 
- -

- +
@@ -129,9 +117,7 @@ message(Error::error()) -> binary()

 query_id(Error::error()) -> binary()
 
- -

- +
@@ -142,9 +128,7 @@ query_id(Error::error()) -> binary()

 received(Error::error()) -> integer()
 
- -

- +
@@ -155,9 +139,7 @@ received(Error::error()) -> integer()

 required(Error::error()) -> integer()
 
- -

- +
@@ -168,9 +150,7 @@ required(Error::error()) -> integer()

 table(Error::error()) -> binary() | undefined
 
- -

- +
@@ -181,8 +161,6 @@ table(Error::error()) -> binary() | undefined

 write_type(Error::error()) -> write_type()
 
- -

- +
diff --git a/doc/seestar_event.md b/doc/seestar_event.md index b19f146..b9051f7 100644 --- a/doc/seestar_event.md +++ b/doc/seestar_event.md @@ -123,9 +123,7 @@ type() = topology_change | status_change | schema_change

 change(Event::event()) -> topology_change() | status_change() | schema_change()
 
- -

- +
@@ -136,9 +134,7 @@ change(Event::event()) -> topology_change_event() | status_change_event()) -> inet:ip_address() - -

- +
@@ -149,9 +145,7 @@ ip(Event::topology_change_event() | keyspace(Event::schema_change_event()) -> binary() - -

- +
@@ -162,9 +156,7 @@ keyspace(Event::schema_change_event()) -

 port(Event::topology_change_event() | status_change_event()) -> inet:port_number()
 
- -

- +
@@ -175,9 +167,7 @@ port(Event::topology_change_event() |

 table(Event::schema_change_event()) -> binary() | undefined
 
- -

- +
@@ -188,8 +178,6 @@ table(Event::schema_change_event()) ->

 type(Event::event()) -> type()
 
- -

- +
diff --git a/doc/seestar_password_auth.md b/doc/seestar_password_auth.md new file mode 100644 index 0000000..b78263b --- /dev/null +++ b/doc/seestar_password_auth.md @@ -0,0 +1,36 @@ + + +# Module seestar_password_auth # +* [Function Index](#index) +* [Function Details](#functions) + + + + +## Function Index ## + + +
perform_auth/2 +Method that needs to be implemented by any password authentication module +The sendFun accepts a #auth_response{} and returns #error{} | #auth_challenge{} | #auth_success{} +{Username, Password} is the data passed to the seestar_session module.
+ + + + +## Function Details ## + + + +### perform_auth/2 ### + + +

+perform_auth(SendFun::function(), X2::{Username::binary(), Password::binary()}) -> Success::boolean()
+
+
+ + +Method that needs to be implemented by any password authentication module +The sendFun accepts a #auth_response{} and returns #error{} | #auth_challenge{} | #auth_success{} +{Username, Password} is the data passed to the seestar_session module diff --git a/doc/seestar_result.md b/doc/seestar_result.md index 3f69b03..a250b6c 100644 --- a/doc/seestar_result.md +++ b/doc/seestar_result.md @@ -26,6 +26,18 @@ change() = created | updated | dropped +### prepared_query() ### + + + +

+prepared_query() = #prepared_query{}
+
+ + + + + ### prepared_result() ### @@ -96,7 +108,7 @@ type() = void | rows | set_keyspace | prepared | schema_change ## Function Index ## -
change/1
keyspace/1
names/1
query_id/1
rows/1
table/1
type/1
type/2
types/1
+
change/1
has_more_rows/1
keyspace/1
names/1
prepared_query/1
rows/1
table/1
type/1
type/2
@@ -111,9 +123,18 @@ type() = void | rows | set_keyspace | prepared | schema_change

 change(Result::schema_change_result()) -> change()
 
+
-

+ + +### has_more_rows/1 ### + + +

+has_more_rows(Result::rows_result()) -> boolean()
+
+
@@ -124,9 +145,7 @@ change(Result::schema_change_result())

 keyspace(Result::set_keyspace_result() | schema_change_result()) -> binary()
 
- -

- +
@@ -137,22 +156,18 @@ keyspace(Result::set_keyspace_result() |

 names(Result::rows_result() | prepared_result()) -> [binary()]
 
- -

- +
- + -### query_id/1 ### +### prepared_query/1 ###

-query_id(Result::prepared_result()) -> binary()
+prepared_query(Result::prepared_result()) -> prepared_query()
 
- -

- +
@@ -163,9 +178,7 @@ query_id(Result::prepared_result()) -> bi

 rows(Rows::rows_result()) -> [[seestar_cqltypes:value()]]
 
- -

- +
@@ -176,9 +189,7 @@ rows(Rows::rows_result()) -> [[schema_change_result()) -> binary() | undefined - -

- +
@@ -189,9 +200,7 @@ table(Result::schema_change_result()) -

 type(Result::result()) -> type()
 
- -

- +
@@ -202,21 +211,6 @@ type(Result::result()) -> typ

 type(Result::rows_result() | prepared_result(), Name::binary()) -> seestar_cqltypes:type()
 
- -

- - - - - -### types/1 ### - - -

-types(Result::rows_result() | prepared_result()) -> [seestar_cqltypes:type()]
-
- -

- +
diff --git a/doc/seestar_session.md b/doc/seestar_session.md index 36da19e..c43b9d6 100644 --- a/doc/seestar_session.md +++ b/doc/seestar_session.md @@ -71,38 +71,58 @@ query() = binary() | string() + +## Function Index ## -### query_id() ### +
batch/2Synchronously execute a batch query +Use seestar_batch module functions to create the request.
batch_async/2Asynchronously execute a batch query +Use seestar_batch module functions to create the request.
execute/3
execute/4
execute/5Synchronously execute a prepared query using the specified consistency level.
execute_async/3
execute_async/4
execute_async/5Asynchronously execute a prepared query using the specified consistency level.
next_page/2Synchronously returns the next page for a previous paginated result.
next_page_async/2Asynchronously returns the next page for a previous paginated result.
perform/3
perform/4
perform/5Synchoronously perform a CQL query using the specified consistency level.
perform_async/3
perform_async/4
perform_async/5Asynchronously perform a CQL query using the specified consistency level.
prepare/2Prepare a query for later execution.
start_link/2Equivalent to start_link(Host, Post, []).
start_link/3Equivalent to start_link(Host, Post, ClientOptions, []).
start_link/4 +Starts a new connection to a cassandra node on Host:Port.
stop/1Stop the client.
+ -

-query_id() = binary()
-
+## Function Details ## + - +### batch/2 ### -## Function Index ## +`batch(Client, Req) -> any()` +Synchronously execute a batch query +Use [`seestar_batch`](seestar_batch.md) module functions to create the request. + -
execute/5Synchronously execute a prepared query using the specified consistency level.
execute_async/5
perform/3Synchoronously perform a CQL query using the specified consistency level.
perform_async/3Asynchronously perform a CQL query using the specified consistency level.
prepare/2Prepare a query for later execution.
start_link/2Equivalent to start_link(Host, Post, []).
start_link/3Equivalent to start_link(Host, Post, ClientOptions, []).
start_link/4 -Starts a new connection to a cassandra node on Host:Port.
stop/1Stop the client.
+### batch_async/2 ### +`batch_async(Client, Req) -> any()` - +Asynchronously execute a batch query +Use [`seestar_batch`](seestar_batch.md) module functions to create the request. + -## Function Details ## +### execute/3 ### + +`execute(Client, Query, Consistency) -> any()` + +__See also:__ [execute/5](#execute-5). + +### execute/4 ### + +`execute(Client, Query, Consistency, PageSize) -> any()` + +__See also:__ [execute/5](#execute-5). ### execute/5 ###

-execute(Client::pid(), QueryID::query_id(), Types::[seestar_cqltypes:type()], Values::[seestar_cqltypes:value()], Consistency::seestar:consistency()) -> {ok, Result::seestar_result:result()} | {error, Error::seestar_error:error()}
+execute(Client::pid(), Prepared_query::seestar_result:prepared_query(), Values::[seestar_cqltypes:value()], Consistency::seestar:consistency(), PageSize::non_neg_integer() | undefined) -> {ok, Result::seestar_result:result()} | {error, Error::seestar_error:error()}
 

@@ -110,22 +130,67 @@ Synchronously execute a prepared query using the specified consistency level. Use [`seestar_result`](seestar_result.md) module functions to work with the result. __See also:__ [perform/3](#perform-3), [prepare/2](#prepare-2). + + +### execute_async/3 ### + +`execute_async(Client, Query, Consistency) -> any()` + +__See also:__ [execute_async/5](#execute_async-5). + + +### execute_async/4 ### + +`execute_async(Client, Query, Consistency, PageSize) -> any()` + +__See also:__ [execute_async/5](#execute_async-5). ### execute_async/5 ### -`execute_async(Client, QueryID, Types, Values, Consistency) -> any()` +

+execute_async(Client::pid(), Prepared_query::seestar_result:prepared_query(), Values::[seestar_cqltypes:value()], Consistency::seestar:consistency(), PageSize::non_neg_integer() | undefined) -> {ok, Result::seestar_result:result()} | {error, Error::seestar_error:error()}
+
+
+ +Asynchronously execute a prepared query using the specified consistency level. +Use [`seestar_result`](seestar_result.md) module functions to work with the result. + +__See also:__ [prepare/2](#prepare-2). + +### next_page/2 ### + +`next_page(Client, Rows) -> any()` + +Synchronously returns the next page for a previous paginated result + + +### next_page_async/2 ### + +`next_page_async(Client, Rows) -> any()` + +Asynchronously returns the next page for a previous paginated result ### perform/3 ### +`perform(Client, Query, Consistency) -> any()` -

-perform(Client::pid(), Query::query(), Consistency::seestar:consistency()) -> {ok, Result::seestar_result:result()} | {error, Error::seestar_error:error()}
-
-
+__See also:__ [perform/5](#perform-5). + + +### perform/4 ### + +`perform(Client, Query, Consistency, Values) -> any()` + +__See also:__ [perform/5](#perform-5). + + +### perform/5 ### + +`perform(Client, Query, Consistency, Values, PageSize) -> any()` Synchoronously perform a CQL query using the specified consistency level. Returns a result of an appropriate type (void, rows, set_keyspace, schema_change). @@ -136,7 +201,29 @@ Use [`seestar_result`](seestar_result.md) module functions to work with the resu

-perform_async(Client::pid(), Query::query(), Consistency::seestar:consistency()) -> ok
+perform_async(Client::pid(), Query::query(), Consistency::seestar:consistency()) -> any()
+
+
+ +__See also:__ [perform_async/5](#perform_async-5). + + +### perform_async/4 ### + + +

+perform_async(Client::pid(), Query::query(), Consistency::seestar:consistency(), Values::[seestar_cqltypes:value()] | non_neg_integer()) -> any()
+
+
+ +__See also:__ [perform_async/5](#perform_async-5). + + +### perform_async/5 ### + + +

+perform_async(Client::pid(), Query::query(), Consistency::seestar:consistency(), Values::[seestar_cqltypes:value()], PageSize::undefined | non_neg_integer()) -> any()
 

@@ -152,7 +239,7 @@ prepare(Client::pid(), Query::query()) -> {ok, Resu
Prepare a query for later execution. The response will contain the prepared -query id and column metadata for all the variables (if any). +query that you will need to pass to the execute methods __See also:__ [execute/3](#execute-3), [execute/4](#execute-4). diff --git a/include/seestar.hrl b/include/seestar.hrl new file mode 100644 index 0000000..c2afff8 --- /dev/null +++ b/include/seestar.hrl @@ -0,0 +1,6 @@ +-record(prepared_query, +{ + id :: binary(), + request_types :: [seestar_cqltypes:type()], + cached_result_meta :: seestar_messages:metadata() +}). diff --git a/src/seestar_batch.erl b/src/seestar_batch.erl new file mode 100644 index 0000000..a46270f --- /dev/null +++ b/src/seestar_batch.erl @@ -0,0 +1,31 @@ +-module(seestar_batch). + +%% API +-export([prepared_query/2, normal_query/2, batch_request/3]). + +-include("seestar.hrl"). +-include("seestar_messages.hrl"). + +-type batch_query() :: #batch_query{}. +-type batch_request() :: #batch{}. +-export_type([batch_query/0, batch_request/0]). + +%% @doc Return a prepared query that can be added to batch request +%% @see batch_request/3. +-spec prepared_query(seestar_result:prepared_query(), [seestar_cqltypes:value()]) -> batch_query(). +prepared_query(#prepared_query{id = ID, request_types = Types}, Values) -> + #batch_query{kind = prepared, string_or_id = ID, values = #query_values{values = Values, types = Types}}. + +%% @doc Return a normal query that can be added to batch request +%% @see batch_request/3. +-spec normal_query(binary(), list(seestar_cqltypes:value())) -> batch_query(). +normal_query(Query, Values) -> + #batch_query{kind = not_prepared, string_or_id = Query, values = #query_values{values = Values}}. + +%% @doc Return a batch request that can be sent to cassandra +%% Use {@link seestar_session} module to send the request +%% @see seestar_session:batch/2. +%% @see seestar_session:batch_async/2. +-spec batch_request(logged | unlogged | counter, one | atom(), list(batch_query())) -> batch_request(). +batch_request(Type, Consistency, Queries) -> + #batch{type = Type, consistency = Consistency, queries = Queries}. diff --git a/src/seestar_cqltypes.erl b/src/seestar_cqltypes.erl index 8d3278d..ac85b1b 100644 --- a/src/seestar_cqltypes.erl +++ b/src/seestar_cqltypes.erl @@ -16,7 +16,11 @@ -include("builtin_types.hrl"). --export([decode_value_with_size/2, encode_value_with_size/2, decode_type/1]). +-export([ + decode_value_with_size/2, + decode_type/1, + encode_value_with_size/2, + encode_value_with_size/1]). -type type() :: native() | {list | set, native()} | {map, native(), native()} | {custom, string()}. -type native() :: ascii | bigint | blob | boolean | counter | decimal | double | @@ -137,6 +141,30 @@ decode_set(Type, Size, Data, Set) -> %% encoding values %% ------------------------------------------------------------------------- +%% TODO -> Think more about this +%% Try to encode the value. This is used for passing values to queries that +%% are not prepared +%% Two possible solutions: if a tuple is provided, then the type from the tuple +%% is used; otherwise the type is going to be guessed. TODO better guesses +%% Java driver also guesses the type +%% Third solution would be to get the types from cassandra, eg describe schema when +%% connects, schema change events... etc.. + +%% @private +-spec encode_value_with_size({type(), value()} | value()) -> binary(). +encode_value_with_size({Type, Value}) -> + encode_value_with_size(Type, Value); +encode_value_with_size(Value) when is_boolean(Value) -> + encode_value_with_size(boolean, Value); +encode_value_with_size(Value) when is_integer(Value), (Value > -2147483648), (Value < 2147483647) -> + encode_value_with_size(int, Value); +encode_value_with_size(Value) when is_integer(Value) -> + encode_value_with_size(bigint, Value); +encode_value_with_size(Value) when is_binary(Value) -> + encode_value_with_size(varchar, Value); +encode_value_with_size(Value)-> + encode_value_with_size(undefined, Value). + %% @private -spec encode_value_with_size(type(), value()) -> binary(). encode_value_with_size(_, null) -> diff --git a/src/seestar_frame.erl b/src/seestar_frame.erl index 55220fa..0a1905a 100644 --- a/src/seestar_frame.erl +++ b/src/seestar_frame.erl @@ -22,7 +22,6 @@ -type flag() :: compression | tracing. -type opcode() :: 16#00..16#0C. -export_type([stream_id/0, flag/0, opcode/0, frame/0]). - -define(COMPRESSION, 16#01). -define(TRACING, 16#02). @@ -62,7 +61,7 @@ body(Frame) -> -spec encode(frame()) -> binary(). encode(#frame{id = ID, flags = Flags, opcode = Op, body = Body}) -> - <<16#01, (encode_flags(Flags)), ID/signed, Op, (size(Body)):32, Body/binary>>. + <<16#02, (encode_flags(Flags)), ID/signed, Op, (size(Body)):32, Body/binary>>. encode_flags(Flags) -> lists:foldl(fun(Flag, Byte) -> encode_flag(Flag) bor Byte end, 0, Flags). @@ -71,7 +70,7 @@ encode_flag(compression) -> ?COMPRESSION; encode_flag(tracing) -> ?TRACING. -spec pending_size(binary()) -> pos_integer(). -pending_size(<<16#81, _Flags, _ID/signed, _Op, Size:32, _/binary>>) -> +pending_size(<<16#82, _Flags, _ID/signed, _Op, Size:32, _/binary>>) -> Size + 8; pending_size(_) -> undefined. @@ -79,7 +78,7 @@ pending_size(_) -> -spec decode(binary()) -> {[frame()], binary()}. decode(Stream) -> decode(Stream, []). -decode(<<16#81, Flags, ID/signed, Op, Size:32, Body:Size/binary, Rest/binary>>, Acc) -> +decode(<<16#82, Flags, ID/signed, Op, Size:32, Body:Size/binary, Rest/binary>>, Acc) -> Frame = #frame{id = ID, flags = decode_flags(Flags), opcode = Op, body = Body}, decode(Rest, [Frame|Acc]); decode(Stream, Acc) -> diff --git a/src/seestar_messages.erl b/src/seestar_messages.erl index 89e8b57..037bb21 100644 --- a/src/seestar_messages.erl +++ b/src/seestar_messages.erl @@ -15,30 +15,13 @@ %%% @private -module(seestar_messages). --export([encode/1, decode/2]). +-export([encode/1, decode/3]). -include("constants.hrl"). -include("seestar_messages.hrl"). -%% requests. --define(STARTUP, 16#01). --define(CREDENTIALS, 16#04). --define(OPTIONS, 16#05). --define(QUERY, 16#07). --define(PREPARE, 16#09). --define(EXECUTE, 16#0A). --define(REGISTER, 16#0B). -%% responses. --define(ERROR, 16#00). --define(READY, 16#02). --define(AUTHENTICATE, 16#03). --define(SUPPORTED, 16#06). --define(RESULT, 16#08). -%% event. --define(EVENT, 16#0C). - -type outgoing() :: #startup{} - | #credentials{} + | #auth_response{} | #options{} | #'query'{} | #prepare{} @@ -50,7 +33,9 @@ | #authenticate{} | #supported{} | #result{} - | #event{}. + | #event{} + | #auth_success{} + | #auth_challenge{}. -define(VERSION, <<"CQL_VERSION">>). -define(COMPRESSION, <<"COMPRESSION">>). @@ -70,26 +55,34 @@ encode(#startup{version = Version, compression = Compression}) -> end, {?STARTUP, seestar_types:encode_string_map(KVPairs)}; -encode(#credentials{credentials = KVPairs}) -> - {?CREDENTIALS, seestar_types:encode_string_map(KVPairs)}; +encode(#auth_response{body = Body}) -> + {?AUTH_RESPONSE, Body}; encode(#options{}) -> {?OPTIONS, <<>>}; -encode(#'query'{'query' = Query, consistency = Consistency}) -> - {?QUERY, <<(seestar_types:encode_long_string(Query))/binary, - (seestar_types:encode_consistency(Consistency))/binary>>}; +encode(#'query'{'query' = Query, params = QueryParams}) -> + {?QUERY, << + (seestar_types:encode_long_string(Query))/binary, + (encode_query_flags(QueryParams))/binary + >>}; encode(#prepare{'query' = Query}) -> {?PREPARE, seestar_types:encode_long_string(Query)}; -encode(#execute{id = ID, types = Types, values = Values, consistency = Consistency}) -> - Variables = [ seestar_cqltypes:encode_value_with_size(Type, Value) || - {Type, Value} <- lists:zip(Types, Values) ], - {?EXECUTE, list_to_binary([seestar_types:encode_short_bytes(ID), - seestar_types:encode_short(length(Variables)), - Variables, - seestar_types:encode_consistency(Consistency)])}; +encode(#execute{id = ID, params = QueryParams}) -> + {?EXECUTE, << + (seestar_types:encode_short_bytes(ID))/binary, + (encode_query_flags(QueryParams))/binary + >>}; + +encode(#batch{type = BatchType, consistency = Consistency, queries = QueriesList}) -> + {?BATCH, << + (seestar_types:encode_batch_type(BatchType))/binary, + (seestar_types:encode_short(length(QueriesList)))/binary, + (encode_batch_queries(QueriesList))/binary, + (seestar_types:encode_consistency(Consistency))/binary + >>}; encode(#register{event_types = Types}) -> % assert validity of event types. @@ -102,8 +95,8 @@ encode(#register{event_types = Types}) -> %% decoding functions %% ------------------------------------------------------------------------- --spec decode(seestar_frame:opcode(), binary()) -> incoming(). -decode(?ERROR, Body) -> +-spec decode(seestar_frame:opcode(), binary(), any()) -> incoming(). +decode(?ERROR, Body, _CachedDecodeData) -> {Code, Rest0} = seestar_types:decode_int(Body), {Message, Rest1} = seestar_types:decode_string(Rest0), #error{code = Code, @@ -117,27 +110,31 @@ decode(?ERROR, Body) -> _ -> undefined end}; -decode(?READY, _Body) -> +decode(?READY, _Body, _CachedDecodeData) -> #ready{}; -decode(?AUTHENTICATE, Body) -> +decode(?AUTHENTICATE, Body, _CachedDecodeData) -> {Class, _} = seestar_types:decode_string(Body), #authenticate{class = Class}; -decode(?SUPPORTED, Body) -> +decode(?SUPPORTED, Body, _CachedDecodeData) -> {KVPairs, _} = seestar_types:decode_string_multimap(Body), #supported{versions = proplists:get_value(?VERSION, KVPairs), compression = proplists:get_value(?COMPRESSION, KVPairs)}; -decode(?RESULT, Body) -> +decode(?RESULT, Body, CachedDecodeData) -> {Kind, Rest} = seestar_types:decode_int(Body), #result{result = case Kind of 16#01 -> void; - 16#02 -> decode_rows(Rest); + 16#02 -> decode_rows(Rest, CachedDecodeData); 16#03 -> decode_set_keyspace(Rest); 16#04 -> decode_prepared(Rest); 16#05 -> decode_schema_change(Rest) - end}. + end}; +decode(?AUTH_SUCCESS, _Body, _CachedDecodeData) -> + #auth_success{}; +decode(?AUTH_CHALLENGE, Body, _CachedDecodeData) -> + #auth_challenge{body = Body}. %% ------------------------------------------------------------------------- %% error details @@ -185,37 +182,64 @@ decode_unprepared(Data) -> %% different result types %% ------------------------------------------------------------------------- -decode_rows(Body) -> +decode_rows(Body, undefined) -> + {Meta, Rest0} = decode_metadata(Body), + {Count, Rest1} = seestar_types:decode_int(Rest0), + MetaMetadataColumns = Meta#metadata.columns, + Rows = decode_rows(MetaMetadataColumns, Rest1, Count), + #rows{metadata = Meta, rows = Rows}; +decode_rows(Body, #metadata{columns = Columns}) -> {Meta, Rest0} = decode_metadata(Body), {Count, Rest1} = seestar_types:decode_int(Rest0), - #rows{metadata = Meta, rows = decode_rows(Meta, Rest1, Count)}. + #rows{metadata = Meta, rows = decode_rows(Columns, Rest1, Count)}. -decode_rows(Meta, Data, Count) -> - decode_rows(Meta, Data, Count, []). +decode_rows(Columns, Data, Count) -> + decode_rows(Columns, Data, Count, []). decode_rows(_, _, 0, Acc) -> lists:reverse(Acc); -decode_rows(Meta, Data, Count, Acc) -> - {Row, Rest} = decode_row(Meta, Data), - decode_rows(Meta, Rest, Count - 1, [Row|Acc]). +decode_rows(Columns, Data, Count, Acc) -> + {Row, Rest} = decode_row(Columns, Data), + decode_rows(Columns, Rest, Count - 1, [Row|Acc]). -decode_row(Meta, Data) -> - decode_row(Meta, Data, []). +decode_row(Columns, Data) -> + decode_row(Columns, Data, []). decode_row([], Data, Row) -> {lists:reverse(Row), Data}; -decode_row([#column{type = Type}|Meta], Data, Row) -> +decode_row([#column{type = Type}| Columns], Data, Row) -> {Value, Rest} = seestar_cqltypes:decode_value_with_size(Type, Data), - decode_row(Meta, Rest, [Value|Row]). + decode_row(Columns, Rest, [Value|Row]). decode_metadata(Data) -> {Flags, Rest0} = seestar_types:decode_int(Data), {Count, Rest1} = seestar_types:decode_int(Rest0), - {TableSpec, Rest2} = case Flags of - 16#00 -> {undefined, Rest1}; - 16#01 -> decode_table_spec(Rest1) - end, - decode_column_specs(TableSpec, Rest2, Count). + {HasMorePages, PagingState, Rest2} = decode_paging_state(<>, Rest1), + {Columns, Rest3} = maybe_decode_columns(<>, Count, Rest2), + {#metadata{has_more_results = HasMorePages, paging_state = PagingState, columns = Columns}, Rest3}. + +maybe_decode_columns(<<_Other:5, 1:1, _Any:2>>, _Count, Rest2) -> + {[], Rest2}; +maybe_decode_columns(<<_Other:5, 0:1, _Any:2>> = Flags, Count, Rest2) -> + {TableSpec, Rest3} = decode_table_spec(Flags, Rest2), + {Columns, Rest4} = decode_column_specs(TableSpec, Rest3, Count), + {Columns, Rest4}. + +decode_paging_state(<<_Other:6, 0:1, _Any:1>>, Rest1) -> + {false, undefined, Rest1}; +decode_paging_state(<<_Other:6, 1:1, _Any:1>>, Rest1) -> + {PagingState, Rest2} = seestar_types:decode_bytes(Rest1), + {true, PagingState, Rest2}. + +decode_table_spec(<<_Other:7, 0:1>>, Data) -> + {undefined, Data}; +decode_table_spec(<<_Other:7, 1:1>>, Data) -> + decode_table_spec(Data). + +decode_table_spec(Data) -> + {Keyspace, Rest0} = seestar_types:decode_string(Data), + {Table, Rest1} = seestar_types:decode_string(Rest0), + {{Keyspace, Table}, Rest1}. decode_column_specs(TableSpec, Data, Count) -> decode_column_specs(TableSpec, Data, Count, []). @@ -234,19 +258,15 @@ decode_column_spec({Keyspace, Table}, Data) -> {Type, Rest1} = seestar_cqltypes:decode_type(Rest0), {#column{keyspace = Keyspace, table = Table, name = Name, type = Type}, Rest1}. -decode_table_spec(Data) -> - {Keyspace, Rest0} = seestar_types:decode_string(Data), - {Table, Rest1} = seestar_types:decode_string(Rest0), - {{Keyspace, Table}, Rest1}. - decode_set_keyspace(Body) -> {Keyspace, _} = seestar_types:decode_string(Body), #set_keyspace{keyspace = Keyspace}. decode_prepared(Body) -> {ID, Rest} = seestar_types:decode_short_bytes(Body), - {Meta, _} = decode_metadata(Rest), - #prepared{id = ID, metadata = Meta}. + {RequestMetadata, Rest1} = decode_metadata(Rest), + {ResultMetadata, _Rest2} = decode_metadata(Rest1), + #prepared{id = ID, result_metadata = ResultMetadata, request_metadata = RequestMetadata}. decode_schema_change(Body) -> {Change, Rest} = seestar_types:decode_string(Body), @@ -257,3 +277,79 @@ decode_schema_change(Body) -> <<>> -> undefined; _ -> Table end}. + +%% ------------------------------------------------------------------------- +%% Internal +%% ------------------------------------------------------------------------- +encode_batch_queries(QueriesList) -> + << <<(encode_batch_query(Query))/binary>> || Query <- QueriesList >>. + +encode_batch_query(#batch_query{kind = prepared, string_or_id = ID, values = Values}) -> + {_Flag, EncodedValues} = values(Values), + << + (seestar_types:encode_byte(1))/binary, + (seestar_types:encode_short_bytes(ID))/binary, + EncodedValues/binary + >>; +encode_batch_query(#batch_query{kind = not_prepared, string_or_id = QueryString, values = Values}) -> + {_Flag, EncodedValues} = values(Values), + << + (seestar_types:encode_byte(0))/binary, + (seestar_types:encode_long_string(QueryString))/binary, + EncodedValues/binary + >>. + +encode_query_flags(QueryParams) -> + {ValueFlag, Values} = case values(QueryParams#query_params.values) of + {0, _Any} -> + {0, <<>>}; + {1, Vals} -> + {1, Vals} + end, + SkipMetadataFlag = skip_meta(QueryParams), + {PageSizeFlag, ResultPageSize} = page_size(QueryParams), + {PagingStateFlag, PagingState} = paging_state(QueryParams), + {SerialConsistencyFlag, SerialConsistency} = serial_consistency(QueryParams), + Flags = << 0:3, SerialConsistencyFlag:1, PagingStateFlag:1, PageSizeFlag:1, SkipMetadataFlag:1, ValueFlag:1 >>, + << + (seestar_types:encode_consistency(QueryParams#query_params.consistency))/binary, + Flags/binary, + Values/binary, ResultPageSize/binary, PagingState/binary, SerialConsistency/binary + >>. + +serial_consistency(#query_params{serial_consistency = serial}) -> + {0, <<>>}; + +serial_consistency(#query_params{serial_consistency = local_serial}) -> + {1, seestar_types:encode_consistency(local_serial)}. + +paging_state(#query_params{paging_state = undefined}) -> + {0, <<>>}; +paging_state(#query_params{paging_state = PagingState}) -> + {1, seestar_types:encode_bytes(PagingState)}. + +page_size(#query_params{page_size = undefined}) -> + {0, <<>>}; +page_size(#query_params{page_size = PageSize}) when is_integer(PageSize) -> + {1, seestar_types:encode_int(PageSize)}. + +skip_meta(#query_params{cached_result_meta = undefined}) -> + 0; +skip_meta(#query_params{cached_result_meta = Metadata}) when is_record(Metadata, metadata) -> + 1. + +values(#query_values{values = []})-> + {0, <<>>}; +values(#query_values{values = Values, types = Types}) when length(Types) == length(Values) -> + Variables = << <<(seestar_cqltypes:encode_value_with_size(Type, Value))/binary>> + || {Type, Value} <- lists:zip(Types, Values) >>, + {1, << + (seestar_types:encode_short(length(Values)))/binary , + Variables/binary>>}; +values(#query_values{values = Values, types = []}) when is_list(Values) -> + %% This happens in the case of the unprepared query. Types need to be 'guessed' + %% TODO -> Could be a little clearer on the whole process + Variables = << <<(seestar_cqltypes:encode_value_with_size(Value))/binary>> || Value <- Values >>, + {1, << + (seestar_types:encode_short(length(Values)))/binary , + Variables/binary>>}. \ No newline at end of file diff --git a/src/seestar_messages.hrl b/src/seestar_messages.hrl index 9725581..898f772 100644 --- a/src/seestar_messages.hrl +++ b/src/seestar_messages.hrl @@ -12,30 +12,83 @@ %%% See the License for the specific language governing permissions and %%% limitations under the License. +%% Op Codes +%% requests. +-define(STARTUP, 16#01). +-define(AUTH_RESPONSE, 16#0F). +-define(OPTIONS, 16#05). +-define(QUERY, 16#07). +-define(PREPARE, 16#09). +-define(EXECUTE, 16#0A). +-define(BATCH, 16#0D). +-define(REGISTER, 16#0B). +%% responses. +-define(ERROR, 16#00). +-define(READY, 16#02). +-define(AUTHENTICATE, 16#03). +-define(SUPPORTED, 16#06). +-define(RESULT, 16#08). +-define(AUTH_CHALLENGE, 16#0E). +-define(AUTH_SUCCESS, 16#10). + +%% event. +-define(EVENT, 16#0C). + +%% Used in requests and responses +-record(column, + {keyspace :: binary(), + table :: binary(), + name :: binary(), + type :: seestar_cqltypes:type()}). + +-record(metadata, + {has_more_results = false :: boolean(), + paging_state = undefined :: undefined | binary(), + columns :: [#column{}]}). + %% requests. -record(startup, {version = <<"3.0.0">> :: binary(), compression :: undefined | binary()}). --record(credentials, - {credentials :: [{binary(), binary()}]}). +-record(auth_response, + {body :: binary()}). -record(options, {}). +-record(query_values, + {types = [] :: [seestar_cqltypes:type()], + values = [] :: [seestar_cqltypes:value()]}). + +-record(query_params, + {consistency = one :: atom(), + values = #query_values{} :: #query_values{}, + cached_result_meta = undefined :: undefined | #metadata{}, + page_size = undefined :: undefined | non_neg_integer(), + paging_state = undefined :: undefined | binary(), + serial_consistency = serial :: serial | local_serial}). + -record('query', {'query' :: binary(), - consistency = one :: atom()}). + params :: #query_params{}}). -record(prepare, {'query' :: binary()}). -record(execute, {id :: binary(), - types = [] :: [seestar_cqltypes:type()], - values = [] :: [seestar_cqltypes:value()], - variables :: [binary()], - consistency = one :: atom()}). + params :: #query_params{}}). + +-record(batch_query, + {kind :: prepared | not_prepared, + string_or_id :: binary(), + values = #query_values{} :: #query_values{} + }). +-record(batch, + {type = logged :: logged | unlogged | counter, + queries = [] :: list(#batch_query{}), + consistency = one :: atom()}). -record(register, {event_types = [] :: [topology_change | status_change | schema_change]}). @@ -44,6 +97,12 @@ -record(ready, {}). +-record(auth_success, + {}). + +-record(auth_challenge, + {body :: binary()}). + -record(authenticate, {class :: binary()}). @@ -87,22 +146,19 @@ | #unprepared{}}). %% result and various result sub-types. --record(column, - {keyspace :: binary(), - table :: binary(), - name :: binary(), - type :: seestar_cqltypes:type()}). - -record(rows, - {metadata :: [#column{}], - rows :: [[seestar_cqltypes:value()]]}). + {metadata :: metadata(), + rows :: [[seestar_cqltypes:value()]], + initial_query :: #execute{} | #'query'{} %% used for fetching the next page + }). -record(set_keyspace, {keyspace :: binary()}). -record(prepared, {id :: binary(), - metadata :: [#column{}]}). + request_metadata :: #metadata{}, + result_metadata :: #metadata{}}). %% also an event. -record(schema_change, @@ -132,3 +188,8 @@ {event :: #topology_change{} | #status_change{} | #schema_change{}}). + +-type metadata() :: #metadata{}. +-type column() :: #column{}. +-export_type([metadata/0, column/0]). + diff --git a/src/seestar_password_auth.erl b/src/seestar_password_auth.erl new file mode 100644 index 0000000..80c45b8 --- /dev/null +++ b/src/seestar_password_auth.erl @@ -0,0 +1,35 @@ +-module(seestar_password_auth). + +-include("seestar_messages.hrl"). + +%% API +-export([perform_auth/2]). + +%% ------------------------------------------------------------------------- +%% API +%% ------------------------------------------------------------------------- + +%% @doc +%% Method that needs to be implemented by any password authentication module +%% The sendFun accepts a #auth_response{} and returns #error{} | #auth_challenge{} | #auth_success{} +%% {Username, Password} is the data passed to the seestar_session module +%% @end +-spec perform_auth(SendFun :: function(), {Username :: binary(), Password :: binary()}) -> Success :: boolean(). +perform_auth(SendFun, {Username, Password}) when is_function(SendFun), is_binary(Username), is_binary(Password) -> + case SendFun(encode_credentials(Username, Password)) of + #auth_challenge{} -> + %% We do not expect any auth_challenge, so probably auth module is wrong + false; + #auth_success{} -> + true; + #error{} -> + false + end. + +%% ------------------------------------------------------------------------- +%% Internal +%% ------------------------------------------------------------------------- + +encode_credentials(Username, Password) when is_binary(Username), is_binary(Password)-> + Auth = << 0, Username/binary, 0, Password/binary >>, + #auth_response{body= seestar_types:encode_bytes(Auth)}. diff --git a/src/seestar_result.erl b/src/seestar_result.erl index 3ea72ae..0925673 100644 --- a/src/seestar_result.erl +++ b/src/seestar_result.erl @@ -14,21 +14,23 @@ -module(seestar_result). +-include("seestar.hrl"). -include("seestar_messages.hrl"). --export([type/1, rows/1, names/1, types/1, type/2, keyspace/1, table/1, query_id/1, - change/1]). +-export([type/1, rows/1, names/1, types/1, type/2, keyspace/1, table/1, prepared_query/1, + change/1, has_more_rows/1]). -type rows_result() :: #rows{}. -type set_keyspace_result() :: #set_keyspace{}. -type prepared_result() :: #prepared{}. -type schema_change_result() :: #schema_change{}. +-type prepared_query() :: #prepared_query{}. -opaque result() :: void | rows_result() | set_keyspace_result() | prepared_result() | schema_change_result(). --export_type([result/0, prepared_result/0]). +-export_type([result/0, prepared_result/0, prepared_query/0]). -type type() :: void | rows | set_keyspace | prepared | schema_change. -type change() :: created | updated | dropped. @@ -54,21 +56,21 @@ rows(#rows{rows = Rows}) -> Rows. -spec names(Result :: rows_result() | prepared_result()) -> [binary()]. -names(#rows{metadata = Columns}) -> +names(#rows{metadata = #metadata{columns = Columns}}) -> [ C#column.name || C <- Columns ]; -names(#prepared{metadata = Columns}) -> +names(#prepared{request_metadata = #metadata{columns = Columns}}) -> [ C#column.name || C <- Columns ]. -spec types(Result :: rows_result() | prepared_result()) -> [seestar_cqltypes:type()]. -types(#rows{metadata = Columns}) -> +types(#rows{metadata = #metadata{columns = Columns}}) -> [ C#column.type || C <- Columns ]; -types(#prepared{metadata = Columns}) -> +types(#prepared{request_metadata = #metadata{columns = Columns}}) -> [ C#column.type || C <- Columns ]. -spec type(Result :: rows_result() | prepared_result(), Name :: binary()) -> seestar_cqltypes:type(). -type(#rows{metadata = Columns}, Name) -> +type(#rows{metadata = #metadata{columns = Columns}}, Name) -> hd([ C#column.type || C <- Columns, C#column.name =:= Name ]); -type(#prepared{metadata = Columns}, Name) -> +type(#prepared{request_metadata = #metadata{columns = Columns}}, Name) -> hd([ C#column.type || C <- Columns, C#column.name =:= Name ]). -spec keyspace(Result :: set_keyspace_result() | schema_change_result()) -> binary(). @@ -81,10 +83,19 @@ keyspace(#schema_change{keyspace = Keyspace}) -> table(#schema_change{table = Table}) -> Table. --spec query_id(Result :: prepared_result()) -> binary(). -query_id(#prepared{id = ID}) -> - ID. +%% @doc Returns a prepared query from the result. The returned query can be passed to +%% the {@link seestar_session:execute} or {@link seestar_session:execute_asyn} functions, +%% or can be used a part of a batch query using the {@link seestar_batch} module. +-spec prepared_query(Result :: prepared_result()) -> prepared_query(). +prepared_query(#prepared{id = ID, result_metadata = ResultMetadata} = Result) -> + #prepared_query{id = ID, cached_result_meta = ResultMetadata, request_types = types(Result)}. -spec change(Result :: schema_change_result()) -> change(). change(#schema_change{change = Change}) -> Change. + +%% @doc Returns true if the current result is paginated and not all records have been +%% retrieved so far +-spec has_more_rows(Result :: rows_result()) -> boolean(). +has_more_rows(#rows{metadata = #metadata{has_more_results = HasMoreRows}}) -> + HasMoreRows. \ No newline at end of file diff --git a/src/seestar_session.erl b/src/seestar_session.erl index f861e2f..6644d79 100644 --- a/src/seestar_session.erl +++ b/src/seestar_session.erl @@ -18,11 +18,16 @@ -include("seestar_messages.hrl"). -include("builtin_types.hrl"). - +-include("seestar.hrl"). %% API exports. -export([start_link/2, start_link/3, start_link/4, stop/1]). --export([perform/3, perform_async/3]). --export([prepare/2, execute/5, execute_async/5]). +-export([perform/3, perform/4, perform/5]). +-export([perform_async/3, perform_async/4, perform_async/5]). +-export([prepare/2]). +-export([execute/3, execute/4, execute/5]). +-export([execute_async/3, execute_async/4, execute_async/5]). +-export([next_page/2, next_page_async/2]). +-export([batch/2, batch_async/2]). %% gen_server exports. -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, @@ -38,7 +43,6 @@ | {ssl, [ssl:connect_option()]}. -type 'query'() :: binary() | string(). --type query_id() :: binary(). -define(b2l(Term), case is_binary(Term) of true -> binary_to_list(Term); false -> Term end). -define(l2b(Term), case is_list(Term) of true -> list_to_binary(Term); false -> Term end). @@ -47,7 +51,9 @@ {op :: seestar_frame:opcode(), body :: binary(), from :: {pid(), reference()}, - sync = true :: boolean()}). + sync = true :: boolean(), + result_data = undefined :: any(), + decode_data = undefined :: any()}). -record(st, {host :: inet:hostname(), @@ -109,18 +115,18 @@ setup(Pid, Options) -> end. authenticate(Pid, Options) -> - Credentials = proplists:get_value(credentials, Options), + Authentication = proplists:get_value(auth, Options), case request(Pid, #startup{}, true) of #ready{} -> true; - #authenticate{} when Credentials =:= undefined -> + #authenticate{} when Authentication =:= undefined -> false; #authenticate{} -> - KVPairs = [ {?l2b(K), ?l2b(V)} || {K, V} <- Credentials ], - case request(Pid, #credentials{credentials = KVPairs}, true) of - #ready{} -> true; - #error{} -> false - end + {AuthModule, AuthModuleParams} = Authentication, + SendFunction = fun(#auth_response{} = Request) -> + request(Pid, Request, true) + end, + AuthModule:perform_auth(SendFunction, AuthModuleParams) end. set_keyspace(Pid, Options) -> @@ -151,71 +157,134 @@ subscribe(Pid, Options) -> stop(Client) -> gen_server:cast(Client, stop). +%% @see perform/5 +perform(Client, Query, Consistency) -> + perform(Client, Query, Consistency, []). + +%% @see perform/5 +perform(Client, Query, Consistency, Values) when is_list(Values) -> + perform(Client, Query, Consistency, Values, undefined); +perform(Client, Query, Consistency, PageSize) -> + perform(Client, Query, Consistency, [], PageSize). + %% @doc Synchoronously perform a CQL query using the specified consistency level. %% Returns a result of an appropriate type (void, rows, set_keyspace, schema_change). %% Use {@link seestar_result} module functions to work with the result. --spec perform(pid(), 'query'(), seestar:consistency()) -> - {ok, Result :: seestar_result:result()} | {error, Error :: seestar_error:error()}. -perform(Client, Query, Consistency) -> - case request(Client, #'query'{'query' = ?l2b(Query), consistency = Consistency}, true) of - #result{result = Result} -> - {ok, Result}; - #error{} = Error -> - {error, Error} - end. +perform(Client, Query, Consistency, Values, PageSize) -> + Req = query_record(Query, Values, Consistency, PageSize), + wrap_response(Req, request(Client, Req, true)). -%% TODO doc -%% @doc Asynchronously perform a CQL query using the specified consistency level. --spec perform_async(pid(), 'query'(), seestar:consistency()) -> ok. +%% @see perform_async/5 +-spec perform_async(pid(), 'query'(), seestar:consistency()) -> any(). perform_async(Client, Query, Consistency) -> - Req = #'query'{'query' = ?l2b(Query), consistency = Consistency}, - request(Client, Req, false). + perform_async(Client, Query, Consistency, []). + +%% @see perform_async/5 +-spec perform_async(pid(), 'query'(), seestar:consistency(), [seestar_cqltypes:value()] | non_neg_integer()) -> any(). +perform_async(Client, Query, Consistency, Values) when is_list(Values) -> + perform_async(Client, Query, Consistency, Values, undefined); +perform_async(Client, Query, Consistency, PageSize) -> + perform_async(Client, Query, Consistency, [], PageSize). + +%% @doc Asynchronously perform a CQL query using the specified consistency level. +-spec perform_async(pid(), 'query'(), seestar:consistency(), [seestar_cqltypes:value()], undefined | non_neg_integer()) + -> any(). +perform_async(Client, Query, Consistency, Values, PageSize) -> + Req = query_record(Query, Values, Consistency, PageSize), + if + is_number(PageSize)-> + request(Client, Req, undefined, Req, false); + true -> + request(Client, Req, false) + end. %% @doc Prepare a query for later execution. The response will contain the prepared -%% query id and column metadata for all the variables (if any). +%% query that you will need to pass to the execute methods %% @see execute/3. %% @see execute/4. -spec prepare(pid(), 'query'()) -> {ok, Result :: seestar_result:prepared_result()} | {error, Error :: seestar_error:error()}. prepare(Client, Query) -> - case request(Client, #prepare{'query' = ?l2b(Query)}, true) of - #result{result = Result} -> - {ok, Result}; - #error{} = Error -> - {error, Error} - end. + Req = #prepare{'query' = ?l2b(Query)}, + wrap_response(Req, request(Client, Req, true)). + +%% @see execute/5 +execute(Client, Query, Consistency) -> + execute(Client, Query, Consistency, undefined). + +%% @see execute/5 +execute(Client, Query, Consistency, PageSize) when is_atom(Consistency) -> + execute(Client, Query, [], Consistency, PageSize); + +%% @see execute/5 +execute(Client, QueryID, Values, Consistency)-> + execute(Client, QueryID, Values, Consistency, undefined). + %% @doc Synchronously execute a prepared query using the specified consistency level. %% Use {@link seestar_result} module functions to work with the result. %% @see prepare/2. %% @see perform/3. -spec execute(pid(), - query_id(), - [seestar_cqltypes:type()], [seestar_cqltypes:value()], - seestar:consistency()) -> + seestar_result:prepared_query(), + [seestar_cqltypes:value()], + seestar:consistency(), + non_neg_integer() | undefined) -> {ok, Result :: seestar_result:result()} | {error, Error :: seestar_error:error()}. -execute(Client, QueryID, Types, Values, Consistency) -> - Req = #execute{id = QueryID, types = Types, values = Values, consistency = Consistency}, - case request(Client, Req, true) of - #result{result = Result} -> - {ok, Result}; - #error{} = Error -> - {error, Error} +execute(Client, #prepared_query{id = QueryID, request_types = Types, cached_result_meta = ResultMeta}, Values, Consistency, PageSize) -> + Req = execute_record(QueryID, Consistency, Values, Types, PageSize, ResultMeta), + wrap_response(Req, request(Client, Req, undefined, ResultMeta, true)). + +%% +%% @see execute_async/5 +execute_async(Client, Query, Consistency) -> + execute_async(Client, Query, Consistency, undefined). + +%% @see execute_async/5 +execute_async(Client, Query, Consistency, PageSize) when is_atom(Consistency)-> + execute_async(Client, Query, [], Consistency, PageSize); +%% @see execute_async/5 +execute_async(Client, QueryID, Values, Consistency)-> + execute_async(Client, QueryID, Values, Consistency, undefined). + + +%% @doc Asynchronously execute a prepared query using the specified consistency level. +%% Use {@link seestar_result} module functions to work with the result. +%% @see prepare/2. +-spec execute_async(pid(), + seestar_result:prepared_query(), + [seestar_cqltypes:value()], + seestar:consistency(), + non_neg_integer() | undefined) -> + {ok, Result :: seestar_result:result()} | {error, Error :: seestar_error:error()}. +execute_async(Client, #prepared_query{id = QueryID, cached_result_meta = CachedResultMeta, request_types = Types}, Values, Consistency, PageSize) -> + Req = execute_record(QueryID, Consistency, Values, Types, PageSize, CachedResultMeta), + if + is_number(PageSize)-> + request(Client, Req, CachedResultMeta, Req, false); + true -> + request(Client, Req, undefiend, Req, false) end. -execute_async(Client, QueryID, Types, Values, Consistency) -> - Req = #execute{id = QueryID, types = Types, values = Values, consistency = Consistency}, +%% @doc Synchronously execute a batch query +%% Use {@link seestar_batch} module functions to create the request. +batch(Client, Req) -> + wrap_response(Req, request(Client, Req, true)). + +%% @doc Asynchronously execute a batch query +%% Use {@link seestar_batch} module functions to create the request. +batch_async(Client, Req) -> request(Client, Req, false). -request(Client, Request, Sync) -> - {ReqOp, ReqBody} = seestar_messages:encode(Request), - case gen_server:call(Client, {request, ReqOp, ReqBody, Sync}, infinity) of - {RespOp, RespBody} -> - seestar_messages:decode(RespOp, RespBody); - Ref -> - Ref - end. +%% @doc Synchronously returns the next page for a previous paginated result +next_page(Client, #rows{initial_query = Req0, metadata = #metadata{paging_state = PagingState}}) -> + {Req, CachedDecodeData} = next_page_request(Req0, PagingState), + wrap_response(Req, request(Client, Req, undefined, CachedDecodeData, true)). +%% @doc Asynchronously returns the next page for a previous paginated result +next_page_async(Client, #rows{initial_query = Req0, metadata = #metadata{paging_state = PagingState}}) -> + {Req, CachedDecodeData} = next_page_request(Req0, PagingState), + request(Client, Req, CachedDecodeData, Req, false). %% ------------------------------------------------------------------------- %% gen_server callback functions %% ------------------------------------------------------------------------- @@ -270,16 +339,17 @@ terminate(_Reason, _St) -> ok. %% @private -handle_call({request, Op, Body, Sync}, From, #st{free_ids = []} = St) -> - Req = #req{op = Op, body = Body, from = From, sync = Sync}, +handle_call({request, Op, Body, Sync, ResultData, DecodeData}, From, #st{free_ids = []} = St) -> + Req = #req{op = Op, body = Body, from = From, sync = Sync, result_data = ResultData, decode_data = DecodeData}, {noreply, St#st{backlog = queue:in(Req, St#st.backlog)}}; -handle_call({request, Op, Body, Sync}, {_Pid, Ref} = From, St) -> +handle_call({request, Op, Body, Sync, ResultData, DecodeData}, {_Pid, Ref} = From, St) -> case Sync of true -> ok; false -> gen_server:reply(From, Ref) end, - case send_request(#req{op = Op, body = Body, from = From, sync = Sync}, St) of + case send_request( + #req{op = Op, body = Body, from = From, sync = Sync, result_data = ResultData, decode_data = DecodeData}, St) of {ok, St1} -> {noreply, St1}; {error, Reason} -> {stop, {socket_error, Reason}, St} end; @@ -287,12 +357,14 @@ handle_call({request, Op, Body, Sync}, {_Pid, Ref} = From, St) -> handle_call(Request, _From, St) -> {stop, {unexpected_call, Request}, St}. -send_request(#req{op = Op, body = Body, from = From, sync = Sync}, #st{sock = Sock, transport = Transport} = St) -> +send_request( + #req{op = Op, body = Body, from = From, sync = Sync, result_data = ResultData, decode_data = DecodeData}, + #st{sock = Sock, transport = Transport} = St) -> ID = hd(St#st.free_ids), Frame = seestar_frame:new(ID, [], Op, Body), case send_on_wire(Sock, Transport, seestar_frame:encode(Frame)) of ok -> - ets:insert(St#st.reqs, {ID, From, Sync}), + ets:insert(St#st.reqs, {ID, From, Sync, ResultData, DecodeData}), {ok, St#st{free_ids = tl(St#st.free_ids)}}; {error, _Reason} = Error -> Error @@ -335,6 +407,60 @@ handle_info({ssl_error, Sock, Reason}, #st{sock = Sock, transport = ssl} = St) - handle_info(Info, St) -> {stop, {unexpected_info, Info}, St}. +%% ------------------------------------------------------------------------- +%% Internal +%% ------------------------------------------------------------------------- + +next_page_request(#'query'{} = Req0, PagingState) -> + QueryParams = Req0#'query'.params#query_params{paging_state = PagingState}, + {Req0#query{params = QueryParams}, undefined}; + +next_page_request(#execute{} = Req0, PagingState) -> + %% For paging execute queries, since we use skip_meta, we need to also return the + %% ResultMetadata so that we can use it when decoding. + QueryParams = Req0#execute.params#query_params{paging_state = PagingState}, + {Req0#execute{params = QueryParams}, QueryParams#query_params.cached_result_meta}. + +query_record(Query, Values, Consistency, PageSize) -> + QueryParams = #query_params{consistency = Consistency, page_size = PageSize, + values = #query_values{values = Values}}, + #'query'{'query' = ?l2b(Query), params = QueryParams}. + +execute_record(QueryID, Consistency, Values, Types, PageSize, ResultMeta) -> + QueryParams = #query_params{consistency = Consistency, page_size = PageSize, cached_result_meta = ResultMeta, + values = #query_values{values = Values, types = Types}}, + #execute{id = QueryID, params = QueryParams}. + +-spec request(pid(), seestar_messages:outgoing(), boolean()) -> any() | seestar_messages:incoming(). +request(Client, Request, Sync) -> + request(Client, Request, undefined, undefined, Sync). + +%% @private +%% @doc Sends the request to the process handling the connection. The 2 new parameters are +%% CachedDecodeData -> will be passed to the decode function. This is useful when a part +%% of the response from cassandra is known( eg skip_meta), and that part is needed for the decoding +%% CachedResultData -> will end up being passed to reply_async. This is currently used for keeping +%% the initial query and returning it to the caller in case of a paginated resultSet +-spec request(pid(), seestar_messages:outgoing(), any(), any(), boolean()) -> any() | seestar_messages:incoming(). +request(Client, Request, CachedDecodeData, CachedResultData, Sync) -> + {ReqOp, ReqBody} = seestar_messages:encode(Request), + case gen_server:call(Client, {request, ReqOp, ReqBody, Sync, CachedResultData, CachedDecodeData}, infinity) of + {RespOp, RespBody} -> + seestar_messages:decode(RespOp, RespBody, CachedDecodeData); + Ref -> + Ref + end. + +wrap_response(Req, Respone) -> + case Respone of + #result{result = #rows{metadata = #metadata{has_more_results = true}} = Result} -> + {ok, Result#rows{initial_query = Req}}; + #result{result = Result} -> + {ok, Result}; + #error{} = Error -> + {error, Error} + end. + process_frames([Frame|Frames], St) -> process_frames(Frames, case seestar_frame:id(Frame) of @@ -349,19 +475,21 @@ handle_event(_Frame, St) -> handle_response(Frame, St) -> ID = seestar_frame:id(Frame), - [{ID, From, Sync}] = ets:lookup(St#st.reqs, ID), + [{ID, From, Sync, ResultData, DecodeData}] = ets:lookup(St#st.reqs, ID), ets:delete(St#st.reqs, ID), Op = seestar_frame:opcode(Frame), Body = seestar_frame:body(Frame), case Sync of true -> gen_server:reply(From, {Op, Body}); - false -> reply_async(From, Op, Body) + false -> reply_async(From, Op, Body, ResultData, DecodeData) end, St#st{free_ids = [ID|St#st.free_ids]}. -reply_async({Pid, Ref}, Op, Body) -> +reply_async({Pid, Ref}, Op, Body, ResultMeta, DecodeMeta) -> F = fun() -> - case seestar_messages:decode(Op, Body) of + case seestar_messages:decode(Op, Body, DecodeMeta) of + #result{result = #rows{} = Result} -> + {ok, Result#rows{initial_query = ResultMeta}}; #result{result = Result} -> {ok, Result}; #error{} = Error -> diff --git a/src/seestar_types.erl b/src/seestar_types.erl index 538dae7..5fba2e5 100644 --- a/src/seestar_types.erl +++ b/src/seestar_types.erl @@ -15,8 +15,9 @@ %%% @private -module(seestar_types). --export([encode_short/1, encode_long_string/1, encode_string_list/1, encode_bytes/1, - encode_short_bytes/1, encode_consistency/1, encode_string_map/1]). +-export([encode_byte/1, encode_short/1, encode_int/1, encode_long_string/1, encode_string_list/1, + encode_bytes/1, encode_short_bytes/1, encode_consistency/1, encode_string_map/1, + encode_batch_type/1]). -export([decode_int/1, decode_short/1, decode_string/1, decode_uuid/1, decode_bytes/1, decode_short_bytes/1, decode_consistency/1, decode_string_multimap/1]). @@ -24,12 +25,15 @@ %% encoding functions %% ------------------------------------------------------------------------- -encode_int(Value) -> - <>. +encode_byte(Value) -> + <>. encode_short(Value) -> <>. +encode_int(Value) -> + <>. + encode_string(Value) -> <<(encode_short(size(Value)))/binary, Value/binary>>. @@ -55,9 +59,19 @@ encode_consistency(Value) -> quorum -> 16#04; all -> 16#05; local_quorum -> 16#06; - each_quorum -> 16#07 + each_quorum -> 16#07; + serial -> 16#08; + local_serial -> 16#09; + local_one -> 16#10 end). +encode_batch_type(Value) -> + encode_byte(case Value of + logged -> 16#00; + unlogged -> 16#01; + counter -> 16#02 + end). + encode_string_map(KVPairs) -> encode_string_map(KVPairs, []). diff --git a/test/seestar_auth_tests.erl b/test/seestar_auth_tests.erl new file mode 100644 index 0000000..4e42acb --- /dev/null +++ b/test/seestar_auth_tests.erl @@ -0,0 +1,51 @@ +-module(seestar_auth_tests). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("seestar/include/constants.hrl"). + +auth_test_() -> + {foreach, + fun() -> + seestar_ccm:create(), + seestar_ccm:update_config(["authenticator:PasswordAuthenticator"]), + seestar_ccm:start(), + wait_for_cassandra_to_accept_auth_requests(1000) + end, + fun(_) -> + seestar_ccm:remove() + end, + [ + fun single_test_function/0 + ]}. + +%% Single test function so that we do not have to wait for the cluster to initialize +%% given that it seems that setting up password auth takes a while +single_test_function() -> + %% Fail when no credentials provided + {error, invalid_credentials} = seestar_session:start_link("localhost", 9042), + + %% Fail when bad credentials provided + {error, invalid_credentials} = seestar_session:start_link("localhost", 9042, + [{auth , {seestar_password_auth, {<<"bad">>, <<"credentials">>}}}]), + + %% Succeed when good credentials provided + {ok, Pid} = seestar_session:start_link("localhost", 9042, + [{auth , {seestar_password_auth, {<<"cassandra">>, <<"cassandra">>}}}]), + unlink(Pid), + seestar_session:stop(Pid). + +%% ------------------------------------------------------------------------- +%% Internal +%% ------------------------------------------------------------------------- + +wait_for_cassandra_to_accept_auth_requests(SleepTime) -> + CurrentStatus = os:cmd("cqlsh -u cassandra -p cassandra"), + case re:run(CurrentStatus, "AuthenticationException", [{capture, first, list}]) of + nomatch -> + ?debugMsg("Cassandra PasswordAuth ready ... "), + ok; + {match,["AuthenticationException"]} -> + ?debugFmt("Waiting for cass password auth to be ready. Sleeping ~p ..." , [SleepTime]), + timer:sleep(SleepTime), + wait_for_cassandra_to_accept_auth_requests(SleepTime) + end. diff --git a/test/seestar_session_tests.erl b/test/seestar_session_tests.erl index b243dc8..3280115 100644 --- a/test/seestar_session_tests.erl +++ b/test/seestar_session_tests.erl @@ -3,24 +3,38 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("seestar/include/constants.hrl"). -session_test_() -> +schema_test_() -> {foreach, - fun() -> - seestar_ccm:create(), - seestar_ccm:start(), - timer:sleep(500), - {ok, Pid} = seestar_session:start_link("localhost", 9042), - unlink(Pid), - Pid - end, - fun(Pid) -> - seestar_session:stop(Pid), - seestar_ccm:remove() - end, - [fun(Pid) -> {with, Pid, [fun test_schema_queries/1]} end, - fun(Pid) -> {with, Pid, [fun test_native_types/1]} end, - fun(Pid) -> {with, Pid, [fun test_collection_types/1]} end, - fun(Pid) -> {with, Pid, [fun test_counter_type/1]} end]}. + fun test_utils:connect/0, + fun test_utils:close/1, + [ fun(Pid) -> {with, Pid, [fun test_schema_queries/1]} end]}. + +session_test_() -> + {setup, + fun test_utils:connect/0, + fun test_utils:close/1, + fun (ConnectionPid) -> + [ + {foreach, fun()-> connect_to_keyspace(ConnectionPid) end, fun drop_keyspace/1, + [ + fun(Pid) -> {with, Pid, [fun test_native_types/1]} end, + fun(Pid) -> {with, Pid, [fun test_collection_types/1]} end, + fun(Pid) -> {with, Pid, [fun test_counter_type/1]} end , + fun(Pid) -> {with, Pid, [fun result_paging_query_sync/1]} end, + fun(Pid) -> {with, Pid, [fun result_paging_query_async/1]} end, + fun(Pid) -> {with, Pid, [fun result_paging_execute_sync/1]} end, + fun(Pid) -> {with, Pid, [fun result_paging_execute_async/1]} end, + fun(Pid) -> {with, Pid, [fun perform_insert_update_delete/1]} end, + fun(Pid) -> {with, Pid, [fun batch_tests/1]} end, + fun(Pid) -> {with, Pid, [fun multiple_inserts/1]} end + ]} + ] + end + }. + +%% ------------------------------------------------------------------------- +%% Test Cases +%% ------------------------------------------------------------------------- test_schema_queries(Pid) -> Qry0 = "CREATE KEYSPACE seestar " @@ -52,7 +66,6 @@ test_schema_queries(Pid) -> ?assertEqual(<<"seestar_test_table">>, seestar_error:table(Err1)). test_native_types(Pid) -> - create_keyspace(Pid, "seestar", 1), Qry0 = "CREATE TABLE seestar.has_all_types ( asciicol ascii, bigintcol bigint, @@ -78,7 +91,7 @@ test_native_types(Pid) -> inetcol, intcol, textcol, timestampcol, timeuuidcol, uuidcol, varcharcol, varintcol) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", {ok, Res1} = seestar_session:prepare(Pid, Qry1), - QryID = seestar_result:query_id(Res1), + PreparedQuery = seestar_result:prepared_query(Res1), Types = seestar_result:types(Res1), ?assertEqual([ascii, bigint, blob, boolean, decimal, double, float, inet, int, varchar, timestamp, timeuuid, uuid, varchar, varint], @@ -93,8 +106,8 @@ test_native_types(Pid) -> <<135,99,103,104,40,81,17,187,181,58,96,197,71,12,191,14>>, <<148,125,144,228,220,27,68,12,148,158,178,154,25,169,42,113>>, <<>>, 100000000000000000000000000], - {ok, _} = seestar_session:execute(Pid, QryID, Types, Row0, one), - {ok, _} = seestar_session:execute(Pid, QryID, Types, Row1, one), + {ok, _} = seestar_session:execute(Pid, PreparedQuery, Row0, one), + {ok, _} = seestar_session:execute(Pid, PreparedQuery, Row1, one), % test deserialization. Qry2 = "SELECT asciicol, bigintcol, blobcol, booleancol, decimalcol, doublecol, floatcol, inetcol, intcol, textcol, timestampcol, timeuuidcol, uuidcol, varcharcol, varintcol @@ -104,20 +117,17 @@ test_native_types(Pid) -> ?assertEqual([Row0, Row1], seestar_result:rows(Res2)). test_counter_type(Pid) -> - create_keyspace(Pid, "seestar", 1), Qry0 = "CREATE TABLE seestar.has_counter_type (id int PRIMARY KEY, counter counter)", {ok, _} = seestar_session:perform(Pid, Qry0, one), Qry1 = "UPDATE seestar.has_counter_type SET counter = counter + ? WHERE id = ?", {ok, Res1} = seestar_session:prepare(Pid, Qry1), - QryID = seestar_result:query_id(Res1), - Types = seestar_result:types(Res1), - [ {ok, _} = seestar_session:execute(Pid, QryID, Types, [C, 0], one) || C <- [ 1, -2, 3 ] ], + PreparedQuery = seestar_result:prepared_query(Res1), + [ {ok, _} = seestar_session:execute(Pid, PreparedQuery, [C, 0], one) || C <- [ 1, -2, 3 ] ], Qry2 = "SELECT id, counter FROM seestar.has_counter_type WHERE id = 0", {ok, Res2} = seestar_session:perform(Pid, Qry2, one), ?assertEqual([[0, 2]], seestar_result:rows(Res2)). test_collection_types(Pid) -> - create_keyspace(Pid, "seestar", 1), Qry0 = "CREATE TABLE seestar.has_collection_types ( id int, mapcol map, @@ -128,16 +138,180 @@ test_collection_types(Pid) -> {ok, _} = seestar_session:perform(Pid, Qry0, one), Qry1 = "INSERT INTO seestar.has_collection_types (id, mapcol, setcol, listcol) VALUES (?, ?, ?, ?)", {ok, Res1} = seestar_session:prepare(Pid, Qry1), - QryID = seestar_result:query_id(Res1), - Types = seestar_result:types(Res1), + PreparedQuery = seestar_result:prepared_query(Res1), Row0 = [0, null, null, null], Row1 = [1, dict:from_list([{<<"k1">>, <<"v1">>}]), sets:from_list([1]), [true]], Row2 = [2, dict:from_list([{<<"k1">>, <<"v1">>}, {<<"k2">>, <<"v2">>}]), sets:from_list([1,2]), [true, false]], - [ {ok, _} = seestar_session:execute(Pid, QryID, Types, R, one) || R <- [Row0, Row1, Row2] ], + [ {ok, _} = seestar_session:execute(Pid, PreparedQuery, R, one) || R <- [Row0, Row1, Row2] ], Qry2 = "SELECT id, mapcol, setcol, listcol FROM seestar.has_collection_types", {ok, Res2} = seestar_session:perform(Pid, Qry2, one), ?assertEqual([Row1, Row0, Row2], seestar_result:rows(Res2)). -create_keyspace(Pid, Name, RF) -> - Qry = "CREATE KEYSPACE ~s WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': ~w}", - {ok, _} = seestar_session:perform(Pid, lists:flatten(io_lib:format(Qry, [Name, RF])), one). +result_paging_query_sync(Pid) -> + insert_data_for_paging(Pid), + %% Check if updated + {ok, PagedSelectResult} = seestar_session:perform(Pid, "SELECT * FROM seestar_test_table", one, 100), + NumberOfRows = count_rows(Pid, PagedSelectResult), + ?assertEqual(2000, NumberOfRows). + +result_paging_query_async(Pid) -> + insert_data_for_paging(Pid), + %% Check if updated + Ref = seestar_session:perform_async(Pid, "SELECT * FROM seestar_test_table", one, 100), + receive + {seestar_response, Ref, F} -> + {ok, PagedSelectResult} = F(), + NumberOfRows = count_rows(Pid, PagedSelectResult), + ?assertEqual(2000, NumberOfRows) + end. + +result_paging_execute_sync(Pid) -> + insert_data_for_paging(Pid), + %% Check if updated + + SelectQuery = "SELECT * FROM seestar_test_table", + {ok, PreparedResult} = seestar_session:prepare(Pid, SelectQuery), + PreparedQuery = seestar_result:prepared_query(PreparedResult), + + {ok, PagedSelectResult} = seestar_session:execute(Pid, PreparedQuery, one, 100), + NumberOfRows = count_rows(Pid, PagedSelectResult), + ?assertEqual(2000, NumberOfRows). + +result_paging_execute_async(Pid) -> + insert_data_for_paging(Pid), + %% Check if updated + SelectQuery = "SELECT * FROM seestar_test_table", + {ok, PreparedResult} = seestar_session:prepare(Pid, SelectQuery), + QueryID = seestar_result:prepared_query(PreparedResult), + + Ref = seestar_session:execute_async(Pid, QueryID, one, 100), + receive + {seestar_response, Ref, F} -> + {ok, PagedSelectResult} = F(), + NumberOfRows = count_rows_async(Pid, PagedSelectResult), + ?assertEqual(2000, NumberOfRows) + end. + +perform_insert_update_delete(Pid) -> + CreateTable = "CREATE TABLE seestar_test_table (id int primary key, value text)", + {ok, _Res2} = seestar_session:perform(Pid, CreateTable, one), + + %% Insert a row + {ok, void} = seestar_session:perform(Pid, "INSERT INTO seestar_test_table(id, value) values (?, ?)", one, [1, <<"The quick brown fox">>]), + + %% Check if row exists + {ok, SelectResult} = seestar_session:perform(Pid, "SELECT * FROM seestar_test_table where id = ?", one, [1]), + ?assertEqual([[1, <<"The quick brown fox">>]], seestar_result:rows(SelectResult)), + + %% Update row + {ok, void} = seestar_session:perform(Pid, "UPDATE seestar_test_table set value = ? where id = ?", one, [<<"UpdatedText">>, 1]), + + %% Check if updated + {ok, SelectResult2} = seestar_session:perform(Pid, "SELECT * FROM seestar_test_table where id = ?", one, [1]), + ?assertEqual([[1, <<"UpdatedText">>]], seestar_result:rows(SelectResult2)), + + %% Delete Row + {ok, void} = seestar_session:perform(Pid, "DELETE FROM seestar_test_table where id = ?", one, [1]), + + %% Check if row no longer exists + {ok, SelectResult3} = seestar_session:perform(Pid, "SELECT * FROM seestar_test_table where id = ?", one, [1]), + ?assertEqual([], seestar_result:rows(SelectResult3)). + +batch_tests(Pid) -> + CreateTable = <<"CREATE TABLE seestar_test_table (id int primary key, value text)">>, + {ok, _Res2} = seestar_session:perform(Pid, CreateTable, one), + + InsertQuery = <<"INSERT INTO seestar_test_table(id, value) values (?, ?)">>, + {ok, PreparedResult} = seestar_session:prepare(Pid, InsertQuery), + PreparedQuery = seestar_result:prepared_query(PreparedResult), + + NormalQueriesList = [ + seestar_batch:normal_query(<<"INSERT INTO seestar_test_table(id, value) values (?, ?)">>, [I, <<"The fox">>] ) + || I <- lists:seq(1,100) + ], + + PreparedQueriesList = [ + seestar_batch:prepared_query(PreparedQuery, [I, <<"The fox">>]) + || I <- lists:seq(101,200) + ], + + Batch = seestar_batch:batch_request(logged, one, NormalQueriesList ++ PreparedQueriesList), + {ok, void} = seestar_session:batch(Pid, Batch), + %% Check if updated + {ok, SelectResult} = seestar_session:perform(Pid, "SELECT * FROM seestar_test_table", one), + ?assertEqual(200, length(seestar_result:rows(SelectResult))). + +multiple_inserts(Pid) -> + CreateTable = "CREATE TABLE seestar_test_table (id int primary key, value text)", + {ok, _Res2} = seestar_session:perform(Pid, CreateTable, one), + + %% Prepare Query + Query = "INSERT INTO seestar_test_table(id, value) values (?, ?)", + {ok, Res1} = seestar_session:prepare(Pid, Query), + PreparedQuery = seestar_result:prepared_query(Res1), + + N = 10000, + [seestar_session:execute_async(Pid, PreparedQuery, [ID, <<"The fox">>], one) || ID <- lists:seq(1, N)], + + wait_for_results(N). + +%% ------------------------------------------------------------------------- +%% Internal +%% ------------------------------------------------------------------------- +connect_to_keyspace(Pid)-> + test_utils:create_keyspace(Pid, "seestar", 1), + {ok, _Res1} = seestar_session:perform(Pid, "USE seestar", one), + Pid. + +drop_keyspace(Pid)-> + test_utils:drop_keyspace(Pid, "seestar"). + +insert_data_for_paging(Pid) -> + CreateTable = <<"CREATE TABLE seestar_test_table (id int primary key, value text)">>, + {ok, _Res2} = seestar_session:perform(Pid, CreateTable, one), + InsertQuery = <<"INSERT INTO seestar_test_table(id, value) values (?, ?)">>, + {ok, PreparedResult} = seestar_session:prepare(Pid, InsertQuery), + QueryID = seestar_result:prepared_query(PreparedResult), + PreparedQueriesList = [ + seestar_batch:prepared_query(QueryID, [I, <<"The fox">>]) + || I <- lists:seq(1, 2000) + ], + Batch = seestar_batch:batch_request(logged, one, PreparedQueriesList), + {ok, void} = seestar_session:batch(Pid, Batch). + +count_rows(Pid, PagedSelectResult) -> + count_rows(Pid, PagedSelectResult, 0). + +count_rows(Pid, PagedSelectResult, N) -> + case seestar_result:has_more_rows(PagedSelectResult) of + false -> + N + length(seestar_result:rows(PagedSelectResult)); + true -> + {ok, NextPage} = seestar_session:next_page(Pid, PagedSelectResult), + count_rows(Pid, NextPage, N + length(seestar_result:rows(PagedSelectResult))) + end. + +count_rows_async(Pid, PagedSelectResult) -> + count_rows_async(Pid, PagedSelectResult, 0). + +count_rows_async(Pid, PagedSelectResult, N) -> + case seestar_result:has_more_rows(PagedSelectResult) of + false -> + N + length(seestar_result:rows(PagedSelectResult)); + true -> + Ref = seestar_session:next_page_async(Pid, PagedSelectResult), + receive + {seestar_response, Ref, F} -> + {ok, NewPagedSelectResult} = F(), + count_rows_async(Pid, NewPagedSelectResult, N + length(seestar_result:rows(PagedSelectResult))) + end + end. + +wait_for_results(0) -> + ok; +wait_for_results(N) -> + receive + {seestar_response, _Ref, F} -> + {ok, _SelectResult} = F(), + wait_for_results(N-1) + end. diff --git a/test/test_utils.erl b/test/test_utils.erl new file mode 100644 index 0000000..a7236a0 --- /dev/null +++ b/test/test_utils.erl @@ -0,0 +1,35 @@ +-module(test_utils). +-author("odobroiu"). + +%% API +-export([connect/0, close/1, create_keyspace/3, drop_keyspace/2]). + +connect() -> + seestar_ccm:create(), + seestar_ccm:start(), + wait_for_cassandra_to_accept_connections(10, 50), + {ok, Pid} = seestar_session:start_link("localhost", 9042), + unlink(Pid), + Pid. + +close(Pid) -> + seestar_session:stop(Pid), + seestar_ccm:remove(). + +create_keyspace(Pid, Name, RF) -> + Qry = "CREATE KEYSPACE ~s WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': ~w}", + {ok, _} = seestar_session:perform(Pid, lists:flatten(io_lib:format(Qry, [Name, RF])), one). + +drop_keyspace(Pid, Name) -> + Qry = "DROP KEYSPACE ~s", + {ok, _} = seestar_session:perform(Pid, lists:flatten(io_lib:format(Qry, [Name])), one). + +wait_for_cassandra_to_accept_connections(Retries, SleepTime) -> + CurrentStatus = os:cmd("cqlsh"), + case re:run(CurrentStatus, "error", [{capture, first, list}]) of + nomatch -> + ok; + {match,["error"]} -> + timer:sleep(SleepTime), + wait_for_cassandra_to_accept_connections(Retries-1, SleepTime) + end. \ No newline at end of file