1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18#' Create a (virtual) DuckDB table from an Arrow object 19#' 20#' This will do the necessary configuration to create a (virtual) table in DuckDB 21#' that is backed by the Arrow object given. No data is copied or modified until 22#' `collect()` or `compute()` are called or a query is run against the table. 23#' 24#' The result is a dbplyr-compatible object that can be used in d(b)plyr pipelines. 25#' 26#' If `auto_disconnect = TRUE`, the DuckDB table that is created will be configured 27#' to be unregistered when the `tbl` object is garbage collected. This is helpful 28#' if you don't want to have extra table objects in DuckDB after you've finished 29#' using them. Currently, this cleanup can, however, sometimes lead to hangs if 30#' tables are created and deleted in quick succession, hence the default value 31#' of `FALSE` 32#' 33#' @param .data the Arrow object (e.g. Dataset, Table) to use for the DuckDB table 34#' @param con a DuckDB connection to use (default will create one and store it 35#' in `options("arrow_duck_con")`) 36#' @param table_name a name to use in DuckDB for this object. The default is a 37#' unique string `"arrow_"` followed by numbers. 38#' @param auto_disconnect should the table be automatically cleaned up when the 39#' resulting object is removed (and garbage collected)? Default: `FALSE` 40#' 41#' @return A `tbl` of the new table in DuckDB 42#' 43#' @name to_duckdb 44#' @export 45#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")() 46#' library(dplyr) 47#' 48#' ds <- InMemoryDataset$create(mtcars) 49#' 50#' ds %>% 51#' filter(mpg < 30) %>% 52#' to_duckdb() %>% 53#' group_by(cyl) %>% 54#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) 55to_duckdb <- function(.data, 56 con = arrow_duck_connection(), 57 table_name = unique_arrow_tablename(), 58 auto_disconnect = FALSE) { 59 .data <- as_adq(.data) 60 duckdb::duckdb_register_arrow(con, table_name, .data) 61 62 tbl <- tbl(con, table_name) 63 groups <- dplyr::groups(.data) 64 if (length(groups)) { 65 tbl <- dplyr::group_by(tbl, groups) 66 } 67 68 if (auto_disconnect) { 69 # this will add the correct connection disconnection when the tbl is gced. 70 # we should probably confirm that this use of src$disco is kosher. 71 tbl$src$disco <- duckdb_disconnector(con, table_name) 72 } 73 74 tbl 75} 76 77arrow_duck_connection <- function() { 78 con <- getOption("arrow_duck_con") 79 if (is.null(con) || !DBI::dbIsValid(con)) { 80 con <- DBI::dbConnect(duckdb::duckdb()) 81 # Use the same CPU count that the arrow library is set to 82 DBI::dbExecute(con, paste0("PRAGMA threads=", cpu_count())) 83 options(arrow_duck_con = con) 84 } 85 con 86} 87 88# helper function to determine if duckdb examples should run 89# see: https://github.com/r-lib/roxygen2/issues/1242 90run_duckdb_examples <- function() { 91 arrow_with_dataset() && 92 requireNamespace("duckdb", quietly = TRUE) && 93 packageVersion("duckdb") > "0.2.7" && 94 requireNamespace("dplyr", quietly = TRUE) && 95 requireNamespace("dbplyr", quietly = TRUE) 96} 97 98# Adapted from dbplyr 99unique_arrow_tablename <- function() { 100 i <- getOption("arrow_table_name", 0) + 1 101 options(arrow_table_name = i) 102 sprintf("arrow_%03i", i) 103} 104 105# Creates an environment that disconnects the database when it's GC'd 106duckdb_disconnector <- function(con, tbl_name) { 107 reg.finalizer(environment(), function(...) { 108 # remote the table we ephemerally created (though only if the connection is 109 # still valid) 110 if (DBI::dbIsValid(con)) { 111 duckdb::duckdb_unregister_arrow(con, tbl_name) 112 } 113 114 # and there are no more tables, so we can safely shutdown 115 if (length(DBI::dbListTables(con)) == 0) { 116 DBI::dbDisconnect(con, shutdown = TRUE) 117 } 118 }) 119 environment() 120} 121 122#' Create an Arrow object from others 123#' 124#' This can be used in pipelines that pass data back and forth between Arrow and 125#' other processes (like DuckDB). 126#' 127#' @param .data the object to be converted 128#' 129#' @return an `arrow_dplyr_query` object, to be used in dplyr pipelines. 130#' @export 131#' 132#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")() 133#' library(dplyr) 134#' 135#' ds <- InMemoryDataset$create(mtcars) 136#' 137#' ds %>% 138#' filter(mpg < 30) %>% 139#' to_duckdb() %>% 140#' group_by(cyl) %>% 141#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>% 142#' to_arrow() %>% 143#' collect() 144to_arrow <- function(.data) { 145 # If this is an Arrow object already, return quickly since we're already Arrow 146 if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) { 147 return(.data) 148 } 149 150 # For now, we only handle .data from duckdb, so check that it is that if we've 151 # gotten this far 152 if (!inherits(dbplyr::remote_con(.data), "duckdb_connection")) { 153 stop( 154 "to_arrow() currently only supports Arrow tables, Arrow datasets, ", 155 "Arrow queries, or dbplyr tbls from duckdb connections", 156 call. = FALSE 157 ) 158 } 159 160 # Run the query 161 res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE) 162 163 # TODO: we shouldn't need $read_table(), but we get segfaults when we do. 164 arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table()) 165} 166