diff --git a/lumni/src/apps/api/error.rs b/lumni/src/apps/api/error.rs index b50e5305..b1a50170 100644 --- a/lumni/src/apps/api/error.rs +++ b/lumni/src/apps/api/error.rs @@ -1,3 +1,4 @@ +use std::error::Error; use std::fmt; use rusqlite::Error as SqliteError; @@ -38,6 +39,7 @@ pub enum ApplicationError { DatabaseError(String), NotImplemented(String), NotReady(String), + CustomError(Box), } #[derive(Debug, Clone)] @@ -132,11 +134,20 @@ impl fmt::Display for ApplicationError { write!(f, "NotImplemented: {}", s) } ApplicationError::NotReady(s) => write!(f, "NotReady: {}", s), + ApplicationError::CustomError(e) => write!(f, "{}", e), } } } -impl std::error::Error for ApplicationError {} +impl std::error::Error for ApplicationError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + ApplicationError::CustomError(e) => Some(e.as_ref()), + // For other variants, we use the default behavior (returning None) + _ => None, + } + } +} impl From for ApplicationError { fn from(error: HttpClientError) -> Self { @@ -192,4 +203,4 @@ impl From for LumniError { fn from(error: std::string::String) -> Self { LumniError::Any(error.to_owned()) } -} \ No newline at end of file +} diff --git a/lumni/src/apps/builtin/llm/prompt/mod.rs b/lumni/src/apps/builtin/llm/prompt/mod.rs index e5900af8..e6d9833e 100644 --- a/lumni/src/apps/builtin/llm/prompt/mod.rs +++ b/lumni/src/apps/builtin/llm/prompt/mod.rs @@ -3,6 +3,7 @@ pub mod src { mod app; mod chat; mod defaults; + mod error; mod handler; mod server; mod tui; diff --git a/lumni/src/apps/builtin/llm/prompt/src/app.rs b/lumni/src/apps/builtin/llm/prompt/src/app.rs index abb518ed..f156ac4d 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/app.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/app.rs @@ -197,8 +197,7 @@ async fn interactive_mode( prompt_instruction: PromptInstruction, db_conn: Arc, ) -> Result<(), ApplicationError> { - let app = - App::new(prompt_instruction, Arc::clone(&db_conn)).await?; + let app = App::new(prompt_instruction, Arc::clone(&db_conn)).await?; let mut stdout = io::stdout().lock(); // Enable raw mode and setup the screen diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/instruction.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/instruction.rs index 25663426..02f5434d 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/instruction.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/instruction.rs @@ -6,7 +6,8 @@ use super::db::{ }; use super::prepare::NewConversation; use super::{ - ChatCompletionOptions, ChatMessage, ColorScheme, PromptRole, TextLine, + ChatCompletionOptions, ChatMessage, ColorScheme, PromptError, PromptRole, + TextLine, }; pub use crate::external as lumni; @@ -352,8 +353,11 @@ impl PromptInstruction { &mut self, question: &str, max_token_length: usize, - ) -> Result, ApplicationError> { - let timestamp = Timestamp::from_system_time()?.as_millis(); + ) -> Result, PromptError> { + let timestamp = Timestamp::from_system_time() + .map_err(|e| PromptError::Runtime(e.to_string()))? + .as_millis(); + let message = Message { id: self.cache.new_message_id(), conversation_id: self.cache.get_conversation_id(), diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/mod.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/mod.rs index 0bfef94f..b67bd00d 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/mod.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/conversation/mod.rs @@ -8,8 +8,8 @@ pub use prepare::NewConversation; pub use super::db; use super::{ - ChatCompletionOptions, ChatMessage, ColorScheme, PromptRole, TextLine, - TextSegment, + ChatCompletionOptions, ChatMessage, ColorScheme, PromptError, PromptRole, + TextLine, TextSegment, }; #[derive(Debug, Clone, PartialEq, Copy)] diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/db/connector.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/db/connector.rs index 69c86ec8..1f91b9bc 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/db/connector.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/db/connector.rs @@ -29,7 +29,7 @@ impl DatabaseConnector { PRAGMA temp_store = MEMORY; PRAGMA busy_timeout = 5000; PRAGMA wal_autocheckpoint = 1000; - PRAGMA journal_size_limit = 67108864;" + PRAGMA journal_size_limit = 67108864;", )?; let operation_queue = Arc::new(Mutex::new(VecDeque::new())); diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/mod.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/mod.rs index 99643297..5bcaf8b7 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/mod.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/mod.rs @@ -12,6 +12,7 @@ pub use prompt::{AssistantManager, PromptRole}; pub use session::{prompt_app, App, ChatEvent, ThreadedChatSession}; pub use super::defaults::*; +pub use super::error::{PromptError, PromptNotReadyReason}; pub use super::server::{CompletionResponse, ModelServer, ServerManager}; use super::tui::{ draw_ui, AppUi, ColorScheme, ColorSchemeType, CommandLineAction, diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/session/chat_session_manager.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/session/chat_session_manager.rs index 8de035cd..39fe6ea1 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/session/chat_session_manager.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/session/chat_session_manager.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use uuid::Uuid; use lumni::api::error::ApplicationError; +use uuid::Uuid; use super::db::{ConversationDatabase, ConversationId}; use super::threaded_chat_session::ThreadedChatSession; @@ -41,32 +41,40 @@ impl ChatSessionManager { .model_server .as_ref() .map(|s| s.to_string()); - + let initial_session = ThreadedChatSession::new( initial_prompt_instruction, db_conn.clone(), ); - + let mut sessions = HashMap::new(); sessions.insert(session_id, initial_session); - + Self { sessions, - active_session_info: SessionInfo { - id: session_id, - conversation_id, - server_name + active_session_info: SessionInfo { + id: session_id, + conversation_id, + server_name, }, } } - pub fn get_active_session(&mut self) -> Result<&mut ThreadedChatSession, ApplicationError> { - self.sessions.get_mut(&self.active_session_info.id).ok_or_else(|| - ApplicationError::Runtime("Active session not found".to_string()) - ) + pub fn get_active_session( + &mut self, + ) -> Result<&mut ThreadedChatSession, ApplicationError> { + self.sessions + .get_mut(&self.active_session_info.id) + .ok_or_else(|| { + ApplicationError::Runtime( + "Active session not found".to_string(), + ) + }) } - pub fn get_conversation_id_for_active_session(&self) -> Option { + pub fn get_conversation_id_for_active_session( + &self, + ) -> Option { self.active_session_info.conversation_id } @@ -82,7 +90,9 @@ impl ChatSessionManager { session.stop(); Ok(()) } else { - Err(ApplicationError::InvalidInput("Session not found".to_string())) + Err(ApplicationError::InvalidInput( + "Session not found".to_string(), + )) } } @@ -115,7 +125,7 @@ impl ChatSessionManager { .model_server .as_ref() .map(|s| s.to_string()); - + self.active_session_info = SessionInfo { id, conversation_id, @@ -123,16 +133,22 @@ impl ChatSessionManager { }; Ok(()) } else { - Err(ApplicationError::InvalidInput("Session not found".to_string())) + Err(ApplicationError::InvalidInput( + "Session not found".to_string(), + )) } } pub fn stop_active_chat_session(&mut self) -> Result<(), ApplicationError> { - if let Some(session) = self.sessions.get_mut(&self.active_session_info.id) { + if let Some(session) = + self.sessions.get_mut(&self.active_session_info.id) + { session.stop(); Ok(()) } else { - Err(ApplicationError::Runtime("Active session not found".to_string())) + Err(ApplicationError::Runtime( + "Active session not found".to_string(), + )) } } } diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/session/conversation_loop.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/session/conversation_loop.rs index 0f756b16..e035f3ae 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/session/conversation_loop.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/session/conversation_loop.rs @@ -32,8 +32,7 @@ pub async fn prompt_app( let mut key_event_handler = KeyEventHandler::new(); let mut redraw_ui = true; let conversation_id = app.get_conversation_id_for_active_session(); - let mut db_handler = - db_conn.get_conversation_handler(conversation_id); + let mut db_handler = db_conn.get_conversation_handler(conversation_id); loop { tokio::select! { @@ -99,7 +98,11 @@ async fn handle_tick( ) .await?; } - Event::Mouse(mouse_event) => handle_mouse_event(app, mouse_event), + Event::Mouse(mouse_event) => { + if !handle_mouse_event(app, mouse_event) { + return Ok(()); // skip redraw_ui if mouse event was not handled + } + } _ => {} } *redraw_ui = true; @@ -150,14 +153,19 @@ async fn handle_key_event( Ok(()) } -fn handle_mouse_event(app: &mut App, mouse_event: MouseEvent) { +fn handle_mouse_event(app: &mut App, mouse_event: MouseEvent) -> bool { + // handle mouse events in response window + // return true if event was handled let window = &mut app.ui.response; match mouse_event.kind { MouseEventKind::ScrollUp => window.scroll_up(), MouseEventKind::ScrollDown => window.scroll_down(), - MouseEventKind::Down(_) => {} - _ => {} + _ => { + // ignore other mouse events + return false; + } } + true // handled mouse event } async fn handle_prompt_action( @@ -264,7 +272,6 @@ async fn send_prompt<'a>( .get_active_session()? .message(&formatted_prompt) .await; - match result { Ok(_) => { // clear prompt @@ -277,9 +284,9 @@ async fn send_prompt<'a>( app.ui.set_primary_window(WindowKind::ResponseWindow); app.ui.response.text_append("\n", Some(Style::reset()))?; } - Err(e) => { - log::error!("Error sending message: {:?}", e); - app.ui.command_line.set_alert(&e.to_string())?; + Err(prompt_error) => { + // show error in alert window + app.ui.command_line.set_alert(&prompt_error.to_string())?; } } Ok(()) diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/session/mod.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/session/mod.rs index ef4779ab..405a52c7 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/session/mod.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/session/mod.rs @@ -17,8 +17,9 @@ use super::db::{ConversationDatabase, ConversationId}; use super::{ db, draw_ui, AppUi, ColorScheme, ColorSchemeType, CommandLineAction, CompletionResponse, ConversationEvent, KeyEventHandler, ModalWindowType, - ModelServer, PromptAction, PromptInstruction, ServerManager, - TextWindowTrait, WindowEvent, WindowKind, + ModelServer, PromptAction, PromptError, PromptInstruction, + PromptNotReadyReason, ServerManager, TextWindowTrait, WindowEvent, + WindowKind, }; pub use crate::external as lumni; @@ -93,11 +94,15 @@ impl App<'_> { Ok(()) } - pub fn get_conversation_id_for_active_session(&self) -> Option { + pub fn get_conversation_id_for_active_session( + &self, + ) -> Option { self.chat_manager.get_conversation_id_for_active_session() } - pub async fn stop_active_chat_session(&mut self) -> Result<(), ApplicationError> { + pub async fn stop_active_chat_session( + &mut self, + ) -> Result<(), ApplicationError> { self.chat_manager.stop_active_chat_session() } diff --git a/lumni/src/apps/builtin/llm/prompt/src/chat/session/threaded_chat_session.rs b/lumni/src/apps/builtin/llm/prompt/src/chat/session/threaded_chat_session.rs index 0a202c61..be890da4 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/chat/session/threaded_chat_session.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/chat/session/threaded_chat_session.rs @@ -6,7 +6,8 @@ use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use super::chat_session_manager::ChatEvent; use super::db::{ConversationDatabase, ConversationDbHandler}; use super::{ - CompletionResponse, ModelServer, PromptInstruction, ServerManager, + CompletionResponse, ModelServer, PromptError, PromptInstruction, + PromptNotReadyReason, ServerManager, }; use crate::api::error::ApplicationError; @@ -73,7 +74,7 @@ struct ThreadedChatSessionInner { #[derive(Debug)] enum ThreadedChatSessionCommand { - Message(String), + Message(String, oneshot::Sender>), LoadInstruction(PromptInstruction), GetInstruction( oneshot::Sender>, @@ -126,10 +127,9 @@ impl ThreadedChatSession { Some(command) = command_receiver.recv() => { let mut locked_inner = inner.lock().await; match command { - ThreadedChatSessionCommand::Message(question) => { - if let Err(e) = locked_inner.handle_message(&question, &db_handler).await { - log::error!("Error processing message: {:?}", e); - } + ThreadedChatSessionCommand::Message(question, response_sender) => { + let result = locked_inner.handle_message(&question, &db_handler).await; + let _ = response_sender.send(result); } ThreadedChatSessionCommand::LoadInstruction(prompt_instruction) => { if let Err(e) = locked_inner.load_instruction(prompt_instruction).await { @@ -167,19 +167,25 @@ impl ThreadedChatSession { } } - pub async fn message( - &self, - question: &str, - ) -> Result<(), ApplicationError> { + pub async fn message(&self, question: &str) -> Result<(), PromptError> { + let (response_sender, response_receiver) = oneshot::channel(); + self.command_sender - .send(ThreadedChatSessionCommand::Message(question.to_string())) + .send(ThreadedChatSessionCommand::Message( + question.to_string(), + response_sender, + )) .await .map_err(|e| { - ApplicationError::Runtime(format!( - "Failed to send message: {}", - e - )) - }) + PromptError::Runtime(format!("Failed to send message: {}", e)) + })?; + + response_receiver.await.map_err(|e| { + PromptError::Runtime(format!( + "Failed to receive message response: {}", + e + )) + })? } pub fn stop(&self) { @@ -237,12 +243,16 @@ impl ThreadedChatSessionInner { &mut self, question: &str, db_handler: &ConversationDbHandler, - ) -> Result<(), ApplicationError> { + ) -> Result<(), PromptError> { if self.model_server_session.server.is_none() { self.model_server_session .initialize_model_server(&self.prompt_instruction, db_handler) .await - .map_err(|e| ApplicationError::NotReady(e.to_string()))?; + .map_err(|e| { + PromptError::NotReady(PromptNotReadyReason::Other( + e.to_string(), + )) + })?; } let model = @@ -250,26 +260,25 @@ impl ThreadedChatSessionInner { .get_model() .cloned() .ok_or_else(|| { - ApplicationError::NotReady( - "Model not available".to_string(), - ) + PromptError::NotReady(PromptNotReadyReason::NoModelSelected) })?; let user_question = self.initiate_new_exchange(question).await?; let server = self.model_server_session.server.as_mut().ok_or_else(|| { - ApplicationError::NotReady("Server not initialized".to_string()) + PromptError::NotReady(PromptNotReadyReason::Other( + "Server not initialized".to_string(), + )) })?; let max_token_length = server.get_max_context_size().await.map_err(|e| { - ApplicationError::ServerConfigurationError(e.to_string()) + PromptError::ServerConfigurationError(e.to_string()) })?; let messages = self .prompt_instruction - .new_question(&user_question, max_token_length) - .map_err(|e| ApplicationError::InvalidInput(e.to_string()))?; + .new_question(&user_question, max_token_length)?; let (cancel_tx, cancel_rx) = oneshot::channel(); self.model_server_session.cancel_tx = Some(cancel_tx); @@ -282,8 +291,7 @@ impl ThreadedChatSessionInner { Some(cancel_rx), ) .await - .map_err(|e| ApplicationError::Runtime(e.to_string()))?; - + .map_err(|e| PromptError::Runtime(e.to_string()))?; Ok(()) } @@ -299,7 +307,7 @@ impl ThreadedChatSessionInner { async fn initiate_new_exchange( &self, user_question: &str, - ) -> Result { + ) -> Result { let user_question = user_question.trim(); Ok(if user_question.is_empty() { "continue".to_string() @@ -345,11 +353,10 @@ impl ThreadedChatSessionInner { ) -> Result<(), ApplicationError> { if let Some(last_answer) = self.prompt_instruction.get_last_response() { let trimmed_answer = last_answer.trim(); - _ = self.prompt_instruction.put_last_response( - trimmed_answer, - tokens_predicted, - db_handler, - ).await; + _ = self + .prompt_instruction + .put_last_response(trimmed_answer, tokens_predicted, db_handler) + .await; } Ok(()) } diff --git a/lumni/src/apps/builtin/llm/prompt/src/error.rs b/lumni/src/apps/builtin/llm/prompt/src/error.rs new file mode 100644 index 00000000..607a232d --- /dev/null +++ b/lumni/src/apps/builtin/llm/prompt/src/error.rs @@ -0,0 +1,64 @@ +use std::error::Error; +use std::fmt; + +use lumni::api::error::ApplicationError; + +pub use crate::external as lumni; + +#[derive(Debug)] +pub enum PromptError { + NotReady(PromptNotReadyReason), + ServerConfigurationError(String), + Runtime(String), +} + +#[derive(Debug)] +pub enum PromptNotReadyReason { + NoModelSelected, + Other(String), +} + +impl fmt::Display for PromptNotReadyReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PromptNotReadyReason::NoModelSelected => { + write!(f, "NoModelSelected") + } + PromptNotReadyReason::Other(msg) => { + write!(f, "{}", msg) + } + } + } +} + +impl fmt::Display for PromptError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PromptError::NotReady(reason) => { + write!(f, "{}", reason) + } + PromptError::ServerConfigurationError(msg) => { + write!(f, "{}", msg) + } + PromptError::Runtime(msg) => { + write!(f, "{}", msg) + } + } + } +} + +impl Error for PromptError {} + +impl From for ApplicationError { + fn from(error: PromptError) -> Self { + match error { + PromptError::NotReady(msg) => { + ApplicationError::NotReady(msg.to_string()) + } + PromptError::ServerConfigurationError(msg) => { + ApplicationError::ServerConfigurationError(msg) + } + PromptError::Runtime(msg) => ApplicationError::Runtime(msg), + } + } +} diff --git a/lumni/src/apps/builtin/llm/prompt/src/server/send.rs b/lumni/src/apps/builtin/llm/prompt/src/server/send.rs index 2f839f1a..33b45d69 100644 --- a/lumni/src/apps/builtin/llm/prompt/src/server/send.rs +++ b/lumni/src/apps/builtin/llm/prompt/src/server/send.rs @@ -52,14 +52,16 @@ pub async fn http_get_with_response( "application/json".to_string(), )]); let (tx, mut rx) = mpsc::channel(1); - + // Spawn a task to handle the HTTP request let request_task = tokio::spawn(async move { - http_client.get(&url, Some(&header), None, Some(tx), None).await + http_client + .get(&url, Some(&header), None, Some(tx), None) + .await }); let mut response_bytes = BytesMut::new(); - + // Receive chunks from the channel while let Some(response) = rx.recv().await { response_bytes.extend_from_slice(&response); @@ -86,21 +88,23 @@ pub async fn http_post_with_response( )]); let (tx, mut rx) = mpsc::channel(1); let payload_bytes = Bytes::from(payload.into_bytes()); - + // Spawn a task to handle the HTTP request let request_task = tokio::spawn(async move { - http_client.post( - &url, - Some(&headers), - None, - Some(&payload_bytes), - Some(tx), - None, - ).await + http_client + .post( + &url, + Some(&headers), + None, + Some(&payload_bytes), + Some(tx), + None, + ) + .await }); let mut response_bytes = BytesMut::new(); - + // Receive chunks from the channel while let Some(response) = rx.recv().await { response_bytes.extend_from_slice(&response); diff --git a/lumni/src/http/client.rs b/lumni/src/http/client.rs index ef0ace50..2f0bdc0c 100644 --- a/lumni/src/http/client.rs +++ b/lumni/src/http/client.rs @@ -8,7 +8,6 @@ use std::time::Duration; use anyhow::{anyhow, Error as AnyhowError, Result}; use bytes::{Bytes, BytesMut}; -use tokio::time::timeout; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty, Full}; use hyper::header::{HeaderName, HeaderValue}; @@ -20,6 +19,7 @@ use hyper_util::rt::TokioExecutor; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use serde::de::DeserializeOwned; use tokio::sync::{mpsc, oneshot}; +use tokio::time::timeout; #[derive(Debug)] pub struct HttpClientResponse { @@ -195,8 +195,10 @@ impl HttpClient { status_code: response.status().as_u16(), headers: response.headers().clone(), }; - return Err(error_handler - .handle_error(http_client_response, canonical_reason)); + return Err(error_handler.handle_error( + http_client_response, + canonical_reason, + )); } return Err(HttpClientError::HttpError( response.status().as_u16(), @@ -255,7 +257,7 @@ impl HttpClient { status_code, headers, }) - }, + } Err(_) => Err(HttpClientError::Timeout), } } diff --git a/lumni/src/table/columns.rs b/lumni/src/table/columns.rs index e962253b..bd5193ff 100644 --- a/lumni/src/table/columns.rs +++ b/lumni/src/table/columns.rs @@ -115,7 +115,7 @@ impl TableColumnValue { // Match any None variant for Optional types _ => { log::error!("Unexpected TableColumnValue: {:?}", self); - "NULL".to_string() + "NULL".to_string() } } } diff --git a/lumni/src/table/file_object.rs b/lumni/src/table/file_object.rs index 66ba733a..c9c93194 100644 --- a/lumni/src/table/file_object.rs +++ b/lumni/src/table/file_object.rs @@ -2,11 +2,12 @@ use core::{fmt, panic}; use std::collections::HashMap; use std::sync::Arc; -use crate::table::{ - OptionalInt64Column, StringColumn, TableRow, Uint64Column, -}; +use crate::table::{OptionalInt64Column, StringColumn, TableRow, Uint64Column}; use crate::utils::formatters::{bytes_human_readable, time_human_readable}; -use crate::{FileObject, InternalError, Table, TableCallback, TableColumn, TableColumnValue}; +use crate::{ + FileObject, InternalError, Table, TableCallback, TableColumn, + TableColumnValue, +}; pub struct FileObjectTable { columns: Vec<(String, Box)>, // Store columns in order @@ -207,7 +208,6 @@ impl FileObjectTable { &mut self, rows: Vec>, ) -> Result<(), InternalError> { - for row_data in rows.iter() { let mut row_vec: Vec<(String, TableColumnValue)> = Vec::new(); // Iterate over self.columns to maintain the defined order