1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // A command line executable that generates a bunch of valid IPC files
19 // containing example record batches.  Those are used as fuzzing seeds
20 // to make fuzzing more efficient.
21 
22 #include <cstdlib>
23 #include <iostream>
24 #include <memory>
25 #include <string>
26 #include <vector>
27 
28 #include "arrow/io/file.h"
29 #include "arrow/io/memory.h"
30 #include "arrow/ipc/json_simple.h"
31 #include "arrow/ipc/test_common.h"
32 #include "arrow/ipc/writer.h"
33 #include "arrow/record_batch.h"
34 #include "arrow/result.h"
35 #include "arrow/testing/extension_type.h"
36 #include "arrow/util/io_util.h"
37 #include "arrow/util/key_value_metadata.h"
38 
39 namespace arrow {
40 namespace ipc {
41 
42 using ::arrow::internal::CreateDir;
43 using ::arrow::internal::PlatformFilename;
44 using internal::json::ArrayFromJSON;
45 
MakeExtensionBatch()46 Result<std::shared_ptr<RecordBatch>> MakeExtensionBatch() {
47   auto array = ExampleUuid();
48   auto md = key_value_metadata({"key1", "key2"}, {"value1", ""});
49   auto schema = ::arrow::schema({field("f0", array->type())}, md);
50   return RecordBatch::Make(schema, array->length(), {array});
51 }
52 
MakeMapBatch()53 Result<std::shared_ptr<RecordBatch>> MakeMapBatch() {
54   std::shared_ptr<Array> array;
55   const char* json_input = R"(
56 [
57     [[0, 1], [1, 1], [2, 2], [3, 3], [4, 5], [5, 8]],
58     null,
59     [[0, null], [1, null], [2, 0], [3, 1], [4, null], [5, 2]],
60     []
61   ]
62 )";
63   RETURN_NOT_OK(ArrayFromJSON(map(int16(), int32()), json_input, &array));
64   auto schema = ::arrow::schema({field("f0", array->type())});
65   return RecordBatch::Make(schema, array->length(), {array});
66 }
67 
Batches()68 Result<std::vector<std::shared_ptr<RecordBatch>>> Batches() {
69   std::vector<std::shared_ptr<RecordBatch>> batches;
70   std::shared_ptr<RecordBatch> batch;
71   std::shared_ptr<Array> array;
72 
73   RETURN_NOT_OK(test::MakeNullRecordBatch(&batch));
74   batches.push_back(batch);
75   RETURN_NOT_OK(test::MakeListRecordBatch(&batch));
76   batches.push_back(batch);
77   RETURN_NOT_OK(test::MakeDictionary(&batch));
78   batches.push_back(batch);
79   RETURN_NOT_OK(test::MakeTimestamps(&batch));
80   batches.push_back(batch);
81   RETURN_NOT_OK(test::MakeFWBinary(&batch));
82   batches.push_back(batch);
83   RETURN_NOT_OK(test::MakeStruct(&batch));
84   batches.push_back(batch);
85   RETURN_NOT_OK(test::MakeUnion(&batch));
86   batches.push_back(batch);
87   RETURN_NOT_OK(test::MakeFixedSizeListRecordBatch(&batch));
88   batches.push_back(batch);
89   ARROW_ASSIGN_OR_RAISE(batch, MakeExtensionBatch());
90   batches.push_back(batch);
91   ARROW_ASSIGN_OR_RAISE(batch, MakeMapBatch());
92   batches.push_back(batch);
93 
94   return batches;
95 }
96 
SerializeRecordBatch(const std::shared_ptr<RecordBatch> & batch,bool is_stream_format)97 Result<std::shared_ptr<Buffer>> SerializeRecordBatch(
98     const std::shared_ptr<RecordBatch>& batch, bool is_stream_format) {
99   ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create(1024));
100   std::shared_ptr<RecordBatchWriter> writer;
101   if (is_stream_format) {
102     ARROW_ASSIGN_OR_RAISE(writer, MakeStreamWriter(sink, batch->schema()));
103   } else {
104     ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(sink, batch->schema()));
105   }
106   RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
107   RETURN_NOT_OK(writer->Close());
108   return sink->Finish();
109 }
110 
DoMain(bool is_stream_format,const std::string & out_dir)111 Status DoMain(bool is_stream_format, const std::string& out_dir) {
112   ARROW_ASSIGN_OR_RAISE(auto dir_fn, PlatformFilename::FromString(out_dir));
113   RETURN_NOT_OK(CreateDir(dir_fn));
114 
115   int sample_num = 1;
116   auto sample_name = [&]() -> std::string {
117     return "batch-" + std::to_string(sample_num++);
118   };
119 
120   ARROW_ASSIGN_OR_RAISE(auto batches, Batches());
121 
122   for (const auto& batch : batches) {
123     RETURN_NOT_OK(batch->ValidateFull());
124     ARROW_ASSIGN_OR_RAISE(auto buf, SerializeRecordBatch(batch, is_stream_format));
125     ARROW_ASSIGN_OR_RAISE(auto sample_fn, dir_fn.Join(sample_name()));
126     std::cerr << sample_fn.ToString() << std::endl;
127     ARROW_ASSIGN_OR_RAISE(auto file, io::FileOutputStream::Open(sample_fn.ToString()));
128     RETURN_NOT_OK(file->Write(buf));
129     RETURN_NOT_OK(file->Close());
130   }
131   return Status::OK();
132 }
133 
Usage()134 ARROW_NORETURN void Usage() {
135   std::cerr << "Usage: arrow-ipc-generate-fuzz-corpus "
136             << "[-stream|-file] <output directory>" << std::endl;
137   std::exit(2);
138 }
139 
Main(int argc,char ** argv)140 int Main(int argc, char** argv) {
141   if (argc != 3) {
142     Usage();
143   }
144   auto opt = std::string(argv[1]);
145   if (opt != "-stream" && opt != "-file") {
146     Usage();
147   }
148   auto out_dir = std::string(argv[2]);
149 
150   Status st = DoMain(opt == "-stream", out_dir);
151   if (!st.ok()) {
152     std::cerr << st.ToString() << std::endl;
153     return 1;
154   }
155   return 0;
156 }
157 
158 }  // namespace ipc
159 }  // namespace arrow
160 
main(int argc,char ** argv)161 int main(int argc, char** argv) { return arrow::ipc::Main(argc, argv); }
162