Skip to content

Instantly share code, notes, and snippets.

@jrosell
Last active January 27, 2026 20:57
Show Gist options
  • Select an option

  • Save jrosell/e4820231d3c234b2c9cd02bf41f17af1 to your computer and use it in GitHub Desktop.

Select an option

Save jrosell/e4820231d3c234b2c9cd02bf41f17af1 to your computer and use it in GitHub Desktop.
Example of an async JSON API in R using dbplyr, SQLite, S7, plumber2 and mirai.
stopifnot(requireNamespace("rlang"))
rlang::check_installed("pak")
pkgs <- rlang::chr(
"rlang" = "rlang",
"plumber2" = "posit-dev/plumber2",
"S7",
"jsonlite",
"httr2",
"mirai",
"uuid",
"DBI",
"dbplyr",
)
# pak::pak(pkgs)
libs <- ifelse(names(pkgs) == "", pkgs, names(pkgs))
lapply(libs, library, quiet = TRUE, character.only = TRUE) |> invisible()
# Server side code -----
User <- new_class(
"User",
properties = list(
id = class_character,
name = class_character,
age = class_integer
)
)
class_dump_df <- \(object) {
unclass(object) |>
attributes() |>
as.list() |>
purrr::list_modify(S7_class = purrr::zap()) |>
as.data.frame()
}
user_cols <- User() |> class_dump_df()
example_user <- User(id = "1", name = "example_user", age = 20L)
example_user_df <- class_dump_df(example_user)
example_user2 <- User(id = "2", name = "example_user2", age = 20L)
example_user2_df <- class_dump_df(example_user2)
file <- tempfile()
con <- dbConnect(RSQLite::SQLite(), file)
dbWriteTable(
con,
name = "users",
value = user_cols
)
DBI::dbListTables(con)
DBI::dbGetQuery(con, "SELECT * FROM users", n = 0) |> names()
library(dbplyr)
users_tbl <- dplyr::tbl(con, "users")
# Creating some example data
dplyr::rows_insert(
users_tbl,
copy_inline(con, example_user_df),
conflict = "ignore",
in_place = TRUE,
by = "id"
)
DBI::dbGetQuery(con, "SELECT * FROM users")
dplyr::rows_insert(
users_tbl,
copy_inline(con, example_user2_df),
conflict = "ignore",
in_place = TRUE,
by = "id"
)
dplyr::tbl(con, "users") |>
dplyr::collect() |>
purrr::pmap(function(...) list(...))
DBI::dbDisconnect(con)
mirai::daemons(5)
mirai::everywhere(file = file, {
library(DBI)
library(dbplyr)
library(purrr)
library(uuid)
library(glue)
con <<- dbConnect(RSQLite::SQLite(), file)
})
if (exists("app")) {
app |> api_stop()
}
app <- api() |>
api_get(
"/users",
serializers = get_serializers("unboxedJSON"),
handler = function(request, response, body) {
mirai(
{
dplyr::tbl(con, "users") |>
dplyr::collect() |>
purrr::pmap(function(...) list(...))
}
)
}
) |>
api_post(
path = "/users",
parsers = get_parsers("json"),
serializers = get_serializers("unboxedJSON"),
handler = function(request, response, body) {
mirai(
{
result_list <- tryCatch(
{
if ("id" %in% names(body)) {
return(list(
error = TRUE,
message = "Don't send an ID."
))
}
body$id <- uuid::UUIDgenerate()
user <- rlang::exec(User, !!!body)
user_df <- class_dump_df(user)
dplyr::rows_insert(
dplyr::tbl(con, "users"),
dbplyr::copy_inline(con, user_df),
conflict = "ignore",
in_place = TRUE,
by = "id"
)
updated_user_df <- dplyr::tbl(con, "users") |>
dplyr::filter(id == body$id) |>
dplyr::collect()
if (nrow(updated_user_df) == 0) {
return(list(
error = TRUE,
message = "The new user is not found in the database after insertion."
))
}
updated_user_df |> as.list()
},
error = \(e) {
list(
error = TRUE,
message = conditionMessage(e)
)
}
)
result_list
},
User = User,
class_dump_df = class_dump_df,
body = body
)
}
) |>
api_run(showcase = FALSE, block = TRUE)
# Client side code -----
library(httr2)
res <- request("http://127.0.0.1:8080/users") |>
req_perform() |>
resp_body_json()
res
# [[1]]
# [[1]]$id
# [1] "1"
# [[1]]$name
# [1] "example_user"
# [[1]]$age
# [1] 20
# [[2]]
# [[2]]$id
# [1] "2"
# [[2]]$name
# [1] "example_user2"
# [[2]]$age
# [1] 20
str(res)
# List of 2
# $ :List of 3
# ..$ id : chr "1"
# ..$ name: chr "example_user"
# ..$ age : int 20
# $ :List of 3
# ..$ id : chr "2"
# ..$ name: chr "example_user2"
# ..$ age : int 20
dplyr::bind_rows(res)
# # A tibble: 2 × 3
# id name age
# <chr> <chr> <int>
# 1 1 example_user 20
# 2 2 example_user2 20
request("http://127.0.0.1:8080/users") |>
req_body_json(list(
id = "3",
name = "Alice",
age = 30L
)) |>
req_perform() |>
resp_body_json()
# $error
# [1] TRUE
# $message
# [1] "Don't send an ID."
request("http://127.0.0.1:8080/users") |>
req_body_json(list(
name = "Alice2",
age = "20"
)) |>
req_perform() |>
resp_body_json()
# $error
# [1] TRUE
# $message
# [1] "<User> object properties are invalid:\n- @age must be <integer>, not <character>"
request("http://127.0.0.1:8080/users") |>
req_body_json(list(
name = "Alice3",
age = 20L
)) |>
req_perform() |>
resp_body_json()
# $id
# [1] "173c3215-119d-40b7-8731-4925ec397baf"
# $name
# [1] "Alice3"
# $age
# [1] 20
request("http://127.0.0.1:8080/users") |>
req_perform() |>
resp_body_json() |>
dplyr::bind_rows()
# # A tibble: 3 × 3
# id name age
# <chr> <chr> <int>
# 1 1 example_user 20
# 2 2 example_user2 20
# 3 9ddf91ee-48ed-4d15-8232-d9e61a23db07 Alice3 20
library(httr2)
seq_time <- system.time(
resps <-
1:10 |>
lapply(
\(x) {
request("http://127.0.0.1:8080/users") |>
req_body_json(list(
name = "Multiple Secuential",
age = 30L
)) |>
req_perform()
}
)
) |>
purrr::pluck("elapsed")
# Parallel
one_request <- request("http://127.0.0.1:8080/users") |>
req_body_json(list(
name = "Multiple Paralel",
age = 30L
)) |>
req_throttle(capacity = 100, fill_time_s = 60)
reqs <- rep(list(one_request), 10)
par_time <-
system.time(resps <- req_perform_parallel(reqs)) |>
purrr::pluck("elapsed")
# Results
cat(paste0("Sequential: ", round(seq_time, 2), "s\n"))
cat(paste0("Parallel: ", round(par_time, 2), "s\n"))
cat(paste0(
"Parallel is x",
round(seq_time / par_time, 2),
" faster than sequential\n"
))
# Sequential: 2.26s
# Parallel: 0.43s
# Parallel is x5.3 faster than sequential
@sounkou-bioinfo
Copy link

