Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB eventstore, work in progress #118

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions eventstore/dynamodb/native/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2020 Johan Haleby
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>eventstore-dynamodb</artifactId>
<groupId>org.occurrent</groupId>
<version>0.14.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>eventstore-dynamodb-native</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-api-blocking</artifactId>
<version>0.14.4-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>2.17.227</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>2.17.227</version>
</dependency>

<!-- TEMP -->
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>eventstore-mongodb-common</artifactId>
<version>0.14.4-SNAPSHOT</version>
</dependency>


<!-- Test -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.occurrent</groupId>
<artifactId>test-support</artifactId>
<version>0.14.4-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.artsok</groupId>
<artifactId>rerunner-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package org.occurrent.eventstore.dynamodb.nativedriver;

import io.cloudevents.CloudEvent;
import java.util.Collections;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.Document;
import org.occurrent.eventstore.api.LongConditionEvaluator;
import org.occurrent.eventstore.api.WriteCondition;
import org.occurrent.eventstore.api.WriteCondition.StreamVersionWriteCondition;
import org.occurrent.eventstore.api.WriteConditionNotFulfilledException;
import org.occurrent.eventstore.api.WriteResult;
import org.occurrent.eventstore.api.blocking.EventStore;
import org.occurrent.eventstore.api.blocking.EventStream;
import org.occurrent.eventstore.mongodb.internal.OccurrentCloudEventMongoDocumentMapper;
import org.occurrent.functionalsupport.internal.FunctionalSupport.Pair;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

