Skip to content

Commit

Permalink
Make new View array type for BinaryView/Utf8View
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Aug 6, 2024
1 parent 8a574f7 commit 43acd2a
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 51 deletions.
17 changes: 1 addition & 16 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name = "Arrow"
uuid = "69666777-d1a9-59fb-9406-91d4454c9d45"
authors = ["quinnj <[email protected]>"]
Expand All @@ -32,6 +16,7 @@ LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
StringViews = "354b36f9-a18e-4713-926e-db85100087ba"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
Expand Down
3 changes: 2 additions & 1 deletion src/Arrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ using DataAPI,
CodecZstd,
TimeZones,
BitIntegers,
ConcurrentUtilities
ConcurrentUtilities,
StringViews

export ArrowTypes

Expand Down
1 change: 1 addition & 0 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,4 @@ include("map.jl")
include("struct.jl")
include("unions.jl")
include("dictencoding.jl")
include("views.jl")
62 changes: 62 additions & 0 deletions src/arraytypes/views.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

struct ViewElement
length::Int32
prefix::Int32
bufindex::Int32
offset::Int32
end

"""
Arrow.View
An `ArrowVector` where each element is a variable sized list of some kind, like an `AbstractVector` or `AbstractString`.
"""
struct View{T} <: ArrowVector{T}
arrow::Vector{UInt8} # need to hold a reference to arrow memory blob
validity::ValidityBitmap
data::Vector{ViewElement}
inline::Vector{UInt8} # `data` field reinterpreted as a byte array
buffers::Vector{Vector{UInt8}} # holds non-inlined data
::Int
metadata::Union{Nothing,Base.ImmutableDict{String,String}}
end

Base.size(l::View) = (l.ℓ,)

@propagate_inbounds function Base.getindex(l::View{T}, i::Integer) where {T}
@boundscheck checkbounds(l, i)
@inbounds v = l.data[i]
S = Base.nonmissingtype(T)
if S <: Base.CodeUnits
# BinaryView
return !l.validity[i] ? missing :
v.length < 13 ?
Base.CodeUnits(StringView(@view l.inline[(((i - 1) * 16) + 5):(((i - 1) * 16) + 5 + v.length - 1)])) :
Base.CodeUnits(StringView(@view l.buffers[v.bufindex + 1][(v.offset + 1):(v.offset + v.length)]))
else
# Utf8View
return !l.validity[i] ? missing :
v.length < 13 ?
ArrowTypes.fromarrow(T, StringView(@view l.inline[(((i - 1) * 16) + 5):(((i - 1) * 16) + 5 + v.length - 1)])) :
ArrowTypes.fromarrow(T, StringView(@view l.buffers[v.bufindex + 1][(v.offset + 1):(v.offset + v.length)]))
end
end

# @propagate_inbounds function Base.setindex!(l::List{T}, v, i::Integer) where {T}

# end
7 changes: 6 additions & 1 deletion src/metadata/Message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct RecordBatch <: FlatBuffers.Table
pos::Base.Int
end

Base.propertynames(x::RecordBatch) = (:length, :nodes, :buffers, :compression)
Base.propertynames(x::RecordBatch) = (:length, :nodes, :buffers, :compression, :variadicBufferCounts)

function Base.getproperty(x::RecordBatch, field::Symbol)
if field === :length
Expand All @@ -97,6 +97,11 @@ function Base.getproperty(x::RecordBatch, field::Symbol)
y = FlatBuffers.indirect(x, o + FlatBuffers.pos(x))
return FlatBuffers.init(BodyCompression, FlatBuffers.bytes(x), y)
end
elseif field === :variadicBufferCounts
o = FlatBuffers.offset(x, 12)
if o != 0
return FlatBuffers.Array{Int32}(x, o)
end
end
return nothing
end
Expand Down
92 changes: 59 additions & 33 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -621,32 +621,34 @@ buildmetadata(x::AbstractDict) = x

