1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "./arrow_types.h"
19
20 #if defined(ARROW_R_WITH_ARROW)
21 #include <arrow/ipc/reader.h>
22 #include <arrow/ipc/writer.h>
23
24 // [[arrow::export]]
ipc___Message__body_length(const std::unique_ptr<arrow::ipc::Message> & message)25 int64_t ipc___Message__body_length(const std::unique_ptr<arrow::ipc::Message>& message) {
26 return message->body_length();
27 }
28
29 // [[arrow::export]]
ipc___Message__metadata(const std::unique_ptr<arrow::ipc::Message> & message)30 std::shared_ptr<arrow::Buffer> ipc___Message__metadata(
31 const std::unique_ptr<arrow::ipc::Message>& message) {
32 return message->metadata();
33 }
34
35 // [[arrow::export]]
ipc___Message__body(const std::unique_ptr<arrow::ipc::Message> & message)36 std::shared_ptr<arrow::Buffer> ipc___Message__body(
37 const std::unique_ptr<arrow::ipc::Message>& message) {
38 return message->body();
39 }
40
41 // [[arrow::export]]
ipc___Message__Verify(const std::unique_ptr<arrow::ipc::Message> & message)42 int64_t ipc___Message__Verify(const std::unique_ptr<arrow::ipc::Message>& message) {
43 return message->Verify();
44 }
45
46 // [[arrow::export]]
ipc___Message__type(const std::unique_ptr<arrow::ipc::Message> & message)47 arrow::ipc::MessageType ipc___Message__type(
48 const std::unique_ptr<arrow::ipc::Message>& message) {
49 return message->type();
50 }
51
52 // [[arrow::export]]
ipc___Message__Equals(const std::unique_ptr<arrow::ipc::Message> & x,const std::unique_ptr<arrow::ipc::Message> & y)53 bool ipc___Message__Equals(const std::unique_ptr<arrow::ipc::Message>& x,
54 const std::unique_ptr<arrow::ipc::Message>& y) {
55 return x->Equals(*y);
56 }
57
58 // [[arrow::export]]
ipc___ReadRecordBatch__Message__Schema(const std::unique_ptr<arrow::ipc::Message> & message,const std::shared_ptr<arrow::Schema> & schema)59 std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__Message__Schema(
60 const std::unique_ptr<arrow::ipc::Message>& message,
61 const std::shared_ptr<arrow::Schema>& schema) {
62 // TODO: perhaps this should come from the R side
63 arrow::ipc::DictionaryMemo memo;
64 auto batch = ValueOrStop(arrow::ipc::ReadRecordBatch(
65 *message, schema, &memo, arrow::ipc::IpcReadOptions::Defaults()));
66 return batch;
67 }
68
69 // [[arrow::export]]
ipc___ReadSchema_InputStream(const std::shared_ptr<arrow::io::InputStream> & stream)70 std::shared_ptr<arrow::Schema> ipc___ReadSchema_InputStream(
71 const std::shared_ptr<arrow::io::InputStream>& stream) {
72 // TODO: promote to function argument
73 arrow::ipc::DictionaryMemo memo;
74 return ValueOrStop(arrow::ipc::ReadSchema(stream.get(), &memo));
75 }
76
77 // [[arrow::export]]
ipc___ReadSchema_Message(const std::unique_ptr<arrow::ipc::Message> & message)78 std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(
79 const std::unique_ptr<arrow::ipc::Message>& message) {
80 arrow::ipc::DictionaryMemo empty_memo;
81 return ValueOrStop(arrow::ipc::ReadSchema(*message, &empty_memo));
82 }
83
84 //--------- MessageReader
85
86 // [[arrow::export]]
ipc___MessageReader__Open(const std::shared_ptr<arrow::io::InputStream> & stream)87 std::shared_ptr<arrow::ipc::MessageReader> ipc___MessageReader__Open(
88 const std::shared_ptr<arrow::io::InputStream>& stream) {
89 return std::shared_ptr<arrow::ipc::MessageReader>(
90 arrow::ipc::MessageReader::Open(stream));
91 }
92
93 // [[arrow::export]]
ipc___MessageReader__ReadNextMessage(const std::unique_ptr<arrow::ipc::MessageReader> & reader)94 std::shared_ptr<arrow::ipc::Message> ipc___MessageReader__ReadNextMessage(
95 const std::unique_ptr<arrow::ipc::MessageReader>& reader) {
96 return ValueOrStop(reader->ReadNextMessage());
97 }
98
99 // [[arrow::export]]
ipc___ReadMessage(const std::shared_ptr<arrow::io::InputStream> & stream)100 std::shared_ptr<arrow::ipc::Message> ipc___ReadMessage(
101 const std::shared_ptr<arrow::io::InputStream>& stream) {
102 return ValueOrStop(arrow::ipc::ReadMessage(stream.get()));
103 }
104
105 #endif
106