Skip to content

Commit

Permalink
[eclipse-ditto#903]: Make schema optional, allow alternate content type
Browse files Browse the repository at this point in the history
This change allows to make the schema optional and also allows to
configure the data content type.

Signed-off-by: Jens Reimann <[email protected]>
  • Loading branch information
ctron committed Dec 9, 2020
1 parent 606af03 commit fe7d03d
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ public abstract class AbstractRoute extends AllDirectives {

protected final ActorRef proxyActor;
protected final ActorSystem actorSystem;
protected final Set<String> mediaTypeJsonWithFallbacks;

private final HttpConfig httpConfig;
private final CommandConfig commandConfig;
private final HeaderTranslator headerTranslator;
private final HttpRequestActorPropsFactory httpRequestActorPropsFactory;
private final Attributes supervisionStrategy;
private final Set<String> mediaTypeJsonWithFallbacks;

/**
* Constructs the abstract route builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.services.gateway.endpoints.actors.AbstractHttpRequestActor;
import org.eclipse.ditto.services.gateway.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
Expand All @@ -46,6 +47,8 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Status;
import akka.http.javadsl.model.ContentType;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.RequestContext;
Expand Down Expand Up @@ -73,6 +76,8 @@ public final class CloudEventsRoute extends AbstractRoute {
private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final String DATA_SCHEMA_SCHEME = "ditto";

private final CloudEventsConfig cloudEventsConfig;

/**
* Constructs the cloud events route builder.
*
Expand All @@ -81,16 +86,19 @@ public final class CloudEventsRoute extends AbstractRoute {
* @param httpConfig the configuration settings of the Gateway service's HTTP endpoint.
* @param commandConfig the configuration settings for incoming commands (via HTTP requests) in the gateway.
* @param headerTranslator translates headers from external sources or to external sources.
* @param cloudEventsConfig the configuration settings for cloud events.
* @throws NullPointerException if any argument is {@code null}.
*/
public CloudEventsRoute(
final ActorRef proxyActor,
final ActorSystem actorSystem,
final HttpConfig httpConfig,
final CommandConfig commandConfig,
final HeaderTranslator headerTranslator
final HeaderTranslator headerTranslator,
final CloudEventsConfig cloudEventsConfig
) {
super(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator);
this.cloudEventsConfig = cloudEventsConfig;
}


Expand Down Expand Up @@ -251,28 +259,51 @@ private Optional<Signal<?>> jsonToDittoSignal(@Nullable final CloudEventData dat
final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject);
final Signal<?> signal = PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable);
final Signal<?> signalWithAdjustedHeaders = signal.setDittoHeaders(
signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build());
signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build());
return Optional.of(signalWithAdjustedHeaders);
}

private void ensureDataContentType(@Nullable final String dataContentType,
final RequestContext ctx,
final DittoHeaders dittoHeaders) {

if (dataContentType == null || !mediaTypeJsonWithFallbacks.contains(dataContentType)) {
if (!isCorrectDataType(dataContentType)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.withCorrelationId(dittoHeaders)
.info("CloudEvent request rejected: unsupported data-content-type: <{}> request: <{}>",
dataContentType, requestToLogString(ctx.getRequest()));
}
throw UnsupportedMediaTypeException
.withDetailedInformationBuilder(dataContentType != null ? dataContentType : "none",
mediaTypeJsonWithFallbacks)
cloudEventsConfig.getDataTypes())
.dittoHeaders(dittoHeaders)
.build();
}
}

/**
* Test if the data type is acceptable.
* <p>
* A missing, empty or malformed data type is not acceptable.
*
* @param dataContentType The content type to check.
* @return {@code true} if the content type is acceptable, {@code false} otherwise.
*/
private boolean isCorrectDataType(@Nullable final String dataContentType) {
if (dataContentType == null) {
// no content type
return false;
}

final ContentType type = ContentTypes.parse(dataContentType);
if (type == null) {
// failed to parse content type
return false;
}

return this.cloudEventsConfig.getDataTypes().contains(type.mediaType().toString());
}

