1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/io/transform.h"
19
20 #include <algorithm>
21 #include <cstring>
22 #include <mutex>
23 #include <random>
24 #include <thread>
25 #include <utility>
26
27 #include "arrow/buffer.h"
28 #include "arrow/io/util_internal.h"
29 #include "arrow/result.h"
30 #include "arrow/status.h"
31 #include "arrow/util/logging.h"
32
33 namespace arrow {
34 namespace io {
35
36 struct TransformInputStream::Impl {
37 std::shared_ptr<InputStream> wrapped_;
38 TransformInputStream::TransformFunc transform_;
39 std::shared_ptr<Buffer> pending_;
40 int64_t pos_ = 0;
41 bool closed_ = false;
42
Implarrow::io::TransformInputStream::Impl43 Impl(std::shared_ptr<InputStream> wrapped,
44 TransformInputStream::TransformFunc transform)
45 : wrapped_(std::move(wrapped)), transform_(std::move(transform)) {}
46
Closearrow::io::TransformInputStream::Impl47 void Close() {
48 closed_ = true;
49 pending_.reset();
50 }
51
CheckClosedarrow::io::TransformInputStream::Impl52 Status CheckClosed() const {
53 if (closed_) {
54 return Status::Invalid("Operation on closed file");
55 }
56 return Status::OK();
57 }
58 };
59
TransformInputStream(std::shared_ptr<InputStream> wrapped,TransformInputStream::TransformFunc transform)60 TransformInputStream::TransformInputStream(std::shared_ptr<InputStream> wrapped,
61 TransformInputStream::TransformFunc transform)
62 : impl_(new Impl{std::move(wrapped), std::move(transform)}) {}
63
~TransformInputStream()64 TransformInputStream::~TransformInputStream() {}
65
Close()66 Status TransformInputStream::Close() {
67 impl_->Close();
68 return impl_->wrapped_->Close();
69 }
70
Abort()71 Status TransformInputStream::Abort() { return impl_->wrapped_->Abort(); }
72
closed() const73 bool TransformInputStream::closed() const { return impl_->closed_; }
74
Read(int64_t nbytes)75 Result<std::shared_ptr<Buffer>> TransformInputStream::Read(int64_t nbytes) {
76 RETURN_NOT_OK(impl_->CheckClosed());
77
78 ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
79 ARROW_ASSIGN_OR_RAISE(auto bytes_read, this->Read(nbytes, buf->mutable_data()));
80 if (bytes_read < nbytes) {
81 RETURN_NOT_OK(buf->Resize(bytes_read, /*shrink_to_fit=*/true));
82 }
83 return std::shared_ptr<Buffer>(std::move(buf));
84 }
85
Read(int64_t nbytes,void * out)86 Result<int64_t> TransformInputStream::Read(int64_t nbytes, void* out) {
87 RETURN_NOT_OK(impl_->CheckClosed());
88
89 if (nbytes == 0) {
90 return 0;
91 }
92
93 int64_t avail_size = 0;
94 std::vector<std::shared_ptr<Buffer>> avail;
95 if (impl_->pending_) {
96 avail.push_back(impl_->pending_);
97 avail_size += impl_->pending_->size();
98 }
99 // Accumulate enough transformed data to satisfy read
100 while (avail_size < nbytes) {
101 ARROW_ASSIGN_OR_RAISE(auto buf, impl_->wrapped_->Read(nbytes));
102 const bool have_eof = (buf->size() == 0);
103 // Even if EOF is met, let the transform function run a last time
104 // (for example to flush internal buffers)
105 ARROW_ASSIGN_OR_RAISE(buf, impl_->transform_(std::move(buf)));
106 avail_size += buf->size();
107 avail.push_back(std::move(buf));
108 if (have_eof) {
109 break;
110 }
111 }
112 DCHECK(!avail.empty());
113
114 // Coalesce buffer data
115 uint8_t* out_data = reinterpret_cast<uint8_t*>(out);
116 int64_t copied_bytes = 0;
117 for (size_t i = 0; i < avail.size() - 1; ++i) {
118 // All buffers except the last fit fully into `nbytes`
119 const auto buf = std::move(avail[i]);
120 DCHECK_LE(buf->size(), nbytes);
121 memcpy(out_data, buf->data(), static_cast<size_t>(buf->size()));
122 out_data += buf->size();
123 nbytes -= buf->size();
124 copied_bytes += buf->size();
125 }
126 {
127 // Last buffer: splice into `out` and `pending_`
128 const auto buf = std::move(avail.back());
129 const int64_t to_copy = std::min(buf->size(), nbytes);
130 memcpy(out_data, buf->data(), static_cast<size_t>(to_copy));
131 copied_bytes += to_copy;
132 if (buf->size() > to_copy) {
133 impl_->pending_ = SliceBuffer(buf, to_copy);
134 } else {
135 impl_->pending_.reset();
136 }
137 }
138 impl_->pos_ += copied_bytes;
139 return copied_bytes;
140 }
141
Tell() const142 Result<int64_t> TransformInputStream::Tell() const {
143 RETURN_NOT_OK(impl_->CheckClosed());
144
145 return impl_->pos_;
146 }
147
ReadMetadata()148 Result<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadata() {
149 RETURN_NOT_OK(impl_->CheckClosed());
150
151 return impl_->wrapped_->ReadMetadata();
152 }
153
ReadMetadataAsync(const IOContext & io_context)154 Future<std::shared_ptr<const KeyValueMetadata>> TransformInputStream::ReadMetadataAsync(
155 const IOContext& io_context) {
156 RETURN_NOT_OK(impl_->CheckClosed());
157
158 return impl_->wrapped_->ReadMetadataAsync(io_context);
159 }
160
161 } // namespace io
162 } // namespace arrow
163