Skip to content

Commit

Permalink
Event stream rpc full stack (#48)
Browse files Browse the repository at this point in the history
* event-stream-rpc implementation and tests. Added aws-c-io dependency.
  • Loading branch information
JonathanHenson authored Nov 11, 2020
1 parent 3462b68 commit 8a98dec
Show file tree
Hide file tree
Showing 19 changed files with 7,103 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ include(CheckCCompilerFlag)
include(AwsFindPackage)
include(AwsCheckHeaders)


if(NOT MSVC)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
Expand Down Expand Up @@ -73,6 +72,7 @@ target_include_directories(${PROJECT_NAME} PUBLIC

set_target_properties(${PROJECT_NAME} PROPERTIES VERSION 1.0.0)

aws_use_package(aws-c-io)
aws_use_package(aws-c-common)
aws_use_package(aws-checksums)

Expand Down
2 changes: 1 addition & 1 deletion builder.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "aws-c-event-stream",
"upstream": [
{ "name": "aws-c-common" },
{ "name": "aws-c-io" },
{ "name": "aws-checksums" }
]
}
2 changes: 1 addition & 1 deletion cmake/aws-c-event-stream-config.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
include(CMakeFindDependencyMacro)
find_dependency(aws-c-common)
find_dependency(aws-c-io)
find_dependency(aws-checksums)

if (BUILD_SHARED_LIBS)
Expand Down
48 changes: 47 additions & 1 deletion include/aws/event-stream/event_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,42 @@

#include <aws/common/array_list.h>
#include <aws/common/byte_buf.h>
#include <aws/common/logging.h>
#include <aws/event-stream/event_stream_exports.h>

#include <stdio.h>

#define AWS_C_EVENT_STREAM_PACKAGE_ID 4
/* max message size is 16MB */
#define AWS_EVENT_STREAM_MAX_MESSAGE_SIZE (16 * 1024 * 1024)

/* max header size is 128kb */
#define AWS_EVENT_STREAM_MAX_HEADERS_SIZE (128 * 1024)

enum aws_event_stream_errors {
AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH = 0x1000,
AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH = AWS_ERROR_ENUM_BEGIN_RANGE(AWS_C_EVENT_STREAM_PACKAGE_ID),
AWS_ERROR_EVENT_STREAM_INSUFFICIENT_BUFFER_LEN,
AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE,
AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE,
AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN,
AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE,
AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE,
AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED,
AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR,
AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED,
AWS_ERROR_EVENT_STREAM_RPC_STREAM_NOT_ACTIVATED,

AWS_ERROR_EVENT_STREAM_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_EVENT_STREAM_PACKAGE_ID),
};

enum aws_event_stream_log_subject {
AWS_LS_EVENT_STREAM_GENERAL = AWS_LOG_SUBJECT_BEGIN_RANGE(AWS_C_EVENT_STREAM_PACKAGE_ID),
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
AWS_LS_EVENT_STREAM_RPC_SERVER,
AWS_LS_EVENT_STREAM_RPC_CLIENT,

AWS_LS_EVENT_STREAM_LAST = AWS_LOG_SUBJECT_END_RANGE(AWS_C_EVENT_STREAM_PACKAGE_ID),
};