/**
* Ensure that the data schema starts with {@code ditto:}.
*
Expand All @@ -284,6 +315,11 @@ private void ensureDataSchema(@Nullable final URI dataSchema,
final RequestContext ctx,
final DittoHeaders dittoHeaders) {

if (dataSchema == null && cloudEventsConfig.isEmptySchemaAllowed()) {
// early return, no schema, but no requirement to have one
return;
}

if (dataSchema == null || !dataSchema.getScheme().equals(DATA_SCHEMA_SCHEME)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.withCorrelationId(dittoHeaders)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthorizationSubjectsProviderFactory;
import org.eclipse.ditto.services.gateway.security.utils.DefaultHttpClientFacade;
import org.eclipse.ditto.services.gateway.security.utils.HttpClientFacade;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig;
Expand Down Expand Up @@ -98,6 +100,7 @@ public abstract class EndpointTestBase extends JUnitRouteTest {
protected static StreamingConfig streamingConfig;
protected static PublicHealthConfig publicHealthConfig;
protected static ProtocolConfig protocolConfig;
protected static CloudEventsConfig cloudEventsConfig;
protected static JwtAuthenticationFactory jwtAuthenticationFactory;
protected static HttpClientFacade httpClientFacade;
protected static JwtAuthorizationSubjectsProviderFactory authorizationSubjectsProviderFactory;
Expand All @@ -117,6 +120,7 @@ public static void initTestFixture() {
streamingConfig = DefaultStreamingConfig.of(gatewayScopedConfig);
publicHealthConfig = DefaultPublicHealthConfig.of(gatewayScopedConfig);
protocolConfig = DefaultProtocolConfig.of(dittoScopedConfig);
cloudEventsConfig = DefaultCloudEventsConfig.of(gatewayScopedConfig);
httpClientFacade = DefaultHttpClientFacade.getInstance(ActorSystem.create(EndpointTestBase.class.getSimpleName()),
DefaultHttpProxyConfig.ofProxy(DefaultScopedConfig.empty("/")));
authorizationSubjectsProviderFactory = DittoJwtAuthorizationSubjectsProvider::of;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void setUp() {
.thingSearchRoute(
new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator, cloudEventsConfig))
.websocketRoute(WebSocketRoute.getInstance(proxyActor, streamingConfig, materializer))
.supportedSchemaVersions(httpConfig.getSupportedSchemaVersions())
.protocolAdapterProvider(protocolAdapterProvider)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.services.gateway.endpoints.routes.cloudevents;

import org.eclipse.ditto.services.gateway.endpoints.EndpointTestBase;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig;
import org.junit.Test;

import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;

public final class CloudEventsTest extends EndpointTestBase {

@Test
public void testDefaults() {
final Object defaultValue = CloudEventsConfig.CloudEventsConfigValue.DATA_TYPES.getDefaultValue();
assertThat(defaultValue).isEqualTo(Set.of("application/json", "application/vnd.eclipse.ditto+json"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private static Route createRoute(final ActorSystem actorSystem,
.thingSearchRoute(
new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator))
.cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig,
headerTranslator, gatewayConfig.getCloudEventsConfig()))
.websocketRoute(WebSocketRoute.getInstance(streamingActor, streamingConfig, materializer)
.withSignalEnrichmentProvider(signalEnrichmentProvider)
.withHeaderTranslator(headerTranslator))
Expand Down
8 changes: 8 additions & 0 deletions services/gateway/starter/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ ditto {
cache-timeout = ${?GATEWAY_STATUS_HEALTH_EXTERNAL_TIMEOUT}
}

cloud-events {
empty-schema-allowed = true
data-types = [
"application/json"
"application/vnd.eclipse.ditto+json"
]
}

cache {
publickeys {
maxentries = 32
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import org.eclipse.ditto.services.base.config.DittoServiceConfig;
import org.eclipse.ditto.services.base.config.limits.LimitsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig;
Expand Down Expand Up @@ -59,6 +61,7 @@ public final class DittoGatewayConfig implements GatewayConfig, WithConfigPath {
private final AuthenticationConfig authenticationConfig;
private final StreamingConfig streamingConfig;
private final PublicHealthConfig publicHealthConfig;
private final DefaultCloudEventsConfig cloudEventsConfig;

private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) {

Expand All @@ -73,6 +76,7 @@ private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) {
authenticationConfig = DefaultAuthenticationConfig.of(dittoServiceConfig);
streamingConfig = DefaultStreamingConfig.of(dittoServiceConfig);
publicHealthConfig = DefaultPublicHealthConfig.of(dittoServiceConfig);
cloudEventsConfig = DefaultCloudEventsConfig.of(dittoServiceConfig);
}

/**
Expand Down Expand Up @@ -152,6 +156,11 @@ public ProtocolConfig getProtocolConfig() {
return protocolConfig;
}

@Override
public CloudEventsConfig getCloudEventsConfig() {
return cloudEventsConfig;
}

/**
* @return always {@value #CONFIG_PATH}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.services.gateway.util.config;

import org.eclipse.ditto.services.base.config.ServiceSpecificConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.MessageConfig;
Expand Down Expand Up @@ -88,4 +89,10 @@ public interface GatewayConfig extends ServiceSpecificConfig, WithProtocolConfig
*/
PublicHealthConfig getPublicHealthConfig();

/**
* Returns the configuration for the cloud events endpoint.
*
* @return the config.
*/
CloudEventsConfig getCloudEventsConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.util.config.endpoints;

import java.util.Set;

import javax.annotation.concurrent.Immutable;

import akka.http.javadsl.model.MediaTypes;
import org.eclipse.ditto.model.base.common.DittoConstants;
import org.eclipse.ditto.services.utils.config.KnownConfigValue;

import static org.eclipse.ditto.model.base.common.DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE;

/**
* Provides configuration settings for the cloud events endpoint of the Ditto Gateway service.
*/
@Immutable
public interface CloudEventsConfig {

/**
* Returns if an empty data schema is allowed.
*
* @return {@code true} if an empty data schema is allowed {@code false} otherwise.
*/
boolean isEmptySchemaAllowed();

/**
* Returns the allowed data types.
*
* @return The set of allowed data types.
*/
Set<String> getDataTypes();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code CloudEventsConfig}.
*/
enum CloudEventsConfigValue implements KnownConfigValue {

/**
* Flag if an empty data schema is allowed.
*/
EMPTY_SCHEMA_ALLOWED("empty-schema-allowed", true),

/**
* Set of allowed data types
*/
DATA_TYPES("data-types", Set.of(MediaTypes.APPLICATION_JSON.toString(), DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE));

private final String path;
private final Object defaultValue;

private CloudEventsConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}
}
Loading

0 comments on commit fe7d03d

Please sign in to comment.