Last active
January 27, 2026 20:57
-
-
Save jrosell/e4820231d3c234b2c9cd02bf41f17af1 to your computer and use it in GitHub Desktop.
Example of an async JSON API in R using dbplyr, SQLite, S7, plumber2 and mirai.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| stopifnot(requireNamespace("rlang")) | |
| rlang::check_installed("pak") | |
| pkgs <- rlang::chr( | |
| "rlang" = "rlang", | |
| "plumber2" = "posit-dev/plumber2", | |
| "S7", | |
| "jsonlite", | |
| "httr2", | |
| "mirai", | |
| "uuid", | |
| "DBI", | |
| "dbplyr", | |
| ) | |
| # pak::pak(pkgs) | |
| libs <- ifelse(names(pkgs) == "", pkgs, names(pkgs)) | |
| lapply(libs, library, quiet = TRUE, character.only = TRUE) |> invisible() | |
| # Server side code ----- | |
| User <- new_class( | |
| "User", | |
| properties = list( | |
| id = class_character, | |
| name = class_character, | |
| age = class_integer | |
| ) | |
| ) | |
| class_dump_df <- \(object) { | |
| unclass(object) |> | |
| attributes() |> | |
| as.list() |> | |
| purrr::list_modify(S7_class = purrr::zap()) |> | |
| as.data.frame() | |
| } | |
| user_cols <- User() |> class_dump_df() | |
| example_user <- User(id = "1", name = "example_user", age = 20L) | |
| example_user_df <- class_dump_df(example_user) | |
| example_user2 <- User(id = "2", name = "example_user2", age = 20L) | |
| example_user2_df <- class_dump_df(example_user2) | |
| file <- tempfile() | |
| con <- dbConnect(RSQLite::SQLite(), file) | |
| dbWriteTable( | |
| con, | |
| name = "users", | |
| value = user_cols | |
| ) | |
| DBI::dbListTables(con) | |
| DBI::dbGetQuery(con, "SELECT * FROM users", n = 0) |> names() | |
| library(dbplyr) | |
| users_tbl <- dplyr::tbl(con, "users") | |
| # Creating some example data | |
| dplyr::rows_insert( | |
| users_tbl, | |
| copy_inline(con, example_user_df), | |
| conflict = "ignore", | |
| in_place = TRUE, | |
| by = "id" | |
| ) | |
| DBI::dbGetQuery(con, "SELECT * FROM users") | |
| dplyr::rows_insert( | |
| users_tbl, | |
| copy_inline(con, example_user2_df), | |
| conflict = "ignore", | |
| in_place = TRUE, | |
| by = "id" | |
| ) | |
| dplyr::tbl(con, "users") |> | |
| dplyr::collect() |> | |
| purrr::pmap(function(...) list(...)) | |
| DBI::dbDisconnect(con) | |
| mirai::daemons(5) | |
| mirai::everywhere(file = file, { | |
| library(DBI) | |
| library(dbplyr) | |
| library(purrr) | |
| library(uuid) | |
| library(glue) | |
| con <<- dbConnect(RSQLite::SQLite(), file) | |
| }) | |
| if (exists("app")) { | |
| app |> api_stop() | |
| } | |
| app <- api() |> | |
| api_get( | |
| "/users", | |
| serializers = get_serializers("unboxedJSON"), | |
| handler = function(request, response, body) { | |
| mirai( | |
| { | |
| dplyr::tbl(con, "users") |> | |
| dplyr::collect() |> | |
| purrr::pmap(function(...) list(...)) | |
| } | |
| ) | |
| } | |
| ) |> | |
| api_post( | |
| path = "/users", | |
| parsers = get_parsers("json"), | |
| serializers = get_serializers("unboxedJSON"), | |
| handler = function(request, response, body) { | |
| mirai( | |
| { | |
| result_list <- tryCatch( | |
| { | |
| if ("id" %in% names(body)) { | |
| return(list( | |
| error = TRUE, | |
| message = "Don't send an ID." | |
| )) | |
| } | |
| body$id <- uuid::UUIDgenerate() | |
| user <- rlang::exec(User, !!!body) | |
| user_df <- class_dump_df(user) | |
| dplyr::rows_insert( | |
| dplyr::tbl(con, "users"), | |
| dbplyr::copy_inline(con, user_df), | |
| conflict = "ignore", | |
| in_place = TRUE, | |
| by = "id" | |
| ) | |
| updated_user_df <- dplyr::tbl(con, "users") |> | |
| dplyr::filter(id == body$id) |> | |
| dplyr::collect() | |
| if (nrow(updated_user_df) == 0) { | |
| return(list( | |
| error = TRUE, | |
| message = "The new user is not found in the database after insertion." | |
| )) | |
| } | |
| updated_user_df |> as.list() | |
| }, | |
| error = \(e) { | |
| list( | |
| error = TRUE, | |
| message = conditionMessage(e) | |
| ) | |
| } | |
| ) | |
| result_list | |
| }, | |
| User = User, | |
| class_dump_df = class_dump_df, | |
| body = body | |
| ) | |
| } | |
| ) |> | |
| api_run(showcase = FALSE, block = TRUE) | |
| # Client side code ----- | |
| library(httr2) | |
| res <- request("http://127.0.0.1:8080/users") |> | |
| req_perform() |> | |
| resp_body_json() | |
| res | |
| # [[1]] | |
| # [[1]]$id | |
| # [1] "1" | |
| # [[1]]$name | |
| # [1] "example_user" | |
| # [[1]]$age | |
| # [1] 20 | |
| # [[2]] | |
| # [[2]]$id | |
| # [1] "2" | |
| # [[2]]$name | |
| # [1] "example_user2" | |
| # [[2]]$age | |
| # [1] 20 | |
| str(res) | |
| # List of 2 | |
| # $ :List of 3 | |
| # ..$ id : chr "1" | |
| # ..$ name: chr "example_user" | |
| # ..$ age : int 20 | |
| # $ :List of 3 | |
| # ..$ id : chr "2" | |
| # ..$ name: chr "example_user2" | |
| # ..$ age : int 20 | |
| dplyr::bind_rows(res) | |
| # # A tibble: 2 × 3 | |
| # id name age | |
| # <chr> <chr> <int> | |
| # 1 1 example_user 20 | |
| # 2 2 example_user2 20 | |
| request("http://127.0.0.1:8080/users") |> | |
| req_body_json(list( | |
| id = "3", | |
| name = "Alice", | |
| age = 30L | |
| )) |> | |
| req_perform() |> | |
| resp_body_json() | |
| # $error | |
| # [1] TRUE | |
| # $message | |
| # [1] "Don't send an ID." | |
| request("http://127.0.0.1:8080/users") |> | |
| req_body_json(list( | |
| name = "Alice2", | |
| age = "20" | |
| )) |> | |
| req_perform() |> | |
| resp_body_json() | |
| # $error | |
| # [1] TRUE | |
| # $message | |
| # [1] "<User> object properties are invalid:\n- @age must be <integer>, not <character>" | |
| request("http://127.0.0.1:8080/users") |> | |
| req_body_json(list( | |
| name = "Alice3", | |
| age = 20L | |
| )) |> | |
| req_perform() |> | |
| resp_body_json() | |
| # $id | |
| # [1] "173c3215-119d-40b7-8731-4925ec397baf" | |
| # $name | |
| # [1] "Alice3" | |
| # $age | |
| # [1] 20 | |
| request("http://127.0.0.1:8080/users") |> | |
| req_perform() |> | |
| resp_body_json() |> | |
| dplyr::bind_rows() | |
| # # A tibble: 3 × 3 | |
| # id name age | |
| # <chr> <chr> <int> | |
| # 1 1 example_user 20 | |
| # 2 2 example_user2 20 | |
| # 3 9ddf91ee-48ed-4d15-8232-d9e61a23db07 Alice3 20 | |
| library(httr2) | |
| seq_time <- system.time( | |
| resps <- | |
| 1:10 |> | |
| lapply( | |
| \(x) { | |
| request("http://127.0.0.1:8080/users") |> | |
| req_body_json(list( | |
| name = "Multiple Secuential", | |
| age = 30L | |
| )) |> | |
| req_perform() | |
| } | |
| ) | |
| ) |> | |
| purrr::pluck("elapsed") | |
| # Parallel | |
| one_request <- request("http://127.0.0.1:8080/users") |> | |
| req_body_json(list( | |
| name = "Multiple Paralel", | |
| age = 30L | |
| )) |> | |
| req_throttle(capacity = 100, fill_time_s = 60) | |
| reqs <- rep(list(one_request), 10) | |
| par_time <- | |
| system.time(resps <- req_perform_parallel(reqs)) |> | |
| purrr::pluck("elapsed") | |
| # Results | |
| cat(paste0("Sequential: ", round(seq_time, 2), "s\n")) | |
| cat(paste0("Parallel: ", round(par_time, 2), "s\n")) | |
| cat(paste0( | |
| "Parallel is x", | |
| round(seq_time / par_time, 2), | |
| " faster than sequential\n" | |
| )) | |
| # Sequential: 2.26s | |
| # Parallel: 0.43s | |
| # Parallel is x5.3 faster than sequential |
server code
#!/usr/bin/env Rscript
## Simple mirai + duckdb plumber2 server
suppressPackageStartupMessages({
library(mirai)
library(DBI)
library(duckdb)
library(plumber2)
library(promises)
})
message("[server] starting, PID:", Sys.getpid(), "\n")
db_path <- tempfile(fileext = ".duckdb")
con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path)
DBI::dbWriteTable(con_main, "mtcars", mtcars, overwrite = TRUE)
message("[server] wrote mtcars to", db_path, "\n")
# shutdown and re-open in read-only mode for
# multiprocess access
DBI::dbDisconnect(con_main, shutdown = TRUE)
con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
mirai::daemons(5)
# creating a read only connection in each worker
mirai::everywhere(db_path = db_path, {
library(DBI)
library(duckdb)
con <<- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
})
# Cleanup on exit
on.exit(
{
try(DBI::dbDisconnect(con_main, shutdown = TRUE), silent = TRUE)
try(mirai::stop_daemons(), silent = TRUE)
file.remove(db_path)
},
add = TRUE
)
# -- API
app <- api()
app <- app |>
api_get(
"/cars_sync",
handler = function(request, response, body) {
Sys.sleep(2)
DBI::dbGetQuery(con_main, "SELECT * FROM mtcars")
}
) |>
api_get(
"/cars_async",
handler = function(request, response, body) {
# manually return a promise
mirai::mirai({
Sys.sleep(2)
DBI::dbGetQuery(con, "SELECT * FROM mtcars")
}) |> as.promise()
}
)
message("[server] launching plumber on http://127.0.0.1:8000\n")
api_run(app, showcase = FALSE, port = 8000L, block = TRUE)
output: github_document
"Minimal" duckDB + mirai + plumber2 example
Using example provided by Jordi Rosell in the gist, we give a stripped down version of the pattern. We start by creating a duckdb file in which we inject mtcars, then create two plumber2 endpoints, one sync and the other async using mirai daemons.
Check dependencies
if (!requireNamespace("mirai", quietly = TRUE)) {
stop("install mirai")
}
if (!requireNamespace("duckdb", quietly = TRUE)) {
stop("install duckdb")
}
if (!requireNamespace("DBI", quietly = TRUE)) {
stop("Install DBI")
}
if (!requireNamespace("plumber2", quietly = TRUE)) {
stop("Install plumber2")
}
if (!requireNamespace("httr2", quietly = TRUE)) {
stop("Install httr2")
}
if (!requireNamespace("promises", quietly = TRUE)) {
stop("Install promises")
}
API
The example writes mtcars to a DuckDB file, starts a few mirai daemons, and has handlers that either run synchronously or dispatch work to a worker. The async and sync endpoints sleep two seconds and return mtcars.
Server code
# show the server implementation
cat server.R
# lanch the server
timeout 10000s Rscript server.R > server.log 2>&1 &
sleep 5
# get pid for cleanup
lsof -i:8000 | awk 'NR == 2{print $2}' > server.pid
echo "[launcher] server started, PID: $(cat server.pid)"
echo "[launcher] server log (tail):"
tail -n 20 server.log || true
client code with timing
library(httr2)
# sleep to allow the server to finish starting
Sys.sleep(5)
base <- "http://127.0.0.1:8000"
n <- 10
## parallel requests to the sync endpoint (use req_perform_parallel)
one_sync <- request(paste0(base, "/cars_sync"))
reqs_sync <- rep(list(one_sync), n)
sync_par_time <- system.time({
res_sync <- req_perform_parallel(reqs_sync)
})["elapsed"]
## parallel requests to the async endpoint
one_async <- request(paste0(base, "/cars_async"))
reqs_async <- rep(list(one_async), n)
async_par_time <- system.time({
res_async <- req_perform_parallel(reqs_async)
})["elapsed"]
sprintf("Parallel (sync endpoint) x%d: %.3fs\n", n, sync_par_time)
sprintf("Parallel (async endpoint) x%d: %.3fs\n", n, async_par_time)
# output
res_sync
res_async
shutown the server
cat server.pid | xargs -I {} kill -9 {}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
“Minimal”
duckDB+mirai+plumber2exampleUsing example provided by Jordi Rosell in
the gist, we give a stripped down version of the pattern. We start by creating a duckdb file in which we inject mtcars, then create two plumber2 endpoints, one sync and the other async using mirai daemons.
Check dependencies
API
The example writes
mtcarsto a DuckDB file, starts a fewmiraidaemons, and has handlers that either run synchronously or dispatch work
to a worker. The async and sync endpoints sleep two seconds and return
mtcars.
Server code
client code with timing
res_asyncshutown the server