sounkou-bioinfo commented Jan 27, 2026

“Minimal” duckDB + mirai + plumber2 example

Using example provided by Jordi Rosell in
the gist, we give a stripped down version of the pattern. We start by creating a duckdb file in which we inject mtcars, then create two plumber2 endpoints, one sync and the other async using mirai daemons.

Check dependencies

if (!requireNamespace("mirai", quietly = TRUE)) {
    stop("install mirai")
}
if (!requireNamespace("duckdb", quietly = TRUE)) {
    stop("install duckdb")
}
if (!requireNamespace("DBI", quietly = TRUE)) {
    stop("Install DBI")
}
if (!requireNamespace("plumber2", quietly = TRUE)) {
    stop("Install plumber2")
}
if (!requireNamespace("httr2", quietly = TRUE)) {
    stop("Install httr2")
}
if (!requireNamespace("promises", quietly = TRUE)) {
    stop("Install promises")
}

API

The example writes mtcars to a DuckDB file, starts a few mirai
daemons, and has handlers that either run synchronously or dispatch work
to a worker. The async and sync endpoints sleep two seconds and return
mtcars.

Server code

# show the server implementation
cat server.R
# lanch the server
timeout 10000s Rscript server.R > server.log 2>&1 &
sleep 5
# get pid for cleanup
lsof -i:8000 | awk 'NR == 2{print $2}' > server.pid
echo "[launcher] server started, PID: $(cat server.pid)"
echo "[launcher] server log (tail):"
tail -n 20 server.log || true
## #!/usr/bin/env Rscript
## ## Simple mirai + duckdb plumber2 server
## suppressPackageStartupMessages({
##     library(mirai)
##     library(DBI)
##     library(duckdb)
##     library(plumber2)
##     library(promises)
## })
## 
## message("[server] starting, PID:", Sys.getpid(), "\n")
## 
## db_path <- tempfile(fileext = ".duckdb")
## con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path)
## DBI::dbWriteTable(con_main, "mtcars", mtcars, overwrite = TRUE)
## message("[server] wrote mtcars to", db_path, "\n")
## # shutdown and re-open in read-only mode for
## # multiprocess access
## DBI::dbDisconnect(con_main, shutdown = TRUE)
## con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
## mirai::daemons(5)
## # creating a read only connection in each worker
## mirai::everywhere(db_path = db_path, {
##     library(DBI)
##     library(duckdb)
##     con <<- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
## })
## 
## # Cleanup on exit
## on.exit(
##     {
##         try(DBI::dbDisconnect(con_main, shutdown = TRUE), silent = TRUE)
##         try(mirai::stop_daemons(), silent = TRUE)
##         file.remove(db_path)
##     },
##     add = TRUE
## )
## 
## # -- API
## app <- api()
## 
## app <- app |>
##     api_get(
##         "/cars_sync",
##         handler = function(request, response, body) {
##             Sys.sleep(2)
##             DBI::dbGetQuery(con_main, "SELECT * FROM mtcars")
##         }
##     ) |>
##     api_get(
##         "/cars_async",
##         handler = function(request, response, body) {
##             # manually return a promise
##             mirai::mirai({
##                 Sys.sleep(2)
##                 DBI::dbGetQuery(con, "SELECT * FROM mtcars")
##             }) |> as.promise()
##         }
##     )
## 
## message("[server] launching plumber on http://127.0.0.1:8000\n")
## api_run(app, showcase = FALSE, port = 8000L, block = TRUE)
## [launcher] server started, PID: 954678
## [launcher] server log (tail):
## [server] starting, PID:954678
## 
## [server] wrote mtcars to/tmp/RtmpGaZiYm/filee913678900b12.duckdb
## 
## Creating default route in request router
## [server] launching plumber on http://127.0.0.1:8000
## 
## plumber2 server started at http://127.0.0.1:8000

