RestRserve and async event loops/long running processes #216
Replies: 4 comments 11 replies
-
Hi Dereck. I tried to experiment with 'later' package and event loop
events. I don't think it will work with Rserve backend due to the way it
handles requests with fork. However I do believe it will work if someone
will implement 'httpuv' backend. Shouldn't be difficult.
…On Mon, Aug 12, 2024, 07:38 Dereck Mezquita ***@***.***> wrote:
Hello,
I'm working on developing a trading bot in R and I'm encountering some
challenges integrating asynchronous timers with the RestRserve API. I'm
hoping to replicate functionality similar to what can be achieved in
Node.js, but I'm running into some roadblocks.
Desired Functionality
Here's an example of what I'm trying to achieve, demonstrated in Node.js:
import express from 'express';
class TradeBot {
isRunning = false;
timer = null;
run() {
if (!this.isRunning) {
this.isRunning = true;
this.timer = setInterval(() => {
console.log('running');
}, 1000);
console.log('Bot started');
} else {
console.log('Already running');
}
}
stop() {
if (this.isRunning) {
clearInterval(this.timer);
this.timer = null;
this.isRunning = false;
console.log('Bot stopped');
} else {
console.log('Bot is not running');
}
}}
const bot = new TradeBot();const app = express();
app$get('/start-bot', (req, res) => {
bot.run();
res.send('Bot started');});
app$get('/stop-bot', (req, res) => {
bot.stop();
res.send('Bot stopped');});
app.listen(3000, () => {
console.log('Server is running on port 3000');});
R Implementation Attempts
I've tried to implement this functionality in R using RestRserve along
with the async <https://github.com/gaborcsardi/async> package and later
with the later <https://github.com/r-lib/later> package. Here are my
attempts:
Using async package:
box::use(R6)box::use(RestRserve[Application, BackendRserve])box::use(async[async_timer, run_event_loop])
TradeBot <- R6$R6Class(
"TradeBot",
public = list(
is_running = FALSE,
timer = NULL,
initialize = function() {
self$is_running <- FALSE
self$timer <- NULL
},
run = function() {
if (!self$is_running) {
self$is_running <- TRUE
self$timer <- async_timer$new(1, function() {
cat("Running trade bot!\n")
})
cat("Bot started\n")
} else {
cat("Bot is already running\n")
}
},
stop = function() {
if (self$is_running) {
self$is_running <- FALSE
if (!is.null(self$timer)) {
self$timer$cancel()
self$timer <- NULL
}
cat("Bot stopped\n")
} else {
cat("Bot is not running\n")
}
}
)
)
bot <- TradeBot$new()
app <- Application$new()
app$add_get("/start-bot", function(request, response) {
bot$run()
response$set_body("Bot started")
})
app$add_get("/stop-bot", function(request, response) {
bot$stop()
response$set_body("Bot stopped")
})
# Start the serverbackend <- BackendRserve$new()
# Run the event loop
run_event_loop({
backend$start(app, http_port = 8080, background = TRUE)
cat("Server is running on port 8080\n")
# this is to keep the main thread alive; I'm not sure if there's a better way
# I am running RestRserve in the background but I'm not sure if I should instead run it on the main thread?
while(TRUE) {
Sys.sleep(1)
}
})
Using later package:
box::use(R6)box::use(RestRserve[Application, BackendRserve])box::use(later)
create_interval <- function(callback, ms) {
cancelled <- FALSE
schedule_next <- function() {
if (!cancelled) {
callback()
later$later(schedule_next, ms / 1000)
}
}
later$later(schedule_next, ms / 1000)
function() {
cancelled <<- TRUE
}
}
TradeBot <- R6$R6Class(
"TradeBot",
public = list(
isRunning = FALSE,
timer = NULL,
run = function() {
if (!self$isRunning) {
self$isRunning <- TRUE
self$timer <- create_interval(function() {
cat("running\n")
}, 1000)
cat("Running\n")
} else {
cat("Already running\n")
}
},
stop = function() {
if (self$isRunning) {
self$timer()
self$timer <- NULL
self$isRunning <- FALSE
cat("Stopped\n")
} else {
cat("Bot is not running\n")
}
}
)
)
bot <- TradeBot$new()app <- Application$new()
app$add_get("/start-bot", function(request, response) {
bot$run()
response$set_body("Bot started")
})
app$add_get("/stop-bot", function(request, response) {
bot$stop()
response$set_body("Bot stopped")
})
backend <- BackendRserve$new()backend$start(app, http_port = 8080, background = TRUE)
# keep the thread open and sleep to avoid high CPU usagewhile (TRUE) {
later$run_now()
Sys.sleep(0.1)
}
The Challenge
In both implementations, the asynchronous timers don't seem to execute
within the RestRserve environment. The API endpoints work, but the
scheduled tasks (printing "running" every second) never execute.
My understanding is that RestRserve utilises the parallel package and
forking for handling requests in parallel, which prevents blocking when
receiving multiple requests. However, this seems to be incompatible with
the event loops provided by packages like async or later.
Questions and Advice Needed
1. Is it possible to integrate asynchronous timers (like those
provided by async or later) with RestRserve in a way that allows for
continuous execution alongside the API?
2. If not, are there alternative approaches within the RestRserve
ecosystem that could achieve similar functionality?
3. I believe one potential solution might be to create two separate
programmes - one for the bot logic and another for the REST API - and have
them communicate via files or another method. However, I'd prefer to keep
everything in memory, as this is for a trading bot where performance is
crucial. Do you have any suggestions for achieving this within a single
programme?
4. Are there any best practices or design patterns within RestRserve
for managing long-running processes alongside API endpoints?
I'm open to any suggestions or advice on how to structure this system in a
way that would work efficiently within the RestRserve framework. Thank you
in advance for your help!
Additional Context
I recognise that the RestRserve community might not be intimately familiar
with the async <https://github.com/gaborcsardi/async> or later
<https://github.com/r-lib/later> packages. My primary goal is to
understand how RestRserve works and how we can leverage its capabilities to
achieve the desired functionality. I'm particularly interested in:
1. How RestRserve handles long-running processes or background tasks.
2. Whether there are built-in mechanisms in RestRserve for periodic
task execution (similar to the setInterval function in JavaScript).
3. If RestRserve has its own event loop or scheduling system that
could be used for this purpose.
Any insights into RestRserve's architecture and how it might accommodate
(or provide alternatives to) the kind of asynchronous, timer-based
operations I'm trying to implement would be greatly appreciated. I'm open
to restructuring my approach to better align with RestRserve's design
philosophy and capabilities.
I originally posted a similar discussion to the async
<gaborcsardi/async#83 (reply in thread)>
package discussion board, but I believe the core of the issue might be more
related to RestRserve's architecture.
—
Reply to this email directly, view it on GitHub
<#216>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABHC5XK5WJZBQWWH2A3HHJLZQ7YXBAVCNFSM6AAAAABMLE4MHWVHI2DSMVQWIX3LMV43ERDJONRXK43TNFXW4OZXGA2DAMBUGY>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
Dereck, the Rserve back-end is a server with parallel processes - it is not just an R session, which is why it is scalable. The server does not use R to accept connections. However, it means that the moment you start it, only clients get to execute R code, the server is in a way not R anymore*. Unfortunately, from your post it is unclear what you are actually trying to achieve in your application - R is fully synchronous, so there is no way to implement real "async" - the packages you mention are just a hack that hooks into what would be normally user typing commands in R and uses that to execute code synchronously. However, with RestRserve there are potentially many different R sessions running at a given point, so it makes no sense to think of anything happening in one session. Also each client request can be handled by a different process. If you are interested in some kind of synchronized state, then you can use As Dmitry said, if you don't care about performance or scalability then you can use a single R process (e.g., via (*) - it is in principle possible to mesh the R event loop and the Rserve server loop - i.e. the loop that waits for new connections. This is typically not desired in deployment as it makes the server more fragile and has performance implications (when anything is evaluated in the R loop the server cannot accept new client connections). Also note that this won't affect existing connections by design (those have already detached from the server), but if that still works in your use-case, then it's something I have ready to go so could enable it if you want to test it. |
Beta Was this translation helpful? Give feedback.
-
@dselivanov and @s-u First and foremost, I want to express my gratitude for taking the time to address my questions and provide insightful guidance. Your explanations have clarified my understanding of RestRserve and the R language. A contribution to RestRserveI'm particularly intrigued by the suggestion to implement an httpuv backend for RestRserve. Could you provide more details on how this might be approached? Specifically:
Working demo of async bots + http server for communication (REST API?)After looking into the suggestions made here about httpuv I was able to put together a minimal working demo to illustrate something similar to what I'm trying to achieve. I will give a more detailed explanation of what I'm trying to after. box::use(R6)
box::use(httpuv)
box::use(later)
box::use(jsonlite)
# Trading Bot class
TradingBot <- R6$R6Class(
"TradingBot",
public = list(
is_running = FALSE,
timer = NULL,
run = function() {
if (!self$is_running) {
self$is_running <- TRUE
self$schedule_next()
cat("Bot started\n")
} else {
cat("Bot is already running\n")
}
},
stop = function() {
if (self$is_running) {
cat("Bot stopped\n")
self$is_running <- FALSE
if (!is.null(self$timer)) {
self$timer$cancel()
self$timer <- NULL
}
} else {
cat("Bot is not running\n")
}
},
schedule_next = function() {
if (self$is_running) {
cat("Trading bot is running...\n")
# Schedule the next execution
self$timer <- later$later(~self$schedule_next(), delay = 1)
}
}
)
)
# Create bot instance
bot <- TradingBot$new()
# Define the application
app <- list(
call = function(req) {
if (req$PATH_INFO == "/start-bot") {
bot$run()
response <- list(status = "Bot started")
} else if (req$PATH_INFO == "/stop-bot") {
bot$stop()
response <- list(status = "Bot stopped")
} else {
response <- list(status = "Unknown endpoint")
}
list(
status = 200L,
headers = list('Content-Type' = 'application/json'),
body = jsonlite$toJSON(response, auto_unbox = TRUE)
)
}
)
# Start the server
cat("Starting server on http://127.0.0.1:8080\n")
server <- httpuv$startServer("127.0.0.1", 8080, app)
httpuv$service(timeout = Inf)
What am I trying to build? A trade botI hope not to bore you with too much information but here are a few more details as to what I am planning; to further clarify my goal. Please feel free to correct me, realign, push me or correct any misconceptions I might have. My goal is to get a working programme, as such my ego is disposable and I seek whatever guidance you could impart. I want to build a programme in R that will function as a trading bot. What would this programme to do? I would have 2 main parts.
Now for how I understand this could work taking into consideration R's synchronous nature. I believe there could be two approaches to this, but one is more clear than the other.
Despite being simple, the above example I gave/the nodeJS example demonstrates the concept I want to build off of. A bot that can run continuously on an interval + a REST API also responding to HTTP requests. It seems to work as expected, running on a single thread with an event loop, similar to how Node.js operates. I plan to run this bot for myself and academic purposes so it doesn't have to scale for public use. I think this works nicely for now because of the small scale of the programme. httpuv + later == NodeJS event loop?After some research I found that httpuv lists the
I am still learning how So my conclusion is that if I use httpuv + later I am in fact creating an event loop in my R programme. So if my understanding is correct, this is in a way equivalent to NodeJS. Now I have not faced any scaling issues yet, but I am starting to wonder if I had written my whole programme in NodeJS if it would face the same scaling issues as R with this event loop. This next comment ventures into potential ignorance: in my mind the best would be if we could have the best of both worlds, i.e. forking + async event loops so that when I create a bot it runs on its own thread with it's own event loop. ConclusionGiven my example, I'm curious about your thoughts on:
Thank you again for your time and expertise. Your insights have been invaluable helping my understanding of these complex topics. |
Beta Was this translation helpful? Give feedback.
-
@s-u I think I got something semi-satisfactory working with your ideas and help. Thank you very much! It's not perfect, it throws a weird error (probably something I did wrong with my use of I'm happy as this brings the conversation back to I used:
I'll explain what I have below.
When I hit the box::use(RestRserve[Application, BackendRserve])
box::use(R6)
box::use(later)
box::use(parallel)
box::use(uuid)
# Define the TradeBot class
TradeBot <- R6$R6Class(
"TradeBot",
public = list(
is_running = FALSE,
timer = NULL,
id = NULL,
bot_file = NULL,
initialize = function(id, bot_file) {
self$bot_file <- bot_file
self$is_running <- FALSE
self$timer <- NULL
self$id <- id
},
start = function() {
if (!self$is_running) {
self$is_running <- TRUE
private$run()
cat(sprintf("Bot %s started\n", self$id))
} else {
cat(sprintf("Bot %s is already running\n", self$id))
}
},
stop = function() {
if (self$is_running) {
self$is_running <- FALSE
if (!is.null(self$timer)) {
self$timer()
self$timer <- NULL
}
cat(sprintf("Bot %s stopped\n", self$id))
} else {
cat(sprintf("Bot %s is not running\n", self$id))
}
}
),
private = list(
check_status = function() {
tryCatch({
processes <- readRDS(self$bot_file)
if (is.null(processes[[self$id]])) {
self$stop()
return(FALSE)
}
return(TRUE)
}, error = function(e) {
cat(sprintf("Error checking status for Bot %s: %s\n", self$id, e$message))
return(FALSE)
})
},
run = function() {
if (!private$check_status()) {
return(NULL)
}
cat(sprintf("Bot %s is running\n", self$id))
self$timer <- later$later(function() {
tryCatch({
private$run()
}, error = function(e) {
cat(sprintf("Error in Bot %s: %s\n", self$id, e$message))
self$stop()
})
}, 1, loop = later$global_loop())
}
)
)
# Use file to store bot processes; simulates database
bot_file <- "bot_processes.Rds"
if (!file.exists(bot_file)) {
processes <- list()
saveRDS(processes, file = bot_file)
}
# Function to start a new bot process
start_bot_process <- function(bot_id) {
processes <- readRDS(bot_file)
if (!is.null(processes[[bot_id]])) {
return(FALSE) # Bot already exists
}
bot_process <- parallel$mcparallel({
tryCatch({
bot <- TradeBot$new(bot_id, bot_file)
bot$start()
while (bot$is_running) {
later$run_now(all = TRUE, loop = later$global_loop())
Sys.sleep(0.1) # Add a small delay to prevent excessive CPU usage
}
}, error = function(e) {
cat("Error in bot process:", e$message, "\n")
}, finally = {
cat(sprintf("Bot %s process exiting\n", bot_id))
})
})
processes[[bot_id]] <- list(pid = bot_process$pid)
saveRDS(processes, file = bot_file)
return(TRUE)
}
# Function to stop a bot process
stop_bot_process <- function(bot_id) {
processes <- readRDS(bot_file)
if (is.null(processes[[bot_id]])) {
return(FALSE) # Bot doesn't exist
}
# Signal the bot to stop
processes[[bot_id]] <- NULL
saveRDS(processes, file = bot_file)
# Wait for the process to finish
Sys.sleep(2)
return(TRUE)
}
# Create the REST API application
app <- Application$new()
app$add_get("/", function(request, response) {
response$set_body("Welcome to the Trade Bot API")
})
app$add_get("/start-bot", function(request, response) {
bot_id <- request$parameters_query[["id"]]
if (is.null(bot_id)) {
bot_id <- uuid$UUIDgenerate()
}
if (start_bot_process(bot_id)) {
response$set_body(sprintf("Bot %s started", bot_id))
} else {
response$set_status_code(400)
response$set_body(sprintf("Bot %s already running", bot_id))
}
})
app$add_get("/stop-bot", function(request, response) {
bot_id <- request$parameters_query[["id"]]
if (is.null(bot_id)) {
response$set_status_code(400)
response$set_body("Bot ID is required")
return(NULL)
}
if (stop_bot_process(bot_id)) {
response$set_body(sprintf("Bot %s stopped", bot_id))
} else {
response$set_status_code(400)
response$set_body(sprintf("Bot %s not found or could not be stopped", bot_id))
}
})
app$add_get("/list-bots", function(request, response) {
processes <- readRDS(bot_file)
bot_list <- names(processes)
response$set_body(paste("Running bots:", paste(bot_list, collapse = ", ")))
})
# Start the server
backend <- BackendRserve$new()
backend$start(app, http_port = 8080) It works but I did something wrong; it throws this error from parallel:
Edit: posted my minimal example to stackoverflow for help debugging segfault: https://stackoverflow.com/questions/78873423/r-segfault-parallel-process-with-restrserve-and-mcparallel |
Beta Was this translation helpful? Give feedback.
-
EDIT: the scope of the question changed over time. I found that instead of running everything on the same thread I could spawn off a process for individual long running processes to run independently. The way one could allow for communication between RestRserve and these processes is to write to a separate file/database and periodically check there for status.
See: #216 (comment)
Hello,
I'm working on developing a trading bot in R and I'm encountering some challenges integrating asynchronous timers with the RestRserve API. I'm hoping to replicate functionality similar to what can be achieved in Node.js, but I'm running into some roadblocks.
Desired Functionality
Here's an example of what I'm trying to achieve, demonstrated in Node.js:
R Implementation Attempts
I've tried to implement this functionality in R using RestRserve along with the
async
package and later with thelater
package. Here are my attempts:Using
async
package:Using
later
package:The Challenge
In both implementations, the asynchronous timers don't seem to execute within the RestRserve environment. The API endpoints work, but the scheduled tasks (printing "running" every second) never execute.
My understanding is that RestRserve utilises the parallel package and forking for handling requests in parallel, which prevents blocking when receiving multiple requests. However, this seems to be incompatible with the event loops provided by packages like
async
orlater
.Questions and Advice Needed
async
orlater
) with RestRserve in a way that allows for continuous execution alongside the API?I'm open to any suggestions or advice on how to structure this system in a way that would work efficiently within the RestRserve framework. Thank you in advance for your help!
Additional Context
I recognise that the RestRserve community might not be intimately familiar with the
async
orlater
packages. My primary goal is to understand how RestRserve works and how we can leverage its capabilities to achieve the desired functionality. I'm particularly interested in:setInterval
function in JavaScript).Any insights into RestRserve's architecture and how it might accommodate (or provide alternatives to) the kind of asynchronous, timer-based operations I'm trying to implement would be greatly appreciated. I'm open to restructuring my approach to better align with RestRserve's design philosophy and capabilities.
I originally posted a similar discussion to the
async
package discussion board, but I believe the core of the issue might be more related to RestRserve's architecture.Beta Was this translation helpful? Give feedback.
All reactions