public class DynamoDBEventStore implements EventStore {
private static final String ATTRIBUTE_STREAM = "stream";
private static final String ATTRIBUTE_VERSION = "version";
private static final String ATTRIBUTE_EVENTS = "events";

private DynamoDbClient dynamoDB;
private String tableName;
private TimeRepresentation timeRepresentation;

public DynamoDBEventStore(DynamoDbClient dynamoDB, String tableName, TimeRepresentation timeRepresentation) {
this.dynamoDB = dynamoDB;
this.tableName = tableName;
this.timeRepresentation = timeRepresentation;
TableDescription table = dynamoDB.describeTable(
DescribeTableRequest.builder().tableName(tableName).build()).table();
if (table.keySchema().stream().filter(k -> ATTRIBUTE_STREAM.equals(k)).findAny().isPresent()) {
throw new IllegalStateException("Incorrect table");
}
// TODO: add more validation
}

public static DynamoDBEventStore create(DynamoDbClient dynamoDB, String tableName, TimeRepresentation timeRepresentation, long readCapacity, long writeCapacity) {
try {
dynamoDB.createTable(CreateTableRequest.builder()
.tableName(tableName)
.keySchema(
KeySchemaElement.builder().attributeName(ATTRIBUTE_STREAM).keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder().attributeName(ATTRIBUTE_VERSION).keyType(KeyType.RANGE)
.build())
.attributeDefinitions(
AttributeDefinition.builder().attributeName(ATTRIBUTE_STREAM)
.attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(ATTRIBUTE_VERSION)
.attributeType(ScalarAttributeType.N).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(readCapacity)
.writeCapacityUnits(writeCapacity).build())
.build());
} catch (ResourceInUseException e) {
// ignore existing table
}
return new DynamoDBEventStore(dynamoDB, tableName, timeRepresentation);
}

public void deleteTable() {
try {
dynamoDB.deleteTable(DeleteTableRequest.builder().tableName(tableName).build());
} catch (ResourceNotFoundException e) {
// ignore
}
}

@Override
public WriteResult write(String streamId, Stream<CloudEvent> events) {
return write(streamId, WriteCondition.anyStreamVersion(), events);
}

@Override
public WriteResult write(String streamId, WriteCondition writeCondition,
Stream<CloudEvent> events) {
QueryResponse res = dynamoDB.query(QueryRequest.builder()
.tableName(tableName)
.attributesToGet(ATTRIBUTE_VERSION)
.scanIndexForward(false)
.limit(1)
.keyConditions(Map.of(ATTRIBUTE_STREAM, Condition.builder()
.attributeValueList(AttributeValue.builder().s(streamId).build())
.comparisonOperator(ComparisonOperator.EQ)
.build()))
.build());
AtomicLong version = res.items().isEmpty()
? new AtomicLong(0)
: new AtomicLong(Long.parseLong(res.items().get(0).get(ATTRIBUTE_VERSION).n()));
long oldStreamVersion = version.intValue();

if (writeCondition instanceof StreamVersionWriteCondition) {
org.occurrent.condition.Condition<Long> condition = ((StreamVersionWriteCondition) writeCondition).condition;
if (condition != null && !LongConditionEvaluator.evaluate(condition, oldStreamVersion)) {
throw new WriteConditionNotFulfilledException(streamId, oldStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, version.longValue()));
}
}
List<Pair<String,Long>> eventData = events
.map(e -> {
long v = version.incrementAndGet();
return new Pair<>(OccurrentCloudEventMongoDocumentMapper.convertToDocument(timeRepresentation, streamId, v, e).toJson(), v);
})
.collect(Collectors.toList());

// TODO: optimize
if (eventData.isEmpty()) {
return new WriteResult(streamId, oldStreamVersion, oldStreamVersion);
}

try {
dynamoDB.transactWriteItems(TransactWriteItemsRequest.builder()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware TWI - it costs double what a standard operation costs (obviously the Equinox.DynamoStore schema involves much more logic as a result of using UpdateItem). See jet/equinox#327 for my learnings from going down this road

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm aware of this and this will most likely be a configuration option how to write. Ideally I would like adding events to an eventstream to be an atomic operation and initially I had one row per transaction (ie several events), but then I had to change this in order to conform to what seems to be the rule in occurrent that every single event should increment the version by one.

@johanhaleby Related to this, I was confused by this: EventStream read(String streamId, int skip, int limit);

Should skip be the number of events to skip or should this be the version number? If its the version number, shouldnt this be a long? Does the version number have to equal the number of events?

When doing eventsourcing I usually consider all the events generated by a command to be an atomic update of the eventstream, ie having versions between the start and the end of the transaction does not necessarily make sense.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had one row per transaction (ie several events), but then I had to change this in order to conform to what seems to be the rule in occurrent that every single event should increment the version by one.

Yeah the problem/tradeoff is that the minute you try to fulfil the transactional correctness requirement, you run into a set of problems:

  • TransactWriteItems doubles the charges for everything, which is a massive loss
  • You need something useful to make the write contingent on (i.e. the expectedversion etc - if you instead are checking that the previous item is present and the one you are writing is not, even the basic coding gets complex)

In Equinox, the schema resolves the forces by having the notion of a Tip per stream, which gates all writes going through:

  • one could keep an event counter in it, but I have an etag string (this allows one to rolling transactionally correct updates without having to write a new event every time). Where you are writing event 0, the condition is that the Tip does not exist
  • if you are persisting multiple events in one write, they can all get appended in a single Put/Update call

In addition to working for larger cases, it has the following key properties for normal use:

  • small streams are a single item that can be loaded via a single GetItem roundtrip
  • TransactWriteItems only becomes required when the tip overflows
  • minimum storage overhead

One thing it does complicate is the fact that the DDB Streams output will emit a full copy of the Tip for every update, e.g. if you are adding 2 events to a Tip that has one event already, the DDB streams output will be a DDB Streas event with the full Item (which hosts 3 events, but only 2 are new)

The other thing to bear in mind is that having >1 event per item means you need a good story about when you are writing 201K of events on top of 200K of existing events.

I would caution against having a mode switch in your implementation; testing, reasoning and talking about the code becomes a nightmare. Better to have a single impl that can deal with your use cases efficiently and test, tune and validate that. (The other reason I say that is that I fundamentally believe that an event per Item schema is just worse than useless in terms of cost and efficiency too)

Should skip be the number of events to skip or should this be the version number? If its the version number, shouldnt this be a long? Does the version number have to equal the number of events?

I use longs for event indexes; ESDB etc does too. In practice the CUs and latency it costs to read more than 2m events make it irrelevant (and there are fixed limits to how much can be held in a logical partition (10GB is it?), so any design that is predicated on unlimited stream lengths is not even theoretically implementable)

.transactItems(
eventData.stream().map(pair -> TransactWriteItem.builder().put(Put.builder()
.tableName(tableName)
.item(Map.of(
ATTRIBUTE_STREAM, AttributeValue.builder().s(streamId).build(),
ATTRIBUTE_VERSION, AttributeValue.builder().n(Long.toString(pair.t2)).build(),
ATTRIBUTE_EVENTS, AttributeValue.builder().s(pair.t1).build()
))
.conditionExpression("attribute_not_exists(" + ATTRIBUTE_VERSION + ")")
.build()).build()).collect(Collectors.toList())).build());
} catch (TransactionCanceledException e) {
// TODO: if WriteCondition.anyStreamVersion maybe we could automatically retry?
if (e.cancellationReasons().stream().anyMatch(r -> r.code().equals("ConditionalCheckFailed"))) {
throw new ConcurrentModificationException();
// throw new WriteConditionNotFulfilledException(streamId, oldStreamVersion, writeCondition, String.format("%s was not fulfilled. Expected version %s but was %s.", WriteCondition.class.getSimpleName(), writeCondition, oldStreamVersion));
} else {
// transaction failed for some other reason
throw e;
}
}

return new WriteResult(streamId, oldStreamVersion, version.longValue());
}

