Skip to content

Commit

Permalink
sip-core: create TsxRegistration for every new request outside of a t…
Browse files Browse the repository at this point in the history
…ransaction
  • Loading branch information
kbalt committed Jul 1, 2024
1 parent 7abe14d commit cd1625d
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 74 deletions.
67 changes: 38 additions & 29 deletions crates/sip-core/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ impl Endpoint {

/// Create a [`ServerTsx`] from an [`IncomingRequest`]. The returned transaction
/// can be used to form and send responses to the request.
pub fn create_server_tsx(&self, request: &IncomingRequest) -> ServerTsx {
ServerTsx::new(self.clone(), request)
pub fn create_server_tsx(&self, request: &mut IncomingRequest) -> ServerTsx {
ServerTsx::new(request)
}

/// Create a [`ServerInvTsx`] from an INVITE [`IncomingRequest`]. The returned transaction
/// can be used to form and send responses to the request.
pub fn create_server_inv_tsx(&self, request: &IncomingRequest) -> ServerInvTsx {
ServerInvTsx::new(self.clone(), request)
pub fn create_server_inv_tsx(&self, request: &mut IncomingRequest) -> ServerInvTsx {
ServerInvTsx::new(request)
}

/// Returns all ALLOW headers this endpoint supports
Expand Down Expand Up @@ -360,32 +360,40 @@ impl Endpoint {
}
};

let mut tsx = None;

// Try to find a transaction that might be able to handle the message
if let Some(handler) = self.transactions().get_handler(&tsx_key) {
let tsx_message = TsxMessage {
tp_info: message.tp_info,
line: message.line,
base_headers,
headers: message.headers,
body: message.body,
};
match self.transactions().get_handler(&self, &tsx_key) {
Ok(handler) => {
let tsx_message = TsxMessage {
tp_info: message.tp_info,
line: message.line,
base_headers,
headers: message.headers,
body: message.body,
};

log::debug!("delegating message to transaction {}", tsx_key);
log::debug!("delegating message to transaction {}", tsx_key);

if let Some(rejected_tsx_message) = handler(tsx_message) {
log::trace!("transaction {} rejected message", tsx_key);
if let Some(rejected_tsx_message) = handler(tsx_message) {
log::trace!("transaction {} rejected message", tsx_key);

// TsxMessage was rejected, restore previous state
base_headers = rejected_tsx_message.base_headers;
message = ReceivedMessage {
tp_info: rejected_tsx_message.tp_info,
line: rejected_tsx_message.line,
headers: rejected_tsx_message.headers,
body: rejected_tsx_message.body,
};
} else {
// Handled
return;
// TsxMessage was rejected, restore previous state
base_headers = rejected_tsx_message.base_headers;
message = ReceivedMessage {
tp_info: rejected_tsx_message.tp_info,
line: rejected_tsx_message.line,
headers: rejected_tsx_message.headers,
body: rejected_tsx_message.body,
};
} else {
// Handled
return;
}
}
Err(registration) => {
log::debug!("no transaction for {tsx_key} found, created registration");
tsx = Some(registration);
}
}

Expand All @@ -401,6 +409,7 @@ impl Endpoint {

let incoming = IncomingRequest {
tp_info: message.tp_info,
tsx,
line,
base_headers,
headers: message.headers,
Expand Down Expand Up @@ -433,7 +442,7 @@ impl Endpoint {
}
}

async fn handle_unwanted_request(&self, request: IncomingRequest) -> Result<()> {
async fn handle_unwanted_request(&self, mut request: IncomingRequest) -> Result<()> {
if request.line.method == Method::ACK {
// Cannot respond to unhandled ACK requests
return Ok(());
Expand All @@ -443,11 +452,11 @@ impl Endpoint {
self.create_response(&request, Code::CALL_OR_TRANSACTION_DOES_NOT_EXIST, None);

if request.line.method == Method::INVITE {
let tsx = self.create_server_inv_tsx(&request);
let tsx = self.create_server_inv_tsx(&mut request);

tsx.respond_failure(response).await
} else {
let tsx = self.create_server_tsx(&request);
let tsx = self.create_server_tsx(&mut request);

tsx.respond(response).await
}
Expand Down
14 changes: 13 additions & 1 deletion crates/sip-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sip_types::print::AppendCtx;
use sip_types::uri::Uri;
use sip_types::{Headers, Method, Name};
use std::fmt;
use transaction::TsxKey;
use transaction::{TsxKey, TsxRegistration};
use transport::MessageTpInfo;

#[macro_use]
Expand Down Expand Up @@ -103,6 +103,7 @@ impl BaseHeaders {
pub struct IncomingRequest {
pub tp_info: MessageTpInfo,
pub tsx_key: TsxKey,
tsx: Option<TsxRegistration>,

pub line: RequestLine,
pub base_headers: BaseHeaders,
Expand All @@ -116,6 +117,17 @@ impl fmt::Display for IncomingRequest {
}
}

impl IncomingRequest {
#[track_caller]
fn take_tsx_registration(&mut self) -> TsxRegistration {
let Some(tsx) = self.tsx.take() else {
panic!("Tried to create transaction for {:?}, which is an already handled message or isn't a transaction creating request", self.tsx_key);
};

tsx
}
}

/// Layers are extensions to the endpoint.
///
/// They can be added to the endpoint in the building stage bay calling
Expand Down
52 changes: 37 additions & 15 deletions crates/sip-core/src/transaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::transport::MessageTpInfo;
use crate::BaseHeaders;
use crate::{BaseHeaders, Endpoint};
use bytes::Bytes;
use bytesstr::BytesStr;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use registration::TsxRegistration;
use parking_lot::lock_api::MutexGuard;
use parking_lot::{MappedMutexGuard, Mutex};
use sip_types::msg::{MessageLine, StatusLine};
use sip_types::Headers;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use tokio::sync::mpsc;

mod client;
mod client_inv;
Expand All @@ -32,34 +33,55 @@ pub use key::TsxKey;
pub use server::ServerTsx;
pub use server_inv::{Accepted, ServerInvTsx};

pub(crate) use registration::TsxRegistration;

pub(crate) type TsxHandler = Box<dyn Fn(TsxMessage) -> Option<TsxMessage> + Send + Sync>;

#[derive(Default)]
pub(crate) struct Transactions {
map: RwLock<HashMap<TsxKey, TsxHandler>>,
map: Mutex<HashMap<TsxKey, TsxHandler>>,
}

impl Transactions {
pub fn get_handler<'a: 'k, 'k>(
pub(crate) fn get_handler<'a: 'k, 'k>(
&'a self,
endoint: &Endpoint,
tsx_key: &TsxKey,
) -> Option<MappedRwLockReadGuard<'a, TsxHandler>> {
let map = self.map.read();
RwLockReadGuard::try_map(map, |map| map.get(tsx_key)).ok()
) -> Result<MappedMutexGuard<'a, TsxHandler>, TsxRegistration> {
let map = self.map.lock();

let mut map = match MutexGuard::try_map(map, |map| map.get_mut(tsx_key)) {
Ok(handler) => return Ok(handler),
Err(map) => map,
};

let (sender, receiver) = mpsc::unbounded_channel();

map.insert(
tsx_key.clone(),
Box::new(move |msg| sender.send(msg).map_err(|e| e.0).err()),
);

Err(TsxRegistration {
endpoint: endoint.clone(),
tsx_key: tsx_key.clone(),
receiver,
})
}

pub fn register_transaction(&self, key: TsxKey, handler: TsxHandler) {
let mut map = self.map.write();
pub(crate) fn register_transaction(&self, key: TsxKey, handler: TsxHandler) {
let mut map = self.map.lock();

match map.entry(key) {
// See https://github.com/kbalt/ezk/issues/16
Entry::Occupied(e) => panic!("Tried to create a second transaction for {:?}. This can happen if a retransmission of message is received before creating a transaction for the original one.", e.key()),
Entry::Vacant(e) => { e.insert(handler); },
Entry::Occupied(e) => panic!("Tried to create a second transaction for {:?}", e.key()),
Entry::Vacant(e) => {
e.insert(handler);
}
}
}

pub fn remove_transaction(&self, key: &TsxKey) {
self.map.write().remove(key);
pub(crate) fn remove_transaction(&self, key: &TsxKey) {
self.map.lock().remove(key);
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/sip-core/src/transaction/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use tokio::sync::mpsc;
/// transactional messages from it
#[derive(Debug)]
pub(crate) struct TsxRegistration {
pub endpoint: Endpoint,
pub tsx_key: TsxKey,
pub(crate) endpoint: Endpoint,
pub(crate) tsx_key: TsxKey,

receiver: mpsc::UnboundedReceiver<TsxMessage>,
pub(super) receiver: mpsc::UnboundedReceiver<TsxMessage>,
}

impl TsxRegistration {
Expand All @@ -41,7 +41,7 @@ impl TsxRegistration {
F: Fn(&TsxMessage) -> bool + Send + Sync + 'static,
{
let transactions = self.endpoint.transactions();
let mut tsx_map = transactions.map.write();
let mut tsx_map = transactions.map.lock();
let handler = tsx_map
.get_mut(&self.tsx_key)
.expect("registration is responsible of handler lifetime inside endpoint");
Expand Down
10 changes: 5 additions & 5 deletions crates/sip-core/src/transaction/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::consts::T1;
use super::TsxRegistration;
use crate::transport::OutgoingResponse;
use crate::{Endpoint, IncomingRequest, Result};
use crate::{IncomingRequest, Result};
use sip_types::{CodeKind, Method};
use std::time::Instant;
use tokio::time::timeout_at;
Expand All @@ -19,16 +19,16 @@ pub struct ServerTsx {

impl ServerTsx {
/// Internal: Used by [Endpoint::create_server_tsx]
pub(crate) fn new(endpoint: Endpoint, request: &IncomingRequest) -> Self {
pub(crate) fn new(request: &mut IncomingRequest) -> Self {
assert!(
!matches!(request.line.method, Method::INVITE | Method::ACK),
"tried to create server transaction from {} request",
request.line.method
);

let registration = TsxRegistration::create(endpoint, request.tsx_key.clone());

Self { registration }
Self {
registration: request.take_tsx_registration(),
}
}

/// Respond with a provisional response (1XX)
Expand Down
10 changes: 5 additions & 5 deletions crates/sip-core/src/transaction/server_inv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::Error;
use crate::transaction::consts::{T1, T2};
use crate::transaction::TsxRegistration;
use crate::transport::OutgoingResponse;
use crate::{Endpoint, IncomingRequest, Result};
use crate::{IncomingRequest, Result};
use sip_types::msg::MessageLine;
use sip_types::{CodeKind, Method};
use std::io;
Expand All @@ -23,17 +23,17 @@ pub struct ServerInvTsx {

impl ServerInvTsx {
/// Internal: Used by [Endpoint::create_server_inv_tsx]
pub(crate) fn new(endpoint: Endpoint, request: &IncomingRequest) -> Self {
pub(crate) fn new(request: &mut IncomingRequest) -> Self {
assert_eq!(
request.line.method,
Method::INVITE,
"tried to create invite transaction from {} request",
request.line.method
);

let registration = TsxRegistration::create(endpoint, request.tsx_key.clone());

Self { registration }
Self {
registration: request.take_tsx_registration(),
}
}

/// Respond with a provisional response (1XX)
Expand Down
6 changes: 3 additions & 3 deletions crates/sip-ua/src/dialog/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl DialogLayer {
async fn handle_unwanted_request(
&self,
endpoint: &Endpoint,
request: IncomingRequest,
mut request: IncomingRequest,
) -> Result<()> {
if request.line.method == Method::ACK {
// Cannot respond to ACK request
Expand All @@ -161,11 +161,11 @@ impl DialogLayer {
let response = endpoint.create_response(&request, Code::NOT_FOUND, None);

if request.line.method == Method::INVITE {
let tsx = endpoint.create_server_inv_tsx(&request);
let tsx = endpoint.create_server_inv_tsx(&mut request);

tsx.respond_failure(response).await
} else {
let tsx = endpoint.create_server_tsx(&request);
let tsx = endpoint.create_server_tsx(&mut request);

tsx.respond(response).await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/sip-ua/src/invite/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Acceptor {
pub fn new(
dialog: Dialog,
invite_layer: LayerKey<InviteLayer>,
invite: IncomingRequest,
mut invite: IncomingRequest,
) -> Result<Self> {
assert_eq!(
invite.line.method,
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Acceptor {
let dialog_layer = dialog.dialog_layer;

// Create Inner shared state
let tsx = endpoint.create_server_inv_tsx(&invite);
let tsx = endpoint.create_server_inv_tsx(&mut invite);
let inner = Arc::new(Inner {
invite_layer,
state: Mutex::new(InviteSessionState::UasProvisional {
Expand Down
8 changes: 4 additions & 4 deletions crates/sip-ua/src/invite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ impl InviteLayer {
// Transaction found but completed: respond 200 to cancel
// No matching transaction: don't handle it, endpoint will respond accordingly
if let Some(inner) = inner {
let cancel = cancel.take();
let cancel_tsx = endpoint.create_server_tsx(&cancel);
let mut cancel = cancel.take();
let cancel_tsx = endpoint.create_server_tsx(&mut cancel);

if let Some((dialog, invite_tsx, invite)) = inner.state.lock().await.set_cancelled() {
let invite_response =
Expand Down Expand Up @@ -303,10 +303,10 @@ impl InviteUsage {
dialog: Dialog,
invite_tsx: ServerInvTsx,
invite: IncomingRequest,
bye: IncomingRequest,
mut bye: IncomingRequest,
) -> Result<()> {
let bye_response = dialog.create_response(&invite, Code::OK, None)?;
let bye_tsx = endpoint.create_server_tsx(&bye);
let bye_tsx = endpoint.create_server_tsx(&mut bye);

let invite_response = dialog.create_response(&invite, Code::REQUEST_TERMINATED, None)?;

Expand Down
Loading

0 comments on commit cd1625d

Please sign in to comment.