Question: Websocket extra params #388
-
I need pass to handler some extra params - ctl and ppers. Where ctl is aplication control, peers is RwLock'ed HashMap of peers let ws_handler = warp::path("ws") // /ws
.and(warp::ws()) // Handle as websocket
.and(ctl) // extra param
.and(warp::any().map(move || peers.clone())) // extra param database of peers
.map(|ws: warp::ws::Ws, ctl, peers| { // handler
ws.on_upgrade(move |socket| {
ws_attach(socket, ctl, peers) // there is closure, so i can pass sock and extra params
})
}); Now is possible interact with that peer and all others static NEXT_PEER_ID: AtomicUsize = AtomicUsize::new(1);
pub async fn ws_attach(ws: WebSocket, ctl: Ctl, peers: Peers) {
let peer_id = NEXT_PEER_ID.fetch_add(1, Ordering::Relaxed);
trace!("UID={:04X} PEER: ATTACHED", peer_id);
let (peer_ws_tx, mut peer_ws_rx) = ws.split();
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
// Setup sender forwarding
tokio::task::spawn(rx.forward(peer_ws_tx).map(|result| {
if let Err(e) = result {
error!("SEND ERROR: {}", e);
}
}));
// Save peer in hashmap
peers.storage
.write()
.await
.insert(peer_id, Peer { ws: tx});
// Send notification to all peers
ctl.ws_peer_attached().await;
// Handle peer
while let Some(result) = peer_ws_rx.next().await {
// omitted, exit loop on error
}
trace!("PEER DETACHED: UID={:04X}", peer_id);
// Remove peer
peers.storage
.write()
.await
.remove(&peer_id);
} But axum restricts on_ubgrade handler to FnOnce(WebSocket) future, and tracking some common context become more complicated Maybe I missed some points, what is the most correct way to implement a similar approach in axum? P/S |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Does passing things like regular arguments not work. There is an example here. If not then you'll have to share your code. |
Beta Was this translation helpful? Give feedback.
Does passing things like regular arguments not work. There is an example here.
If not then you'll have to share your code.