private static class EventStreamImpl<T> implements EventStream<T> {
private final String id;
private final long version;
private final Stream<T> events;

EventStreamImpl(String id, long version, Stream<T> events) {
this.id = id;
this.version = version;
this.events = events;
}

@Override
public String id() {
return id;
}

@Override
public long version() {
return version;
}

@Override
public Stream<T> events() {
return events;
}
}

@Override
public EventStream<CloudEvent> read(String streamId, int skip, int limit) {
QueryResponse res = dynamoDB.query(QueryRequest.builder()
.tableName(tableName)
.attributesToGet(ATTRIBUTE_VERSION, ATTRIBUTE_EVENTS)
.keyConditions(Map.of(
ATTRIBUTE_STREAM, Condition.builder()
.attributeValueList(AttributeValue.builder().s(streamId).build())
.comparisonOperator(ComparisonOperator.EQ)
.build(),
ATTRIBUTE_VERSION, Condition.builder()
.attributeValueList(AttributeValue.builder().n(Long.toString(skip)).build())
.comparisonOperator(ComparisonOperator.GT)
.build()
))
.build());
if (res.items().isEmpty()) {
return new EventStreamImpl<CloudEvent>(streamId, 0, Collections.<CloudEvent>emptyList().stream());
}
long latestVersion = res.items().stream().map(row -> Long.parseLong(row.get(ATTRIBUTE_VERSION).n())).max(Comparator.naturalOrder()).get();
Stream<CloudEvent> events = res.items().stream()
.limit(limit)
.map(row -> row.get(ATTRIBUTE_EVENTS).s())
.map(Document::parse)
.map(bson -> OccurrentCloudEventMongoDocumentMapper.convertToCloudEvent(timeRepresentation, bson));
return new EventStreamImpl<>(streamId, latestVersion, events);
}

// @Override
public void deleteEventStream(String streamId) {
while (true) {
QueryResponse queryResponse = dynamoDB.query(QueryRequest.builder()
.tableName(tableName)
.attributesToGet(ATTRIBUTE_VERSION)
.keyConditions(Map.of(
ATTRIBUTE_STREAM, Condition.builder()
.attributeValueList(AttributeValue.builder().s(streamId).build())
.comparisonOperator(ComparisonOperator.EQ)
.build()
))
.build());
if (!queryResponse.hasItems() || queryResponse.items().isEmpty()) break;
BatchWriteItemResponse deleteResponse = dynamoDB.batchWriteItem(
BatchWriteItemRequest.builder()
.requestItems(Map.of(tableName,
queryResponse.items().stream().map(row -> WriteRequest.builder().deleteRequest(
DeleteRequest.builder()
.key(Map.of(
ATTRIBUTE_STREAM, AttributeValue.builder().s(streamId).build(),
ATTRIBUTE_VERSION, row.get(ATTRIBUTE_VERSION)
))
.build()).build()).collect(Collectors.toList()))).build());
if (!deleteResponse.hasUnprocessedItems() && queryResponse.hasLastEvaluatedKey()) break;
// TODO: add exponential backoff
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

@Override
public boolean exists(String streamId) {
return !read(streamId, 0, 1).isEmpty();
}

}
Loading