1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# Determines the total number of bytes needed to store `n` bytes with padding. 18# Note that the Arrow standard requires buffers to be aligned to 8-byte boundaries. 19padding(n::Integer, alignment) = ((n + alignment - 1) ÷ alignment) * alignment 20 21paddinglength(n::Integer, alignment) = padding(n, alignment) - n 22 23function writezeros(io::IO, n::Integer) 24 s = 0 25 for i ∈ 1:n 26 s += Base.write(io, 0x00) 27 end 28 s 29end 30 31# efficient writing of arrays 32writearray(io, col) = writearray(io, maybemissing(eltype(col)), col) 33 34function writearray(io::IO, ::Type{T}, col) where {T} 35 if col isa Vector{T} 36 n = Base.write(io, col) 37 elseif isbitstype(T) && (col isa Vector{Union{T, Missing}} || col isa SentinelVector{T, T, Missing, Vector{T}}) 38 # need to write the non-selector bytes of isbits Union Arrays 39 n = Base.unsafe_write(io, pointer(col), sizeof(T) * length(col)) 40 elseif col isa ChainedVector 41 n = 0 42 for A in col.arrays 43 n += writearray(io, T, A) 44 end 45 else 46 n = 0 47 data = Vector{UInt8}(undef, sizeof(col)) 48 buf = IOBuffer(data; write=true) 49 for x in col 50 n += Base.write(buf, coalesce(x, ArrowTypes.default(T))) 51 end 52 n = Base.write(io, take!(buf)) 53 end 54 return n 55end 56 57getbit(v::UInt8, n::Integer) = Bool((v & 0x02^(n - 1)) >> (n - 1)) 58 59function setbit(v::UInt8, b::Bool, n::Integer) 60 if b 61 v | 0x02^(n - 1) 62 else 63 v & (0xff ⊻ 0x02^(n - 1)) 64 end 65end 66 67# Determines the number of bytes used by `n` bits, optionally with padding. 68function bitpackedbytes(n::Integer, alignment) 69 ℓ = cld(n, 8) 70 return ℓ + paddinglength(ℓ, alignment) 71end 72 73# count # of missing elements in an iterable 74nullcount(col) = count(ismissing, col) 75 76# like startswith/endswith for strings, but on byte buffers 77function _startswith(a::AbstractVector{UInt8}, pos::Integer, b::AbstractVector{UInt8}) 78 for i = 1:length(b) 79 @inbounds check = a[pos + i - 1] == b[i] 80 check || return false 81 end 82 return true 83end 84 85function _endswith(a::AbstractVector{UInt8}, endpos::Integer, b::AbstractVector{UInt8}) 86 aoff = endpos - length(b) + 1 87 for i = 1:length(b) 88 @inbounds check = a[aoff] == b[i] 89 check || return false 90 aoff += 1 91 end 92 return true 93end 94 95# read a single element from a byte vector 96# copied from read(::IOBuffer, T) in Base 97function readbuffer(t::AbstractVector{UInt8}, pos::Integer, ::Type{T}) where {T} 98 GC.@preserve t begin 99 ptr::Ptr{T} = pointer(t, pos) 100 x = unsafe_load(ptr) 101 end 102end 103 104# given a number of unique values; what dict encoding _index_ type is most appropriate 105encodingtype(n) = n < div(typemax(Int8), 2) ? Int8 : n < div(typemax(Int16), 2) ? Int16 : n < div(typemax(Int32), 2) ? Int32 : Int64 106 107# lazily call convert(T, x) on getindex for each x in data 108struct Converter{T, A} <: AbstractVector{T} 109 data::A 110end 111 112converter(::Type{T}, x::A) where {T, A} = Converter{eltype(A) >: Missing ? Union{T, Missing} : T, A}(x) 113converter(::Type{T}, x::ChainedVector{A}) where {T, A} = ChainedVector([converter(T, x) for x in x.arrays]) 114 115Base.IndexStyle(::Type{<:Converter}) = Base.IndexLinear() 116Base.size(x::Converter) = (length(x.data),) 117Base.eltype(x::Converter{T, A}) where {T, A} = T 118Base.getindex(x::Converter{T}, i::Int) where {T} = ArrowTypes.arrowconvert(T, getindex(x.data, i)) 119 120maybemissing(::Type{T}) where {T} = T === Missing ? Missing : Base.nonmissingtype(T) 121 122function getfooter(filebytes) 123 len = readbuffer(filebytes, length(filebytes) - 9, Int32) 124 FlatBuffers.getrootas(Meta.Footer, filebytes[end-(9 + len):end-10], 0) 125end 126 127function getrb(filebytes) 128 f = getfooter(filebytes) 129 rb = f.recordBatches[1] 130 return filebytes[rb.offset+1:(rb.offset+1+rb.metaDataLength)] 131 # FlatBuffers.getrootas(Meta.Message, filebytes, rb.offset) 132end 133 134function readmessage(filebytes, off=9) 135 @assert readbuffer(filebytes, off, UInt32) === 0xFFFFFFFF 136 len = readbuffer(filebytes, off + 4, Int32) 137 138 FlatBuffers.getrootas(Meta.Message, filebytes, off + 8) 139end 140 141# a custom Channel type that only allows put!-ing objects in a specific, monotonically increasing order 142struct OrderedChannel{T} 143 chan::Channel{T} 144 cond::Threads.Condition 145 i::Ref{Int} 146end 147 148OrderedChannel{T}(sz) where {T} = OrderedChannel{T}(Channel{T}(sz), Threads.Condition(), Ref(1)) 149Base.iterate(ch::OrderedChannel, st...) = iterate(ch.chan, st...) 150 151macro lock(obj, expr) 152 esc(quote 153 lock($obj) 154 try 155 $expr 156 finally 157 unlock($obj) 158 end 159 end) 160end 161 162# when put!-ing an object, operation may have to wait until other tasks have put their 163# objects to ensure the channel is ordered correctly 164function Base.put!(ch::OrderedChannel{T}, x::T, i::Integer, incr::Bool=false) where {T} 165 @lock ch.cond begin 166 while ch.i[] < i 167 # channel index too early, need to wait for other tasks to put their objects first 168 wait(ch.cond) 169 end 170 # now it's our turn 171 put!(ch.chan, x) 172 if incr 173 ch.i[] += 1 174 end 175 # wake up tasks that may be waiting to put their objects 176 notify(ch.cond) 177 end 178 return 179end 180 181function Base.close(ch::OrderedChannel) 182 @lock ch.cond begin 183 # just need to ensure any tasks waiting to put their tasks have had a chance to put 184 while Base.n_waiters(ch.cond) > 0 185 wait(ch.cond) 186 end 187 close(ch.chan) 188 end 189 return 190end 191 192struct Lockable{T} 193 x::T 194 lock::ReentrantLock 195end 196 197Lockable(x::T) where {T} = Lockable{T}(x, ReentrantLock()) 198 199Base.lock(x::Lockable) = lock(x.lock) 200Base.unlock(x::Lockable) = unlock(x.lock) 201