diff --git a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java index d41129542fd..1339d9ece0a 100755 --- a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java +++ b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java @@ -91,13 +91,13 @@ public abstract class AbstractRoute extends AllDirectives { protected final ActorRef proxyActor; protected final ActorSystem actorSystem; - protected final Set mediaTypeJsonWithFallbacks; private final HttpConfig httpConfig; private final CommandConfig commandConfig; private final HeaderTranslator headerTranslator; private final HttpRequestActorPropsFactory httpRequestActorPropsFactory; private final Attributes supervisionStrategy; + private final Set mediaTypeJsonWithFallbacks; /** * Constructs the abstract route builder. diff --git a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java index e496c0ede22..b97ac11e04d 100755 --- a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java +++ b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; +import akka.http.javadsl.model.ContentTypes; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.model.base.acks.AcknowledgementRequest; import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel; @@ -36,6 +37,7 @@ import org.eclipse.ditto.protocoladapter.ProtocolFactory; import org.eclipse.ditto.services.gateway.endpoints.actors.AbstractHttpRequestActor; import org.eclipse.ditto.services.gateway.endpoints.routes.AbstractRoute; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig; import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory; @@ -46,6 +48,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Status; +import akka.http.javadsl.model.ContentType; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.RequestContext; @@ -73,6 +76,8 @@ public final class CloudEventsRoute extends AbstractRoute { private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private static final String DATA_SCHEMA_SCHEME = "ditto"; + private final CloudEventsConfig cloudEventsConfig; + /** * Constructs the cloud events route builder. * @@ -81,6 +86,7 @@ public final class CloudEventsRoute extends AbstractRoute { * @param httpConfig the configuration settings of the Gateway service's HTTP endpoint. * @param commandConfig the configuration settings for incoming commands (via HTTP requests) in the gateway. * @param headerTranslator translates headers from external sources or to external sources. + * @param cloudEventsConfig the configuration settings for cloud events. * @throws NullPointerException if any argument is {@code null}. */ public CloudEventsRoute( @@ -88,9 +94,11 @@ public CloudEventsRoute( final ActorSystem actorSystem, final HttpConfig httpConfig, final CommandConfig commandConfig, - final HeaderTranslator headerTranslator + final HeaderTranslator headerTranslator, + final CloudEventsConfig cloudEventsConfig ) { super(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator); + this.cloudEventsConfig = cloudEventsConfig; } @@ -251,7 +259,7 @@ private Optional> jsonToDittoSignal(@Nullable final CloudEventData dat final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject); final Signal signal = PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable); final Signal signalWithAdjustedHeaders = signal.setDittoHeaders( - signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build()); + signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build()); return Optional.of(signalWithAdjustedHeaders); } @@ -259,7 +267,7 @@ private void ensureDataContentType(@Nullable final String dataContentType, final RequestContext ctx, final DittoHeaders dittoHeaders) { - if (dataContentType == null || !mediaTypeJsonWithFallbacks.contains(dataContentType)) { + if (!isCorrectDataType(dataContentType)) { if (LOGGER.isInfoEnabled()) { LOGGER.withCorrelationId(dittoHeaders) .info("CloudEvent request rejected: unsupported data-content-type: <{}> request: <{}>", @@ -267,12 +275,35 @@ private void ensureDataContentType(@Nullable final String dataContentType, } throw UnsupportedMediaTypeException .withDetailedInformationBuilder(dataContentType != null ? dataContentType : "none", - mediaTypeJsonWithFallbacks) + cloudEventsConfig.getDataTypes()) .dittoHeaders(dittoHeaders) .build(); } } + /** + * Test if the data type is acceptable. + *

+ * A missing, empty or malformed data type is not acceptable. + * + * @param dataContentType The content type to check. + * @return {@code true} if the content type is acceptable, {@code false} otherwise. + */ + private boolean isCorrectDataType(@Nullable final String dataContentType) { + if (dataContentType == null) { + // no content type + return false; + } + + final ContentType type = ContentTypes.parse(dataContentType); + if (type == null) { + // failed to parse content type + return false; + } + + return this.cloudEventsConfig.getDataTypes().contains(type.mediaType().toString()); + } + /** * Ensure that the data schema starts with {@code ditto:}. * @@ -284,6 +315,11 @@ private void ensureDataSchema(@Nullable final URI dataSchema, final RequestContext ctx, final DittoHeaders dittoHeaders) { + if (dataSchema == null && cloudEventsConfig.isEmptySchemaAllowed()) { + // early return, no schema, but no requirement to have one + return; + } + if (dataSchema == null || !dataSchema.getScheme().equals(DATA_SCHEMA_SCHEME)) { if (LOGGER.isInfoEnabled()) { LOGGER.withCorrelationId(dittoHeaders) diff --git a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java index 66408aa1349..2c07b8e4a5b 100755 --- a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java +++ b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java @@ -38,8 +38,10 @@ import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthorizationSubjectsProviderFactory; import org.eclipse.ditto.services.gateway.security.utils.DefaultHttpClientFacade; import org.eclipse.ditto.services.gateway.security.utils.HttpClientFacade; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig; @@ -98,6 +100,7 @@ public abstract class EndpointTestBase extends JUnitRouteTest { protected static StreamingConfig streamingConfig; protected static PublicHealthConfig publicHealthConfig; protected static ProtocolConfig protocolConfig; + protected static CloudEventsConfig cloudEventsConfig; protected static JwtAuthenticationFactory jwtAuthenticationFactory; protected static HttpClientFacade httpClientFacade; protected static JwtAuthorizationSubjectsProviderFactory authorizationSubjectsProviderFactory; @@ -117,6 +120,7 @@ public static void initTestFixture() { streamingConfig = DefaultStreamingConfig.of(gatewayScopedConfig); publicHealthConfig = DefaultPublicHealthConfig.of(gatewayScopedConfig); protocolConfig = DefaultProtocolConfig.of(dittoScopedConfig); + cloudEventsConfig = DefaultCloudEventsConfig.of(gatewayScopedConfig); httpClientFacade = DefaultHttpClientFacade.getInstance(ActorSystem.create(EndpointTestBase.class.getSimpleName()), DefaultHttpProxyConfig.ofProxy(DefaultScopedConfig.empty("/"))); authorizationSubjectsProviderFactory = DittoJwtAuthorizationSubjectsProvider::of; diff --git a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java index bb2db357fd3..05f4aa64f75 100755 --- a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java +++ b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java @@ -168,7 +168,7 @@ public void setUp() { .thingSearchRoute( new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) .whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) - .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) + .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator, cloudEventsConfig)) .websocketRoute(WebSocketRoute.getInstance(proxyActor, streamingConfig, materializer)) .supportedSchemaVersions(httpConfig.getSupportedSchemaVersions()) .protocolAdapterProvider(protocolAdapterProvider) diff --git a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsTest.java b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsTest.java new file mode 100644 index 00000000000..c7ac08e0b5d --- /dev/null +++ b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ + +package org.eclipse.ditto.services.gateway.endpoints.routes.cloudevents; + +import org.eclipse.ditto.services.gateway.endpoints.EndpointTestBase; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; +import org.junit.Test; + +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public final class CloudEventsTest extends EndpointTestBase { + + @Test + public void testDefaults() { + final Object defaultValue = CloudEventsConfig.CloudEventsConfigValue.DATA_TYPES.getDefaultValue(); + assertThat(defaultValue).isEqualTo(Set.of("application/json", "application/vnd.eclipse.ditto+json")); + } + +} diff --git a/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java b/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java index 96f944fcdaf..14de6444a4d 100755 --- a/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java +++ b/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java @@ -283,7 +283,8 @@ private static Route createRoute(final ActorSystem actorSystem, .thingSearchRoute( new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) .whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) - .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) + .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, + headerTranslator, gatewayConfig.getCloudEventsConfig())) .websocketRoute(WebSocketRoute.getInstance(streamingActor, streamingConfig, materializer) .withSignalEnrichmentProvider(signalEnrichmentProvider) .withHeaderTranslator(headerTranslator)) diff --git a/services/gateway/starter/src/main/resources/gateway.conf b/services/gateway/starter/src/main/resources/gateway.conf index 021d164497c..cdb85032a9f 100755 --- a/services/gateway/starter/src/main/resources/gateway.conf +++ b/services/gateway/starter/src/main/resources/gateway.conf @@ -255,6 +255,14 @@ ditto { cache-timeout = ${?GATEWAY_STATUS_HEALTH_EXTERNAL_TIMEOUT} } + cloud-events { + empty-schema-allowed = true + data-types = [ + "application/json" + "application/vnd.eclipse.ditto+json" + ] + } + cache { publickeys { maxentries = 32 diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java index 2eb3ca60733..422ba82a5b2 100644 --- a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java @@ -16,8 +16,10 @@ import org.eclipse.ditto.services.base.config.DittoServiceConfig; import org.eclipse.ditto.services.base.config.limits.LimitsConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig; @@ -59,6 +61,7 @@ public final class DittoGatewayConfig implements GatewayConfig, WithConfigPath { private final AuthenticationConfig authenticationConfig; private final StreamingConfig streamingConfig; private final PublicHealthConfig publicHealthConfig; + private final DefaultCloudEventsConfig cloudEventsConfig; private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) { @@ -73,6 +76,7 @@ private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) { authenticationConfig = DefaultAuthenticationConfig.of(dittoServiceConfig); streamingConfig = DefaultStreamingConfig.of(dittoServiceConfig); publicHealthConfig = DefaultPublicHealthConfig.of(dittoServiceConfig); + cloudEventsConfig = DefaultCloudEventsConfig.of(dittoServiceConfig); } /** @@ -152,6 +156,11 @@ public ProtocolConfig getProtocolConfig() { return protocolConfig; } + @Override + public CloudEventsConfig getCloudEventsConfig() { + return cloudEventsConfig; + } + /** * @return always {@value #CONFIG_PATH}. */ diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java index 95075f72b18..71cc3ca594f 100644 --- a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.services.gateway.util.config; import org.eclipse.ditto.services.base.config.ServiceSpecificConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.MessageConfig; @@ -88,4 +89,10 @@ public interface GatewayConfig extends ServiceSpecificConfig, WithProtocolConfig */ PublicHealthConfig getPublicHealthConfig(); + /** + * Returns the configuration for the cloud events endpoint. + * + * @return the config. + */ + CloudEventsConfig getCloudEventsConfig(); } diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java new file mode 100644 index 00000000000..1f5522cd922 --- /dev/null +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.gateway.util.config.endpoints; + +import java.util.Set; + +import javax.annotation.concurrent.Immutable; + +import akka.http.javadsl.model.MediaTypes; +import org.eclipse.ditto.model.base.common.DittoConstants; +import org.eclipse.ditto.services.utils.config.KnownConfigValue; + +import static org.eclipse.ditto.model.base.common.DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE; + +/** + * Provides configuration settings for the cloud events endpoint of the Ditto Gateway service. + */ +@Immutable +public interface CloudEventsConfig { + + /** + * Returns if an empty data schema is allowed. + * + * @return {@code true} if an empty data schema is allowed {@code false} otherwise. + */ + boolean isEmptySchemaAllowed(); + + /** + * Returns the allowed data types. + * + * @return The set of allowed data types. + */ + Set getDataTypes(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code CloudEventsConfig}. + */ + enum CloudEventsConfigValue implements KnownConfigValue { + + /** + * Flag if an empty data schema is allowed. + */ + EMPTY_SCHEMA_ALLOWED("empty-schema-allowed", true), + + /** + * Set of allowed data types + */ + DATA_TYPES("data-types", Set.of(MediaTypes.APPLICATION_JSON.toString(), DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE)); + + private final String path; + private final Object defaultValue; + + private CloudEventsConfigValue(final String thePath, final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + + } +} diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java new file mode 100644 index 00000000000..d3c1a94d23f --- /dev/null +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.gateway.util.config.endpoints; + +import java.util.Objects; +import java.util.Set; + +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.services.utils.config.ConfigWithFallback; +import org.eclipse.ditto.services.utils.config.ScopedConfig; + +import com.typesafe.config.Config; + +/** + * This class is the default implementation of the cloud events endpoint config. + */ +@Immutable +public final class DefaultCloudEventsConfig implements CloudEventsConfig { + + private static final String CONFIG_PATH = "cloud-events"; + + private final boolean emptySchemaAllowed; + + private final Set dataTypes; + + private DefaultCloudEventsConfig(final ScopedConfig scopedConfig) { + emptySchemaAllowed = scopedConfig.getBoolean(CloudEventsConfig.CloudEventsConfigValue.EMPTY_SCHEMA_ALLOWED.getConfigPath()); + dataTypes = Set.copyOf(scopedConfig.getStringList(CloudEventsConfigValue.DATA_TYPES.getConfigPath())); + } + + /** + * Returns an instance of {@code DefaultCloudEventsConfig} based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the public health config at {@value #CONFIG_PATH}. + * @return the instance. + * @throws org.eclipse.ditto.services.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultCloudEventsConfig of(final Config config) { + return new DefaultCloudEventsConfig( + ConfigWithFallback.newInstance(config, CONFIG_PATH, CloudEventsConfig.CloudEventsConfigValue.values())); + } + + @Override + public boolean isEmptySchemaAllowed() { + return emptySchemaAllowed; + } + + @Override + public Set getDataTypes() { + return dataTypes; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DefaultCloudEventsConfig that = (DefaultCloudEventsConfig) o; + return emptySchemaAllowed == that.emptySchemaAllowed && dataTypes.equals(that.dataTypes); + } + + @Override + public int hashCode() { + return Objects.hash(emptySchemaAllowed, dataTypes); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + + "emptySchemaAllowed=" + emptySchemaAllowed + + ", dataTypes=" + dataTypes + + "]"; + } + +}