struct aws_event_stream_message_prelude {
Expand Down Expand Up @@ -218,6 +241,21 @@ AWS_EVENT_STREAM_API uint32_t aws_event_stream_message_message_crc(const struct
*/
AWS_EVENT_STREAM_API const uint8_t *aws_event_stream_message_buffer(const struct aws_event_stream_message *message);

AWS_EVENT_STREAM_API uint32_t
aws_event_stream_compute_headers_required_buffer_len(const struct aws_array_list *headers);

AWS_EVENT_STREAM_API size_t
aws_event_stream_write_headers_to_buffer(const struct aws_array_list *headers, uint8_t *buffer);

/** Get the headers from the buffer, store them in the headers list.
* the user's responsibility to cleanup the list when they are finished with it.
* no buffer copies happen here, the lifetime of the buffer, must outlive the usage of the headers.
* returns error codes defined in the public interface.
*/
AWS_EVENT_STREAM_API int aws_event_stream_read_headers_from_buffer(
struct aws_array_list *headers,
const uint8_t *buffer,
size_t headers_len);
/**
* Initialize a streaming decoder for messages with callbacks for usage and an optional user context pointer.
*/
Expand Down Expand Up @@ -261,6 +299,14 @@ AWS_EVENT_STREAM_API int aws_event_stream_add_string_header(
uint16_t value_len,
int8_t copy);

AWS_EVENT_STREAM_API struct aws_event_stream_header_value_pair aws_event_stream_create_string_header(
struct aws_byte_cursor name,
struct aws_byte_cursor value);

AWS_EVENT_STREAM_API struct aws_event_stream_header_value_pair aws_event_stream_create_int32_header(
struct aws_byte_cursor name,
int32_t value);

/**
* Adds a byte header to the list of headers.
*/
Expand Down
82 changes: 82 additions & 0 deletions include/aws/event-stream/event_stream_channel_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef AWS_EVENT_STREAM_CHANNEL_HANDLER_H
#define AWS_EVENT_STREAM_CHANNEL_HANDLER_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/event-stream/event_stream.h>

struct aws_event_stream_channel_handler;
struct aws_channel_handler;

/**
* Invoked when an aws_event_stream_message is encountered. If the message
* parsed successfully, message will be non-null and error_code will be AWS_ERROR_SUCCESS.
* Otherwise message will be null and error_code will represent the error that was encountered.
* Note that any case that error_code was not AWS_OP_SUCCESS, the channel also shuts down.
*/
typedef void(aws_event_stream_channel_handler_on_message_received_fn)(
struct aws_event_stream_message *message,
int error_code,
void *user_data);

/**
* Invoked when an aws_event_stream_message is flushed to the IO interface. When error_code is AWS_ERROR_SUCCESS the
* write happened successfuly. Regardless, message is held from the aws_event_stream_channel_handler_write_message()
* call and should likely be freed in this callback. If error_code is non-zero, the channel will be shutdown immediately
* after this callback returns.
*/
typedef void(aws_event_stream_channel_handler_on_message_written_fn)(
struct aws_event_stream_message *message,
int error_code,
void *user_data);

struct aws_event_stream_channel_handler_options {
/** Callback for when messages are received. Can not be null. */
aws_event_stream_channel_handler_on_message_received_fn *on_message_received;
/** user data passed to message callback. Optional */
void *user_data;
/** initial window size to use for the channel. If automatic window management is set to true, this value is
* ignored. */
size_t initial_window_size;
/**
* if set to false (the default), windowing will be managed automatically for the user.
* Otherwise, after any on_message_received, the user must invoke
* aws_event_stream_channel_handler_increment_read_window()
*/
bool manual_window_management;
};

AWS_EXTERN_C_BEGIN
/**
* Allocates and initializes a new channel handler for processing aws_event_stream_message() events. Handler options
* must not be null.
*/
AWS_EVENT_STREAM_API struct aws_channel_handler *aws_event_stream_channel_handler_new(
struct aws_allocator *allocator,
const struct aws_event_stream_channel_handler_options *handler_options);

/**
* Writes an aws_event_stream_message() to the channel. Once the channel flushes or an error occurs, on_message_written
* will be invoked. message should stay valid until the callback is invoked. If an error an occurs, the channel will
* automatically be shutdown.
*/
AWS_EVENT_STREAM_API int aws_event_stream_channel_handler_write_message(
struct aws_channel_handler *handler,
struct aws_event_stream_message *message,
aws_event_stream_channel_handler_on_message_written_fn *on_message_written,
void *user_data);

/**
* Updates the read window for the channel if automatic_window_managemanet was set to false.
*/
AWS_EVENT_STREAM_API void aws_event_stream_channel_handler_increment_read_window(
struct aws_channel_handler *handler,
size_t window_update_size);

AWS_EVENT_STREAM_API void *aws_event_stream_channel_handler_get_user_data(struct aws_channel_handler *handler);

AWS_EXTERN_C_END

#endif /* AWS_EVENT_STREAM_CHANNEL_HANDLER_H */
62 changes: 62 additions & 0 deletions include/aws/event-stream/event_stream_rpc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef AWS_EVENT_STREAM_RPC_H
#define AWS_EVENT_STREAM_RPC_H
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <aws/event-stream/event_stream.h>

/**
* :message-type header name
*/
extern AWS_EVENT_STREAM_API const struct aws_byte_cursor aws_event_stream_rpc_message_type_name;
/**
* :message-flags header name
*/
extern AWS_EVENT_STREAM_API const struct aws_byte_cursor aws_event_stream_rpc_message_flags_name;
/**
* :stream-id header name
*/
extern AWS_EVENT_STREAM_API const struct aws_byte_cursor aws_event_stream_rpc_stream_id_name;
/**
* operation header name.
*/
extern AWS_EVENT_STREAM_API const struct aws_byte_cursor aws_event_stream_rpc_operation_name;

enum aws_event_stream_rpc_message_type {
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING_RESPONSE,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR,

AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT,
};

enum aws_event_stream_rpc_message_flag {
AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED = 1,
AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM = 2,
};

struct aws_event_stream_rpc_message_args {
/** array of headers for an event-stream message. */
struct aws_event_stream_header_value_pair *headers;
/** number of headers in the headers array.
* headers are copied in aws_event_stream_rpc_*_send_message()
* so you can free the memory immediately after calling it if you need to.*/
size_t headers_count;
/** payload buffer for the message, payload is copied in aws_event_stream_rpc_*_send_message()
* so you can free the memory immediately after calling it if you need to. */
struct aws_byte_buf *payload;
/** message type for the message. This will be added to the headers array
* and the ":message-type" header should not be included in headers */
enum aws_event_stream_rpc_message_type message_type;
/** message flags for the message. This will be added to the headers array
* and the ":message-flags" header should not be included in headers */
uint32_t message_flags;
};

#endif /* AWS_EVENT_STREAM_RPC_SERVER_H */
Loading

0 comments on commit 8a98dec

Please sign in to comment.