client code with timing

library(httr2)

# sleep to allow the server to finish starting
Sys.sleep(5)

base <- "http://127.0.0.1:8000"
n <- 10

## parallel requests to the sync endpoint (use req_perform_parallel)
one_sync <- request(paste0(base, "/cars_sync"))
reqs_sync <- rep(list(one_sync), n)
sync_par_time <- system.time({
  res_sync <- req_perform_parallel(reqs_sync)
})["elapsed"]
## [working] (0 + 0) -> 9 -> 1 | ■■■■ 10%�[K[working] (0 + 0) -> 8 -> 2 | ■■■■■■■
## 20%�[K[working] (0 + 0) -> 7 -> 3 | ■■■■■■■■■■ 30%�[K[working] (0 + 0) -> 6 -> 4
## | ■■■■■■■■■■■■■ 40%�[K[working] (0 + 0) -> 5 -> 5 | ■■■■■■■■■■■■■■■■
## 50%�[K[working] (0 + 0) -> 4 -> 6 | ■■■■■■■■■■■■■■■■■■■ 60%�[K[working] (0 + 0)
## -> 3 -> 7 | ■■■■■■■■■■■■■■■■■■■■■■ 70%�[K[working] (0 + 0) -> 2 -> 8 |
## ■■■■■■■■■■■■■■■■■■■■■■■■■ 80%�[K[working] (0 + 0) -> 1 -> 9 |
## ■■■■■■■■■■■■■■■■■■■■■■■■■■■■ 90%�[K�[K
## parallel requests to the async endpoint
one_async <- request(paste0(base, "/cars_async"))
reqs_async <- rep(list(one_async), n)
async_par_time <- system.time({
  res_async <- req_perform_parallel(reqs_async)
})["elapsed"]
## [working] (0 + 0) -> 9 -> 1 | ■■■■ 10%�[K[working] (0 + 0) -> 8 -> 2 | ■■■■■■■
## 20%�[K[working] (0 + 0) -> 3 -> 7 | ■■■■■■■■■■■■■■■■■■■■■■ 70%�[K�[K
sprintf("Parallel (sync endpoint) x%d: %.3fs\n", n, sync_par_time)
## [1] "Parallel (sync endpoint) x10: 20.249s\n"
sprintf("Parallel (async endpoint) x%d: %.3fs\n", n, async_par_time)
## [1] "Parallel (async endpoint) x10: 6.086s\n"
# output 
res_sync
## [[1]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[2]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[3]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[4]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[5]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[6]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[7]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[8]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[9]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[10]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_sync
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
res_async
## [[1]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[2]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[3]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[4]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[5]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[6]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[7]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[8]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[9]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)
## 
## [[10]]
## <httr2_response>
## GET http://127.0.0.1:8000/cars_async
## Status: 200 OK
## Content-Type: application/json
## Body: In memory (3445 bytes)