function Base.iterate(
x::VectorIterator,
(columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)),
(columnidx, nodeidx, bufferidx, varbufferidx)=(Int64(1), Int64(1), Int64(1), Int64(1)),
)
columnidx > length(x.schema.fields) && return nothing
field = x.schema.fields[columnidx]
@debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
A, nodeidx, bufferidx = build(
@debugv 2 "building top-level column: field = $(field), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx, varbufferidx = $varbufferidx"
A, nodeidx, bufferidx, varbufferidx = build(
field,
x.batch,
x.batch.msg.header,
x.dictencodings,
nodeidx,
bufferidx,
varbufferidx,
x.convert,
)
@debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx"
@debugv 2 "built top-level column: A = $(typeof(A)), columnidx = $columnidx, nodeidx = $nodeidx, bufferidx = $bufferidx, varbufferidx = $varbufferidx"
@debugv 3 A
return A, (columnidx + 1, nodeidx, bufferidx)
return A, (columnidx + 1, nodeidx, bufferidx, varbufferidx)
end

Base.length(x::VectorIterator) = length(x.schema.fields)

const ListTypes =
Union{Meta.Utf8,Meta.Utf8View,Meta.LargeUtf8,Meta.Binary,Meta.BinaryView,Meta.LargeBinary,Meta.List,Meta.ListView,Meta.LargeList,Meta.LargeListView}
Union{Meta.Utf8,Meta.LargeUtf8,Meta.Binary,Meta.LargeBinary,Meta.List,Meta.LargeList}
const LargeLists = Union{Meta.LargeUtf8,Meta.LargeBinary,Meta.LargeList,Meta.LargeListView}
const ViewTypes = Union{Meta.Utf8View,Meta.BinaryView,Meta.ListView,Meta.LargeListView}

function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert)
function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
d = field.dictionary
if d !== nothing
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
Expand All @@ -665,10 +667,10 @@ function build(field::Meta.Field, batch, rb, de, nodeidx, bufferidx, convert)
nodeidx += 1
bufferidx += 1
else
A, nodeidx, bufferidx =
build(field, field.type, batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx =
build(field, field.type, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
end
return A, nodeidx, bufferidx
return A, nodeidx, bufferidx, varbufferidx
end

function buildbitmap(batch, rb, nodeidx, bufferidx)
Expand Down Expand Up @@ -744,7 +746,7 @@ end

const SubVector{T,P} = SubArray{T,1,P,Tuple{UnitRange{Int64}},true}

function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
Expand All @@ -769,16 +771,38 @@ function build(f::Meta.Field, L::ListTypes, batch, rb, de, nodeidx, bufferidx, c
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx =
build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx =
build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
# juliaeltype returns Vector for List, translate to SubArray
S = Base.nonmissingtype(T)
if S <: Vector
ST = SubVector{eltype(A),typeof(A)}
T = S == T ? ST : Union{Missing,ST}
end
end
return List{T,OT,typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx
return List{T,OT,typeof(A)}(bytes, validity, offsets, A, len, meta), nodeidx, bufferidx, varbufferidx
end

function build(f::Meta.Field, L::ViewTypes, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
buffer = rb.buffers[bufferidx]
inline, views = reinterp(ViewElement, batch, buffer, rb.compression)
bufferidx += 1
buffers = Vector{UInt8}[]
for i = 1:rb.variadicBufferCounts[varbufferidx]
buffer = rb.buffers[bufferidx]
_, A = reinterp(UInt8, batch, buffer, rb.compression)
push!(buffers, A)
bufferidx += 1
end
varbufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return View{T}(batch.bytes, validity, views, inline, buffers, len, meta), nodeidx, bufferidx, varbufferidx
end

function build(
Expand All @@ -789,6 +813,7 @@ function build(
de,
nodeidx,
bufferidx,
varbufferidx,
convert,
)
@debugv 2 "building array: L = $L"
Expand All @@ -802,15 +827,15 @@ function build(
bufferidx += 1
else
bytes = UInt8[]
A, nodeidx, bufferidx =
build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx =
build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
end
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return FixedSizeList{T,typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx
return FixedSizeList{T,typeof(A)}(bytes, validity, A, len, meta), nodeidx, bufferidx, varbufferidx
end

function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
Expand All @@ -822,31 +847,31 @@ function build(f::Meta.Field, L::Meta.Map, batch, rb, de, nodeidx, bufferidx, co
bufferidx += 1
len = rb.nodes[nodeidx].length
nodeidx += 1
A, nodeidx, bufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx = build(f.children[1], batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return Map{T,OT,typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx
return Map{T,OT,typeof(A)}(validity, offsets, A, len, meta), nodeidx, bufferidx, varbufferidx
end

function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::Meta.Struct, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
len = rb.nodes[nodeidx].length
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx = build(child, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
fnames = ntuple(i -> Symbol(f.children[i].name), length(f.children))
return Struct{T,typeof(data),fnames}(validity, data, len, meta), nodeidx, bufferidx
return Struct{T,typeof(data),fnames}(validity, data, len, meta), nodeidx, bufferidx, varbufferidx
end

function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
buffer = rb.buffers[bufferidx]
bytes, typeIds = reinterp(UInt8, batch, buffer, rb.compression)
Expand All @@ -859,7 +884,7 @@ function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx,
vecs = []
nodeidx += 1
for child in f.children
A, nodeidx, bufferidx = build(child, batch, rb, de, nodeidx, bufferidx, convert)
A, nodeidx, bufferidx, varbufferidx = build(child, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
push!(vecs, A)
end
data = Tuple(vecs)
Expand All @@ -871,20 +896,21 @@ function build(f::Meta.Field, L::Meta.Union, batch, rb, de, nodeidx, bufferidx,
else
B = SparseUnion{T,UT,typeof(data)}(bytes, typeIds, data, meta)
end
return B, nodeidx, bufferidx
return B, nodeidx, bufferidx, varbufferidx
end

function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::Meta.Null, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
meta = buildmetadata(f.custom_metadata)
T = juliaeltype(f, meta, convert)
return NullVector{maybemissing(T)}(MissingVector(rb.nodes[nodeidx].length), meta),
nodeidx + 1,
bufferidx
nodeidx + 1,
bufferidx,
varbufferidx
end

# primitives
function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) where {L}
function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert) where {L}
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
Expand All @@ -897,10 +923,10 @@ function build(f::Meta.Field, ::L, batch, rb, de, nodeidx, bufferidx, convert) w
len = rb.nodes[nodeidx].length
T = juliaeltype(f, meta, convert)
@debugv 2 "final julia type for primitive: T = $T"
return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1
return Primitive(T, bytes, validity, A, len, meta), nodeidx + 1, bufferidx + 1, varbufferidx
end

function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, convert)
function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, varbufferidx, convert)
@debugv 2 "building array: L = $L"
validity = buildbitmap(batch, rb, nodeidx, bufferidx)
bufferidx += 1
Expand All @@ -925,5 +951,5 @@ function build(f::Meta.Field, L::Meta.Bool, batch, rb, de, nodeidx, bufferidx, c
end
len = rb.nodes[nodeidx].length
T = juliaeltype(f, meta, convert)
return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1
return BoolVector{T}(decodedbytes, pos, validity, len, meta), nodeidx + 1, bufferidx + 1, varbufferidx
end

0 comments on commit 43acd2a

Please sign in to comment.