Skip to content

Commit

Permalink
Merge pull request #7 from tthogho1/websocket_QA
Browse files Browse the repository at this point in the history
add_submodule_link
  • Loading branch information
tthogho1 authored Dec 22, 2024
2 parents e3080ae + 1446f87 commit 2ace58a
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "front"]
path = front
url = https://github.com/tthogho1/reactgooglemap.git
74 changes: 44 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.7.7", features = ["ws"] }
tokio = { version = "1.40.0", features = ["full"] }
axum = { version = "0.7.9", features = ["ws"] }
tokio = { version = "1.42.0", features = ["full"] }
futures = "0.3.31"
uuid = { version = "1.0", features = ["v4"] }
uuid = { version = "1.11.0", features = ["v4"] }
askama = "0.12"
tower-http = { version = "0.6.1", features = ["fs"] }
tower-http = { version = "0.6.2", features = ["fs"] }
dotenv = "0.15.0"
redis = { version = "0.27.3", features = ["tokio-comp"] }
bb8 = "0.8"
bb8-redis = "0.17.0"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
redis = { version = "0.27.6", features = ["tokio-comp"] }
bb8 = "0.9.0"
bb8-redis = "0.18.0"
serde = { version = "1.0.216", features = ["derive"] }
serde_json = "1.0.133"
once_cell = "1.20.2"
mockall = "0.13.0"
mockall = "0.13.1"
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70-slim-buster AS builder
FROM rust:1.75-slim-buster AS builder

WORKDIR /app

Expand Down Expand Up @@ -29,4 +29,5 @@ COPY --from=builder /app/static /usr/local/bin/static

EXPOSE 8000

WORKDIR /usr/local/bin
CMD ["websocket_rust"]
1 change: 1 addition & 0 deletions front
Submodule front added at e7a6e8
73 changes: 70 additions & 3 deletions src/handlers/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,76 @@ use axum::{
use futures::{sink::SinkExt, stream::StreamExt};
use serde::Deserialize;
use std::sync::Arc;
use std::env;

use websocket_rust::AppState;
use websocket_rust::ChatMessage;
use websocket_rust::get_app_state;
use websocket_rust::MessageContent;

use redis::{Client,AsyncCommands};
use crate::handlers::position::delete_user;

use std::time::Duration;
use tokio::time::sleep; //


#[derive(Deserialize)]
pub struct WsParams {
name: String,
}

fn connect_with_retry(client: &redis::Client, max_retries: u32) -> redis::RedisResult<redis::Connection> {
for attempt in 1..=max_retries {
match client.get_connection_with_timeout(Duration::from_secs(10)) {
Ok(con) => return Ok(con),
Err(e) => {
eprintln!("attempt {} faile: {}", attempt, e);
if attempt < max_retries {
let _ = sleep(Duration::from_secs(5));
}
}
}
}
Err(redis::RedisError::from((redis::ErrorKind::IoError, "Over max retry count for redis connection")))
}


pub async fn redis_listener() {
let redis_url = env::var("REDIS_URL").unwrap().to_string();
let state = Arc::clone(get_app_state());

let client = Client::open(redis_url).unwrap();
let mut con = connect_with_retry(&client, 10).unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("my_channel").unwrap();

println!("Listening for messages on channel 'my_channel'");
loop {
match pubsub.get_message() {
Ok(msg) => {
if let Ok(payload) = msg.get_payload::<String>() {
println!("get message from channel '{}': {}", msg.get_channel_name(), payload);

match serde_json::from_str::<ChatMessage>(&payload) {
Ok(chat_message) => {
println!("send message to tx.send {}", payload);
if let Err(e) = state.tx.send(chat_message) {
eprintln!("Failed to send message: {}", e);
}
},
Err(e) => eprintln!("Failed to parse message: {}", e),
}
}
},
Err(e) => {
eprintln!("Failed to get message from channel: {}", e);
// wait to retry
let _ = tokio::time::sleep(Duration::from_secs(3)).await;
},
}
}
}

pub async fn ws_handler(
ws: WebSocketUpgrade,
Query(params): Query<WsParams>,
Expand All @@ -38,12 +94,17 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, name: String) {
let user_id = name;
let user_id_clone = user_id.clone();
let delete_id = user_id.clone();

let app_state = Arc::clone(get_app_state());

let pool = app_state.pool.clone();

// ブロードキャストチャンネルの受信機を取得
let mut rx = state.tx.subscribe();

let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
println!("receive from rx {}", msg.message);

if !msg.to_id.is_empty(){
if msg.to_id == user_id_clone {
println!("{}: {}", msg.user_id, msg.message);
Expand All @@ -54,6 +115,7 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, name: String) {
}
}else{
let json_string = serde_json::to_string(&msg).unwrap();
println!(" send to {}: {}", msg.user_id, msg.message);
let _ = sender
.send(Message::Text(json_string))
.await;
Expand All @@ -68,6 +130,12 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, name: String) {
println!("{}: {}", user_id.clone(), jsontext);
let chat_message: ChatMessage = serde_json::from_str(&jsontext).unwrap();

// let mut con = pool.get().await.unwrap();
// let json_string = serde_json::to_string(&chat_message).unwrap();
// let json_string_clone = json_string.clone();

// let _: () = con.publish("my_channel", json_string).await.unwrap();
// println!("publish message to redis {}", json_string_clone);
let _ = state.tx.send(chat_message).unwrap();
}
});
Expand All @@ -82,7 +150,6 @@ async fn handle_socket(socket: WebSocket, state: Arc<AppState>, name: String) {
println!("WebSocket connection closed {}", delete_id_clone);

// delete user for redis if exists
let app_state = Arc::clone(get_app_state());
let _ = delete_user(State(app_state), delete_id).await.unwrap();

}
Loading

0 comments on commit 2ace58a

Please sign in to comment.