Skip to content

Commit

Permalink
feat: Importing large objects from client side (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
toppyy authored Feb 24, 2025
1 parent 930452a commit cea85e5
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 26 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresHasDefault)
export(postgresImportLargeObject)
export(postgresIsTransacting)
export(postgresWaitForNotify)
exportClasses(PqConnection)
Expand Down
33 changes: 33 additions & 0 deletions R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,36 @@ postgresWaitForNotify <- function(conn, timeout = 1) {
postgresIsTransacting <- function(conn) {
connection_is_transacting(conn@ptr)
}


#' Imports a large object from file
#'
#' Returns an object idenfier (Oid) for the imported large object
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param filepath a path to the large object to import
#' @param oid the oid to write to. Defaults to 0 which assigns an unused oid
#' @return the identifier of the large object, an integer
#' @examples
#' \dontrun{
#' con <- postgresDefault()
#' filepath <- 'your_image.png'
#' dbWithTransaction(con, {
#' oid <- postgresImportLargeObject(con, filepath)
#' })
#' }
postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) {

if (!postgresIsTransacting(conn)) {
stopc("Cannot import a large object outside of a transaction")
}

if (is.null(filepath)) stopc("'filepath' cannot be NULL")
if (is.null(oid)) stopc("'oid' cannot be NULL")
if (is.na(oid)) stopc("'oid' cannot be NA")
if (oid < 0) stopc("'oid' cannot be negative")

connection_import_lo_from_file(conn@ptr, filepath, oid)
}
4 changes: 4 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ connection_set_transacting <- function(con, transacting) {
invisible(.Call(`_RPostgres_connection_set_transacting`, con, transacting))
}

connection_import_lo_from_file <- function(con, filename, oid) {
.Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid)
}

connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}
Expand Down
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ reference:
- '`RPostgres-package`'
- postgresHasDefault
- postgresWaitForNotify
- postgresImportLargeObject

development:
mode: auto
Expand Down
31 changes: 31 additions & 0 deletions man/postgresImportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ bool DbConnection::has_query() {
return pCurrentResult_ != NULL;
}

Oid DbConnection::import_lo_from_file(std::string filename, Oid p_oid) {
Oid lo_oid = lo_import_with_oid(pConn_, filename.c_str(), p_oid);
if (lo_oid == InvalidOid) cpp11::stop(PQerrorMessage(pConn_));
return(lo_oid);
}

void DbConnection::copy_data(std::string sql, cpp11::list df) {
LOG_DEBUG << sql;

Expand Down
3 changes: 3 additions & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DbConnection : boost::noncopyable {
bool has_query();

void copy_data(std::string sql, cpp11::list df);

Oid import_lo_from_file(std::string file_path, Oid p_oid);


void check_connection();
cpp11::list info();
Expand Down
4 changes: 4 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ void connection_set_transacting(DbConnection* con, bool transacting) {
}

// Specific functions
[[cpp11::register]]
Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid) {
return con->import_lo_from_file(filename, oid);
}

[[cpp11::register]]
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) {
Expand Down
60 changes: 34 additions & 26 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ extern "C" SEXP _RPostgres_connection_set_transacting(SEXP con, SEXP transacting
END_CPP11
}
// connection.cpp
Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid oid);
extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filename, SEXP oid) {
BEGIN_CPP11
return cpp11::as_sexp(connection_import_lo_from_file(cpp11::as_cpp<cpp11::decay_t<DbConnection*>>(con), cpp11::as_cpp<cpp11::decay_t<std::string>>(filename), cpp11::as_cpp<cpp11::decay_t<Oid>>(oid)));
END_CPP11
}
// connection.cpp
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df);
extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) {
BEGIN_CPP11
Expand Down Expand Up @@ -197,32 +204,33 @@ extern "C" SEXP _RPostgres_result_column_info(SEXP res) {

extern "C" {
static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
{"_RPostgres_connection_is_transacting", (DL_FUNC) &_RPostgres_connection_is_transacting, 1},
{"_RPostgres_connection_quote_identifier", (DL_FUNC) &_RPostgres_connection_quote_identifier, 2},
{"_RPostgres_connection_quote_string", (DL_FUNC) &_RPostgres_connection_quote_string, 2},
{"_RPostgres_connection_release", (DL_FUNC) &_RPostgres_connection_release, 1},
{"_RPostgres_connection_set_temp_schema", (DL_FUNC) &_RPostgres_connection_set_temp_schema, 2},
{"_RPostgres_connection_set_transacting", (DL_FUNC) &_RPostgres_connection_set_transacting, 2},
{"_RPostgres_connection_valid", (DL_FUNC) &_RPostgres_connection_valid, 1},
{"_RPostgres_connection_wait_for_notify", (DL_FUNC) &_RPostgres_connection_wait_for_notify, 2},
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encode_vector", (DL_FUNC) &_RPostgres_encode_vector, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_bind", (DL_FUNC) &_RPostgres_result_bind, 2},
{"_RPostgres_result_column_info", (DL_FUNC) &_RPostgres_result_column_info, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
{"_RPostgres_result_has_completed", (DL_FUNC) &_RPostgres_result_has_completed, 1},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_rows_affected", (DL_FUNC) &_RPostgres_result_rows_affected, 1},
{"_RPostgres_result_rows_fetched", (DL_FUNC) &_RPostgres_result_rows_fetched, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{NULL, NULL, 0}
};
}
Expand Down
1 change: 1 addition & 0 deletions src/pch.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <cpp11.hpp>
#include <libpq-fe.h>
#include <libpq/libpq-fs.h>

#include <plogr.h>

Expand Down
1 change: 1 addition & 0 deletions tests/testthat/data/large_object.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
postgres
37 changes: 37 additions & 0 deletions tests/testthat/test-ImportLargeObject.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

test_that("can import and read a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
expect_gt(oid,0)
lo_data <- unlist(dbGetQuery(con, "select lo_get($1) as lo_data", params=list(oid))$lo_data[1])
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres'
expect_equal(lo_data, large_object_txt)
})


test_that("importing to an existing oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object.txt')
oid <- 1234
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })

expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path, oid) })
)
dbExecute(con, "select lo_unlink($1) as lo_data", params=list(oid))
})


test_that("import from a non-existing path throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(),'/data/large_object_that_does_not_exist.txt')
expect_error(
dbWithTransaction(con, { oid <- postgresImportLargeObject(con, test_file_path) })
)
})


0 comments on commit cea85e5

Please sign in to comment.