From 74140b05aae68ce13772afec02c0c2740b7af31c Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Sat, 30 Nov 2024 22:28:26 -0700 Subject: [PATCH] Support new view types when reading --- Project.toml | 1 + src/Arrow.jl | 3 +- src/arraytypes/arraytypes.jl | 1 + src/arraytypes/views.jl | 62 ++++++++++++++++++++++++ src/eltypes.jl | 6 +-- src/metadata/Message.jl | 7 ++- src/metadata/Schema.jl | 90 ++++++++++++++++++++++++++++++++++ src/table.jl | 94 +++++++++++++++++++++++------------- 8 files changed, 226 insertions(+), 38 deletions(-) create mode 100644 src/arraytypes/views.jl diff --git a/Project.toml b/Project.toml index 80e92e28..b18aef46 100644 --- a/Project.toml +++ b/Project.toml @@ -32,6 +32,7 @@ LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c" +StringViews = "354b36f9-a18e-4713-926e-db85100087ba" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53" TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" diff --git a/src/Arrow.jl b/src/Arrow.jl index efef8266..97b7f91d 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -53,7 +53,8 @@ using DataAPI, CodecZstd, TimeZones, BitIntegers, - ConcurrentUtilities + ConcurrentUtilities, + StringViews export ArrowTypes diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index b417abf6..599942db 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -271,3 +271,4 @@ include("map.jl") include("struct.jl") include("unions.jl") include("dictencoding.jl") +include("views.jl") diff --git a/src/arraytypes/views.jl b/src/arraytypes/views.jl new file mode 100644 index 00000000..f13774b9 --- /dev/null +++ b/src/arraytypes/views.jl @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +struct ViewElement + length::Int32 + prefix::Int32 + bufindex::Int32 + offset::Int32 +end + +""" + Arrow.View + +An `ArrowVector` where each element is a variable sized list of some kind, like an `AbstractVector` or `AbstractString`. +""" +struct View{T} <: ArrowVector{T} + arrow::Vector{UInt8} # need to hold a reference to arrow memory blob + validity::ValidityBitmap + data::Vector{ViewElement} + inline::Vector{UInt8} # `data` field reinterpreted as a byte array + buffers::Vector{Vector{UInt8}} # holds non-inlined data + ℓ::Int + metadata::Union{Nothing,Base.ImmutableDict{String,String}} +end + +Base.size(l::View) = (l.ℓ,) + +@propagate_inbounds function Base.getindex(l::View{T}, i::Integer) where {T} + @boundscheck checkbounds(l, i) + @inbounds v = l.data[i] + S = Base.nonmissingtype(T) + if S <: Base.CodeUnits + # BinaryView + return !l.validity[i] ? missing : + v.length < 13 ? + Base.CodeUnits(StringView(@view l.inline[(((i - 1) * 16) + 5):(((i - 1) * 16) + 5 + v.length - 1)])) : + Base.CodeUnits(StringView(@view l.buffers[v.bufindex + 1][(v.offset + 1):(v.offset + v.length)])) + else + # Utf8View + return !l.validity[i] ? missing : + v.length < 13 ? + ArrowTypes.fromarrow(T, StringView(@view l.inline[(((i - 1) * 16) + 5):(((i - 1) * 16) + 5 + v.length - 1)])) : + ArrowTypes.fromarrow(T, StringView(@view l.buffers[v.bufindex + 1][(v.offset + 1):(v.offset + v.length)])) + end +end + +# @propagate_inbounds function Base.setindex!(l::List{T}, v, i::Integer) where {T} + +# end diff --git a/src/eltypes.jl b/src/eltypes.jl index ffc53c03..40a1c622 100644 --- a/src/eltypes.jl +++ b/src/eltypes.jl @@ -129,12 +129,12 @@ function arrowtype(b, ::Type{T}) where {T<:AbstractFloat} return Meta.FloatingPoint, Meta.floatingPointEnd(b), nothing end -juliaeltype(f::Meta.Field, b::Union{Meta.Utf8,Meta.LargeUtf8}, convert) = String +juliaeltype(f::Meta.Field, b::Union{Meta.Utf8,Meta.LargeUtf8,Meta.Utf8View}, convert) = String datasizeof(x) = sizeof(x) datasizeof(x::AbstractVector) = sum(datasizeof, x) -juliaeltype(f::Meta.Field, b::Union{Meta.Binary,Meta.LargeBinary}, convert) = Base.CodeUnits +juliaeltype(f::Meta.Field, b::Union{Meta.Binary,Meta.LargeBinary,Meta.BinaryView}, convert) = Base.CodeUnits juliaeltype(f::Meta.Field, x::Meta.FixedSizeBinary, convert) = NTuple{Int(x.byteWidth),UInt8} @@ -428,7 +428,7 @@ ArrowTypes.JuliaType(::Val{PERIOD_SYMBOL}, ::Type{Duration{U}}) where {U} = peri ArrowTypes.fromarrow(::Type{P}, x::Duration{U}) where {P<:Dates.Period,U} = convert(P, x) # nested types; call juliaeltype recursively on nested children -function juliaeltype(f::Meta.Field, list::Union{Meta.List,Meta.LargeList}, convert) +function juliaeltype(f::Meta.Field, list::Union{Meta.List,Meta.LargeList,Meta.ListView,Meta.LargeListView}, convert) return Vector{juliaeltype(f.children[1], buildmetadata(f.children[1]), convert)} end diff --git a/src/metadata/Message.jl b/src/metadata/Message.jl index 139793db..b7883141 100644 --- a/src/metadata/Message.jl +++ b/src/metadata/Message.jl @@ -75,7 +75,7 @@ struct RecordBatch <: FlatBuffers.Table pos::Base.Int end -Base.propertynames(x::RecordBatch) = (:length, :nodes, :buffers, :compression) +Base.propertynames(x::RecordBatch) = (:length, :nodes, :buffers, :compression, :variadicBufferCounts) function Base.getproperty(x::RecordBatch, field::Symbol) if field === :length @@ -97,6 +97,11 @@ function Base.getproperty(x::RecordBatch, field::Symbol) y = FlatBuffers.indirect(x, o + FlatBuffers.pos(x)) return FlatBuffers.init(BodyCompression, FlatBuffers.bytes(x), y) end + elseif field === :variadicBufferCounts + o = FlatBuffers.offset(x, 12) + if o != 0 + return FlatBuffers.Array{Int32}(x, o) + end end return nothing end diff --git a/src/metadata/Schema.jl b/src/metadata/Schema.jl index d4cfc821..161bc908 100644 --- a/src/metadata/Schema.jl +++ b/src/metadata/Schema.jl @@ -401,6 +401,91 @@ durationAddUnit(b::FlatBuffers.Builder, unit::TimeUnit.T) = FlatBuffers.prependslot!(b, 0, unit, 1) durationEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) +# /// Contains two child arrays, run_ends and values. +# /// The run_ends child array must be a 16/32/64-bit integer array +# /// which encodes the indices at which the run with the value in +# /// each corresponding index in the values child array ends. +# /// Like list/struct types, the value array can be of any type. +# table RunEndEncoded { +# } +struct RunEndEncoded <: FlatBuffers.Table + bytes::Vector{UInt8} + pos::Base.Int +end + +Base.propertynames(x::RunEndEncoded) = () + +runEndEncodedStart(b::FlatBuffers.Builder) = FlatBuffers.startobject!(b, 0) +runEndEncodedEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) + +# /// Logically the same as Binary, but the internal representation uses a view +# /// struct that contains the string length and either the string's entire data +# /// inline (for small strings) or an inlined prefix, an index of another buffer, +# /// and an offset pointing to a slice in that buffer (for non-small strings). +# /// +# /// Since it uses a variable number of data buffers, each Field with this type +# /// must have a corresponding entry in `variadicBufferCounts`. +# table BinaryView { +# } +struct BinaryView <: FlatBuffers.Table + bytes::Vector{UInt8} + pos::Base.Int +end + +Base.propertynames(x::BinaryView) = () + +binaryViewStart(b::FlatBuffers.Builder) = FlatBuffers.startobject!(b, 0) +binaryViewEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) + +# /// Logically the same as Utf8, but the internal representation uses a view +# /// struct that contains the string length and either the string's entire data +# /// inline (for small strings) or an inlined prefix, an index of another buffer, +# /// and an offset pointing to a slice in that buffer (for non-small strings). +# /// +# /// Since it uses a variable number of data buffers, each Field with this type +# /// must have a corresponding entry in `variadicBufferCounts`. +# table Utf8View { +# } +struct Utf8View <: FlatBuffers.Table + bytes::Vector{UInt8} + pos::Base.Int +end + +Base.propertynames(x::Utf8View) = () + +utf8ViewStart(b::FlatBuffers.Builder) = FlatBuffers.startobject!(b, 0) +utf8ViewEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) + +# /// Represents the same logical types that List can, but contains offsets and +# /// sizes allowing for writes in any order and sharing of child values among +# /// list values. +# table ListView { +# } +struct ListView <: FlatBuffers.Table + bytes::Vector{UInt8} + pos::Base.Int +end + +Base.propertynames(x::ListView) = () + +listViewStart(b::FlatBuffers.Builder) = FlatBuffers.startobject!(b, 0) +listViewEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) + +# /// Represents the same logical types that LargeList can, but contains offsets +# /// and sizes allowing for writes in any order and sharing of child values among +# /// list values. +# table LargeListView { +# } +struct LargeListView <: FlatBuffers.Table + bytes::Vector{UInt8} + pos::Base.Int +end + +Base.propertynames(x::LargeListView) = () + +largeListViewStart(b::FlatBuffers.Builder) = FlatBuffers.startobject!(b, 0) +largeListViewEnd(b::FlatBuffers.Builder) = FlatBuffers.endobject!(b) + function Type(b::UInt8) b == 1 && return Null b == 2 && return Int @@ -423,6 +508,11 @@ function Type(b::UInt8) b == 19 && return LargeBinary b == 20 && return LargeUtf8 b == 21 && return LargeList + b == 22 && return RunEndEncoded + b == 23 && return BinaryView + b == 24 && return Utf8View + b == 25 && return ListView + b == 26 && return LargeListView return nothing end diff --git a/src/table.jl b/src/table.jl index ecd8b1d8..bf5b5839 100644 --- a/src/table.jl +++ b/src/table.jl @@ -621,32 +621,34 @@ buildmetadata(x::AbstractDict) = x function Base.iterate( x::VectorIterator, - (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)), + (columnidx, nodeidx, bufferidx, varbufferidx)=(Int64(1), Int64(1), Int64(1), Int64(1)), ) columnidx > length(x.schema.fields) && return nothing field = x.schema.fields[columnidx] - @debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" - A, nodeidx, bufferidx = build( + @debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx, varbufferidx = $varbufferidx" + A, nodeidx, bufferidx, varbufferidx = build( field, x.batch, x.batch.msg.header, x.dictencodings, nodeidx, bufferidx, + varbufferidx, x.convert, ) - @debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx" + @debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx, varbufferidx = $varbufferidx" @debugv 3 A - return A, (columnidx + 1, nodeidx, bufferidx) + return A, (columnidx + 1, nodeidx, bufferidx, varbufferidx) end Base.length(x::VectorIterator) = length(x.schema.fields) const ListTypes = Union{Meta.Utf8,Meta.LargeUtf8,Meta.Binary,Meta.LargeBinary,Meta.List,Meta.LargeList} -const LargeLists = Union{Meta.LargeUtf8,Meta.LargeBinary,Meta.LargeList} +const LargeLists = Union{Meta.LargeUtf8,Meta.LargeBinary,Meta.LargeList,Meta.LargeListView} +const ViewTypes = Union{Meta.Utf8View,Meta.BinaryView,Meta.ListView,Meta.LargeListView} -function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert) +function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) d = field.dictionary if d !== nothing validity = buildbitmap(batch, rb, nodeidx, bufferidx) @@ -665,10 +667,10 @@ function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert) nodeidx += 1 bufferidx += 1 else - A, nodeidx, bufferidx = - build(field, field.type, batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = + build(field, field.type, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) end - return A, nodeidx, bufferidx + return A, nodeidx, bufferidx, varbufferidx end function buildbitmap(batch, rb, nodeidx, bufferidx) @@ -744,7 +746,7 @@ end const SubVector{T,P} = SubArray{T,1,P,Tuple{UnitRange{Int64}},true} -function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" validity = buildbitmap(batch, rb, nodeidx, bufferidx) bufferidx += 1 @@ -759,16 +761,18 @@ function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, c meta = buildmetadata(f.custom_metadata) T = juliaeltype(f, meta, convert) if L isa Meta.Utf8 || + L isa Meta.Utf8View || L isa Meta.LargeUtf8 || L isa Meta.Binary || + L isa Meta.BinaryView || L isa Meta.LargeBinary buffer = rb.buffers[bufferidx] bytes, A = reinterp(UInt8, batch, buffer, rb.compression) bufferidx += 1 else bytes = UInt8[] - A, nodeidx, bufferidx = - build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = + build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) # juliaeltype returns Vector for List, translate to SubArray S = Base.nonmissingtype(T) if S <: Vector @@ -776,7 +780,29 @@ function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, c T = S == T ? ST : Union{Missing,ST} end end - return List{T,OT,typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx + return List{T,OT,typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx, varbufferidx +end + +function build(f::Meta.Field, L::ViewTypes, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) + @debugv 2 "building array: L = $L" + validity = buildbitmap(batch, rb, nodeidx, bufferidx) + bufferidx += 1 + buffer = rb.buffers[bufferidx] + inline, views = reinterp(ViewElement, batch, buffer, rb.compression) + bufferidx += 1 + buffers = Vector{UInt8}[] + for i = 1:rb.variadicBufferCounts[varbufferidx] + buffer = rb.buffers[bufferidx] + _, A = reinterp(UInt8, batch, buffer, rb.compression) + push!(buffers, A) + bufferidx += 1 + end + varbufferidx += 1 + len = rb.nodes[nodeidx].length + nodeidx += 1 + meta = buildmetadata(f.custom_metadata) + T = juliaeltype(f, meta, convert) + return View{T}(batch.bytes, validity, views, inline, buffers, len, meta), nodeidx, bufferidx, varbufferidx end function build( @@ -787,6 +813,7 @@ function build( de, nodeidx, bufferidx, + varbufferidx, convert, ) @debugv 2 "building array: L = $L" @@ -800,15 +827,15 @@ function build( bufferidx += 1 else bytes = UInt8[] - A, nodeidx, bufferidx = - build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = + build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) end meta = buildmetadata(f.custom_metadata) T = juliaeltype(f, meta, convert) - return FixedSizeList{T,typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx + return FixedSizeList{T,typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx, varbufferidx end -function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" validity = buildbitmap(batch, rb, nodeidx, bufferidx) bufferidx += 1 @@ -820,13 +847,13 @@ function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, co bufferidx += 1 len = rb.nodes[nodeidx].length nodeidx += 1 - A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) meta = buildmetadata(f.custom_metadata) T = juliaeltype(f, meta, convert) - return Map{T,OT,typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx + return Map{T,OT,typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx, varbufferidx end -function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" validity = buildbitmap(batch, rb, nodeidx, bufferidx) bufferidx += 1 @@ -834,17 +861,17 @@ function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, vecs = [] nodeidx += 1 for child in f.children - A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = build(child, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) push!(vecs, A) end data = Tuple(vecs) meta = buildmetadata(f.custom_metadata) T = juliaeltype(f, meta, convert) fnames = ntuple(i -> Symbol(f.children[i].name), length(f.children)) - return Struct{T,typeof(data),fnames}(validity, data, len, meta), nodeidx, bufferidx + return Struct{T,typeof(data),fnames}(validity, data, len, meta), nodeidx, bufferidx, varbufferidx end -function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" buffer = rb.buffers[bufferidx] bytes, typeIds = reinterp(UInt8, batch, buffer, rb.compression) @@ -857,7 +884,7 @@ function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, vecs = [] nodeidx += 1 for child in f.children - A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert) + A, nodeidx, bufferidx, varbufferidx = build(child, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) push!(vecs, A) end data = Tuple(vecs) @@ -869,20 +896,21 @@ function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, else B = SparseUnion{T,UT,typeof(data)}(bytes, typeIds, data, meta) end - return B, nodeidx, bufferidx + return B, nodeidx, bufferidx, varbufferidx end -function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" meta = buildmetadata(f.custom_metadata) T = juliaeltype(f, meta, convert) return NullVector{maybemissing(T)}(MissingVector(rb.nodes[nodeidx].length), meta), - nodeidx + 1, - bufferidx + nodeidx + 1, + bufferidx, + varbufferidx end # primitives -function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) where {L} +function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) where {L} @debugv 2 "building array: L = $L" validity = buildbitmap(batch, rb, nodeidx, bufferidx) bufferidx += 1 @@ -895,10 +923,10 @@ function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) w len = rb.nodes[nodeidx].length T = juliaeltype(f, meta, convert) @debugv 2 "final julia type for primitive: T = $T" - return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1 + return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1, varbufferidx end -function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, convert) +function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) @debugv 2 "building array: L = $L" validity = buildbitmap(batch, rb, nodeidx, bufferidx) bufferidx += 1 @@ -923,5 +951,5 @@ function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, c end len = rb.nodes[nodeidx].length T = juliaeltype(f, meta, convert) - return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1 + return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1, varbufferidx end