1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.arrow.dataset.jni;
19 
20 import java.io.IOException;
21 
22 import org.apache.arrow.dataset.source.DatasetFactory;
23 import org.apache.arrow.memory.BufferAllocator;
24 import org.apache.arrow.vector.types.pojo.Schema;
25 import org.apache.arrow.vector.util.SchemaUtility;
26 
27 /**
28  * Native implementation of {@link DatasetFactory}.
29  */
30 public class NativeDatasetFactory implements DatasetFactory {
31   private final long datasetFactoryId;
32   private final NativeMemoryPool memoryPool;
33   private final BufferAllocator allocator;
34 
35   private boolean closed = false;
36 
37   /**
38    * Constructor.
39    *
40    * @param allocator a context allocator associated with this factory. Any buffer that will be created natively will
41    *                  be then bound to this allocator.
42    * @param memoryPool the native memory pool associated with this factory. Any buffer created natively should request
43    *                   for memory spaces from this memory pool. This is a mapped instance of c++ arrow::MemoryPool.
44    * @param datasetFactoryId an ID, at the same time the native pointer of the underlying native instance of this
45    *                         factory. Make sure in c++ side  the pointer is pointing to the shared pointer wrapping
46    *                         the actual instance so we could successfully decrease the reference count once
47    *                         {@link #close} is called.
48    * @see #close()
49    */
NativeDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, long datasetFactoryId)50   public NativeDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, long datasetFactoryId) {
51     this.allocator = allocator;
52     this.memoryPool = memoryPool;
53     this.datasetFactoryId = datasetFactoryId;
54   }
55 
56   @Override
inspect()57   public Schema inspect() {
58     final byte[] buffer;
59     synchronized (this) {
60       if (closed) {
61         throw new NativeInstanceReleasedException();
62       }
63       buffer = JniWrapper.get().inspectSchema(datasetFactoryId);
64     }
65     try {
66       return SchemaUtility.deserialize(buffer, allocator);
67     } catch (IOException e) {
68       throw new RuntimeException(e);
69     }
70   }
71 
72   @Override
finish()73   public NativeDataset finish() {
74     return finish(inspect());
75   }
76 
77   @Override
finish(Schema schema)78   public NativeDataset finish(Schema schema) {
79     try {
80       byte[] serialized = SchemaUtility.serialize(schema);
81       synchronized (this) {
82         if (closed) {
83           throw new NativeInstanceReleasedException();
84         }
85         return new NativeDataset(new NativeContext(allocator, memoryPool),
86             JniWrapper.get().createDataset(datasetFactoryId, serialized));
87       }
88     } catch (IOException e) {
89       throw new RuntimeException(e);
90     }
91   }
92 
93   /**
94    * Close this factory by release the pointer of the native instance.
95    */
96   @Override
close()97   public synchronized void close() {
98     if (closed) {
99       return;
100     }
101     closed = true;
102     JniWrapper.get().closeDatasetFactory(datasetFactoryId);
103   }
104 }
105