Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE YET] GH-3103: Introduce CloudEvents transformers #3246

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ext {
assertjVersion = '3.15.0'
assertkVersion = '0.22'
awaitilityVersion = '4.0.2'
cloudEventsVersion = '1.3.0'
commonsDbcp2Version = '2.7.0'
commonsIoVersion = '2.6'
commonsNetVersion = '3.6'
Expand Down Expand Up @@ -419,6 +420,7 @@ project('spring-integration-core') {
optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion"
optionalApi "org.apache.avro:avro:$avroVersion"
optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
optionalApi "io.cloudevents:cloudevents-api:$cloudEventsVersion"

testImplementation ("org.aspectj:aspectjweaver:$aspectjVersion")
testImplementation ('com.fasterxml.jackson.datatype:jackson-datatype-jsr310')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

/**
* Message headers for basic cloud event attributes.
* These headers might be remapped to respective attributes/headers
* in the target protocol binder.
*
* @author Artem Bilan
*
* @since 5.3
*/
public final class CloudEventHeaders {

private CloudEventHeaders() {
}

/**
* Header prefix as a {@value PREFIX} for cloud event attributes.
*/
public static final String PREFIX = "ce_";

/**
* The header name for cloud event {@code id} attribute.
*/
public static final String ID = PREFIX + "id";

/**
* The header name for cloud event {@code source} attribute.
*/
public static final String SOURCE = PREFIX + "source";

/**
* The header name for cloud event {@code specversion} attribute.
*/
public static final String SPEC_VERSION = PREFIX + "specversion";

/**
* The header name for cloud event {@code type} attribute.
*/
public static final String TYPE = PREFIX + "type";

/**
* The header name for cloud event {@code datacontenttype} attribute.
*/
public static final String DATA_CONTENT_TYPE = PREFIX + "datacontenttype";

/**
* The header name for cloud event {@code dataschema} attribute.
*/
public static final String DATA_SCHEMA = PREFIX + "dataschema";

/**
* The header name for cloud event {@code subject} attribute.
*/
public static final String SUBJECT = PREFIX + "subject";

/**
* The header name for cloud event {@code time} attribute.
*/
public static final String TIME = PREFIX + "time";


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;

import io.cloudevents.fun.DataMarshaller;
import io.cloudevents.json.Json;

/**
* A {@link DataMarshaller} implementation for delegating
* to the provided {@link Encoder}s according a {@link MessageHeaders#CONTENT_TYPE}
* header value.
*
* @author Artem Bilan
*
* @since 5.3
*/
public class ContentTypeDelegatingDataMarshaller implements DataMarshaller<byte[], Object, String> {

private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

private final List<Encoder<?>> encoders = new ArrayList<>();

public ContentTypeDelegatingDataMarshaller(Encoder<?>... encoders) {
this.encoders.add(CharSequenceEncoder.allMimeTypes());
setEncoders(encoders);
}

public final void setEncoders(Encoder<?>... encoders) {
Assert.notNull(encoders, "'encoders' must not be null");
Assert.noNullElements(encoders, "'encoders' must not contain null elements");
this.encoders.addAll(Arrays.asList(encoders));
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public byte[] marshal(Object data, Map<String, String> headers) throws RuntimeException {
String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
if (contentType == null) { // Assume JSON by default
return Json.binaryMarshal(data, headers);
}
else {
ResolvableType elementType = ResolvableType.forClass(data.getClass());
MimeType mimeType = MimeType.valueOf(contentType);
Encoder<Object> encoder = encoder(elementType, mimeType);
DataBuffer dataBuffer =
encoder.encodeValue(data, this.dataBufferFactory, elementType,
mimeType, (Map<String, Object>) (Map) headers);

ByteBuffer buf = dataBuffer.asByteBuffer();
byte[] result = new byte[buf.remaining()];
buf.get(result);
return result;
}
}

@SuppressWarnings("unchecked")
private Encoder<Object> encoder(ResolvableType elementType, MimeType mimeType) {
for (Encoder<?> encoder : this.encoders) {
if (encoder.canEncode(elementType, mimeType)) {
return (Encoder<Object>) encoder;
}
}
throw new IllegalArgumentException("No encoder for " + elementType);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

import java.util.AbstractMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

import io.cloudevents.v1.ContextAttributes;

/**
* A Cloud Event header mapper.
*
* @author Artem Bilan
*
* @since 5.3
*/
public final class HeaderMapper {

/**
* Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper}
* @param attributes The map of attributes
* @param extensions The map of extensions
* @return The map of headers
*/
public static Map<String, String> map(Map<String, String> attributes, Map<String, String> extensions) {
Assert.notNull(attributes, "'attributes' must not be null");
Assert.notNull(extensions, "'extensions' must not be null");

Map<String, String> result =
attributes.entrySet()
.stream()
.filter(attribute ->
attribute.getValue() != null
&& !ContextAttributes.datacontenttype.name().equals(attribute.getKey()))
.map(header ->
new AbstractMap.SimpleEntry<>(
CloudEventHeaders.PREFIX + header.getKey().toLowerCase(Locale.US),
header.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

result.putAll(
extensions.entrySet()
.stream()
.filter(extension -> extension.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);

Optional.ofNullable(attributes
.get(ContextAttributes.datacontenttype.name()))
.ifPresent((dataContentType) -> {
result.put(MessageHeaders.CONTENT_TYPE, dataContentType);
});

return result;
}

private HeaderMapper() {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.cloudevents;

import java.util.HashMap;
import java.util.Map;

import org.springframework.messaging.MessageHeaders;

import io.cloudevents.extensions.ExtensionFormat;
import io.cloudevents.format.BinaryMarshaller;
import io.cloudevents.format.StructuredMarshaller;
import io.cloudevents.format.Wire;
import io.cloudevents.format.builder.EventStep;
import io.cloudevents.fun.DataMarshaller;
import io.cloudevents.json.Json;
import io.cloudevents.v1.Accessor;
import io.cloudevents.v1.AttributesImpl;

/**
* A Cloud Events general purpose marshallers factory.
*
* @author Artem Bilan
*
* @since 5.3
*/
public final class Marshallers {

private static final Map<String, String> NO_HEADERS = new HashMap<>();

/**
* Builds a Binary Content Mode marshaller to marshal cloud events as JSON for
* any Transport Binding.
* @param <T> The data type
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
* @see BinaryMarshaller
*/
public static <T> EventStep<AttributesImpl, T, byte[], String> binary() {
return binary(Json::binaryMarshal);
}

/**
* Builds a Binary Content Mode marshaller to marshal cloud events as a {@code byte[]} for
* any Transport Binding.
* The data marshalling is based on the provided {@link DataMarshaller}.
* @param marshaller the {@link DataMarshaller} for cloud event payload.
* @param <T> The data type
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
* @see BinaryMarshaller
*/
public static <T> EventStep<AttributesImpl, T, byte[], String> binary(
DataMarshaller<byte[], T, String> marshaller) {

return BinaryMarshaller.<AttributesImpl, T, byte[], String>builder()
.map(AttributesImpl::marshal)
.map(Accessor::extensionsOf)
.map(ExtensionFormat::marshal)
.map(HeaderMapper::map)
.map(marshaller)
.builder(Wire::new);
}

/**
* Builds a Structured Content Mode marshaller to marshal cloud event as JSON for
* any Transport Binding.
* @param <T> The data type
* @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON
* @see StructuredMarshaller
*/
public static <T> EventStep<AttributesImpl, T, byte[], String> structured() {
return StructuredMarshaller.<AttributesImpl, T, byte[], String>
builder()
.mime(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json")
.map((event) -> Json.binaryMarshal(event, NO_HEADERS))
.skip();
}

private Marshallers() {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides classes to support for Cloud Events.
*/
package org.springframework.integration.support.cloudevents;
Loading