1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18#' Create a (virtual) DuckDB table from an Arrow object
19#'
20#' This will do the necessary configuration to create a (virtual) table in DuckDB
21#' that is backed by the Arrow object given. No data is copied or modified until
22#' `collect()` or `compute()` are called or a query is run against the table.
23#'
24#' The result is a dbplyr-compatible object that can be used in d(b)plyr pipelines.
25#'
26#' If `auto_disconnect = TRUE`, the DuckDB table that is created will be configured
27#' to be unregistered when the `tbl` object is garbage collected. This is helpful
28#' if you don't want to have extra table objects in DuckDB after you've finished
29#' using them. Currently, this cleanup can, however, sometimes lead to hangs if
30#' tables are created and deleted in quick succession, hence the default value
31#' of `FALSE`
32#'
33#' @param .data the Arrow object (e.g. Dataset, Table) to use for the DuckDB table
34#' @param con a DuckDB connection to use (default will create one and store it
35#' in `options("arrow_duck_con")`)
36#' @param table_name a name to use in DuckDB for this object. The default is a
37#' unique string `"arrow_"` followed by numbers.
38#' @param auto_disconnect should the table be automatically cleaned up when the
39#' resulting object is removed (and garbage collected)? Default: `FALSE`
40#'
41#' @return A `tbl` of the new table in DuckDB
42#'
43#' @name to_duckdb
44#' @export
45#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")()
46#' library(dplyr)
47#'
48#' ds <- InMemoryDataset$create(mtcars)
49#'
50#' ds %>%
51#'   filter(mpg < 30) %>%
52#'   to_duckdb() %>%
53#'   group_by(cyl) %>%
54#'   summarize(mean_mpg = mean(mpg, na.rm = TRUE))
55to_duckdb <- function(.data,
56                      con = arrow_duck_connection(),
57                      table_name = unique_arrow_tablename(),
58                      auto_disconnect = FALSE) {
59  .data <- as_adq(.data)
60  duckdb::duckdb_register_arrow(con, table_name, .data)
61
62  tbl <- tbl(con, table_name)
63  groups <- dplyr::groups(.data)
64  if (length(groups)) {
65    tbl <- dplyr::group_by(tbl, groups)
66  }
67
68  if (auto_disconnect) {
69    # this will add the correct connection disconnection when the tbl is gced.
70    # we should probably confirm that this use of src$disco is kosher.
71    tbl$src$disco <- duckdb_disconnector(con, table_name)
72  }
73
74  tbl
75}
76
77arrow_duck_connection <- function() {
78  con <- getOption("arrow_duck_con")
79  if (is.null(con) || !DBI::dbIsValid(con)) {
80    con <- DBI::dbConnect(duckdb::duckdb())
81    # Use the same CPU count that the arrow library is set to
82    DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count()))
83    options(arrow_duck_con = con)
84  }
85  con
86}
87
88# helper function to determine if duckdb examples should run
89# see: https://github.com/r-lib/roxygen2/issues/1242
90run_duckdb_examples <- function() {
91  arrow_with_dataset() &&
92    requireNamespace("duckdb", quietly = TRUE) &&
93    packageVersion("duckdb") > "0.2.7" &&
94    requireNamespace("dplyr", quietly = TRUE) &&
95    requireNamespace("dbplyr", quietly = TRUE)
96}
97
98# Adapted from dbplyr
99unique_arrow_tablename <- function() {
100  i <- getOption("arrow_table_name", 0) + 1
101  options(arrow_table_name = i)
102  sprintf("arrow_%03i", i)
103}
104
105# Creates an environment that disconnects the database when it's GC'd
106duckdb_disconnector <- function(con, tbl_name) {
107  reg.finalizer(environment(), function(...) {
108    # remote the table we ephemerally created (though only if the connection is
109    # still valid)
110    if (DBI::dbIsValid(con)) {
111      duckdb::duckdb_unregister_arrow(con, tbl_name)
112    }
113
114    # and there are no more tables, so we can safely shutdown
115    if (length(DBI::dbListTables(con)) == 0) {
116      DBI::dbDisconnect(con, shutdown = TRUE)
117    }
118  })
119  environment()
120}
121
122#' Create an Arrow object from others
123#'
124#' This can be used in pipelines that pass data back and forth between Arrow and
125#' other processes (like DuckDB).
126#'
127#' @param .data the object to be converted
128#'
129#' @return an `arrow_dplyr_query` object, to be used in dplyr pipelines.
130#' @export
131#'
132#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")()
133#' library(dplyr)
134#'
135#' ds <- InMemoryDataset$create(mtcars)
136#'
137#' ds %>%
138#'   filter(mpg < 30) %>%
139#'   to_duckdb() %>%
140#'   group_by(cyl) %>%
141#'   summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>%
142#'   to_arrow() %>%
143#'   collect()
144to_arrow <- function(.data) {
145  # If this is an Arrow object already, return quickly since we're already Arrow
146  if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) {
147    return(.data)
148  }
149
150  # For now, we only handle .data from duckdb, so check that it is that if we've
151  # gotten this far
152  if (!inherits(dbplyr::remote_con(.data), "duckdb_connection")) {
153    stop(
154      "to_arrow() currently only supports Arrow tables, Arrow datasets, ",
155      "Arrow queries, or dbplyr tbls from duckdb connections",
156      call. = FALSE
157    )
158  }
159
160  # Run the query
161  res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
162
163  # TODO: we shouldn't need $read_table(), but we get segfaults when we do.
164  arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table())
165}
166