From 0a37738bfd493cbe28a4b4f0862b897f272803a9 Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 11:28:46 +0200 Subject: [PATCH 1/7] feat: protos with tags --- .../v1alpha/entities_v1alpha.proto | 15 ++++++++++---- scripts/pp/get-bare-policy.sh | 20 +++++++++++++++++++ scripts/pp/list-datapolicies.sh | 5 +++++ scripts/pp/list-groups.sh | 16 +++++++++++++++ scripts/pp/list-platforms.sh | 5 +++++ scripts/pp/list-tables.sh | 15 ++++++++++++++ scripts/pp/upsert-data-policy.sh | 11 ++++++++++ 7 files changed, 83 insertions(+), 4 deletions(-) create mode 100755 scripts/pp/get-bare-policy.sh create mode 100755 scripts/pp/list-datapolicies.sh create mode 100755 scripts/pp/list-groups.sh create mode 100755 scripts/pp/list-platforms.sh create mode 100755 scripts/pp/list-tables.sh create mode 100755 scripts/pp/upsert-data-policy.sh diff --git a/protos/getstrm/api/data_policies/v1alpha/entities_v1alpha.proto b/protos/getstrm/api/data_policies/v1alpha/entities_v1alpha.proto index cdbcc571..e589cb7c 100644 --- a/protos/getstrm/api/data_policies/v1alpha/entities_v1alpha.proto +++ b/protos/getstrm/api/data_policies/v1alpha/entities_v1alpha.proto @@ -18,17 +18,17 @@ message DataPolicy { string organization_id = 4; google.protobuf.Timestamp create_time = 5; google.protobuf.Timestamp update_time = 6; + repeated string tags = 7; } message RuleSet { Target target = 1; repeated FieldTransform field_transforms = 2; - repeated RowFilter row_filters = 3; + repeated Filter filters = 3; - message RowFilter { - Attribute attribute = 1; + message Filter { // Last condition in the list must have 0 principals, as this acts as the default / else condition. - repeated Condition conditions = 3; + repeated Condition conditions = 1; message Condition { repeated string principals = 1; string condition = 2; @@ -100,6 +100,7 @@ message DataPolicy { // attributes are leading. // Todo: rename to fields? Attribute is an overloaded term in this context. repeated Attribute attributes = 4; + repeated string tags = 5; // where did the Attribute list come from? enum Type { @@ -117,6 +118,8 @@ message DataPolicy { // This is the "native" type, originating from the source platform. string type = 2; bool required = 3; + repeated string tags = 4; + // TODO add repeated? } } @@ -126,6 +129,7 @@ message DataCatalog { string id = 1; Type type = 2; repeated DataBase databases = 3; + repeated string tags = 4; enum Type { TYPE_UNSPECIFIED = 0; COLLIBRA = 1; @@ -139,6 +143,7 @@ message DataCatalog { string display_name = 3; DataCatalog catalog = 4; repeated Schema schemas = 5; + repeated string tags = 6; } message Schema { @@ -146,11 +151,13 @@ message DataCatalog { string name = 2; DataBase database = 3; repeated Table tables = 4; + repeated string tags = 5; } message Table { string id = 1; string name = 2; Schema schema = 3; + repeated string tags = 4; } } diff --git a/scripts/pp/get-bare-policy.sh b/scripts/pp/get-bare-policy.sh new file mode 100755 index 00000000..57ebed16 --- /dev/null +++ b/scripts/pp/get-bare-policy.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +platform='databricks-pim@getstrm.com' +table='strm.poc.gddemo' +while getopts "p:t:" opt; do + case $opt in + p) + platform=$OPTARG + ;; + t) + table=$OPTARG + ;; + esac +done +query=$( jq -n -r --arg id $platform --arg table $table \ + '{"platform":{"id":$id},"table":$table}' ) + +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call GetProcessingPlatformBarePolicy diff --git a/scripts/pp/list-datapolicies.sh b/scripts/pp/list-datapolicies.sh new file mode 100755 index 00000000..f2623326 --- /dev/null +++ b/scripts/pp/list-datapolicies.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo '{}' | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListDataPolicies diff --git a/scripts/pp/list-groups.sh b/scripts/pp/list-groups.sh new file mode 100755 index 00000000..29a0fdd0 --- /dev/null +++ b/scripts/pp/list-groups.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +platform='databricks-pim@getstrm.com' + +while getopts "p:" opt; do + case $opt in + p) + platform=$OPTARG + ;; + esac +done + +query=$( jq -n -r --arg id $platform '{"platform":{"id":$id}}' ) +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListProcessingPlatformGroups diff --git a/scripts/pp/list-platforms.sh b/scripts/pp/list-platforms.sh new file mode 100755 index 00000000..6c725490 --- /dev/null +++ b/scripts/pp/list-platforms.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo '{}' | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListProcessingPlatforms diff --git a/scripts/pp/list-tables.sh b/scripts/pp/list-tables.sh new file mode 100755 index 00000000..13314388 --- /dev/null +++ b/scripts/pp/list-tables.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +platform='databricks-pim@getstrm.com' + +while getopts "p:" opt; do + case $opt in + p) + platform=$OPTARG + ;; + esac +done +query=$( jq -n -r --arg id $platform '{"platform":{"id":$id}}' ) +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListProcessingPlatformTables diff --git a/scripts/pp/upsert-data-policy.sh b/scripts/pp/upsert-data-policy.sh new file mode 100755 index 00000000..d1a7ac8d --- /dev/null +++ b/scripts/pp/upsert-data-policy.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +f=${1:-sample_data/cdc-diabetes.yaml} +data_policy=$(json-case $f) +request=$(echo '{}' | jq -r --argjson p "$data_policy" '{data_policy: $p}') + +echo $request | evans -r cli \ + --header "authorization=Bearer $(strm auth print-access-token)" \ + --host api.dev.getstrm.io --tls --port 443 \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call UpsertDataPolicy From 86c5eaca058b3cedaf88ed8ea2c06264cc591065 Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 11:57:20 +0200 Subject: [PATCH 2/7] feat: gradle props --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 376e4b9a..fcd2913c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -8,4 +8,4 @@ jooqVersion=3.18.7 # TODO verify the tag is correct dockertag = ghcr.io/getstrm/pace:latest -generatedBufDependencyVersion=00000000000000.0f57123e7845 +generatedBufDependencyVersion=00000000000000./home/bvdeen From 422df9fa6587730ddb1613995c624793e79d844e Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 14:44:41 +0200 Subject: [PATCH 3/7] feat(strm-2708): added RuleSetService and tests --- Makefile | 2 + .../table-with-columns-and-domain.graphql | 10 +- .../com/getstrm/pace/api/DataPolicyApi.kt | 35 ++- .../com/getstrm/pace/catalogs/Collibra.kt | 189 ++++++----- .../com/getstrm/pace/catalogs/Datahub.kt | 190 +++++------ .../pace/catalogs/OpenDataDiscovery.kt | 276 ++++++++-------- .../common/AbstractDynamicViewGenerator.kt | 49 +-- .../com/getstrm/pace/dao/RuleSetsDao.kt | 17 + .../com/getstrm/pace/domain/DataCatalog.kt | 43 ++- .../com/getstrm/pace/domain/Exceptions.kt | 15 + .../getstrm/pace/service/CatalogService.kt | 161 +++++----- .../getstrm/pace/service/DataPolicyService.kt | 26 +- .../getstrm/pace/service/RuleSetService.kt | 76 +++++ .../SnowflakeDynamicViewGenerator.kt | 2 +- .../main/kotlin/com/getstrm/pace/util/Util.kt | 44 ++- .../postgresql/V1__create_policies_tables.sql | 4 +- .../BigQueryDynamicViewGeneratorTest.kt | 11 +- .../DatabricksDynamicViewGeneratorTest.kt | 15 +- .../pace/service/DataPolicyServiceTest.kt | 122 +------- .../pace/service/RuleSetServiceTest.kt | 294 ++++++++++++++++++ .../SnowflakeDynamicViewGeneratorTest.kt | 11 +- .../kotlin/com/getstrm/pace/util/UtilTest.kt | 2 +- gradle.properties | 2 +- .../catalogs/collibra/get-ecommerce-policy.sh | 6 + .../datahub/get-bare-policy-cdc-diabetes.sh | 8 + .../catalogs/datahub/get-bare-policy-gdd.sh | 8 + scripts/catalogs/get-bare-policy.sh | 39 +++ scripts/catalogs/list-catalogs.sh | 5 + scripts/catalogs/list-databases.sh | 16 + scripts/catalogs/list-recursive.sh | 30 ++ scripts/catalogs/list-schemas.sh | 23 ++ scripts/catalogs/list-tables.sh | 29 ++ .../odd/get-bare-policy-CATALOG_RETURNS.sh | 5 + 33 files changed, 1162 insertions(+), 603 deletions(-) create mode 100644 app/src/main/kotlin/com/getstrm/pace/dao/RuleSetsDao.kt create mode 100644 app/src/main/kotlin/com/getstrm/pace/service/RuleSetService.kt create mode 100644 app/src/test/kotlin/com/getstrm/pace/service/RuleSetServiceTest.kt create mode 100755 scripts/catalogs/collibra/get-ecommerce-policy.sh create mode 100755 scripts/catalogs/datahub/get-bare-policy-cdc-diabetes.sh create mode 100755 scripts/catalogs/datahub/get-bare-policy-gdd.sh create mode 100755 scripts/catalogs/get-bare-policy.sh create mode 100755 scripts/catalogs/list-catalogs.sh create mode 100755 scripts/catalogs/list-databases.sh create mode 100755 scripts/catalogs/list-recursive.sh create mode 100755 scripts/catalogs/list-schemas.sh create mode 100755 scripts/catalogs/list-tables.sh create mode 100755 scripts/catalogs/odd/get-bare-policy-CATALOG_RETURNS.sh diff --git a/Makefile b/Makefile index d9770a0a..84c57b27 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,7 @@ .PHONY: clean-common-protos +SHELL := /bin/bash + common_protos := ${CURDIR}/.common-protos grpc_version := 1.50.0 diff --git a/app/src/main/graphql/collibra/table-with-columns-and-domain.graphql b/app/src/main/graphql/collibra/table-with-columns-and-domain.graphql index 57a41b89..01e23263 100644 --- a/app/src/main/graphql/collibra/table-with-columns-and-domain.graphql +++ b/app/src/main/graphql/collibra/table-with-columns-and-domain.graphql @@ -1,4 +1,4 @@ -# Catalog -> Data Source (e.g. Database) -> Schema -> Table -> Column + query TableWithColumns($id: UUID!) { tables: assets( where: {id: {eq: $id}} @@ -22,6 +22,8 @@ query TableWithColumns($id: UUID!) { } } columns: incomingRelations( + # TODO we need handle paging! + limit: 400, where: {type: {publicId: {eq: "ColumnIsPartOfTable"}}} ) { columnDetails: source { @@ -31,6 +33,12 @@ query TableWithColumns($id: UUID!) { ) { value: stringValue } + tags { + name + } + + + } } } diff --git a/app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt b/app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt index 0734dcfb..897d2bbf 100644 --- a/app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt +++ b/app/src/main/kotlin/com/getstrm/pace/api/DataPolicyApi.kt @@ -1,14 +1,16 @@ package com.getstrm.pace.api -import com.getstrm.pace.service.ProcessingPlatformsService import build.buf.gen.getstrm.api.data_policies.v1alpha.* +import com.getstrm.pace.service.CatalogService import com.getstrm.pace.service.DataPolicyService +import com.getstrm.pace.service.ProcessingPlatformsService import net.devh.boot.grpc.server.service.GrpcService @GrpcService class DataPolicyApi( private val dataPolicyService: DataPolicyService, private val processingPlatformsService: ProcessingPlatformsService, + private val catalogService: CatalogService, ) : DataPolicyServiceGrpcKt.DataPolicyServiceCoroutineImplBase() { override suspend fun listDataPolicies(request: ListDataPoliciesRequest): ListDataPoliciesResponse { @@ -53,4 +55,35 @@ class DataPolicyApi( GetProcessingPlatformBarePolicyResponse.newBuilder() .setDataPolicy(processingPlatformsService.createBarePolicy(request.platform, request.table)) .build() + + override suspend fun listCatalogs(request: ListCatalogsRequest): ListCatalogsResponse = + ListCatalogsResponse.newBuilder() + .addAllCatalogs(catalogService.listCatalogs()) + .build() + + override suspend fun listDatabases(request: ListDatabasesRequest): ListDatabasesResponse { + val databases = catalogService.listDatabases(request.catalog) + return ListDatabasesResponse.newBuilder() + .addAllDatabases(databases) + .build() + } + override suspend fun listSchemas(request: ListSchemasRequest): ListSchemasResponse { + val schemas = catalogService.listSchemas(request.database) + return ListSchemasResponse.newBuilder() + .addAllSchemas(schemas) + .build() + } + override suspend fun listTables(request: ListTablesRequest): ListTablesResponse { + val tables = catalogService.listTables(request.schema) + return ListTablesResponse.newBuilder() + .addAllTables(tables) + .build() + } + + override suspend fun getCatalogBarePolicy(request: GetCatalogBarePolicyRequest): GetCatalogBarePolicyResponse { + val dataPolicy: DataPolicy = catalogService.getBarePolicy(request.table) + return GetCatalogBarePolicyResponse.newBuilder() + .setDataPolicy(dataPolicy) + .build() + } } diff --git a/app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt b/app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt index 60666716..bd42a0e5 100644 --- a/app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt +++ b/app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt @@ -1,97 +1,92 @@ -//package com.getstrm.pace.catalogs -// -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy -//import com.apollographql.apollo3.ApolloClient -//import com.collibra.generated.ListPhysicalDataAssetsQuery -//import com.collibra.generated.ListSchemaIdsQuery -//import com.collibra.generated.ListTablesInSchemaQuery -//import com.collibra.generated.TableWithColumnsQuery -//import com.getstrm.pace.config.CatalogConfiguration -//import com.getstrm.pace.domain.DataCatalog -//import java.util.* -// -//class CollibraCatalog(config: CatalogConfiguration) : DataCatalog() { -// private val client = config.apolloClient() -// override fun close() { -// client.close() -// } -// -// override suspend fun listDatabases(): List = listPhysicalAssets(AssetTypes.DATABASE).map { -// Database(this, it.id, it.getDataSourceType()) -// } -// -// class Database(private val catalog: CollibraCatalog, id: String, dbType: String) : -// DataCatalog.Database(id, dbType) { -// constructor(catalog: CollibraCatalog, id: Any, dbType: String) : this(catalog, id.toString(), dbType) -// -// override suspend fun getSchemas(): List { -// val assets = catalog.client.query(ListSchemaIdsQuery(id)).execute().data!!.assets!!.filterNotNull() -// .flatMap { schema -> -// schema.schemas -// } -// return assets.map { -// Schema(catalog, this, it.target.id.toString(), it.target.fullName) -// } -// } -// } -// -// class Schema(private val catalog: CollibraCatalog, database: DataCatalog.Database, id: String, name: String) : -// DataCatalog.Schema(database, id, name) { -// override suspend fun getTables(): List = -// catalog.client.query(ListTablesInSchemaQuery(id)).execute().data!!.assets!!.filterNotNull() -// .flatMap { table -> -// table.tables.map { Table(catalog, this, it.target.id.toString(), it.target.fullName) } -// } -// } -// -// class Table(private val catalog: CollibraCatalog, schema: DataCatalog.Schema, id: String, name: String) : -// DataCatalog.Table(schema, id, name) { -// override suspend fun getDataPolicy(): DataPolicy? { -// val response = catalog.client.query(TableWithColumnsQuery(id = id)).execute() -// return response.data?.tables?.firstOrNull()?.let { table -> -// val systemName = -// table.schema.firstOrNull()?.schemaDetails?.database?.firstOrNull()?.databaseDetails?.domain?.name -// -// val builder = DataPolicy.newBuilder() -// builder.infoBuilder.title = table.displayName -// builder.infoBuilder.description = systemName -// builder.sourceBuilder.addAllAttributes(table.columns.map { it.toAttribute() }) -// builder.build() -// } -// } -// -// private fun TableWithColumnsQuery.Column.toAttribute(): DataPolicy.Attribute { -// return with(DataPolicy.Attribute.newBuilder()) { -// addPathComponents(columnDetails.displayName) -// val sourceType = columnDetails.dataType.firstOrNull()?.value ?: "unknown" -// // source type mapping -// type = sourceType -// build() -// } -// } -// } -// -// class Configuration( -// private val serverUrl: String = "https://test-drive.collibra.com/graphql/knowledgeGraph/v1", -// private val username: String = "test-drive-user-9b8o5m7l", -// private val password: String = "Egwrazg\$8q3j6i0b", -// ) : DataCatalog.Configuration() { -// fun apolloClient(): ApolloClient { -// val basicAuth = Base64.getEncoder().encodeToString("$username:$password".toByteArray()) -// -// return ApolloClient.Builder().serverUrl(serverUrl).addHttpHeader("Authorization", "Basic $basicAuth") -// .build() -// } -// } -// -// private suspend fun listPhysicalAssets(type: AssetTypes): List = -// client.query(ListPhysicalDataAssetsQuery(assetType = type.assetName)).execute().data!!.assets?.filterNotNull() -// ?: emptyList() -// -// private fun ListPhysicalDataAssetsQuery.Asset.getDataSourceType(): String = -// stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown" -//} -// -//enum class AssetTypes(val assetName: String) { -// DATABASE("Database"), SCHEMA("Schema"), TABLE("Table"), COLUMN("Column"), -//} +package com.getstrm.pace.catalogs +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.apollographql.apollo3.ApolloClient +import com.collibra.generated.ListPhysicalDataAssetsQuery +import com.collibra.generated.ListSchemaIdsQuery +import com.collibra.generated.ListTablesInSchemaQuery +import com.collibra.generated.TableWithColumnsQuery +import com.getstrm.pace.config.CatalogConfiguration +import com.getstrm.pace.domain.DataCatalog +import normalizeType +import java.util.* + +class CollibraCatalog(config: CatalogConfiguration) : DataCatalog(config) { + + val client = apolloClient() + override fun close() { + client.close() + } + + override suspend fun listDatabases(): List = + listPhysicalAssets(AssetTypes.DATABASE).map { + Database(this, it.id, it.getDataSourceType()) + } + + class Database(override val catalog: CollibraCatalog, id: String, dbType: String) : + DataCatalog.Database(catalog, id, dbType) { + constructor(catalog: CollibraCatalog, id: Any, dbType: String) : this(catalog, id.toString(), dbType) + + override suspend fun getSchemas(): List { + val assets = catalog.client.query(ListSchemaIdsQuery(id)).execute().data!!.assets!!.filterNotNull().flatMap { schema -> + schema.schemas + } + return assets.map { + Schema(catalog, this, it.target.id.toString(), it.target.fullName) + } + } + } + + class Schema(private val catalog: CollibraCatalog, database: DataCatalog.Database, id: String, name: String) : + DataCatalog.Schema(database, id, name) { + override suspend fun getTables(): List = + catalog.client.query(ListTablesInSchemaQuery(id)).execute().data!!.assets!!.filterNotNull().flatMap { table -> + table.tables.map { Table(catalog, this, it.target.id.toString(), it.target.fullName) } + } + } + + class Table(private val catalog: CollibraCatalog, schema: DataCatalog.Schema, id: String, name: String) : + DataCatalog.Table(schema, id, name) { + override suspend fun getDataPolicy(): DataPolicy? { + val response = catalog.client.query(TableWithColumnsQuery(id = id)).execute() + return response.data?.tables?.firstOrNull()?.let { table -> + val systemName = table.schema.firstOrNull()?.schemaDetails?.database?.firstOrNull()?.databaseDetails?.domain?.name + + val builder = DataPolicy.newBuilder() + builder.infoBuilder.title = table.displayName + builder.infoBuilder.description = systemName + builder.sourceBuilder.addAllAttributes(table.columns.map { it.toAttribute() }) + builder.build() + } + } + + private fun TableWithColumnsQuery.Column.toAttribute(): DataPolicy.Attribute = + with(DataPolicy.Attribute.newBuilder()) { + addPathComponents(columnDetails.displayName) + val sourceType = columnDetails.dataType.firstOrNull()?.value ?: "unknown" + // source type mapping + type = sourceType + addAllTags(columnDetails.tags.map { it.name }) + build().normalizeType() + } + } + + private fun apolloClient(): ApolloClient { + val basicAuth = Base64.getEncoder().encodeToString("${config.userName}:${config.password}".toByteArray()) + return ApolloClient.Builder() + .serverUrl(config.serverUrl) + .addHttpHeader("Authorization", "Basic $basicAuth") + .build() + } + private suspend fun listPhysicalAssets(type: AssetTypes): List = + client.query(ListPhysicalDataAssetsQuery(assetType = type.assetName)).execute().data!!.assets?.filterNotNull() ?: emptyList() + + private fun ListPhysicalDataAssetsQuery.Asset.getDataSourceType(): String = + stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown" +} + +enum class AssetTypes(val assetName: String) { + DATABASE("Database"), + SCHEMA("Schema"), + TABLE("Table"), + COLUMN("Column"), +} diff --git a/app/src/main/kotlin/com/getstrm/pace/catalogs/Datahub.kt b/app/src/main/kotlin/com/getstrm/pace/catalogs/Datahub.kt index 460e9f38..15ddded4 100644 --- a/app/src/main/kotlin/com/getstrm/pace/catalogs/Datahub.kt +++ b/app/src/main/kotlin/com/getstrm/pace/catalogs/Datahub.kt @@ -1,94 +1,96 @@ -//package com.getstrm.pace.catalogs -// -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy -//import com.apollographql.apollo3.ApolloClient -//import com.getstrm.pace.config.CatalogConfiguration -//import com.getstrm.pace.domain.DataCatalog -//import io.datahubproject.generated.GetDatasetDetailsQuery -//import io.datahubproject.generated.ListDatasetsQuery -// -//class DatahubCatalog(private val config: CatalogConfiguration = DatahubConfiguration()) : DataCatalog() { -// val client = config.apolloClient() -// -// override fun close() { -// client.close() -// } -// -// override suspend fun listDatabases(): List { -// return fetchAllDatasets().map { -// Database(this, it.entity.urn) -// } -// } -// -// private suspend fun fetchAllDatasets(start: Int = 0): List { -// val response = client.query(ListDatasetsQuery(start, config.fetchSize)).execute() -// -// if (response.hasErrors()) throw RuntimeException("Error fetching databases: ${response.errors}") -// -// val resultSize = response.data?.search?.searchResults?.size ?: 0 -// -// return if (resultSize == 0) { -// emptyList() -// } else if (resultSize < config.fetchSize) { -// response.data?.search?.searchResults ?: emptyList() -// } else { -// response.data?.search?.searchResults?.plus(fetchAllDatasets(start + config.fetchSize)) ?: emptyList() -// } -// } -// -// class Database(private val catalog: DatahubCatalog, id: String) : DataCatalog.Database(id) { -// override suspend fun getSchemas(): List { -// return catalog.client.query(GetDatasetDetailsQuery(id)).execute().data?.dataset?.let { dataset -> -// val schema = Schema(this, dataset) -// listOf(schema) -// } ?: emptyList() -// } -// } -// -// class Schema(database: Database, private val dataset: GetDatasetDetailsQuery.Dataset) : DataCatalog.Schema(database, dataset.urn, dataset.platform.properties?.displayName ?: dataset.urn) { -// override suspend fun getTables(): List = listOf(Table(this, dataset)) -// } -// -// class Table(schema: Schema, private val dataset: GetDatasetDetailsQuery.Dataset) : DataCatalog.Table(schema, dataset.urn, dataset.platform.properties?.displayName ?: dataset.urn) { -// override suspend fun getDataPolicy(): DataPolicy? { -// val policyBuilder = DataPolicy.newBuilder() -// -// policyBuilder.sourceBuilder.addAllAttributes( -// dataset.schemaMetadata?.fields?.map { -// DataPolicy.Attribute.newBuilder() -// .addAllPathComponents(it.fieldPath.extractPathComponents()) -// .setType(it.type.rawValue) -// .setRequired(!it.nullable) -// .build() -// } ?: emptyList()) -// -// policyBuilder.infoBuilder.title = dataset.platform.properties?.displayName ?: dataset.urn -// policyBuilder.infoBuilder.description = dataset.platform.name -// -// return policyBuilder.build() -// } -// -// /** -// * Some fieldPaths in the Datahub sample data (e.g. Kafka), contain field paths that match '[version=2.0].[type=boolean].field_bar', which is probably an export format of Kafka. We need to skip the version and type here, as those are not actual path components. -// */ -// private fun String.extractPathComponents() = usefulFieldPathsRegex.find(this)?.let { match -> -// match.groupValues[1].split(".") -// } ?: this.split(".") -// -// companion object { -// private val usefulFieldPathsRegex = "\\.([^\\]]+)\$".toRegex() -// } -// -// } -//} -// -//class DatahubConfiguration( -// private val serverUrl: String = "http://datahub-datahub-frontend.datahub:9002/api/graphql", -// private val token: String = "eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImp0aSI6IjE4YWExMjA3LWY2NTQtNDc4OS05MTU3LTkwYTMyMjExMWJkYyIsInN1YiI6ImRhdGFodWIiLCJpc3MiOiJkYXRhaHViLW1ldGFkYXRhLXNlcnZpY2UifQ.8-NksHdL4p3o9_Bryst2MOvH-bATl-avC8liB-E2_sM", -// val fetchSize: Int = 1 -//) : DataCatalog.Configuration() { -// fun apolloClient(): ApolloClient = ApolloClient.Builder() -// .serverUrl(serverUrl) -// .addHttpHeader("Authorization", "Bearer $token") -// .build() -//} +package com.getstrm.pace.catalogs + +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.apollographql.apollo3.ApolloClient +import com.getstrm.pace.config.CatalogConfiguration +import com.getstrm.pace.domain.DataCatalog +import io.datahubproject.generated.GetDatasetDetailsQuery +import io.datahubproject.generated.ListDatasetsQuery +import normalizeType + +class DatahubCatalog(config: CatalogConfiguration) : DataCatalog(config) { + val client = apolloClient() + + override fun close() { + client.close() + } + + override suspend fun listDatabases(): List { + return fetchAllDatasets().map { + Database(this, it.entity.urn) + } + } + + private suspend fun fetchAllDatasets(start: Int = 0): List { + val response = client.query(ListDatasetsQuery(start, config.fetchSize ?: 1)).execute() + + if (response.hasErrors()) throw RuntimeException("Error fetching databases: ${response.errors}") + + val resultSize = response.data?.search?.searchResults?.size ?: 0 + + return if (resultSize == 0) { + emptyList() + } else if (resultSize < (config.fetchSize ?: 1)) { + response.data?.search?.searchResults ?: emptyList() + } else { + response.data?.search?.searchResults?.plus(fetchAllDatasets(start + (config.fetchSize ?: 1))) ?: emptyList() + } + } + + class Database(override val catalog: DatahubCatalog, id: String) : DataCatalog.Database(catalog, id) { + override suspend fun getSchemas(): List { + return catalog.client.query(GetDatasetDetailsQuery(id)).execute().data?.dataset?.let { dataset -> + val schema = Schema(this, dataset) + listOf(schema) + } ?: emptyList() + } + } + + class Schema(database: Database, private val dataset: GetDatasetDetailsQuery.Dataset) : + DataCatalog.Schema(database, dataset.urn, dataset.platform.properties?.displayName ?: dataset.urn) { + override suspend fun getTables(): List = listOf(Table(this, dataset)) + } + + class Table(schema: Schema, private val dataset: GetDatasetDetailsQuery.Dataset) : DataCatalog.Table(schema, dataset.urn, dataset.platform.properties?.displayName ?: dataset.urn) { + override suspend fun getDataPolicy(): DataPolicy? { + val policyBuilder = DataPolicy.newBuilder() + + // tags don't exist in schemaMetadata but only in editableSchemaMetadata! + val addtributeTags = dataset.editableSchemaMetadata?.editableSchemaFieldInfo?.map { + it.fieldPath to it.tags?.tags?.map { it.tag.properties?.name.orEmpty() }.orEmpty() + }?.toMap() ?: emptyMap() + + policyBuilder.sourceBuilder.addAllAttributes( + dataset.schemaMetadata?.fields?.map { + DataPolicy.Attribute.newBuilder() + .addAllPathComponents(it.fieldPath.extractPathComponents()) + .addAllTags(addtributeTags[it.fieldPath] ?: emptyList()) + .setType(it.type.rawValue) + .setRequired(!it.nullable) + .build().normalizeType() + } ?: emptyList(), + ) + + policyBuilder.infoBuilder.title = dataset.platform.properties?.displayName ?: dataset.urn + policyBuilder.infoBuilder.description = dataset.platform.name + policyBuilder.infoBuilder.addAllTags( + dataset.tags?.tags?.map { it.tag.properties?.name.orEmpty() }.orEmpty(), + ) + + return policyBuilder.build() + } + + /** + * Some fieldPaths in the Datahub sample data (e.g. Kafka), contain field paths that match '[version=2.0].[type=boolean].field_bar', which is probably an export format of Kafka. We need to skip the version and type here, as those are not actual path components. + */ + private fun String.extractPathComponents() = this.replace(stripKafkaPrefix, "").split(".") + + companion object { + private val stripKafkaPrefix = """^(\[[^]]+\]\.)+""".toRegex() + } + } + private fun apolloClient(): ApolloClient = ApolloClient.Builder() + .serverUrl(config.serverUrl) + .addHttpHeader("Authorization", "Bearer ${config.token}") + .build() +} diff --git a/app/src/main/kotlin/com/getstrm/pace/catalogs/OpenDataDiscovery.kt b/app/src/main/kotlin/com/getstrm/pace/catalogs/OpenDataDiscovery.kt index 00b4ed6f..a3629488 100644 --- a/app/src/main/kotlin/com/getstrm/pace/catalogs/OpenDataDiscovery.kt +++ b/app/src/main/kotlin/com/getstrm/pace/catalogs/OpenDataDiscovery.kt @@ -1,139 +1,137 @@ -//package com.getstrm.pace.catalogs -// -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy -//import com.getstrm.pace.config.CatalogConfiguration -//import com.getstrm.pace.domain.DataCatalog -//import org.opendatadiscovery.generated.api.DataSetApi -//import org.opendatadiscovery.generated.api.DataSourceApi -//import org.opendatadiscovery.generated.api.SearchApi -//import org.opendatadiscovery.generated.model.* -//import java.util.* -// -///** -// * Interface to ODD Data Catalogs. Can be optimized using a cache. -// */ -//class OpenDataDiscoveryCatalog(configuration: CatalogConfiguration = ODDConfiguration()) : DataCatalog() { -// private val searchClient = SearchApi(configuration.serverUrl) -// private val datasetsClient = DataSetApi(configuration.serverUrl) -// private val dataSourceClient = DataSourceApi(configuration.serverUrl) -// -// private val dataSources = getAllDataSources().associateBy { it.id } -// -// override suspend fun listDatabases(): List { -// val searchRef = searchClient.search( -// SearchFormData( -// query = "", -// filters = SearchFormDataFilters( -// entityClasses = listOf( -// SearchFilterState( -// entityName = "DATA_SET", -// // selected means that this is included in the search results -// selected = true, -// // Not sure why the entity id is necessary, I think it -// // refers to the entity class id, which "should be" a static -// // value, in case of the entity type DATA_SET it is 1 -// entityId = 1 -// ) -// ) -// ) -// ) -// ) -// -// return getAllSearchResults(searchRef.searchId).map { -// val dataSource = dataSources[it.dataSource.id]?.name ?: "unknown" -// Database(this, it.id.toString(), dataSource, it.externalName) -// } -// } -// -// private fun getAllSearchResults(searchId: UUID, page: Int = 1, size: Int = 100): List { -// val searchResults = searchClient.getSearchResults(searchId, page, size) -// -// return if (searchResults.items.size == size) { -// searchResults.items + getAllSearchResults(searchId, page + 1, size) -// } else { -// searchResults.items -// } -// } -// -// private fun getAllDataSources(page: Int = 1, size: Int = 100): List { -// val dataSources = dataSourceClient.getDataSourceList(page, size) -// -// return if (dataSources.items.size == size) { -// dataSources.items + getAllDataSources(page + 1, size) -// } else { -// dataSources.items -// } -// } -// -// override fun close() { -// } -// -// class Database(private val catalog: OpenDataDiscoveryCatalog, id: String, dbType: String, displayName: String) : -// DataCatalog.Database(id, dbType, displayName) { -// /** Effectively a noop, but that's because ODD's data model isn't hierachical -// */ -// override suspend fun getSchemas(): List { -// return listOf(Schema(catalog, this, id, dbType!!)) -// } -// } -// -// class Schema( -// private val catalog: OpenDataDiscoveryCatalog, -// database: DataCatalog.Database, -// id: String, -// name: String -// ) : -// DataCatalog.Schema(database, id, name) { -// override suspend fun getTables(): List { -// return listOf(Table(catalog, this, id, name)) -// } -// } -// -// class Table(private val catalog: OpenDataDiscoveryCatalog, schema: DataCatalog.Schema, id: String, name: String) : -// DataCatalog.Table(schema, id, name) { -// -// private fun getAllParents(parentId: Long, fieldsById: Map): List { -// val parent = fieldsById[parentId]!! -// -// return if (parent.parentFieldId != null) { -// listOf(parent.id) + getAllParents(parent.parentFieldId, fieldsById) -// } else { -// listOf(parent.id) -// } -// } -// -// override suspend fun getDataPolicy(): DataPolicy? { -// val datasetStructure = catalog.datasetsClient.getDataSetStructureLatest(schema.database.id.toLong()) -// val fields = datasetStructure.fieldList -// val fieldsById = fields.associateBy { it.id } -// -// val policyBuilder = DataPolicy.newBuilder() -// -// policyBuilder.sourceBuilder.addAllAttributes( -// fields.map { field -> -// val fieldPath = if (field.parentFieldId != null) { -// val parentIds = getAllParents(field.parentFieldId, fieldsById) -// parentIds.map { fieldsById[it]!!.name } -// } else { -// listOf(field.name) -// } -// -// DataPolicy.Attribute.newBuilder() -// .addAllPathComponents(fieldPath) -// .setType(field.type.logicalType) -// .setRequired(!field.type.isNullable) -// .build() -// } -// ) -// -// policyBuilder.infoBuilder.title = schema.database.displayName -// policyBuilder.infoBuilder.description = schema.database.dbType -// -// return policyBuilder.build() -// } -// } -// -// class ODDConfiguration( -// val serverUrl: String = "http://localhost:8080" -// ) : Configuration() -//} +package com.getstrm.pace.catalogs + +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.getstrm.pace.config.CatalogConfiguration +import com.getstrm.pace.domain.DataCatalog +import normalizeType +import org.opendatadiscovery.generated.api.DataSetApi +import org.opendatadiscovery.generated.api.DataSourceApi +import org.opendatadiscovery.generated.api.SearchApi +import org.opendatadiscovery.generated.model.* // ktlint-disable no-wildcard-imports +import java.util.* + +/** + * Interface to ODD Data Catalogs. + */ +class OpenDataDiscoveryCatalog(configuration: CatalogConfiguration) : DataCatalog(configuration) { + private val searchClient = SearchApi(configuration.serverUrl) + private val datasetsClient = DataSetApi(configuration.serverUrl) + private val dataSourceClient = DataSourceApi(configuration.serverUrl) + + private val dataSources = getAllDataSources().associateBy { it.id } + + override suspend fun listDatabases(): List { + val searchRef = searchClient.search( + SearchFormData( + query = "", + filters = SearchFormDataFilters( + entityClasses = listOf( + SearchFilterState( + entityName = "DATA_SET", + // selected means that this is included in the search results + selected = true, + // Not sure why the entity id is necessary, I think it + // refers to the entity class id, which "should be" a static + // value, in case of the entity type DATA_SET it is 1 + entityId = 1, + ), + ), + ), + ), + ) + + return getAllSearchResults(searchRef.searchId).map { + val dataSource = dataSources[it.dataSource.id]?.name ?: "unknown" + Database(this, it.id.toString(), dataSource, it.externalName) + } + } + + private fun getAllSearchResults(searchId: UUID, page: Int = 1, size: Int = 100): List { + val searchResults = searchClient.getSearchResults(searchId, page, size) + + return if (searchResults.items.size == size) { + searchResults.items + getAllSearchResults(searchId, page + 1, size) + } else { + searchResults.items + } + } + + private fun getAllDataSources(page: Int = 1, size: Int = 100): List { + val dataSources = dataSourceClient.getDataSourceList(page, size) + + return if (dataSources.items.size == size) { + dataSources.items + getAllDataSources(page + 1, size) + } else { + dataSources.items + } + } + + override fun close() { + } + + class Database(override val catalog: OpenDataDiscoveryCatalog, id: String, dbType: String, displayName: String) : + DataCatalog.Database(catalog, id, dbType, displayName) { + /** Effectively a noop, but that's because ODD's data model isn't hierachical + */ + override suspend fun getSchemas(): List { + return listOf(Schema(catalog, this, id, dbType!!)) + } + } + + class Schema( + private val catalog: OpenDataDiscoveryCatalog, + database: DataCatalog.Database, + id: String, + name: String, + ) : + DataCatalog.Schema(database, id, name) { + override suspend fun getTables(): List { + return listOf(Table(catalog, this, id, name)) + } + } + + class Table(private val catalog: OpenDataDiscoveryCatalog, schema: DataCatalog.Schema, id: String, name: String) : + DataCatalog.Table(schema, id, name) { + + private fun getAllParents(parentId: Long, fieldsById: Map): List { + val parent = fieldsById[parentId]!! + + return if (parent.parentFieldId != null) { + listOf(parent.id) + getAllParents(parent.parentFieldId, fieldsById) + } else { + listOf(parent.id) + } + } + + override suspend fun getDataPolicy(): DataPolicy? { + val datasetStructure = catalog.datasetsClient.getDataSetStructureLatest(schema.database.id.toLong()) + val fields = datasetStructure.fieldList + val fieldsById = fields.associateBy { it.id } + + val policyBuilder = DataPolicy.newBuilder() + + policyBuilder.sourceBuilder.addAllAttributes( + fields.map { field -> + val fieldPath = if (field.parentFieldId != null) { + val parentIds = getAllParents(field.parentFieldId, fieldsById) + parentIds.map { fieldsById[it]!!.name } + } else { + listOf(field.name) + } + + DataPolicy.Attribute.newBuilder() + .addAllPathComponents(fieldPath) + .setType(field.type.logicalType) + .setRequired(!field.type.isNullable) + .addAllTags(field.tags?.map { it.name }.orEmpty()) + .build().normalizeType() + }, + ) + + policyBuilder.infoBuilder.title = schema.database.displayName + policyBuilder.infoBuilder.description = schema.database.dbType + + return policyBuilder.build() + } + } +} diff --git a/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt b/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt index b1c7b0d0..9a6f97e4 100644 --- a/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt +++ b/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt @@ -2,7 +2,7 @@ package com.getstrm.pace.common import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy import headTailFold -import org.jooq.* +import org.jooq.* // ktlint-disable no-wildcard-imports import org.jooq.conf.ParseNameCase import org.jooq.conf.ParseUnknownFunctions import org.jooq.conf.RenderQuotedNames @@ -41,16 +41,16 @@ abstract class AbstractDynamicViewGenerator( attribute, ruleSet.fieldTransformsList.firstOrNull { it.attribute.fullName() == attribute.fullName() - } + }, ) - } + }, ) .from(renderName(dataPolicy.source.ref)) .where( - ruleSet.rowFiltersList.map { rowFilter -> - toCondition(rowFilter) - } - ) + ruleSet.filtersList.map { filter -> + toCondition(filter) + }, + ), ) } @@ -59,38 +59,38 @@ abstract class AbstractDynamicViewGenerator( return jooq.queries(allQueries).sql } - fun toCondition(rowFilter: DataPolicy.RuleSet.RowFilter): Condition { - if (rowFilter.conditionsList.size == 1) { + fun toCondition(filter: DataPolicy.RuleSet.Filter): Condition { + if (filter.conditionsList.size == 1) { // If there is only one filter it should be the only option - return parser.parseCondition(rowFilter.conditionsList.first().condition) + return parser.parseCondition(filter.conditionsList.first().condition) } - val whereCondition = rowFilter.conditionsList.headTailFold( + val whereCondition = filter.conditionsList.headTailFold( headOperation = { condition -> parser.parseCondition(condition.condition) DSL.`when`( condition.principalsList.toPrincipalCondition(), - field(condition.condition, Boolean::class.java) + field(condition.condition, Boolean::class.java), ) }, bodyOperation = { conditionStep, condition -> parser.parseCondition(condition.condition) conditionStep.`when`( condition.principalsList.toPrincipalCondition(), - field(condition.condition, Boolean::class.java) + field(condition.condition, Boolean::class.java), ) }, tailOperation = { conditionStep, condition -> parser.parseCondition(condition.condition) conditionStep.otherwise(field(condition.condition, Boolean::class.java)) - } + }, ) return DSL.condition(whereCondition) } fun toField( attribute: DataPolicy.Attribute, - fieldTransform: DataPolicy.RuleSet.FieldTransform? + fieldTransform: DataPolicy.RuleSet.FieldTransform?, ): Field<*> { if (fieldTransform == null) { // If there is no transform, we return just the field path (joined by a dot for now) @@ -114,7 +114,7 @@ abstract class AbstractDynamicViewGenerator( tailOperation = { conditionStep, transform -> val (c, q) = toCase(transform, attribute) conditionStep.otherwise(q).`as`(attribute.fullName()) - } + }, ) return caseWhenStatement @@ -122,7 +122,7 @@ abstract class AbstractDynamicViewGenerator( fun toCase( transform: DataPolicy.RuleSet.FieldTransform.Transform?, - attribute: DataPolicy.Attribute + attribute: DataPolicy.Attribute, ): Pair> { val memberCheck = transform?.principalsList?.toPrincipalCondition() @@ -134,13 +134,13 @@ abstract class AbstractDynamicViewGenerator( "regexp_extract({0}, {1})", String::class.java, unquotedName(attribute.fullName()), - DSL.`val`(transform.regex.regex) + DSL.`val`(transform.regex.regex), ) } else { DSL.regexpReplaceAll( field(attribute.fullName(), String::class.java), transform.regex.regex, - transform.regex.replacement + transform.regex.replacement, ) } } @@ -154,7 +154,7 @@ abstract class AbstractDynamicViewGenerator( "hash({0}, {1})", Any::class.java, DSL.`val`(transform.hash.seed), - unquotedName(attribute.fullName()) + unquotedName(attribute.fullName()), ) } @@ -172,10 +172,11 @@ abstract class AbstractDynamicViewGenerator( DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.NULLIFY -> DSL.inline(null) - DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.TRANSFORM_NOT_SET, DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.IDENTITY, null -> field( - attribute.fullName() - ) - + DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.TRANSFORM_NOT_SET, DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.IDENTITY, null -> { + field( + attribute.fullName(), + ) + } } return memberCheck to (statement as Field) } diff --git a/app/src/main/kotlin/com/getstrm/pace/dao/RuleSetsDao.kt b/app/src/main/kotlin/com/getstrm/pace/dao/RuleSetsDao.kt new file mode 100644 index 00000000..cf267500 --- /dev/null +++ b/app/src/main/kotlin/com/getstrm/pace/dao/RuleSetsDao.kt @@ -0,0 +1,17 @@ +package com.getstrm.pace.dao + +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import org.springframework.stereotype.Component + +@Component +class RuleSetsDao { + suspend fun getFieldTransforms(tag: String): List { + // TODO needs DAO + TODO() + } + + suspend fun getFilters(tag: String): List { + // TODO needs DAO + TODO() + } +} diff --git a/app/src/main/kotlin/com/getstrm/pace/domain/DataCatalog.kt b/app/src/main/kotlin/com/getstrm/pace/domain/DataCatalog.kt index 3f3ab0b2..3ce6f6d6 100644 --- a/app/src/main/kotlin/com/getstrm/pace/domain/DataCatalog.kt +++ b/app/src/main/kotlin/com/getstrm/pace/domain/DataCatalog.kt @@ -1,12 +1,24 @@ package com.getstrm.pace.domain import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy - +import com.getstrm.pace.config.CatalogConfiguration +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog as ApiCatalog +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Schema as ApiSchema +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Table as ApiTable /** * Abstraction of the physical data concepts in a data catalog. */ -abstract class DataCatalog: AutoCloseable { +abstract class DataCatalog( + val config: CatalogConfiguration, +) : AutoCloseable { + val id: String + get() = config.id + val type: ApiCatalog.Type + get() = config.type + val apiCatalog: ApiCatalog + get() = ApiCatalog.newBuilder().setId(id).setType(config.type).build() + abstract suspend fun listDatabases(): List /** @@ -14,7 +26,14 @@ abstract class DataCatalog: AutoCloseable { */ abstract class Table(val schema: Schema, val id: String, val name: String) { abstract suspend fun getDataPolicy(): DataPolicy? + open suspend fun getTags(): List = emptyList() override fun toString(): String = "Table($id, $name)" + val apiTable: ApiTable + get() = ApiTable.newBuilder() + .setId(id) + .setSchema(schema.apiSchema) + .setName(name) + .build() } /** @@ -22,14 +41,28 @@ abstract class DataCatalog: AutoCloseable { */ abstract class Schema(val database: Database, val id: String, val name: String) { open suspend fun getTables(): List = emptyList() + open suspend fun getTags(): List = emptyList() override fun toString(): String = "Schema($id, $name)" + val apiSchema: ApiSchema + get() = ApiSchema.newBuilder() + .setId(id) + .setName(name) + .setDatabase(database.apiDatabase) + .build() } /** meta information database */ - abstract class Database(val id: String, val dbType: String? = null, val displayName: String? = null) { + abstract class Database(open val catalog: DataCatalog, val id: String, val dbType: String? = null, val displayName: String? = null) { open suspend fun getSchemas(): List = emptyList() + open suspend fun getTags(): List = emptyList() override fun toString() = dbType?.let { "Database($id, $dbType, $displayName)" } ?: "Database($id)" - } - abstract class Configuration + val apiDatabase: ApiCatalog.DataBase + get() = ApiCatalog.DataBase.newBuilder() + .setCatalog(catalog.apiCatalog) + .setDisplayName(displayName.orEmpty()) + .setId(id) + .setType(dbType.orEmpty()) + .build() + } } diff --git a/app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt b/app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt index be2f6a71..505c4a27 100644 --- a/app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt +++ b/app/src/main/kotlin/com/getstrm/pace/domain/Exceptions.kt @@ -4,14 +4,24 @@ import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy import io.grpc.Status import io.strmprivacy.grpc.common.server.NotFoundException import io.strmprivacy.grpc.common.server.StrmStatusException +import org.jooq.impl.ParserException import toJson class ProcessingPlatformNotFoundException(id: String) : NotFoundException("Processing Platform", id) +class CatalogNotFoundException(id: String) : NotFoundException("Data Catalog", id) +class CatalogDatabaseNotFoundException(id: String) : NotFoundException("Data Catalog Database", id) +class CatalogSchemaNotFoundException(id: String) : NotFoundException("Data Catalog Schema", id) +class CatalogTableNotFoundException(id: String) : NotFoundException("Data Catalog Table", id) + class ProcessingPlatformConfigurationError(description: String) : StrmStatusException(Status.INVALID_ARGUMENT, description) class ProcessingPlatformTableNotFound(id: String, type: DataPolicy.ProcessingPlatform.PlatformType, tableName: String) : StrmStatusException(Status.NOT_FOUND, "ProcessingPlatform $type:$id has no table named $tableName") class ProcessingPlatformExecuteException(id: String, message: String) : StrmStatusException(Status.INVALID_ARGUMENT, "Platform $id caused issue $message") +class InvalidDataPolicyAbsentSourceRef() : + StrmStatusException(Status.INVALID_ARGUMENT, "DataPolicy has no source ref") +class InvalidDataPolicyAbsentPlatformId() : + StrmStatusException(Status.INVALID_ARGUMENT, "DataPolicy has no platform.id") class InvalidDataPolicyUnknownGroup(principals: Set) : StrmStatusException(Status.INVALID_ARGUMENT, "Unknown groups: ${principals.joinToString()}") class InvalidDataPolicyNonEmptyLastFieldTransform(transform: DataPolicy.RuleSet.FieldTransform.Transform) : @@ -24,3 +34,8 @@ class InvalidDataPolicyOverlappingPrincipals(transform: DataPolicy.RuleSet.Field StrmStatusException(Status.INVALID_ARGUMENT, "Attribute ${transform.toJson()} has overlapping principals") class InvalidDataPolicyOverlappingAttributes(ruleSet: DataPolicy.RuleSet) : StrmStatusException(Status.INVALID_ARGUMENT, "RuleSet ${ruleSet.toJson()} has overlapping attributes") + +class SqlParseException(statement: String, cause: ParserException) : StrmStatusException( + Status.INVALID_ARGUMENT, + "SQL Statement [$statement] is invalid, please verify it's syntax. Details: ${cause.sql()}" +) diff --git a/app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt b/app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt index f018239d..07214b8c 100644 --- a/app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt +++ b/app/src/main/kotlin/com/getstrm/pace/service/CatalogService.kt @@ -1,81 +1,80 @@ -//package com.getstrm.pace.service -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog as ApiCatalog -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Type.COLLIBRA -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Type.DATAHUB -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Type.ODD -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Type.TYPE_UNSPECIFIED -//import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Type.UNRECOGNIZED -//import com.getstrm.pace.catalogs.CollibraCatalog -//import com.getstrm.pace.catalogs.DatahubCatalog -//import com.getstrm.pace.catalogs.OpenDataDiscoveryCatalog -//import com.getstrm.pace.config.CatalogsConfiguration -//import com.getstrm.pace.domain.DataCatalog -//import org.slf4j.LoggerFactory -//import org.springframework.stereotype.Component -// -//@Component -//class CatalogService( -// config: CatalogsConfiguration, -//) { -// val log by lazy { LoggerFactory.getLogger(javaClass) } -// -// val catalogs: Map = config.catalogs.mapNotNull { config -> -// try { -// when (config.type) { -// TYPE_UNSPECIFIED -> null -// COLLIBRA -> CollibraCatalog(config) -// ODD -> OpenDataDiscoveryCatalog(config) -// DATAHUB -> DatahubCatalog(config) -// UNRECOGNIZED -> null -// } -// } catch (e: Exception) { -// log.warn("Can't instantiate DataCatalog ${config.id}/${config.type}: {}", e.message) -// null -// } -// }.associateBy { it.id } -// -// fun listCatalogs(): List = catalogs.map { (id, platform) -> -// ApiCatalog.newBuilder() -// .setId(id) -// .setType(platform.type) -// .build() -// } -// -// suspend fun listDatabases(apiCatalog: ApiCatalog): List = -// getCatalog(apiCatalog.id).listDatabases().map { it.apiDatabase } -// -// suspend fun listSchemas(apiDatabase: ApiDatabase): List { -// val catalog = getCatalog(apiDatabase.catalog.id) -// val database = catalog.listDatabases().find { it.id == apiDatabase.id } -// ?: throw CatalogDatabaseNotFoundException(apiDatabase.id) -// val schemas = database.getSchemas() -// return schemas.map { it.apiSchema } -// } -// -// suspend fun listTables(apiSchema: ApiSchema): List = -// getTablesInfo(apiSchema).map { it.apiTable } -// -// suspend fun getBarePolicy(apiTable: ApiTable): DataPolicy { -// val tables = getTablesInfo(apiTable.schema) -// val table = tables.find { it.id == apiTable.id } -// ?: throw CatalogTableNotFoundException(apiTable.id) -// return table.getDataPolicy()!! -// } -// -// /** -// * find all the tables in an apiSchema. -// * -// * @return dto object with all relevant info -// */ -// private suspend fun getTablesInfo(apiSchema: ApiSchema): List { -// val catalog = getCatalog(apiSchema.database.catalog.id) -// val database = catalog.listDatabases().find { it.id == apiSchema.database.id } -// ?: throw CatalogDatabaseNotFoundException(apiSchema.database.id) -// val schema = -// database.getSchemas().find { it.id == apiSchema.id } ?: throw CatalogSchemaNotFoundException(apiSchema.id) -// return schema.getTables() -// } -// -// private fun getCatalog(id: String): DataCatalog = -// catalogs[id] ?: throw CatalogNotFoundException(id) -//} +package com.getstrm.pace.service +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.getstrm.pace.catalogs.CollibraCatalog +import com.getstrm.pace.catalogs.DatahubCatalog +import com.getstrm.pace.catalogs.OpenDataDiscoveryCatalog +import com.getstrm.pace.config.CatalogsConfiguration +import com.getstrm.pace.domain.* +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog as ApiCatalog +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.DataBase as ApiDatabase +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Schema as ApiSchema +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataCatalog.Table as ApiTable + +@Component +class CatalogService( + private val appConfig: CatalogsConfiguration, +) { + val log by lazy { LoggerFactory.getLogger(javaClass) } + + val catalogs: Map = appConfig.catalogs.mapNotNull { config -> + try { + when (config.type) { + ApiCatalog.Type.TYPE_UNSPECIFIED -> null + ApiCatalog.Type.COLLIBRA -> CollibraCatalog(config) + ApiCatalog.Type.ODD -> OpenDataDiscoveryCatalog(config) + ApiCatalog.Type.DATAHUB -> DatahubCatalog(config) + ApiCatalog.Type.UNRECOGNIZED -> null + } + } catch (e: Exception) { + log.warn("Can't instantiate DataCatalog ${config.id}/${config.type}: {}", e.message) + null + } + }.associateBy { it.id } + + fun listCatalogs(): List = catalogs.map { (id, platform) -> + ApiCatalog.newBuilder() + .setId(id) + .setType(platform.type) + .build() + } + + suspend fun listDatabases(apiCatalog: ApiCatalog): List = + getCatalog(apiCatalog.id).listDatabases().map { it.apiDatabase } + + suspend fun listSchemas(apiDatabase: ApiDatabase): List { + val catalog = getCatalog(apiDatabase.catalog.id) + val database = catalog.listDatabases().find { it.id == apiDatabase.id } + ?: throw CatalogDatabaseNotFoundException(apiDatabase.id) + val schemas = database.getSchemas() + return schemas.map { it.apiSchema } + } + + suspend fun listTables(apiSchema: ApiSchema): List = + getTablesInfo(apiSchema).map { it.apiTable } + + suspend fun getBarePolicy(apiTable: ApiTable): DataPolicy { + val tables = getTablesInfo(apiTable.schema) + val table = tables.find { it.id == apiTable.id } + ?: throw CatalogTableNotFoundException(apiTable.id) + return table.getDataPolicy()!! + } + + /** + * find all the tables in an apiSchema. + * + * @return dto object with all relevant info + */ + private suspend fun getTablesInfo(apiSchema: ApiSchema): List { + val catalog = getCatalog(apiSchema.database.catalog.id) + val database = catalog.listDatabases().find { it.id == apiSchema.database.id } + ?: throw CatalogDatabaseNotFoundException(apiSchema.database.id) + val schema = + database.getSchemas().find { it.id == apiSchema.id } ?: throw CatalogSchemaNotFoundException(apiSchema.id) + return schema.getTables() + } + + private fun getCatalog(id: String): DataCatalog = + catalogs[id] ?: throw CatalogNotFoundException(id) +} diff --git a/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt b/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt index 9b629806..c744fd7e 100644 --- a/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt +++ b/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt @@ -1,14 +1,9 @@ package com.getstrm.pace.service -import com.getstrm.pace.dao.DataPolicyDao -import com.getstrm.pace.domain.InvalidDataPolicyEmptyFieldTransforms -import com.getstrm.pace.domain.InvalidDataPolicyMissingAttribute -import com.getstrm.pace.domain.InvalidDataPolicyNonEmptyLastFieldTransform -import com.getstrm.pace.domain.InvalidDataPolicyOverlappingAttributes -import com.getstrm.pace.domain.InvalidDataPolicyOverlappingPrincipals -import com.getstrm.pace.domain.InvalidDataPolicyUnknownGroup import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy import coWithTransactionResult +import com.getstrm.pace.dao.DataPolicyDao +import com.getstrm.pace.domain.* import org.jooq.DSLContext import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component @@ -26,6 +21,7 @@ class DataPolicyService( suspend fun upsertDataPolicy(dataPolicy: DataPolicy): DataPolicy { validate(dataPolicy) return jooq.coWithTransactionResult { + // TODO should it remove old ruleset targets? val newDataPolicy = dataPolicyDao.upsertDataPolicy(dataPolicy, context, it) enforceStatement(newDataPolicy) newDataPolicy @@ -33,6 +29,12 @@ class DataPolicyService( } suspend fun validate(dataPolicy: DataPolicy) { + if (dataPolicy.source.ref.isNullOrEmpty()) { + throw InvalidDataPolicyAbsentSourceRef() + } + if (dataPolicy.platform.id.isNullOrEmpty()) { + throw InvalidDataPolicyAbsentPlatformId() + } val platform = processingPlatforms.getProcessingPlatform(dataPolicy) val validGroups = platform.listGroups().map { it.name }.toSet() val validAttributes = dataPolicy.source.attributesList.map(DataPolicy.Attribute::pathString).toSet() @@ -67,6 +69,11 @@ class DataPolicyService( throw InvalidDataPolicyNonEmptyLastFieldTransform(transform) } } + fieldTransform.transformsList.filter { it.principalsCount == 0 }.let { + if (it.size > 1) { + throw InvalidDataPolicyOverlappingPrincipals(fieldTransform) + } + } // check non-overlapping principals within one fieldTransform fieldTransform.transformsList.fold( emptySet(), @@ -81,9 +88,8 @@ class DataPolicyService( } // check for every row filter that the principals overlap with groups in the processing platform // and that the attributes exist in the DataPolicy - ruleSet.rowFiltersList.forEach { rowFilter -> - checkAttribute(rowFilter.attribute) - rowFilter.conditionsList.forEach { condition -> + ruleSet.filtersList.forEach { filter -> + filter.conditionsList.forEach { condition -> checkPrincipals(condition.principalsList) } } diff --git a/app/src/main/kotlin/com/getstrm/pace/service/RuleSetService.kt b/app/src/main/kotlin/com/getstrm/pace/service/RuleSetService.kt new file mode 100644 index 00000000..e032fa91 --- /dev/null +++ b/app/src/main/kotlin/com/getstrm/pace/service/RuleSetService.kt @@ -0,0 +1,76 @@ +package com.getstrm.pace.service + +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet +import com.getstrm.pace.dao.RuleSetsDao +import org.springframework.stereotype.Component +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet.FieldTransform as ApiFieldTransform +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform as ApiTransform +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet.Filter as ApiFilter + +@Component +class RuleSetService( + private val dataPolicyService: DataPolicyService, + private val ruleSetsDao: RuleSetsDao, +) { + + private suspend fun getFieldTransforms(tag: String): List = + ruleSetsDao.getFieldTransforms((tag)) + + suspend fun getFilters(tag: String): List = ruleSetsDao.getFilters(tag) + + /** + * add a rule set to a data policy based on tags. + * + * @param dataPolicy bare DataPolicy + * @return policy with embedded ruleset. + */ + suspend fun addRuleSet(dataPolicy: DataPolicy): DataPolicy { + val fieldTransforms = dataPolicy.source.attributesList.filter { it.tagsList.isNotEmpty() }.map { attribute -> + with(ApiFieldTransform.newBuilder()) { + this.attribute = attribute + this.addAllTransforms(attribute.tagsList.flatMap { getFieldTransforms(it) }.filterFieldTransforms()) + }.build() + } + val policyWithRuleSet = dataPolicy.toBuilder() + .addRuleSets(RuleSet.newBuilder().addAllFieldTransforms(fieldTransforms)) + .build() + dataPolicyService.validate(policyWithRuleSet) + return policyWithRuleSet + } +} + +/** enforce non-overlapping principals on the ApiTransforms in one FieldTransform. + * First one wins. + * + * Since each tag can have a list of associated ApiTransforms, this makes the + * ORDER of tags important. Let's hope the catalogs present the tags in a deterministic order! + * + * TODO: think about if this is a good idea, or should we enforce that for + * a certain attribute the tags define non-overlapping rules? The [DataPolicyService.validate] + * method already executes this check. + * + */ +fun List.filterFieldTransforms(): List { + val filtered: List = this.fold( + emptySet() to listOf(), + ) { ( + /* the principals that we've already encountered while going through the list */ + alreadySeenPrincipals: Set, + /* the cleaned-up list of ApiTransforms */ + acc: List, + ), + /* the original ApiTransform */ + transform: ApiTransform, -> + val principals = transform.principalsList.toSet() - alreadySeenPrincipals + val dataPolicyWithoutOverlappingPrincipals = transform.toBuilder() + .clearPrincipals() + .addAllPrincipals(principals) + .build() + alreadySeenPrincipals + principals to + acc + dataPolicyWithoutOverlappingPrincipals + }.second + // now remove duplicate defaults (without principals + val (defaults, withPrincipals) = filtered.partition { it.principalsCount == 0 } + return (withPrincipals + defaults.firstOrNull()).filterNotNull() +} diff --git a/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGenerator.kt b/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGenerator.kt index 4cd7ccf2..9901d3e0 100644 --- a/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGenerator.kt +++ b/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGenerator.kt @@ -29,7 +29,7 @@ class SnowflakeDynamicViewGenerator( val grants = dataPolicy.ruleSetsList.flatMap { ruleSet -> val principals = ruleSet.fieldTransformsList.flatMap { it.transformsList }.flatMap { it.principalsList }.toSet() + - ruleSet.rowFiltersList.flatMap { it.conditionsList }.flatMap { it.principalsList }.toSet() + ruleSet.filtersList.flatMap { it.conditionsList }.flatMap { it.principalsList }.toSet() val viewName = ruleSet.target.fullname diff --git a/app/src/main/kotlin/com/getstrm/pace/util/Util.kt b/app/src/main/kotlin/com/getstrm/pace/util/Util.kt index d88c91c7..30d0face 100644 --- a/app/src/main/kotlin/com/getstrm/pace/util/Util.kt +++ b/app/src/main/kotlin/com/getstrm/pace/util/Util.kt @@ -15,19 +15,21 @@ import io.strmprivacy.grpc.common.authz.ZedTokenContext import io.strmprivacy.grpc.common.server.InvalidArgumentException import io.strmprivacy.grpc.common.server.StrmStatusException import org.apache.commons.codec.digest.MurmurHash3 -import org.jooq.Configuration -import org.jooq.DSLContext -import org.jooq.DataType -import org.jooq.JSONB +import org.jooq.* import org.jooq.exception.DataAccessException import org.jooq.impl.DSL import org.jooq.impl.SQLDataType +import org.slf4j.LoggerFactory import java.time.Instant import java.time.OffsetDateTime import java.time.ZoneId import java.time.ZoneOffset import java.util.* import kotlin.coroutines.coroutineContext +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform as ApiTransform +import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy.RuleSet.FieldTransform as ApiFieldTransform + +val log by lazy { LoggerFactory.getLogger("Util") } suspend fun coUnwrapStatusException(block: suspend () -> R): R { try { @@ -103,6 +105,11 @@ fun GeneratedMessageV3.toJson(): String = JsonFormat.printer() .omittingInsignificantWhitespace() .print(this) +fun GeneratedMessageV3.toJsonWithDefaults(): String = JsonFormat.printer() + .omittingInsignificantWhitespace() + .includingDefaultValueFields() + .print(this) + fun Collection.toJsonb(): JSONB = JSONB.jsonb(mapper.writeValueAsString(this.toSet())) fun String?.toUUID(idFieldName: String): UUID = this?.let { @@ -130,7 +137,7 @@ fun String.yaml2json(): String { } fun GeneratedMessageV3.toYaml(): String = - ObjectMapper(YAMLFactory()).writeValueAsString(ObjectMapper().readTree(toJson())) + ObjectMapper(YAMLFactory()).writeValueAsString(ObjectMapper().readTree(toJsonWithDefaults())) fun String.parseDataPolicy(): DataPolicy = let { val builder = DataPolicy.newBuilder() @@ -138,6 +145,12 @@ fun String.parseDataPolicy(): DataPolicy = let { builder.build() } +fun String.parseTransforms(): List = let { + val builder = ApiFieldTransform.newBuilder() + JsonFormat.parser().merge(this.yaml2json(), builder) + builder.build().transformsList +} + fun Long.toTimestamp(): Timestamp { val offsetDateTime = OffsetDateTime.ofInstant(Instant.ofEpochMilli(this), ZoneId.systemDefault()) return Timestamp.newBuilder() @@ -184,10 +197,19 @@ fun List.headTailFold( return tailOperation(accumulator, this.last()) } -fun DataPolicy.Attribute.sqlDataType(): DataType<*> { - return when(type.lowercase(Locale.ROOT)) { - "string", "text" -> SQLDataType.VARCHAR - "int", "integer" -> SQLDataType.INTEGER - else -> throw NotImplementedError("Unsupported type: $type") +fun DataPolicy.Attribute.sqlDataType(): DataType<*> = + try { + if(type.lowercase()=="struct") { + SQLDataType.RECORD + } else { + sqlParser.parseField("a::$type").dataType.sqlDataType!! + } + } catch (e: Exception) { + log.warn("Can't parse {}, default to VARCHAR", type) + SQLDataType.VARCHAR } -} + +fun DataPolicy.Attribute.normalizeType(): DataPolicy.Attribute = + toBuilder().setType( sqlDataType().typeName).build() + +val sqlParser = DSL.using(SQLDialect.DEFAULT).parser() diff --git a/app/src/main/resources/db/migration/postgresql/V1__create_policies_tables.sql b/app/src/main/resources/db/migration/postgresql/V1__create_policies_tables.sql index e8d50e22..9ec9b36d 100644 --- a/app/src/main/resources/db/migration/postgresql/V1__create_policies_tables.sql +++ b/app/src/main/resources/db/migration/postgresql/V1__create_policies_tables.sql @@ -1,10 +1,10 @@ -create schema pace; +create schema if not exists pace; set search_path to pg_catalog,public,pace; create extension if not exists "uuid-ossp" schema pace; -create table pace.data_policies +create table if not exists pace.data_policies ( id varchar not null, title varchar not null, diff --git a/app/src/test/kotlin/com/getstrm/pace/bigquery/BigQueryDynamicViewGeneratorTest.kt b/app/src/test/kotlin/com/getstrm/pace/bigquery/BigQueryDynamicViewGeneratorTest.kt index a9d9f2e1..e51bf7bc 100644 --- a/app/src/test/kotlin/com/getstrm/pace/bigquery/BigQueryDynamicViewGeneratorTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/bigquery/BigQueryDynamicViewGeneratorTest.kt @@ -101,19 +101,18 @@ class BigQueryDynamicViewGeneratorTest { @Test fun `row filters to condition`() { // Given - val filter = DataPolicy.RuleSet.RowFilter.newBuilder() - .setAttribute(DataPolicy.Attribute.newBuilder().addPathComponents("age").build()) + val filter = DataPolicy.RuleSet.Filter.newBuilder() .addAllConditions( listOf( - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("fraud-detection")) .setCondition("true") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("analytics", "marketing")) .setCondition("age > 18") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .setCondition("false") .build() ) @@ -272,7 +271,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: - age diff --git a/app/src/test/kotlin/com/getstrm/pace/databricks/DatabricksDynamicViewGeneratorTest.kt b/app/src/test/kotlin/com/getstrm/pace/databricks/DatabricksDynamicViewGeneratorTest.kt index 22fb7e06..a73de93c 100644 --- a/app/src/test/kotlin/com/getstrm/pace/databricks/DatabricksDynamicViewGeneratorTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/databricks/DatabricksDynamicViewGeneratorTest.kt @@ -250,19 +250,18 @@ class DatabricksDynamicViewGeneratorTest { @Test fun `row filter to condition`() { // Given - val filter = DataPolicy.RuleSet.RowFilter.newBuilder() - .setAttribute(DataPolicy.Attribute.newBuilder().addPathComponents("age").build()) + val filter = DataPolicy.RuleSet.Filter.newBuilder() .addAllConditions( listOf( - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("fraud-detection")) .setCondition("true") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("analytics", "marketing")) .setCondition("age > 18") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .setCondition("false") .build() ) @@ -327,8 +326,8 @@ where ( @Test fun `transform - no row filters`() { - val policyWithoutRowFilters = dataPolicy.toBuilder().apply { ruleSetsBuilderList.first().clearRowFilters() }.build() - underTest = DatabricksDynamicViewGenerator(policyWithoutRowFilters) { withRenderFormatted(true) } + val policyWithoutFilters = dataPolicy.toBuilder().apply { ruleSetsBuilderList.first().clearFilters() }.build() + underTest = DatabricksDynamicViewGenerator(policyWithoutFilters) { withRenderFormatted(true) } underTest.toDynamicViewSQL() .shouldBe( """create or replace view my_catalog.my_schema.gddemo_public @@ -452,7 +451,7 @@ from mycatalog.my_schema.gddemo;""" - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: - age diff --git a/app/src/test/kotlin/com/getstrm/pace/service/DataPolicyServiceTest.kt b/app/src/test/kotlin/com/getstrm/pace/service/DataPolicyServiceTest.kt index c1cd43ae..f74c89e0 100644 --- a/app/src/test/kotlin/com/getstrm/pace/service/DataPolicyServiceTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/service/DataPolicyServiceTest.kt @@ -79,7 +79,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -109,113 +109,6 @@ rule_sets: } } - @Test - fun `validate missing age attribute`() { - val dataPolicy = """ -platform: - platform_type: SNOWFLAKE - id: snowflake -source: - type: SQL_DDL - ref: mycatalog.my_schema.gddemo - attributes: - - path_components: [transactionId] - type: bigint - - path_components: [userId] - type: string - - path_components: [email] - type: string - - path_components: [age_missing] # renamed to trigger validation failure - type: bigint - - path_components: [size] - type: string - - path_components: [hairColor] - type: string - - path_components: [transactionAmount] - type: bigint - - path_components: [items] - type: string - - path_components: [itemCount] - type: bigint - - path_components: [date] - type: timestamp - - path_components: [purpose] - type: bigint - -rule_sets: -- target: - type: DYNAMIC_VIEW - fullname: 'my_catalog.my_schema.gddemo_public' - field_transforms: - - attribute: - path_components: [ email ] - transforms: - - principals: - - analytics - - marketing - regex: - regex: '^.*(@.*)${'$'}' - replacement: '****${'$'}1' - - principals: - - fraud-detection - - admin - identity: true - - principals: [] - fixed: - value: "'****'" - - attribute: - path_components: [ userId ] - transforms: - - principals: - - fraud-detection - identity: true - - principals: [] - hash: - seed: "1234" - - attribute: - path_components: [ items ] - transforms: - - principals: [] - fixed: - value: "'****'" - - attribute: - path_components: [ hairColor ] - transforms: - - principals: [] - sql_statement: - statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: - - attribute: - path_components: [ age ] - conditions: - - principals: - - fraud-detection - condition: "true" - - principals: [] - condition: "age > 18" - - attribute: - path_components: [ userId ] - conditions: - - principals: - - marketing - condition: "userId in ('1', '2', '3', '4')" - - principals: [] - condition: "true" - - attribute: - path_components: [ transactionAmount ] - conditions: - - principals: [] - condition: "transactionAmount < 10" - """.yaml2json().parseDataPolicy() - coEvery { platforms.getProcessingPlatform(dataPolicy) } returns platform - coEvery { platform.listGroups() } returns groups("analytics", "marketing", "fraud-detection", "admin") - runBlocking { - shouldThrow { - underTest.validate(dataPolicy) - } - } - } - @Test fun `validate processing platform`() { val dataPolicy = """ @@ -262,7 +155,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -340,7 +233,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -422,7 +315,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -503,7 +396,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -581,7 +474,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: [ age ] conditions: @@ -612,11 +505,10 @@ rule_sets: } } } - } // convenience for tests -private fun groups(vararg group: String) = group.map { Group(it, it, it) } +fun groups(vararg group: String) = group.map { Group(it, it, it) } /* * base yaml of policy with happy flow attributes diff --git a/app/src/test/kotlin/com/getstrm/pace/service/RuleSetServiceTest.kt b/app/src/test/kotlin/com/getstrm/pace/service/RuleSetServiceTest.kt new file mode 100644 index 00000000..24283350 --- /dev/null +++ b/app/src/test/kotlin/com/getstrm/pace/service/RuleSetServiceTest.kt @@ -0,0 +1,294 @@ +package com.getstrm.pace.service + +import com.getstrm.pace.dao.DataPolicyDao +import com.getstrm.pace.dao.RuleSetsDao +import com.getstrm.pace.snowflake.SnowflakeClient +import io.kotest.matchers.shouldBe +import io.mockk.coEvery +import io.mockk.mockk +import io.mockk.slot +import kotlinx.coroutines.runBlocking +import org.intellij.lang.annotations.Language +import org.jooq.DSLContext +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import parseDataPolicy +import parseTransforms +import yaml2json + +class RuleSetServiceTest { + private lateinit var underTest: RuleSetService + private lateinit var dps: DataPolicyService + private val dao = mockk() + private val platforms = mockk() + private val jooq = mockk() + private val platform = mockk() + private val rulesetsDao = mockk() + private val defaultContext = "default" + + @BeforeEach + fun setUp() { + dps = DataPolicyService(defaultContext, dao, platforms, jooq) + underTest = RuleSetService(dps, rulesetsDao) + } + + @Test + fun addRuleSet() { + setupGlobalBusinessRules() + + @Language("Yaml") + val dataPolicy = """ + platform: + platform_type: SNOWFLAKE + id: snowflake + source: + ref: test1 + type: SNOWFLAKE + attributes: + - path_components: [name] + type: varchar + tags: [ pii, banaan ] + - path_components: [email] + type: varchar + tags: [ email ] + - path_components: [description] + type: varchar + """.yaml2json().parseDataPolicy() + + runBlocking { + val policyWithRulesets = underTest.addRuleSet(dataPolicy)!! + policyWithRulesets shouldBe """ + source: + ref: test1 + type: SNOWFLAKE + attributes: + - pathComponents: [ name ] + type: varchar + tags: [ pii, banaan ] + - pathComponents: [ email ] + type: varchar + tags: [ email ] + - path_components: [description] + type: varchar + platform: + platformType: SNOWFLAKE + id: snowflake + ruleSets: + - fieldTransforms: + - attribute: + pathComponents: [ name ] + type: varchar + tags: [ pii, banaan ] + transforms: + - principals: [ fraud-detection ] + identity: true + - hash: {seed: "1234"} + - attribute: + pathComponents: [ email ] + type: "varchar" + tags: [ "email" ] + transforms: + - principals: [ marketing ] + regex: {regex: "^.*(@.*)$", replacement: "\\\\1"} + - principals: [ fraud-detection ] + identity: true + - fixed: {value: "****"} + """.yaml2json().parseDataPolicy() + } + } + + @Test + fun `test overlapping rules`() { + setupGlobalBusinessRules() + + @Language("Yaml") + val dataPolicy = """ + platform: + platform_type: SNOWFLAKE + id: snowflake + source: + ref: test1 + type: SNOWFLAKE + attributes: + - path_components: [name] + type: varchar + tags: [ pii, banaan, overlap ] + - path_components: [email] + type: varchar + tags: [ email, overlap ] + - path_components: [description] + type: varchar + """.yaml2json().parseDataPolicy() + + runBlocking { + val policyWithRulesets = underTest.addRuleSet(dataPolicy)!! +// println(policyWithRulesets.toYaml().strip()) + policyWithRulesets shouldBe """ +source: + ref: test1 + type: SNOWFLAKE + attributes: + - pathComponents: [ name ] + type: varchar + tags: [ pii, banaan, overlap ] + - pathComponents: [ email ] + type: varchar + tags: [ email, overlap ] + - pathComponents: [ description ] + type: varchar +platform: + platformType: SNOWFLAKE + id: snowflake +ruleSets: +- fieldTransforms: + - attribute: + pathComponents: [ name ] + type: "varchar" + tags: [ pii, banaan, overlap ] + transforms: + - principals: [ fraud-detection ] + identity: true + - principals: [marketing] + fixed: {value: bla} + - principals: [ "analytics" ] + hash: {seed: "3"} + - hash: {seed: "1234"} + - attribute: + pathComponents: [ email ] + type: varchar + tags: [ email, overlap ] + transforms: + - principals: [ marketing ] + regex: {regex: "^.*(@.*)$", replacement: "\\\\1"} + - principals: [ fraud-detection ] + identity: true + - principals: [ analytics ] + hash: {seed: "3" } + - fixed: {value: "****" } + """.yaml2json().parseDataPolicy() + } + } + + @Test + fun `test overlapping reversed tags`() { + setupGlobalBusinessRules() + + @Language("Yaml") + val dataPolicy = """ + platform: + platform_type: SNOWFLAKE + id: snowflake + source: + ref: test1 + type: SNOWFLAKE + attributes: + - path_components: [name] + type: varchar + tags: [ overlap, pii, banaan ] + - path_components: [email] + type: varchar + tags: [ overlap, email ] + - path_components: [description] + type: varchar + """.yaml2json().parseDataPolicy() + + runBlocking { + val policyWithRulesets = underTest.addRuleSet(dataPolicy) + policyWithRulesets shouldBe """ +source: + ref: test1 + type: SNOWFLAKE + attributes: + - pathComponents: [ name ] + type: varchar + tags: [ overlap, pii, banaan ] + - pathComponents: [ email ] + type: varchar + tags: [ overlap, email ] + - pathComponents: [ description ] + type: varchar +platform: + platformType: SNOWFLAKE + id: snowflake +ruleSets: +- fieldTransforms: + - attribute: + pathComponents: [ name ] + type: varchar + # checks ignoring unknown tags + tags: [ overlap, pii, banaan ] + transforms: + - principals: [marketing] + fixed: {value: bla} + - principals: [ analytics ] + hash: {seed: "3"} + - principals: [ fraud-detection ] + nullify: {} + - fixed: {value: jopie} + - attribute: + pathComponents: [ email ] + type: "varchar" + tags: [ overlap, email ] + transforms: + - principals: [ marketing ] + fixed: {value: bla} + - principals: [ analytics ] + hash: {seed: "3" } + - principals: [ fraud-detection ] + nullify: {} + - fixed: {value: "jopie" } + """.yaml2json().parseDataPolicy() + } + } + + private fun setupGlobalBusinessRules() { + coEvery { platforms.getProcessingPlatform(any()) } returns platform + coEvery { platform.listGroups() } returns groups("analytics", "marketing", "fraud-detection", "admin") + + // Global business rules + // a map of field level tags to a list of Api Transforms that define how the + // field value gets transformed for which group member + val businessRules = mapOf( + "email" to """ + transforms: + # marketeers get only the domain + - principals: [ marketing ] + regex: {regex: "^.*(@.*)$", replacement: "\\\\1"} + # security gets everything + - principals: [ fraud-detection ] + identity: true + # everyone else gets 4 stars + - fixed: {value: "****"} + """, + "pii" to """ + # transforms to be applied to fields classified as PII + transforms: + # fraud-detection sees the original value + - principals: [ fraud-detection ] + identity: true + # everyone else gets a hashed value + - hash: {seed: "1234"} + """, + "overlap" to """ + # a business policy that is deliberately overlapping with both others + transforms: + # marketeers get only the domain + - principals: [ marketing ] + fixed: {value: bla} + # analytics gets a hash + - principals: [ analytics] + hash: {seed: "3" } + # security gets null + - principals: [ fraud-detection ] + nullify: {} + # everyone else gets jopie + - fixed: {value: jopie} + """, + ).mapValues { it.value.parseTransforms() } + + val tagSlot = slot() + coEvery { rulesetsDao.getFieldTransforms(capture(tagSlot)) } answers { + businessRules[tagSlot.captured] ?: emptyList() + } + } +} diff --git a/app/src/test/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGeneratorTest.kt b/app/src/test/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGeneratorTest.kt index 35b743e1..f07e2c2a 100644 --- a/app/src/test/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGeneratorTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/snowflake/SnowflakeDynamicViewGeneratorTest.kt @@ -99,19 +99,18 @@ class SnowflakeDynamicViewGeneratorTest { @Test fun `row filter to condition`() { // Given - val filter = DataPolicy.RuleSet.RowFilter.newBuilder() - .setAttribute(DataPolicy.Attribute.newBuilder().addPathComponents("age").build()) + val filter = DataPolicy.RuleSet.Filter.newBuilder() .addAllConditions( listOf( - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("fraud-detection")) .setCondition("true") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .addAllPrincipals(listOf("analytics", "marketing")) .setCondition("age > 18") .build(), - DataPolicy.RuleSet.RowFilter.Condition.newBuilder() + DataPolicy.RuleSet.Filter.Condition.newBuilder() .setCondition("false") .build() ) @@ -268,7 +267,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: - age diff --git a/app/src/test/kotlin/com/getstrm/pace/util/UtilTest.kt b/app/src/test/kotlin/com/getstrm/pace/util/UtilTest.kt index 0a6f26ce..99f2f223 100644 --- a/app/src/test/kotlin/com/getstrm/pace/util/UtilTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/util/UtilTest.kt @@ -102,7 +102,7 @@ rule_sets: - principals: [] sql_statement: statement: "case when hairColor = 'blonde' then 'fair' else 'dark' end" - row_filters: + filters: - attribute: path_components: - age diff --git a/gradle.properties b/gradle.properties index fcd2913c..a0ab58a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -8,4 +8,4 @@ jooqVersion=3.18.7 # TODO verify the tag is correct dockertag = ghcr.io/getstrm/pace:latest -generatedBufDependencyVersion=00000000000000./home/bvdeen +generatedBufDependencyVersion=00000000000000.609525407a2f diff --git a/scripts/catalogs/collibra/get-ecommerce-policy.sh b/scripts/catalogs/collibra/get-ecommerce-policy.sh new file mode 100755 index 00000000..da0ac45e --- /dev/null +++ b/scripts/catalogs/collibra/get-ecommerce-policy.sh @@ -0,0 +1,6 @@ +#!/bin/bash +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +exec $SCRIPT_DIR/../get-bare-policy.sh \ + -c COLLIBRA-testdrive \ + -d b6e043a7-88f1-42ee-8e81-0fdc1c96f471 \ + -s 10255be7-c2ac-43ae-be0a-a34d4e7c88b7 \-t 37f0dec4-097f-42b1-8cb6-23b46927a6ef diff --git a/scripts/catalogs/datahub/get-bare-policy-cdc-diabetes.sh b/scripts/catalogs/datahub/get-bare-policy-cdc-diabetes.sh new file mode 100755 index 00000000..5b90ebcb --- /dev/null +++ b/scripts/catalogs/datahub/get-bare-policy-cdc-diabetes.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +exec $SCRIPT_DIR/../get-bare-policy.sh \ + -d 'urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.cdc_diabetes,PROD)' \ + -s 'urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.cdc_diabetes,PROD)' \ + -t 'urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.cdc_diabetes,PROD)' + diff --git a/scripts/catalogs/datahub/get-bare-policy-gdd.sh b/scripts/catalogs/datahub/get-bare-policy-gdd.sh new file mode 100755 index 00000000..772a318a --- /dev/null +++ b/scripts/catalogs/datahub/get-bare-policy-gdd.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +exec $SCRIPT_DIR/../get-bare-policy.sh \ + -d "urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.gddemo,PROD)" \ + -s "urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.gddemo,PROD)" \ + -t "urn:li:dataset:(urn:li:dataPlatform:snowflake,strm.poc.gddemo,PROD)" + diff --git a/scripts/catalogs/get-bare-policy.sh b/scripts/catalogs/get-bare-policy.sh new file mode 100755 index 00000000..3b164921 --- /dev/null +++ b/scripts/catalogs/get-bare-policy.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +catalog=datahub-on-dev +database='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' +schema='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' +table='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' + +while getopts "c:d:s:t:" opt; do + case $opt in + c) + catalog=$OPTARG + ;; + d) + database=$OPTARG + ;; + s) + schema=$OPTARG + ;; + t) + table=$OPTARG + ;; + esac +done + +query=$( jq -n -r \ + --arg catalog $catalog \ + --arg database $database \ + --arg schema $schema \ + --arg table $table \ + '{ + "table": { + "id": $table, +"schema":{"id": $schema, "database": {"id":$database, "catalog":{"id":$catalog}}} + } + }') + +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call GetCatalogBarePolicy diff --git a/scripts/catalogs/list-catalogs.sh b/scripts/catalogs/list-catalogs.sh new file mode 100755 index 00000000..e5f711c3 --- /dev/null +++ b/scripts/catalogs/list-catalogs.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo '{}' | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListCatalogs diff --git a/scripts/catalogs/list-databases.sh b/scripts/catalogs/list-databases.sh new file mode 100755 index 00000000..dbe39b82 --- /dev/null +++ b/scripts/catalogs/list-databases.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +catalog=datahub-on-dev +while getopts "c:" opt; do + case $opt in + c) + catalog=$OPTARG + ;; + esac +done + +query=$( jq -n -r --arg id $catalog '{"catalog":{"id":$id}}' ) + +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListDatabases diff --git a/scripts/catalogs/list-recursive.sh b/scripts/catalogs/list-recursive.sh new file mode 100755 index 00000000..88499392 --- /dev/null +++ b/scripts/catalogs/list-recursive.sh @@ -0,0 +1,30 @@ +#!/bin/bash +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +re="^(.*)X_A_X(.*)$" + +$SCRIPT_DIR/list-catalogs.sh | jq -r '.catalogs[]|[.id,.type]|join("X_A_X")' | while read s ; do + [[ $s =~ $re ]] && catalog_id=${BASH_REMATCH[1]} && type=${BASH_REMATCH[2]} + echo "Catalog id=$catalog_id type=$type" + + $SCRIPT_DIR/list-databases.sh \ + -c $catalog_id \ + | jq -r '.databases[]|[.id,.displayName]|join("X_A_X")' | while read s ; do + [[ $s =~ $re ]] && db_id=${BASH_REMATCH[1]} && db_name=${BASH_REMATCH[2]} + + echo " Database id=$db_id $db_name" + + $SCRIPT_DIR/list-schemas.sh \ + -c $catalog_id -d $db_id \ + | jq -r '.schemas[]|[.id,.name]|join("X_A_X")' | while read s ; do + [[ $s =~ $re ]] && schema_id=${BASH_REMATCH[1]} && schema_name=${BASH_REMATCH[2]} + echo " Schema id=$schema_id $schema_name" + $SCRIPT_DIR/list-tables.sh \ + -c $catalog_id -d $db_id -s $schema_id \ + | jq -r '.tables[]|[.id,.name]|join("X_A_X")' | while read s ; do + [[ $s =~ $re ]] && table_id=${BASH_REMATCH[1]} && table_name=${BASH_REMATCH[2]} + echo " Table id=$table_id $table_name" + done + done + done +done diff --git a/scripts/catalogs/list-schemas.sh b/scripts/catalogs/list-schemas.sh new file mode 100755 index 00000000..1426b056 --- /dev/null +++ b/scripts/catalogs/list-schemas.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +catalog=datahub-on-dev +database='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' + +while getopts "c:d:" opt; do + case $opt in + c) + catalog=$OPTARG + ;; + d) + database=$OPTARG + ;; + esac +done +query=$( jq -n -r \ + --arg catalog $catalog \ + --arg database $database \ + '{"database": {"id":$database, "catalog":{"id":$catalog}}}' ) + +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListSchemas diff --git a/scripts/catalogs/list-tables.sh b/scripts/catalogs/list-tables.sh new file mode 100755 index 00000000..48a8da1a --- /dev/null +++ b/scripts/catalogs/list-tables.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +catalog=datahub-on-dev +database='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' +schema='urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)' + +while getopts "c:d:s:" opt; do + case $opt in + c) + catalog=$OPTARG + ;; + d) + database=$OPTARG + ;; + s) + schema=$OPTARG + ;; + esac +done + +query=$( jq -n -r \ + --arg catalog $catalog \ + --arg database $database \ + --arg schema $schema \ + '{"schema":{"id": $schema, "database": {"id":$database, "catalog":{"id":$catalog}}}}' ) + +echo $query | evans -r cli \ + --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ + call ListTables diff --git a/scripts/catalogs/odd/get-bare-policy-CATALOG_RETURNS.sh b/scripts/catalogs/odd/get-bare-policy-CATALOG_RETURNS.sh new file mode 100755 index 00000000..c2777f88 --- /dev/null +++ b/scripts/catalogs/odd/get-bare-policy-CATALOG_RETURNS.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +exec $SCRIPT_DIR/../get-bare-policy.sh -c odd -d 111 -s 111 -t 111 + From 123b0bc29c0f94878ffda58b9024cc6c2c280eda Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 15:51:11 +0200 Subject: [PATCH 4/7] refactor(strm-2708): minor stuff --- .../pace/common/AbstractDynamicViewGenerator.kt | 1 + .../main/kotlin/com/getstrm/pace/common/Exceptions.kt | 10 ---------- .../com/getstrm/pace/snowflake/SnowflakeClient.kt | 4 +++- .../pace/common/AbstractDynamicViewGeneratorTest.kt | 1 + scripts/pp/upsert-data-policy.sh | 2 -- 5 files changed, 5 insertions(+), 13 deletions(-) delete mode 100644 app/src/main/kotlin/com/getstrm/pace/common/Exceptions.kt diff --git a/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt b/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt index 9a6f97e4..c8a67552 100644 --- a/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt +++ b/app/src/main/kotlin/com/getstrm/pace/common/AbstractDynamicViewGenerator.kt @@ -1,6 +1,7 @@ package com.getstrm.pace.common import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.getstrm.pace.domain.SqlParseException import headTailFold import org.jooq.* // ktlint-disable no-wildcard-imports import org.jooq.conf.ParseNameCase diff --git a/app/src/main/kotlin/com/getstrm/pace/common/Exceptions.kt b/app/src/main/kotlin/com/getstrm/pace/common/Exceptions.kt deleted file mode 100644 index 98ec3e47..00000000 --- a/app/src/main/kotlin/com/getstrm/pace/common/Exceptions.kt +++ /dev/null @@ -1,10 +0,0 @@ -package com.getstrm.pace.common - -import io.grpc.Status -import io.strmprivacy.grpc.common.server.StrmStatusException -import org.jooq.impl.ParserException - -class SqlParseException(statement: String, cause: ParserException) : StrmStatusException( - Status.INVALID_ARGUMENT, - "SQL Statement [$statement] is invalid, please verify it's syntax. Details: ${cause.sql()}" -) diff --git a/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeClient.kt b/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeClient.kt index 0dd13f50..59441362 100644 --- a/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeClient.kt +++ b/app/src/main/kotlin/com/getstrm/pace/snowflake/SnowflakeClient.kt @@ -49,7 +49,9 @@ class SnowflakeClient( } override suspend fun applyPolicy(dataPolicy: DataPolicy) { - val statement = SnowflakeDynamicViewGenerator(dataPolicy).toDynamicViewSQL().replace("\\n".toRegex(), "\\\\n") + val statement = SnowflakeDynamicViewGenerator(dataPolicy).toDynamicViewSQL() + .replace("\\n".toRegex(), "\\\\n") + .replace("""(\\\d)""".toRegex(), """\\\\""" + "\$1") val statementCount = statement.mapNotNull { element -> element.takeIf { it == ';' } }.size.toString() val request = SnowflakeRequest( statement = statement, diff --git a/app/src/test/kotlin/com/getstrm/pace/common/AbstractDynamicViewGeneratorTest.kt b/app/src/test/kotlin/com/getstrm/pace/common/AbstractDynamicViewGeneratorTest.kt index 690e0ed6..b7870778 100644 --- a/app/src/test/kotlin/com/getstrm/pace/common/AbstractDynamicViewGeneratorTest.kt +++ b/app/src/test/kotlin/com/getstrm/pace/common/AbstractDynamicViewGeneratorTest.kt @@ -2,6 +2,7 @@ package com.getstrm.pace.common import com.getstrm.pace.util.TestDynamicViewGenerator import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy +import com.getstrm.pace.domain.SqlParseException import io.kotest.matchers.shouldBe import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows diff --git a/scripts/pp/upsert-data-policy.sh b/scripts/pp/upsert-data-policy.sh index d1a7ac8d..25428145 100755 --- a/scripts/pp/upsert-data-policy.sh +++ b/scripts/pp/upsert-data-policy.sh @@ -5,7 +5,5 @@ data_policy=$(json-case $f) request=$(echo '{}' | jq -r --argjson p "$data_policy" '{data_policy: $p}') echo $request | evans -r cli \ - --header "authorization=Bearer $(strm auth print-access-token)" \ - --host api.dev.getstrm.io --tls --port 443 \ --package getstrm.api.data_policies.v1alpha --service DataPolicyService \ call UpsertDataPolicy From 0a83ad9f035e3173c1ca8a5cad2ad4755bc20650 Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 15:58:26 +0200 Subject: [PATCH 5/7] fix(strm-2708): removed jooq transaction --- .../getstrm/pace/service/DataPolicyService.kt | 12 +-- .../main/kotlin/com/getstrm/pace/util/Util.kt | 3 + sample_data/bigquery-cdc.yaml | 85 +++++++++++++++ sample_data/databricks-cdc.yaml | 85 +++++++++++++++ sample_data/datahub-snowflake-cdc.yaml | 69 ++++++++++++ sample_data/datahub-snowflake-gdd.yaml | 93 ++++++++++++++++ sample_data/snowflake-cdc.yaml | 102 ++++++++++++++++++ 7 files changed, 442 insertions(+), 7 deletions(-) create mode 100644 sample_data/bigquery-cdc.yaml create mode 100644 sample_data/databricks-cdc.yaml create mode 100644 sample_data/datahub-snowflake-cdc.yaml create mode 100644 sample_data/datahub-snowflake-gdd.yaml create mode 100644 sample_data/snowflake-cdc.yaml diff --git a/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt b/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt index c744fd7e..6454b0cc 100644 --- a/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt +++ b/app/src/main/kotlin/com/getstrm/pace/service/DataPolicyService.kt @@ -1,7 +1,6 @@ package com.getstrm.pace.service import build.buf.gen.getstrm.api.data_policies.v1alpha.DataPolicy -import coWithTransactionResult import com.getstrm.pace.dao.DataPolicyDao import com.getstrm.pace.domain.* import org.jooq.DSLContext @@ -20,12 +19,11 @@ class DataPolicyService( suspend fun upsertDataPolicy(dataPolicy: DataPolicy): DataPolicy { validate(dataPolicy) - return jooq.coWithTransactionResult { - // TODO should it remove old ruleset targets? - val newDataPolicy = dataPolicyDao.upsertDataPolicy(dataPolicy, context, it) - enforceStatement(newDataPolicy) - newDataPolicy - } + // TODO should it remove old ruleset targets? + // TODO the two statements below should be wrapped in a transaction + val newDataPolicy = dataPolicyDao.upsertDataPolicy(dataPolicy, context, jooq) + enforceStatement(newDataPolicy) + return newDataPolicy } suspend fun validate(dataPolicy: DataPolicy) { diff --git a/app/src/main/kotlin/com/getstrm/pace/util/Util.kt b/app/src/main/kotlin/com/getstrm/pace/util/Util.kt index 30d0face..9ba85138 100644 --- a/app/src/main/kotlin/com/getstrm/pace/util/Util.kt +++ b/app/src/main/kotlin/com/getstrm/pace/util/Util.kt @@ -49,6 +49,7 @@ private fun getFirstStrmStatusException(throwable: Throwable): StrmStatusExcepti } return null } +/* suspend fun DSLContext.coWithTransactionResult(transactionalBlock: suspend (DSLContext) -> R): R = coUnwrapStatusException { @@ -81,6 +82,8 @@ suspend fun DSLContext.transactionCoroutine(transactional: suspend (Configur // }.awaitFirstOrNull() as T } + */ + val mapper = jacksonObjectMapper() /** diff --git a/sample_data/bigquery-cdc.yaml b/sample_data/bigquery-cdc.yaml new file mode 100644 index 00000000..647bcdf7 --- /dev/null +++ b/sample_data/bigquery-cdc.yaml @@ -0,0 +1,85 @@ +info: + create_time: 2023-10-13T07:48:21.743Z + title: stream-machine-development.dynamic_views.cdc_diabetes + update_time: 2023-10-13T07:48:21.743Z +platform: + id: bigquery-dev + platform_type: BIGQUERY +rule_sets: + - target: + fullname: "stream-machine-development.dynamic_views.cdc-diabetes-view" + field_transforms: + - attribute: + path_components: [ HighChol ] + type: integer + transforms: + - principals: [] + fixed: + value: blabla +source: + attributes: + - path_components: + - HighBP + type: integer + - path_components: + - HighChol + type: integer + - path_components: + - CholCheck + type: integer + - path_components: + - BMI + type: integer + - path_components: + - Smoker + type: integer + - path_components: + - Stroke + type: integer + - path_components: + - HeartDiseaseorAttack + type: integer + - path_components: + - PhysActivity + type: integer + - path_components: + - Fruits + type: integer + - path_components: + - Veggies + type: integer + - path_components: + - HvyAlcoholConsump + type: integer + - path_components: + - AnyHealthcare + type: integer + - path_components: + - NoDocbcCost + type: integer + - path_components: + - GenHlth + type: integer + - path_components: + - MentHlth + type: integer + - path_components: + - PhysHlth + type: integer + - path_components: + - DiffWalk + type: integer + - path_components: + - Sex + type: integer + - path_components: + - Age + type: integer + - path_components: + - Education + type: integer + - path_components: + - Income + type: integer + ref: stream-machine-development.dynamic_views.cdc_diabetes + type: BIGQUERY diff --git a/sample_data/databricks-cdc.yaml b/sample_data/databricks-cdc.yaml new file mode 100644 index 00000000..0362e287 --- /dev/null +++ b/sample_data/databricks-cdc.yaml @@ -0,0 +1,85 @@ +info: + create_time: 2023-10-13T07:47:55.189Z + description: Created by the file upload UI + title: cdc_diabetes + update_time: 2023-10-13T07:47:55.189Z +platform: + id: databricks-pim@getstrm.com + platform_type: DATABRICKS +rule_sets: + - target: + fullname: "strm.poc.cdc_diabetes_view" + field_transforms: + - attribute: + path_components: [ HighChol ] + transforms: + - principals: [] + fixed: + value: bla bla bla +source: + attributes: + - path_components: + - HighBP + type: bigint + - path_components: + - HighChol + type: bigint + - path_components: + - CholCheck + type: bigint + - path_components: + - BMI + type: bigint + - path_components: + - Smoker + type: bigint + - path_components: + - Stroke + type: bigint + - path_components: + - HeartDiseaseorAttack + type: bigint + - path_components: + - PhysActivity + type: bigint + - path_components: + - Fruits + type: bigint + - path_components: + - Veggies + type: bigint + - path_components: + - HvyAlcoholConsump + type: bigint + - path_components: + - AnyHealthcare + type: bigint + - path_components: + - NoDocbcCost + type: bigint + - path_components: + - GenHlth + type: bigint + - path_components: + - MentHlth + type: bigint + - path_components: + - PhysHlth + type: bigint + - path_components: + - DiffWalk + type: bigint + - path_components: + - Sex + type: bigint + - path_components: + - Age + type: bigint + - path_components: + - Education + type: bigint + - path_components: + - Income + type: bigint + ref: strm.poc.cdc_diabetes + type: DATABRICKS diff --git a/sample_data/datahub-snowflake-cdc.yaml b/sample_data/datahub-snowflake-cdc.yaml new file mode 100644 index 00000000..9893d5d2 --- /dev/null +++ b/sample_data/datahub-snowflake-cdc.yaml @@ -0,0 +1,69 @@ +data_policy: + info: + description: snowflake + title: Snowflake + source: + attributes: + - path_components: + - highbp + type: numeric + - path_components: + - highchol + type: numeric + - path_components: + - cholcheck + type: numeric + - path_components: + - bmi + type: numeric + - path_components: + - smoker + type: numeric + - path_components: + - stroke + type: numeric + - path_components: + - heartdiseaseorattack + type: numeric + - path_components: + - physactivity + type: numeric + - path_components: + - fruits + type: numeric + - path_components: + - veggies + type: numeric + - path_components: + - hvyalcoholconsump + type: numeric + - path_components: + - anyhealthcare + type: numeric + - path_components: + - nodocbccost + type: numeric + - path_components: + - genhlth + type: numeric + - path_components: + - menthlth + type: numeric + - path_components: + - physhlth + type: numeric + - path_components: + - diffwalk + type: numeric + - path_components: + - sex + type: numeric + - path_components: + - age + type: numeric + - path_components: + - education + type: numeric + - path_components: + - income + type: numeric diff --git a/sample_data/datahub-snowflake-gdd.yaml b/sample_data/datahub-snowflake-gdd.yaml new file mode 100644 index 00000000..e804cda7 --- /dev/null +++ b/sample_data/datahub-snowflake-gdd.yaml @@ -0,0 +1,93 @@ +rule_sets: + - target: + fullname: STRM.POC.GDDDEMO_V3 + filters: + - conditions: + - principals: [ FRAUD_AND_RISK ] + condition: "true" + - principals: [] + condition: "age > 8" + - conditions: + - principals : [] + condition: "transactionamount < 10" + field_transforms: + - attribute: + path_components: [ userid ] + transforms: + - principals: [ "FRAUD_AND_RISK"] + identity: true + - principals: [] + hash: + seed: "1234" + - attribute: + path_components: [ email ] + transforms: + - principals: [ MARKETING ] + regex: + regex: "^.*(@.*)$" + replacement: "****\\\\1" + - principals: [ FRAUD_AND_RISK] + identity: true + - principals: [] + fixed: + value: "****" + - attribute: + path_components: [ haircolor ] + transforms: + - principals: [] + sql_statement: + statement: "CASE WHEN haircolor = 'blonde' THEN 'fair' ELSE 'dark' END" +info: + description: snowflake + tags: + - purpose:marketing + title: Snowflake +source: + ref: POC.GDDEMO + attributes: + - path_components: + - transactionid + type: numeric + - path_components: + - userid + tags: + - PII + - sensitive + type: varchar + - path_components: + - email + tags: + - PII + - sensitive + type: varchar + - path_components: + - age + tags: + - sensitive + type: numeric + - path_components: + - size + type: varchar + - path_components: + - haircolor + type: varchar + - path_components: + - transactionamount + tags: + - sensitive + type: numeric + - path_components: + - items + type: varchar + - path_components: + - itemcount + type: numeric + - path_components: + - date + type: time + - path_components: + - purpose + type: numeric +platform: + id: snowflake-demo + platform_type: SNOWFLAKE diff --git a/sample_data/snowflake-cdc.yaml b/sample_data/snowflake-cdc.yaml new file mode 100644 index 00000000..96955835 --- /dev/null +++ b/sample_data/snowflake-cdc.yaml @@ -0,0 +1,102 @@ +info: {} +platform: + id: snowflake-demo + platform_type: SNOWFLAKE +rule_sets: + - target: + fullname: "POC.CDC_DIABETES_VIEW" + field_transforms: + - attribute: + path_components: [ HIGHCHOL ] + transforms: + - principals: [] + fixed: + value: bla bla +source: + attributes: + - path_components: + - HIGHBP + required: true + type: numeric + - path_components: + - HIGHCHOL + required: true + type: numeric + - path_components: + - CHOLCHECK + required: true + type: numeric + - path_components: + - BMI + required: true + type: numeric + - path_components: + - SMOKER + required: true + type: numeric + - path_components: + - STROKE + required: true + type: numeric + - path_components: + - HEARTDISEASEORATTACK + required: true + type: numeric + - path_components: + - PHYSACTIVITY + required: true + type: numeric + - path_components: + - FRUITS + required: true + type: numeric + - path_components: + - VEGGIES + required: true + type: numeric + - path_components: + - HVYALCOHOLCONSUMP + required: true + type: numeric + - path_components: + - ANYHEALTHCARE + required: true + type: numeric + - path_components: + - NODOCBCCOST + required: true + type: numeric + - path_components: + - GENHLTH + required: true + type: numeric + - path_components: + - MENTHLTH + required: true + type: numeric + - path_components: + - PHYSHLTH + required: true + type: numeric + - path_components: + - DIFFWALK + required: true + type: numeric + - path_components: + - SEX + required: true + type: numeric + - path_components: + - AGE + required: true + type: numeric + - path_components: + - EDUCATION + required: true + type: numeric + - path_components: + - INCOME + required: true + type: numeric + ref: POC.CDC_DIABETES + type: SNOWFLAKE From b99b08e7b2566a3454b06b8d9951357908e625db Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 16:03:39 +0200 Subject: [PATCH 6/7] fix(strm-2708): postgres 15 --- .github/workflows/build-and-publish.yaml | 2 +- app/build.gradle.kts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 654a2c9a..0313e89f 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -14,7 +14,7 @@ jobs: # based on: https://docs.github.com/en/actions/using-containerized-services/creating-postgresql-service-containers#running-jobs-directly-on-the-runner-machine services: postgres: - image: postgres + image: postgres:15 env: POSTGRES_PASSWORD: postgres # Set health checks to wait until postgres has started diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 7aac037d..a107f933 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -142,7 +142,7 @@ val createPostgresContainer = tasks.register("jooqPostgresCreate", DockerCreateContainer::class) { dependsOn(removePostgresContainer) group = "postgres" - targetImageId("postgres:12") + targetImageId("postgres:15") containerName.set("jooq-postgres") hostConfig.portBindings.set(listOf("$postgresPort:5432")) hostConfig.autoRemove.set(true) From 9f87d0f995c53c0669d96c8abce518d82fba1289 Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Tue, 24 Oct 2023 16:15:58 +0200 Subject: [PATCH 7/7] feat(strm-2708): unset postgres image version free --- .github/workflows/build-and-publish.yaml | 2 +- app/build.gradle.kts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 0313e89f..654a2c9a 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -14,7 +14,7 @@ jobs: # based on: https://docs.github.com/en/actions/using-containerized-services/creating-postgresql-service-containers#running-jobs-directly-on-the-runner-machine services: postgres: - image: postgres:15 + image: postgres env: POSTGRES_PASSWORD: postgres # Set health checks to wait until postgres has started diff --git a/app/build.gradle.kts b/app/build.gradle.kts index a107f933..3094dddb 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -142,7 +142,7 @@ val createPostgresContainer = tasks.register("jooqPostgresCreate", DockerCreateContainer::class) { dependsOn(removePostgresContainer) group = "postgres" - targetImageId("postgres:15") + targetImageId("postgres") containerName.set("jooq-postgres") hostConfig.portBindings.set(listOf("$postgresPort:5432")) hostConfig.autoRemove.set(true)