shutown the server

cat server.pid | xargs -I {} kill -9 {}

@sounkou-bioinfo
Copy link

server code

#!/usr/bin/env Rscript
## Simple mirai + duckdb plumber2 server
suppressPackageStartupMessages({
    library(mirai)
    library(DBI)
    library(duckdb)
    library(plumber2)
    library(promises)
})

message("[server] starting, PID:", Sys.getpid(), "\n")

db_path <- tempfile(fileext = ".duckdb")
con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path)
DBI::dbWriteTable(con_main, "mtcars", mtcars, overwrite = TRUE)
message("[server] wrote mtcars to", db_path, "\n")
# shutdown and re-open in read-only mode for
# multiprocess access
DBI::dbDisconnect(con_main, shutdown = TRUE)
con_main <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
mirai::daemons(5)
# creating a read only connection in each worker
mirai::everywhere(db_path = db_path, {
    library(DBI)
    library(duckdb)
    con <<- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = TRUE)
})

# Cleanup on exit
on.exit(
    {
        try(DBI::dbDisconnect(con_main, shutdown = TRUE), silent = TRUE)
        try(mirai::stop_daemons(), silent = TRUE)
        file.remove(db_path)
    },
    add = TRUE
)

# -- API
app <- api()

app <- app |>
    api_get(
        "/cars_sync",
        handler = function(request, response, body) {
            Sys.sleep(2)
            DBI::dbGetQuery(con_main, "SELECT * FROM mtcars")
        }
    ) |>
    api_get(
        "/cars_async",
        handler = function(request, response, body) {
            # manually return a promise
            mirai::mirai({
                Sys.sleep(2)
                DBI::dbGetQuery(con, "SELECT * FROM mtcars")
            }) |> as.promise()
        }
    )

message("[server] launching plumber on http://127.0.0.1:8000\n")
api_run(app, showcase = FALSE, port = 8000L, block = TRUE)

@sounkou-bioinfo
Copy link

sounkou-bioinfo commented Jan 27, 2026


output: github_document

"Minimal" duckDB + mirai + plumber2 example

Using example provided by Jordi Rosell in the gist, we give a stripped down version of the pattern. We start by creating a duckdb file in which we inject mtcars, then create two plumber2 endpoints, one sync and the other async using mirai daemons.

Check dependencies

if (!requireNamespace("mirai", quietly = TRUE)) {
    stop("install mirai")
}
if (!requireNamespace("duckdb", quietly = TRUE)) {
    stop("install duckdb")
}
if (!requireNamespace("DBI", quietly = TRUE)) {
    stop("Install DBI")
}
if (!requireNamespace("plumber2", quietly = TRUE)) {
    stop("Install plumber2")
}
if (!requireNamespace("httr2", quietly = TRUE)) {
    stop("Install httr2")
}
if (!requireNamespace("promises", quietly = TRUE)) {
    stop("Install promises")
}

API

The example writes mtcars to a DuckDB file, starts a few mirai daemons, and has handlers that either run synchronously or dispatch work to a worker. The async and sync endpoints sleep two seconds and return mtcars.

Server code

# show the server implementation
cat server.R
# lanch the server
timeout 10000s Rscript server.R > server.log 2>&1 &
sleep 5
# get pid for cleanup
lsof -i:8000 | awk 'NR == 2{print $2}' > server.pid
echo "[launcher] server started, PID: $(cat server.pid)"
echo "[launcher] server log (tail):"
tail -n 20 server.log || true

client code with timing

library(httr2)

# sleep to allow the server to finish starting
Sys.sleep(5)

base <- "http://127.0.0.1:8000"
n <- 10

## parallel requests to the sync endpoint (use req_perform_parallel)
one_sync <- request(paste0(base, "/cars_sync"))
reqs_sync <- rep(list(one_sync), n)
sync_par_time <- system.time({
  res_sync <- req_perform_parallel(reqs_sync)
})["elapsed"]

## parallel requests to the async endpoint
one_async <- request(paste0(base, "/cars_async"))
reqs_async <- rep(list(one_async), n)
async_par_time <- system.time({
  res_async <- req_perform_parallel(reqs_async)
})["elapsed"]

sprintf("Parallel (sync endpoint) x%d: %.3fs\n", n, sync_par_time)
sprintf("Parallel (async endpoint) x%d: %.3fs\n", n, async_par_time)
# output 
res_sync
res_async

shutown the server

cat server.pid | xargs -I {} kill -9 {}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment