1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "./arrow_types.h"
19 
20 #if defined(ARROW_R_WITH_ARROW)
21 #include <arrow/ipc/reader.h>
22 #include <arrow/ipc/writer.h>
23 
24 // [[arrow::export]]
ipc___Message__body_length(const std::unique_ptr<arrow::ipc::Message> & message)25 int64_t ipc___Message__body_length(const std::unique_ptr<arrow::ipc::Message>& message) {
26   return message->body_length();
27 }
28 
29 // [[arrow::export]]
ipc___Message__metadata(const std::unique_ptr<arrow::ipc::Message> & message)30 std::shared_ptr<arrow::Buffer> ipc___Message__metadata(
31     const std::unique_ptr<arrow::ipc::Message>& message) {
32   return message->metadata();
33 }
34 
35 // [[arrow::export]]
ipc___Message__body(const std::unique_ptr<arrow::ipc::Message> & message)36 std::shared_ptr<arrow::Buffer> ipc___Message__body(
37     const std::unique_ptr<arrow::ipc::Message>& message) {
38   return message->body();
39 }
40 
41 // [[arrow::export]]
ipc___Message__Verify(const std::unique_ptr<arrow::ipc::Message> & message)42 int64_t ipc___Message__Verify(const std::unique_ptr<arrow::ipc::Message>& message) {
43   return message->Verify();
44 }
45 
46 // [[arrow::export]]
ipc___Message__type(const std::unique_ptr<arrow::ipc::Message> & message)47 arrow::ipc::MessageType ipc___Message__type(
48     const std::unique_ptr<arrow::ipc::Message>& message) {
49   return message->type();
50 }
51 
52 // [[arrow::export]]
ipc___Message__Equals(const std::unique_ptr<arrow::ipc::Message> & x,const std::unique_ptr<arrow::ipc::Message> & y)53 bool ipc___Message__Equals(const std::unique_ptr<arrow::ipc::Message>& x,
54                            const std::unique_ptr<arrow::ipc::Message>& y) {
55   return x->Equals(*y);
56 }
57 
58 // [[arrow::export]]
ipc___ReadRecordBatch__Message__Schema(const std::unique_ptr<arrow::ipc::Message> & message,const std::shared_ptr<arrow::Schema> & schema)59 std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__Message__Schema(
60     const std::unique_ptr<arrow::ipc::Message>& message,
61     const std::shared_ptr<arrow::Schema>& schema) {
62   // TODO: perhaps this should come from the R side
63   arrow::ipc::DictionaryMemo memo;
64   auto batch = ValueOrStop(arrow::ipc::ReadRecordBatch(
65       *message, schema, &memo, arrow::ipc::IpcReadOptions::Defaults()));
66   return batch;
67 }
68 
69 // [[arrow::export]]
ipc___ReadSchema_InputStream(const std::shared_ptr<arrow::io::InputStream> & stream)70 std::shared_ptr<arrow::Schema> ipc___ReadSchema_InputStream(
71     const std::shared_ptr<arrow::io::InputStream>& stream) {
72   // TODO: promote to function argument
73   arrow::ipc::DictionaryMemo memo;
74   return ValueOrStop(arrow::ipc::ReadSchema(stream.get(), &memo));
75 }
76 
77 // [[arrow::export]]
ipc___ReadSchema_Message(const std::unique_ptr<arrow::ipc::Message> & message)78 std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(
79     const std::unique_ptr<arrow::ipc::Message>& message) {
80   arrow::ipc::DictionaryMemo empty_memo;
81   return ValueOrStop(arrow::ipc::ReadSchema(*message, &empty_memo));
82 }
83 
84 //--------- MessageReader
85 
86 // [[arrow::export]]
ipc___MessageReader__Open(const std::shared_ptr<arrow::io::InputStream> & stream)87 std::shared_ptr<arrow::ipc::MessageReader> ipc___MessageReader__Open(
88     const std::shared_ptr<arrow::io::InputStream>& stream) {
89   return std::shared_ptr<arrow::ipc::MessageReader>(
90       arrow::ipc::MessageReader::Open(stream));
91 }
92 
93 // [[arrow::export]]
ipc___MessageReader__ReadNextMessage(const std::unique_ptr<arrow::ipc::MessageReader> & reader)94 std::shared_ptr<arrow::ipc::Message> ipc___MessageReader__ReadNextMessage(
95     const std::unique_ptr<arrow::ipc::MessageReader>& reader) {
96   return ValueOrStop(reader->ReadNextMessage());
97 }
98 
99 // [[arrow::export]]
ipc___ReadMessage(const std::shared_ptr<arrow::io::InputStream> & stream)100 std::shared_ptr<arrow::ipc::Message> ipc___ReadMessage(
101     const std::shared_ptr<arrow::io::InputStream>& stream) {
102   return ValueOrStop(arrow::ipc::ReadMessage(stream.get()));
103 }
104 
105 #endif
106