Skip to content

Commit

Permalink
Convert DECIMAL type (#53)
Browse files Browse the repository at this point in the history
Author: edvardas <[email protected]>
  • Loading branch information
edvaone authored Jul 25, 2024
1 parent d8f2e72 commit eafce3d
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.exacaster.deltafetch.search.parquet.readsupport;

import java.math.BigDecimal;
import java.math.BigInteger;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
Expand Down Expand Up @@ -51,7 +53,7 @@ public Optional<Converter> visit(
@Override
public Optional<Converter> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
return of(new SimplePrimitiveConverter(field.getName()));
return of(new DecimalConverter(field.getName(), decimalLogicalType.getScale()));
}
}).orElse(new SimplePrimitiveConverter(field.getName()));
}
Expand Down Expand Up @@ -149,4 +151,28 @@ public void addBinary(Binary value) {
array.add(value.toStringUsingUTF8());
}
}

private class DecimalConverter extends PrimitiveConverter {
private final String name;
private final int scale;

public DecimalConverter(String name, int scale) {
this.name = name;
this.scale = scale;
}

@Override
public void addBinary(Binary value) {
BigInteger unscaledValue = new BigInteger(value.getBytes());
BigDecimal decimalValue = new BigDecimal(unscaledValue, scale);
record.put(name, decimalValue);
}

@Override
public void addLong(long value) {
BigInteger unscaledValue = BigInteger.valueOf(value);
BigDecimal decimalValue = new BigDecimal(unscaledValue, scale);
record.put(name, decimalValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,28 @@ class SearchServiceTest extends Specification {
then: "returns empty"
result.isEmpty()
}
def worksWithDifferentTypes() {
given:
def cache = Mock(SyncCache) {
get(*_) >> Optional.empty()
}
def conf = new Configuration()
def statsReader = new DeltaMetaReader(conf, cache)
def svc = new SearchService(statsReader, new Configuration())
def path = getClass().getResource("/test_data_types").toString()

when:
def result = svc.find(path, [new ColumnValueFilter("user_id", "555")], true, 1).findFirst()

then:
result.isPresent()
result.get().getValue().get("trait_string") == "ACTIVE"
result.get().getValue().get("trait_decimal18_3") == 5.555
result.get().getValue().get("trait_decimal21_3") == 55.123
result.get().getValue().get("trait_double") == 12345.678
result.get().getValue().get("trait_float") == 12345.678F
result.get().getValue().get("trait_int") == 12345
result.get().getValue().get("trait_bigint") == 123456789012345
result.get().getValue().get("trait_boolean") == true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1721901554498,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"2873"},"engineInfo":"Apache-Spark/3.4.2 Delta-Lake/2.4.0","txnId":"b5285209-e1ef-4844-8c25-9353a33e2b75"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"17da5576-7e97-4e09-a01d-74f679b262e8","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"user_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_decimal18_3\",\"type\":\"decimal(18,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_decimal21_3\",\"type\":\"decimal(21,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_float\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_bigint\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"trait_boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1721901552277}}
{"add":{"path":"part-00000-d31af104-2315-4f5a-9a24-0e41fa1c58f2-c000.snappy.parquet","partitionValues":{},"size":2873,"modificationTime":1721901554000,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"user_id\":\"111\",\"trait_string\":\"ACTIVE\",\"trait_decimal18_3\":1.111,\"trait_decimal21_3\":11.123,\"trait_double\":3456.78,\"trait_float\":12345.678,\"trait_int\":12345,\"trait_bigint\":123456789012345},\"maxValues\":{\"user_id\":\"555\",\"trait_string\":\"DEACTIVATED\",\"trait_decimal18_3\":5.555,\"trait_decimal21_3\":55.123,\"trait_double\":12345.678,\"trait_float\":98765.43,\"trait_int\":98765,\"trait_bigint\":987654321098765},\"nullCount\":{\"user_id\":0,\"trait_string\":0,\"trait_decimal18_3\":0,\"trait_decimal21_3\":0,\"trait_double\":0,\"trait_float\":0,\"trait_int\":0,\"trait_bigint\":0,\"trait_boolean\":0}}"}}
Binary file not shown.

0 comments on commit eafce3d

Please sign in to comment.