From 77938286a13f7e429c57d54dc75475e8339aa90b Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 18:55:53 +0300 Subject: [PATCH 1/6] expand control-api --- e2e-demo/js/index.js | 12 +- e2e/tests/world/mod.rs | 2 + mock/control-api/src/api/member.rs | 12 ++ mock/control-api/src/callback/mod.rs | 149 ++++++++++++++++++ proto/control-api/src/callback.rs | 110 +++++++++++++ proto/control-api/src/control/member.rs | 12 ++ proto/control-api/src/grpc/api.proto | 18 ++- proto/control-api/src/grpc/api.rs | 22 ++- proto/control-api/src/grpc/callback.proto | 58 +++++++ proto/control-api/src/grpc/callback.rs | 144 ++++++++++++++++- proto/control-api/src/grpc/convert/api.rs | 18 +++ .../control-api/src/grpc/convert/callback.rs | 130 ++++++++++++++- 12 files changed, 668 insertions(+), 19 deletions(-) diff --git a/e2e-demo/js/index.js b/e2e-demo/js/index.js index c018835d8..5cb1910ea 100644 --- a/e2e-demo/js/index.js +++ b/e2e-demo/js/index.js @@ -70,7 +70,9 @@ async function createRoom(roomId, memberId) { credentials: { plain: 'test' }, pipeline: pipeline, on_join: 'grpc://127.0.0.1:9099', - on_leave: 'grpc://127.0.0.1:9099' + on_leave: 'grpc://127.0.0.1:9099', + on_start: 'grpc://127.0.0.1:9099', + on_stop: 'grpc://127.0.0.1:9099' } } } @@ -139,7 +141,9 @@ async function createMember(roomId, memberId) { credentials: { plain: 'test' }, pipeline: pipeline, on_join: 'grpc://127.0.0.1:9099', - on_leave: 'grpc://127.0.0.1:9099' + on_leave: 'grpc://127.0.0.1:9099', + on_start: 'grpc://127.0.0.1:9099', + on_stop: 'grpc://127.0.0.1:9099' } }); } else { @@ -154,7 +158,9 @@ async function createMember(roomId, memberId) { credentials: { plain: 'test' }, pipeline: pipeline, on_join: 'grpc://127.0.0.1:9099', - on_leave: 'grpc://127.0.0.1:9099' + on_leave: 'grpc://127.0.0.1:9099', + on_start: 'grpc://127.0.0.1:9099', + on_stop: 'grpc://127.0.0.1:9099' } } } diff --git a/e2e/tests/world/mod.rs b/e2e/tests/world/mod.rs index b7878f0e9..b370e5e1d 100644 --- a/e2e/tests/world/mod.rs +++ b/e2e/tests/world/mod.rs @@ -204,6 +204,8 @@ impl World { )), on_join: Some("grpc://127.0.0.1:9099".to_owned()), on_leave: Some("grpc://127.0.0.1:9099".to_owned()), + on_start: Some("grpc://127.0.0.1:9099".to_owned()), + on_stop: Some("grpc://127.0.0.1:9099".to_owned()), idle_timeout: None, reconnect_timeout: None, ping_interval: None, diff --git a/mock/control-api/src/api/member.rs b/mock/control-api/src/api/member.rs index fae9698f8..9df424aa4 100644 --- a/mock/control-api/src/api/member.rs +++ b/mock/control-api/src/api/member.rs @@ -34,6 +34,14 @@ pub struct Member { #[serde(skip_serializing_if = "Option::is_none")] pub on_leave: Option, + /// URL to which `OnStart` Control API callback will be sent. + #[serde(skip_serializing_if = "Option::is_none")] + pub on_start: Option, + + /// URL to which `OnStop` Control API callback will be sent. + #[serde(skip_serializing_if = "Option::is_none")] + pub on_stop: Option, + /// Timeout of receiving heartbeat messages from this [`Member`] via Client /// API. Once reached, the [`Member`] is considered being idle. #[serde(default, with = "humantime_serde")] @@ -69,6 +77,8 @@ impl Member { credentials: self.credentials.map(Into::into), on_join: self.on_join.unwrap_or_default(), on_leave: self.on_leave.unwrap_or_default(), + on_start: self.on_start.unwrap_or_default(), + on_stop: self.on_stop.unwrap_or_default(), idle_timeout: self.idle_timeout.map(|d| d.try_into().unwrap()), reconnect_timeout: self .reconnect_timeout @@ -100,6 +110,8 @@ impl From for Member { credentials: proto.credentials.map(Into::into), on_join: Some(proto.on_join).filter(|s| !s.is_empty()), on_leave: Some(proto.on_leave).filter(|s| !s.is_empty()), + on_start: Some(proto.on_start).filter(|s| !s.is_empty()), + on_stop: Some(proto.on_stop).filter(|s| !s.is_empty()), idle_timeout: proto.idle_timeout.map(|dur| dur.try_into().unwrap()), reconnect_timeout: proto .reconnect_timeout diff --git a/mock/control-api/src/callback/mod.rs b/mock/control-api/src/callback/mod.rs index 26ea87d48..586d62e7b 100644 --- a/mock/control-api/src/callback/mod.rs +++ b/mock/control-api/src/callback/mod.rs @@ -14,6 +14,12 @@ pub enum CallbackEvent { /// `OnLeave` callback of Control API. OnLeave(leave::OnLeave), + + /// `OnStart` callback of Control API. + OnStart(start::OnStart), + + /// `OnStop` callback of Control API. + OnStop(stop::OnStop), } impl From for CallbackEvent { @@ -25,6 +31,12 @@ impl From for CallbackEvent { proto::request::Event::OnJoin(on_join) => { Self::OnJoin(on_join.into()) } + proto::request::Event::OnStart(on_start) => { + Self::OnStart(on_start.into()) + } + proto::request::Event::OnStop(on_stop) => { + Self::OnStop(on_stop.into()) + } } } } @@ -121,3 +133,140 @@ mod leave { } } } + +/// Media type of the traffic which starts/stops flowing in some `Endpoint`. +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] +pub enum MediaType { + /// Started/stopped audio traffic. + Audio, + + /// Started/stopped video traffic. + Video, + + /// Started/stopped audio and video traffic. + Both, +} + +/// Media Endpoint for which OnStart or OnStop Control API callback +/// was received. +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] +pub enum MediaDirection { + /// Endpoint is a publisher. + Publish, + + /// Endpoint is a player. + Play, +} + +impl From for MediaDirection { + fn from(end: proto::MediaDirection) -> Self { + match end { + proto::MediaDirection::Publish => Self::Publish, + proto::MediaDirection::Play => Self::Play, + } + } +} + +impl From for MediaType { + fn from(kind: proto::MediaType) -> Self { + match kind { + proto::MediaType::Audio => Self::Audio, + proto::MediaType::Video => Self::Video, + proto::MediaType::Both => Self::Both, + } + } +} + +mod start { + use medea_control_api_proto::grpc::callback as proto; + use serde::{Deserialize, Serialize}; + + use super::{MediaDirection, MediaType}; + + /// `OnJoin` callback for Control API. + #[derive(Clone, Copy, Debug, Deserialize, Serialize)] + pub struct OnStart { + media_type: MediaType, + media_direction: MediaDirection, + } + + impl From for OnStart { + fn from(ev: proto::OnStart) -> Self { + Self { + media_type: proto::MediaType::try_from(ev.media_type) + .unwrap_or_default() + .into(), + media_direction: proto::MediaDirection::try_from( + ev.media_direction, + ) + .unwrap_or_default() + .into(), + } + } + } +} + +mod stop { + use derive_more::Display; + use medea_control_api_proto::grpc::callback as proto; + use serde::{Deserialize, Serialize}; + + use super::{MediaDirection, MediaType}; + + #[derive(Clone, Copy, Debug, Deserialize, Serialize)] + pub struct OnStop { + pub reason: OnStopReason, + pub media_type: MediaType, + pub media_direction: MediaDirection, + } + + impl From for OnStop { + fn from(proto: proto::OnStop) -> Self { + Self { + reason: proto::on_stop::Reason::try_from(proto.reason) + .unwrap_or_default() + .into(), + media_type: proto::MediaType::try_from(proto.media_type) + .unwrap_or_default() + .into(), + media_direction: proto::MediaDirection::try_from( + proto.media_direction, + ) + .unwrap_or_default() + .into(), + } + } + } + + #[derive(Clone, Copy, Debug, Deserialize, Display, Serialize)] + pub enum OnStopReason { + /// All traffic of some `Endpoint` was stopped flowing. + TrafficNotFlowing, + + /// `Endpoint` was muted. + Muted, + + /// Source `Endpoint` of a `Endpoint` for which received this `on_stop` + /// callback was muted. + SrcMuted, + + /// Some traffic flows within `Endpoint`, but incorrectly. + WrongTrafficFlowing, + + /// Traffic stopped because Endpoint was removed. + EndpointRemoved, + } + + impl From for OnStopReason { + fn from(proto: proto::on_stop::Reason) -> Self { + use proto::on_stop::Reason as R; + + match proto { + R::TrafficNotFlowing => Self::TrafficNotFlowing, + R::Muted => Self::Muted, + R::WrongTrafficFlowing => Self::WrongTrafficFlowing, + R::EndpointRemoved => Self::EndpointRemoved, + } + } + } +} diff --git a/proto/control-api/src/callback.rs b/proto/control-api/src/callback.rs index b7e563bff..43669fb32 100644 --- a/proto/control-api/src/callback.rs +++ b/proto/control-api/src/callback.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use derive_more::From; use time::OffsetDateTime as DateTime; +use derive_more::Display; use crate::Fid; @@ -54,6 +55,115 @@ pub enum Event { /// [`Member`]: crate::Member /// [`Room`]: crate::Room OnLeave(OnLeaveEvent), + + /// [`Member`] Started traffic. + /// + /// [`Member`]: crate::Member + OnStart(OnStartEvent), + + /// [`Member`] Stopped traffic. + /// + /// [`Member`]: crate::Member + OnStop(OnStopEvent), +} + +/// `OnStart` callback of Control API. +#[derive(Clone, Copy, Debug)] +pub struct OnStartEvent { + /// [`MediaDirection`] of the `Endpoint` for which + /// his callback was received. + pub media_direction: MediaDirection, + /// [`MediaType`] of the traffic which starts flowing in some `Endpoint`. + pub media_type: MediaType, +} + +/// Reason of why some `Endpoint` was stopped. +#[derive(Clone, Copy, Debug)] +pub enum OnStopReason { + /// All traffic of some `Endpoint` was stopped flowing. + TrafficNotFlowing, + + /// `Endpoint` was muted. + Muted, + + /// Some traffic flows within `Endpoint`, but incorrectly. + WrongTrafficFlowing, + + /// Traffic stopped because Endpoint was removed. + EndpointRemoved, +} + +/// Media type of the traffic which starts/stops flowing in some `Endpoint`. +#[derive(Clone, Copy, Debug, Display, Eq, PartialEq)] +#[repr(u8)] +pub enum MediaType { + /// Started/stopped audio traffic. + Audio = 0b1, + + /// Started/stopped video traffic. + Video = 0b10, + + /// Started/stopped video and audio traffic. + Both = 0b11, +} + +impl From for u8 { + #[allow(clippy::as_conversions)] // no other way + fn from(t: MediaType) -> Self { + t as Self + } +} + +impl MediaType { + /// Returns [`MediaType`] which was started based on the provided + /// [`MediaType`]s. + /// + /// This [`MediaType`] should be what was before `RTCStat` update and + /// as argument is [`MediaType`] which was got after `RTCStat` update. + #[must_use] + pub const fn get_started(self, after: Self) -> Option { + match self { + Self::Audio => match after { + Self::Video => Some(Self::Audio), + Self::Audio | Self::Both => None, + }, + Self::Video => match after { + Self::Audio => Some(Self::Video), + Self::Video | Self::Both => None, + }, + Self::Both => match after { + Self::Audio => Some(Self::Video), + Self::Video => Some(Self::Audio), + Self::Both => None, + }, + } + } +} + +/// Media Endpoint for which OnStart or OnStop Control API callback +/// was received. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub enum MediaDirection { + /// `Endpoint` is a publisher. + Publish, + + /// `Endpoint` is a player. + Play, +} + +/// `OnStop` callback of Control API. +#[derive(Clone, Copy, Debug)] +pub struct OnStopEvent { + /// [`MediaType`] of the traffic which stops flowing in some + /// `Endpoint`. + pub media_type: MediaType, + + /// [`MediaDirection`] of the `Endpoint` for which this callback was + /// received. + pub media_direction: MediaDirection, + + /// Reason of why `Endpoint` was stopped. + pub reason: OnStopReason, } /// [`Event`] notifying about a [`Member`] joining a [`Room`]. diff --git a/proto/control-api/src/control/member.rs b/proto/control-api/src/control/member.rs index 9724e745b..58f0200e3 100644 --- a/proto/control-api/src/control/member.rs +++ b/proto/control-api/src/control/member.rs @@ -58,6 +58,18 @@ pub struct Spec { /// [Client API]: https://tinyurl.com/266y74tf pub on_leave: Option, + /// [`Url`] of the callback to fire when this [`Member`] started traffic + /// with a media server via [Client API]. + /// + /// [Client API]: https://tinyurl.com/266y74tf + pub on_start: Option, + + /// [`Url`] of the callback to fire when this [`Member`] stopped traffic + /// with a media server via [Client API]. + /// + /// [Client API]: https://tinyurl.com/266y74tf + pub on_stop: Option, + /// Timeout of receiving heartbeat messages from this [`Member`] via /// [Client API]. /// diff --git a/proto/control-api/src/grpc/api.proto b/proto/control-api/src/grpc/api.proto index 88dc8f596..c81674e75 100644 --- a/proto/control-api/src/grpc/api.proto +++ b/proto/control-api/src/grpc/api.proto @@ -168,6 +168,12 @@ message Member { // URL of the callback to fire when this `Member` finishes a persistent // connection with a media server via Client API. string on_leave = 3; + /// URL of the callback to fire when this [`Member`] start traffic + /// with a media server via [Client API]. + string on_start = 4; + /// URL of the callback to fire when this [`Member`] stopped traffic + /// with a media server via [Client API]. + string on_stop = 5; // Credentials to authenticate this `Member` in Client API with. // // Plain and hashed credentials are supported. If no credentials provided, @@ -179,23 +185,23 @@ message Member { // are used, so it should be appended manually on a client side. oneof credentials { // Argon2 hash of credentials. - string hash = 4; + string hash = 6; // Plain text credentials. - string plain = 5; + string plain = 7; } // Timeout of receiving heartbeat messages from this `Member` via Client API. // Once reached, this `Member` is considered being idle. - google.protobuf.Duration idle_timeout = 6; + google.protobuf.Duration idle_timeout = 8; // Timeout of reconnecting this `Member` via Client API. // Once reached, this `Member` is considered disconnected. - google.protobuf.Duration reconnect_timeout = 7; + google.protobuf.Duration reconnect_timeout = 9; // Interval of pinging with heartbeat messages this `Member` via Client API // by a media server. // If empty then the default interval of a media server is used, if // configured. - google.protobuf.Duration ping_interval = 8; + google.protobuf.Duration ping_interval = 10; // Media pipeline representing this `Member`. - map pipeline = 9; + map pipeline = 11; // Elements which Member's pipeline can contain. message Element { diff --git a/proto/control-api/src/grpc/api.rs b/proto/control-api/src/grpc/api.rs index beddf8923..a2ffffb2b 100644 --- a/proto/control-api/src/grpc/api.rs +++ b/proto/control-api/src/grpc/api.rs @@ -209,22 +209,30 @@ pub struct Member { /// connection with a media server via Client API. #[prost(string, tag = "3")] pub on_leave: ::prost::alloc::string::String, + /// / URL of the callback to fire when this \[`Member`\] start traffic + /// / with a media server via \[Client API\]. + #[prost(string, tag = "4")] + pub on_start: ::prost::alloc::string::String, + /// / URL of the callback to fire when this \[`Member`\] stopped traffic + /// / with a media server via \[Client API\]. + #[prost(string, tag = "5")] + pub on_stop: ::prost::alloc::string::String, /// Timeout of receiving heartbeat messages from this `Member` via Client API. /// Once reached, this `Member` is considered being idle. - #[prost(message, optional, tag = "6")] + #[prost(message, optional, tag = "8")] pub idle_timeout: ::core::option::Option<::prost_types::Duration>, /// Timeout of reconnecting this `Member` via Client API. /// Once reached, this `Member` is considered disconnected. - #[prost(message, optional, tag = "7")] + #[prost(message, optional, tag = "9")] pub reconnect_timeout: ::core::option::Option<::prost_types::Duration>, /// Interval of pinging with heartbeat messages this `Member` via Client API /// by a media server. /// If empty then the default interval of a media server is used, if /// configured. - #[prost(message, optional, tag = "8")] + #[prost(message, optional, tag = "10")] pub ping_interval: ::core::option::Option<::prost_types::Duration>, /// Media pipeline representing this `Member`. - #[prost(map = "string, message", tag = "9")] + #[prost(map = "string, message", tag = "11")] pub pipeline: ::std::collections::HashMap< ::prost::alloc::string::String, member::Element, @@ -238,7 +246,7 @@ pub struct Member { /// Hashed variant only supports Argon2 hash at the moment. /// `Member` sid won't contain a `token` query parameter if hashed credentials /// are used, so it should be appended manually on a client side. - #[prost(oneof = "member::Credentials", tags = "4, 5")] + #[prost(oneof = "member::Credentials", tags = "6, 7")] pub credentials: ::core::option::Option, } /// Nested message and enum types in `Member`. @@ -274,10 +282,10 @@ pub mod member { #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Credentials { /// Argon2 hash of credentials. - #[prost(string, tag = "4")] + #[prost(string, tag = "6")] Hash(::prost::alloc::string::String), /// Plain text credentials. - #[prost(string, tag = "5")] + #[prost(string, tag = "7")] Plain(::prost::alloc::string::String), } } diff --git a/proto/control-api/src/grpc/callback.proto b/proto/control-api/src/grpc/callback.proto index 801d3a9dc..9db616218 100644 --- a/proto/control-api/src/grpc/callback.proto +++ b/proto/control-api/src/grpc/callback.proto @@ -21,6 +21,64 @@ message Request { oneof event { OnJoin on_join = 3; OnLeave on_leave = 4; + OnStart on_start = 5; + OnStop on_stop = 6; + } +} + +// Media type of the traffic which starts/stops flowing in some Endpoint. +enum MediaType { + /// Started/stopped audio traffic. + AUDIO = 0; + + /// Started/stopped video traffic. + VIDEO = 1; + + /// Started/stopped audio and video traffic. + BOTH = 2; +} + +// Media direction of the Endpoint for which OnStart or OnStop Control API callback +// was received. +enum MediaDirection { + // Endpoint is a publisher. + PUBLISH = 0; + + // Endpoint is a player. + PLAY = 1; +} + +// Event which fires when Endpoint starts sending/receiving media traffic. +message OnStart { + // Media type of the traffic which starts flowing in some Endpoint. + MediaType media_type = 1; + + // Media direction of the Endpoint for which this callback was received. + MediaDirection media_direction = 2; +} + +// Event which fires when Endpoint stops sending/receiving media traffic. +message OnStop { + /// Media type of the traffic which stops flowing in some Endpoint. + MediaType media_type = 1; + /// Media Endpoint for which this callback was received. + MediaDirection media_direction = 2; + // Reason of why Endpoint was stopped. + Reason reason = 3; + + // Reason of why some Endpoint was stopped. + enum Reason { + // All traffic of some Endpoint was stopped flowing. + TRAFFIC_NOT_FLOWING = 0; + + // Endpoint was muted. + MUTED = 1; + + // Some traffic flows within Endpoint, but incorrectly. + WRONG_TRAFFIC_FLOWING = 2; + + // Traffic stopped because Endpoint was removed. + ENDPOINT_REMOVED = 3; } } diff --git a/proto/control-api/src/grpc/callback.rs b/proto/control-api/src/grpc/callback.rs index c36d7afcc..23418e470 100644 --- a/proto/control-api/src/grpc/callback.rs +++ b/proto/control-api/src/grpc/callback.rs @@ -9,7 +9,7 @@ pub struct Request { #[prost(string, tag = "2")] pub at: ::prost::alloc::string::String, /// Occurred event. - #[prost(oneof = "request::Event", tags = "3, 4")] + #[prost(oneof = "request::Event", tags = "3, 4, 5, 6")] pub event: ::core::option::Option, } /// Nested message and enum types in `Request`. @@ -22,6 +22,85 @@ pub mod request { OnJoin(super::OnJoin), #[prost(message, tag = "4")] OnLeave(super::OnLeave), + #[prost(message, tag = "5")] + OnStart(super::OnStart), + #[prost(message, tag = "6")] + OnStop(super::OnStop), + } +} +/// Event which fires when Endpoint starts sending/receiving media traffic. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OnStart { + /// Media type of the traffic which starts flowing in some Endpoint. + #[prost(enumeration = "MediaType", tag = "1")] + pub media_type: i32, + /// Media direction of the Endpoint for which this callback was received. + #[prost(enumeration = "MediaDirection", tag = "2")] + pub media_direction: i32, +} +/// Event which fires when Endpoint stops sending/receiving media traffic. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OnStop { + /// / Media type of the traffic which stops flowing in some Endpoint. + #[prost(enumeration = "MediaType", tag = "1")] + pub media_type: i32, + /// / Media Endpoint for which this callback was received. + #[prost(enumeration = "MediaDirection", tag = "2")] + pub media_direction: i32, + /// Reason of why Endpoint was stopped. + #[prost(enumeration = "on_stop::Reason", tag = "3")] + pub reason: i32, +} +/// Nested message and enum types in `OnStop`. +pub mod on_stop { + /// Reason of why some Endpoint was stopped. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Reason { + /// All traffic of some Endpoint was stopped flowing. + TrafficNotFlowing = 0, + /// Endpoint was muted. + Muted = 1, + /// Some traffic flows within Endpoint, but incorrectly. + WrongTrafficFlowing = 2, + /// Traffic stopped because Endpoint was removed. + EndpointRemoved = 3, + } + impl Reason { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Reason::TrafficNotFlowing => "TRAFFIC_NOT_FLOWING", + Reason::Muted => "MUTED", + Reason::WrongTrafficFlowing => "WRONG_TRAFFIC_FLOWING", + Reason::EndpointRemoved => "ENDPOINT_REMOVED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "TRAFFIC_NOT_FLOWING" => Some(Self::TrafficNotFlowing), + "MUTED" => Some(Self::Muted), + "WRONG_TRAFFIC_FLOWING" => Some(Self::WrongTrafficFlowing), + "ENDPOINT_REMOVED" => Some(Self::EndpointRemoved), + _ => None, + } + } } } /// Empty response of the `Callback` service. @@ -93,6 +172,69 @@ pub mod on_leave { } } } +/// Media type of the traffic which starts/stops flowing in some Endpoint. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum MediaType { + /// / Started/stopped audio traffic. + Audio = 0, + /// / Started/stopped video traffic. + Video = 1, + /// / Started/stopped audio and video traffic. + Both = 2, +} +impl MediaType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + MediaType::Audio => "AUDIO", + MediaType::Video => "VIDEO", + MediaType::Both => "BOTH", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "AUDIO" => Some(Self::Audio), + "VIDEO" => Some(Self::Video), + "BOTH" => Some(Self::Both), + _ => None, + } + } +} +/// Media direction of the Endpoint for which OnStart or OnStop Control API callback +/// was received. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum MediaDirection { + /// Endpoint is a publisher. + Publish = 0, + /// Endpoint is a player. + Play = 1, +} +impl MediaDirection { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + MediaDirection::Publish => "PUBLISH", + MediaDirection::Play => "PLAY", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "PUBLISH" => Some(Self::Publish), + "PLAY" => Some(Self::Play), + _ => None, + } + } +} /// Generated client implementations. pub mod callback_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] diff --git a/proto/control-api/src/grpc/convert/api.rs b/proto/control-api/src/grpc/convert/api.rs index 68f540d38..75185a45c 100644 --- a/proto/control-api/src/grpc/convert/api.rs +++ b/proto/control-api/src/grpc/convert/api.rs @@ -412,6 +412,14 @@ impl TryFrom for Member { .then(|| member.on_leave.parse()) .transpose() .map_err(CallbackUrlParseError::from)?, + on_start: (!member.on_start.is_empty()) + .then(|| member.on_start.parse()) + .transpose() + .map_err(CallbackUrlParseError::from)?, + on_stop: (!member.on_stop.is_empty()) + .then(|| member.on_stop.parse()) + .transpose() + .map_err(CallbackUrlParseError::from)?, idle_timeout, reconnect_timeout, ping_interval, @@ -437,6 +445,16 @@ impl From for proto::Member { .on_leave .as_ref() .map_or_else(String::default, ToString::to_string), + on_start: member + .spec + .on_start + .as_ref() + .map_or_else(String::default, ToString::to_string), + on_stop: member + .spec + .on_stop + .as_ref() + .map_or_else(String::default, ToString::to_string), idle_timeout: member .spec .idle_timeout diff --git a/proto/control-api/src/grpc/convert/callback.rs b/proto/control-api/src/grpc/convert/callback.rs index 3b326ad4a..d1f6f0be7 100644 --- a/proto/control-api/src/grpc/convert/callback.rs +++ b/proto/control-api/src/grpc/convert/callback.rs @@ -9,8 +9,14 @@ use time::{ }; use crate::{ - callback::{Event, OnJoinEvent, OnLeaveEvent, OnLeaveReason, Request}, - grpc::{callback as proto, ProtobufError}, + callback::{ + Event, MediaDirection, MediaType, OnJoinEvent, OnLeaveEvent, + OnLeaveReason, OnStartEvent, OnStopEvent, OnStopReason, Request, + }, + grpc::{ + callback::{self as proto}, + ProtobufError, + }, }; impl TryFrom for Request { @@ -48,6 +54,8 @@ impl From for Event { match ev { Event::OnJoin(on_join) => Self::OnJoin(on_join.into()), Event::OnLeave(on_leave) => Self::OnLeave(on_leave.into()), + Event::OnStart(on_start) => Self::OnStart(on_start.into()), + Event::OnStop(on_stop) => Self::OnStop(on_stop.into()), } } } @@ -57,6 +65,8 @@ impl From for proto::request::Event { match ev { Event::OnJoin(on_join) => Self::OnJoin(on_join.into()), Event::OnLeave(on_leave) => Self::OnLeave(on_leave.into()), + Event::OnStart(on_start) => Self::OnStart(on_start.into()), + Event::OnStop(on_stop) => Self::OnStop(on_stop.into()), } } } @@ -91,6 +101,60 @@ impl From for proto::OnLeave { } } +impl From for proto::OnStart { + fn from(ev: OnStartEvent) -> Self { + Self { + media_type: proto::MediaType::from(ev.media_type).into(), + media_direction: proto::MediaDirection::from(ev.media_direction) + .into(), + } + } +} + +impl From for OnStartEvent { + fn from(ev: proto::OnStart) -> Self { + Self { + media_direction: proto::MediaDirection::try_from( + ev.media_direction, + ) + .unwrap_or_default() + .into(), + media_type: proto::MediaType::try_from(ev.media_type) + .unwrap_or_default() + .into(), + } + } +} + +impl From for proto::OnStop { + fn from(ev: OnStopEvent) -> Self { + Self { + reason: proto::on_stop::Reason::from(ev.reason).into(), + media_type: proto::MediaType::from(ev.media_type).into(), + media_direction: proto::MediaDirection::from(ev.media_direction) + .into(), + } + } +} + +impl From for OnStopEvent { + fn from(ev: proto::OnStop) -> Self { + Self { + reason: proto::on_stop::Reason::try_from(ev.reason) + .unwrap_or_default() + .into(), + media_type: proto::MediaType::try_from(ev.media_type) + .unwrap_or_default() + .into(), + media_direction: proto::MediaDirection::try_from( + ev.media_direction, + ) + .unwrap_or_default() + .into(), + } + } +} + impl From for OnLeaveReason { fn from(rsn: proto::on_leave::Reason) -> Self { use proto::on_leave::Reason; @@ -114,3 +178,65 @@ impl From for proto::on_leave::Reason { } } } + +impl From for OnStopReason { + fn from(rsn: proto::on_stop::Reason) -> Self { + use proto::on_stop::Reason; + + match rsn { + Reason::TrafficNotFlowing => Self::TrafficNotFlowing, + Reason::Muted => Self::Muted, + Reason::WrongTrafficFlowing => Self::WrongTrafficFlowing, + Reason::EndpointRemoved => Self::EndpointRemoved, + } + } +} + +impl From for proto::on_stop::Reason { + fn from(rsn: OnStopReason) -> Self { + match rsn { + OnStopReason::TrafficNotFlowing => Self::TrafficNotFlowing, + OnStopReason::Muted => Self::Muted, + OnStopReason::WrongTrafficFlowing => Self::WrongTrafficFlowing, + OnStopReason::EndpointRemoved => Self::EndpointRemoved, + } + } +} + +impl From for proto::MediaDirection { + fn from(end: MediaDirection) -> Self { + match end { + MediaDirection::Publish => Self::Publish, + MediaDirection::Play => Self::Play, + } + } +} + +impl From for proto::MediaType { + fn from(kind: MediaType) -> Self { + match kind { + MediaType::Audio => Self::Audio, + MediaType::Video => Self::Video, + MediaType::Both => Self::Both, + } + } +} + +impl From for MediaDirection { + fn from(end: proto::MediaDirection) -> Self { + match end { + proto::MediaDirection::Publish => Self::Publish, + proto::MediaDirection::Play => Self::Play, + } + } +} + +impl From for MediaType { + fn from(kind: proto::MediaType) -> Self { + match kind { + proto::MediaType::Audio => Self::Audio, + proto::MediaType::Video => Self::Video, + proto::MediaType::Both => Self::Both, + } + } +} From 5cbb8d6f2d671214d66ce8cf568a06e42186b247 Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 19:33:26 +0300 Subject: [PATCH 2/6] fix --- proto/control-api/src/callback.rs | 5 +- proto/control-api/src/control/mod.rs | 6 + proto/control-api/src/grpc/api.rs | 390 ------------------------- proto/control-api/src/grpc/callback.rs | 110 ------- 4 files changed, 8 insertions(+), 503 deletions(-) diff --git a/proto/control-api/src/callback.rs b/proto/control-api/src/callback.rs index 43669fb32..27c257d43 100644 --- a/proto/control-api/src/callback.rs +++ b/proto/control-api/src/callback.rs @@ -1,9 +1,8 @@ //! API for receiving callbacks from a media server. use async_trait::async_trait; -use derive_more::From; +use derive_more::{Display, From}; use time::OffsetDateTime as DateTime; -use derive_more::Display; use crate::Fid; @@ -140,7 +139,7 @@ impl MediaType { } } -/// Media Endpoint for which OnStart or OnStop Control API callback +/// Media Endpoint for which `OnStart` or `OnStop` Control API callback /// was received. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] pub enum MediaDirection { diff --git a/proto/control-api/src/control/mod.rs b/proto/control-api/src/control/mod.rs index 2fc768daf..dc4611279 100644 --- a/proto/control-api/src/control/mod.rs +++ b/proto/control-api/src/control/mod.rs @@ -367,6 +367,8 @@ spec: )), on_join: None, on_leave: None, + on_start: None, + on_stop: None, idle_timeout: None, reconnect_timeout: None, ping_interval: None, @@ -395,6 +397,8 @@ spec: )), on_join: None, on_leave: None, + on_start: None, + on_stop: None, idle_timeout: None, reconnect_timeout: None, ping_interval: None, @@ -437,6 +441,8 @@ spec: )), on_join: None, on_leave: None, + on_start: None, + on_stop: None, idle_timeout: None, reconnect_timeout: None, ping_interval: None, diff --git a/proto/control-api/src/grpc/api.rs b/proto/control-api/src/grpc/api.rs index a2ffffb2b..7040ef162 100644 --- a/proto/control-api/src/grpc/api.rs +++ b/proto/control-api/src/grpc/api.rs @@ -678,393 +678,3 @@ pub mod control_api_client { } } } -/// Generated server implementations. -pub mod control_api_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with ControlApiServer. - #[async_trait] - pub trait ControlApi: Send + Sync + 'static { - /// Creates a new `Element` on the media server. - /// - /// Non-idempotent. Errors if an `Element` with such ID already exists. - async fn create( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Removes `Element`s from the media server. - /// Allows referring multiple `Element`s on the last two levels of a FID. - /// - /// Idempotent. If no `Element`s with such FIDs exist, then succeeds. - async fn delete( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Lookups `Element`s by their FIDs on the media server. - /// If no FIDs are specified, then returns all the current `Element`s on the - /// media server. - async fn get( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Applies changes to an existing `Element` on the media server, or creates a - /// new one in case there is no `Element` with such ID. - /// - /// Idempotent. If no `Element` with such ID exists, then it will be created, - /// otherwise it will be reconfigured. `Element`s that exist on the same - /// hierarchy level, but are not specified in the provided spec, will be - /// removed. - async fn apply( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Checks healthiness of the media server. - /// Caller should assert that the returned `Pong` has the same nonce as the - /// sent `Ping`. - async fn healthz( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - } - /// Service allowing to control a media server dynamically, by creating, updating - /// and destroying pipelines of media `Element`s on it. - #[derive(Debug)] - pub struct ControlApiServer { - inner: _Inner, - accept_compression_encodings: EnabledCompressionEncodings, - send_compression_encodings: EnabledCompressionEncodings, - max_decoding_message_size: Option, - max_encoding_message_size: Option, - } - struct _Inner(Arc); - impl ControlApiServer { - pub fn new(inner: T) -> Self { - Self::from_arc(Arc::new(inner)) - } - pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - max_decoding_message_size: None, - max_encoding_message_size: None, - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - /// Enable decompressing requests with the given encoding. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.accept_compression_encodings.enable(encoding); - self - } - /// Compress responses with the given encoding, if the client supports it. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.send_compression_encodings.enable(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.max_decoding_message_size = Some(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.max_encoding_message_size = Some(limit); - self - } - } - impl tonic::codegen::Service> for ControlApiServer - where - T: ControlApi, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = std::convert::Infallible; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/api.ControlApi/Create" => { - #[allow(non_camel_case_types)] - struct CreateSvc(pub Arc); - impl tonic::server::UnaryService - for CreateSvc { - type Response = super::CreateResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::create(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = CreateSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Delete" => { - #[allow(non_camel_case_types)] - struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService - for DeleteSvc { - type Response = super::Response; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::delete(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DeleteSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Get" => { - #[allow(non_camel_case_types)] - struct GetSvc(pub Arc); - impl tonic::server::UnaryService - for GetSvc { - type Response = super::GetResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Apply" => { - #[allow(non_camel_case_types)] - struct ApplySvc(pub Arc); - impl tonic::server::UnaryService - for ApplySvc { - type Response = super::CreateResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::apply(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ApplySvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Healthz" => { - #[allow(non_camel_case_types)] - struct HealthzSvc(pub Arc); - impl tonic::server::UnaryService - for HealthzSvc { - type Response = super::Pong; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::healthz(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = HealthzSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } - } - } - } - impl Clone for ControlApiServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - max_decoding_message_size: self.max_decoding_message_size, - max_encoding_message_size: self.max_encoding_message_size, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for ControlApiServer { - const NAME: &'static str = "api.ControlApi"; - } -} diff --git a/proto/control-api/src/grpc/callback.rs b/proto/control-api/src/grpc/callback.rs index 23418e470..b4f2c5503 100644 --- a/proto/control-api/src/grpc/callback.rs +++ b/proto/control-api/src/grpc/callback.rs @@ -235,116 +235,6 @@ impl MediaDirection { } } } -/// Generated client implementations. -pub mod callback_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - use tonic::codegen::http::Uri; - /// Service for receiving callbacks from a media server. - #[derive(Debug, Clone)] - pub struct CallbackClient { - inner: tonic::client::Grpc, - } - impl CallbackClient { - /// Attempt to create a new client by connecting to a given endpoint. - pub async fn connect(dst: D) -> Result - where - D: TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl CallbackClient - where - T: tonic::client::GrpcService, - T::Error: Into, - T::ResponseBody: Body + Send + 'static, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_origin(inner: T, origin: Uri) -> Self { - let inner = tonic::client::Grpc::with_origin(inner, origin); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> CallbackClient> - where - F: tonic::service::Interceptor, - T::ResponseBody: Default, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - , - >>::Error: Into + Send + Sync, - { - CallbackClient::new(InterceptedService::new(inner, interceptor)) - } - /// Compress requests with the given encoding. - /// - /// This requires the server to support it otherwise it might respond with an - /// error. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.send_compressed(encoding); - self - } - /// Enable decompressing responses. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.inner = self.inner.accept_compressed(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_decoding_message_size(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.inner = self.inner.max_encoding_message_size(limit); - self - } - /// Fires when a certain callback event happens on a media server. - pub async fn on_event( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/callback.Callback/OnEvent", - ); - let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new("callback.Callback", "OnEvent")); - self.inner.unary(req, path, codec).await - } - } -} /// Generated server implementations. pub mod callback_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] From 7d6717e90c9ae84ebbdaa4ace53d3bec1d8f6005 Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 20:47:03 +0300 Subject: [PATCH 3/6] fix --- flutter/example/lib/call_route.dart | 2 +- .../lib/control_api/entities/member.dart | 10 +- .../lib/control_api/entities/member.g.dart | 4 + .../src/native/ffi/jason_api.g.freezed.dart | 12 +- flutter/test/e2e/api/member.dart | 11 +- flutter/test/e2e/world/custom_world.dart | 10 +- mock/control-api/src/callback/mod.rs | 19 +- proto/control-api/src/grpc/api.proto | 10 +- proto/control-api/src/grpc/api.rs | 400 +++++++++++++++++- proto/control-api/src/grpc/callback.proto | 4 +- proto/control-api/src/grpc/callback.rs | 114 ++++- 11 files changed, 569 insertions(+), 27 deletions(-) diff --git a/flutter/example/lib/call_route.dart b/flutter/example/lib/call_route.dart index 0598b63f0..665505baf 100644 --- a/flutter/example/lib/call_route.dart +++ b/flutter/example/lib/call_route.dart @@ -984,7 +984,7 @@ Future controlApiCreateMemberDialog(BuildContext context, Call call) { TextButton( onPressed: () async { var member = Member(memberId, {}, Plain(credentials), - 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099'); + 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099'); member.idle_timeout = idle; member.reconnect_timeout = reconnectTimeout; diff --git a/flutter/example/lib/control_api/entities/member.dart b/flutter/example/lib/control_api/entities/member.dart index a13917bff..17df49135 100644 --- a/flutter/example/lib/control_api/entities/member.dart +++ b/flutter/example/lib/control_api/entities/member.dart @@ -85,6 +85,14 @@ class Member { @JsonKey(includeIfNull: false) String? on_leave; + /// URL to which [OnStart] Control API callback will be sent. + @JsonKey(includeIfNull: false) + String? on_start; + + /// URL to which [OnStop] Control API callback will be sent. + @JsonKey(includeIfNull: false) + String? on_stop; + /// Timeout of receiving heartbeat messages from this [Member] via Client API. /// /// Once reached, this [Member] is considered being idle. @@ -99,7 +107,7 @@ class Member { /// API. String? ping_interval; - Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave); + Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave, this.on_start, this.on_stop); factory Member.fromJson(Map json) { json.remove('kind'); diff --git a/flutter/example/lib/control_api/entities/member.g.dart b/flutter/example/lib/control_api/entities/member.g.dart index a5f73e3ed..e48665880 100644 --- a/flutter/example/lib/control_api/entities/member.g.dart +++ b/flutter/example/lib/control_api/entities/member.g.dart @@ -37,6 +37,8 @@ Member _$MemberFromJson(Map json) => Member( : Credentials.fromJson(json['credentials'] as Map), json['on_join'] as String?, json['on_leave'] as String?, + json['on_start'] as String?, + json['on_stop'] as String?, ) ..idle_timeout = json['idle_timeout'] as String? ..reconnect_timeout = json['reconnect_timeout'] as String? @@ -57,6 +59,8 @@ Map _$MemberToJson(Member instance) { writeNotNull('on_join', instance.on_join); writeNotNull('on_leave', instance.on_leave); + writeNotNull('on_start', instance.on_start); + writeNotNull('on_stop', instance.on_stop); val['idle_timeout'] = instance.idle_timeout; val['reconnect_timeout'] = instance.reconnect_timeout; val['ping_interval'] = instance.ping_interval; diff --git a/flutter/lib/src/native/ffi/jason_api.g.freezed.dart b/flutter/lib/src/native/ffi/jason_api.g.freezed.dart index 677c87cff..d21c79cbc 100644 --- a/flutter/lib/src/native/ffi/jason_api.g.freezed.dart +++ b/flutter/lib/src/native/ffi/jason_api.g.freezed.dart @@ -12,7 +12,7 @@ part of 'jason_api.g.dart'; T _$identity(T value) => value; final _privateConstructorUsedError = UnsupportedError( - 'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#custom-getters-and-methods'); + 'It seems like you constructed your class using `MyClass._()`. This constructor is only meant to be used by freezed and you are not supposed to need it nor use it.\nPlease check the documentation here for more information: https://github.com/rrousselGit/freezed#adding-getters-and-methods-to-our-models'); /// @nodoc mixin _$ApiConstrainFacingMode { @@ -146,7 +146,7 @@ class _$ApiConstrainFacingMode_ExactImpl } @override - bool operator ==(dynamic other) { + bool operator ==(Object other) { return identical(this, other) || (other.runtimeType == runtimeType && other is _$ApiConstrainFacingMode_ExactImpl && @@ -291,7 +291,7 @@ class _$ApiConstrainFacingMode_IdealImpl } @override - bool operator ==(dynamic other) { + bool operator ==(Object other) { return identical(this, other) || (other.runtimeType == runtimeType && other is _$ApiConstrainFacingMode_IdealImpl && @@ -518,7 +518,7 @@ class _$ConstrainU32_ExactImpl implements ConstrainU32_Exact { } @override - bool operator ==(dynamic other) { + bool operator ==(Object other) { return identical(this, other) || (other.runtimeType == runtimeType && other is _$ConstrainU32_ExactImpl && @@ -662,7 +662,7 @@ class _$ConstrainU32_IdealImpl implements ConstrainU32_Ideal { } @override - bool operator ==(dynamic other) { + bool operator ==(Object other) { return identical(this, other) || (other.runtimeType == runtimeType && other is _$ConstrainU32_IdealImpl && @@ -813,7 +813,7 @@ class _$ConstrainU32_RangeImpl implements ConstrainU32_Range { } @override - bool operator ==(dynamic other) { + bool operator ==(Object other) { return identical(this, other) || (other.runtimeType == runtimeType && other is _$ConstrainU32_RangeImpl && diff --git a/flutter/test/e2e/api/member.dart b/flutter/test/e2e/api/member.dart index 68501faba..ebb3bb441 100644 --- a/flutter/test/e2e/api/member.dart +++ b/flutter/test/e2e/api/member.dart @@ -85,6 +85,14 @@ class Member { @JsonKey(includeIfNull: false) String? on_leave; + /// URL to which [OnStart] Control API callback will be sent. + @JsonKey(includeIfNull: false) + String? on_start; + + /// URL to which [OnStop] Control API callback will be sent. + @JsonKey(includeIfNull: false) + String? on_stop; + /// Timeout of receiving heartbeat messages from this [Member] via Client API. /// /// Once reached, this [Member] is considered being idle. @@ -99,7 +107,8 @@ class Member { /// API. String? ping_interval; - Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave); + Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave, + this.on_start, this.on_stop); factory Member.fromJson(Map json) { json.remove('kind'); diff --git a/flutter/test/e2e/world/custom_world.dart b/flutter/test/e2e/world/custom_world.dart index f1c4f9624..d9ffea629 100644 --- a/flutter/test/e2e/world/custom_world.dart +++ b/flutter/test/e2e/world/custom_world.dart @@ -100,8 +100,14 @@ class CustomWorld extends FlutterWidgetTesterWorld { }); } - var createMember = api.Member(builderId, pipeline, api.Plain('test'), - 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099'); + var createMember = api.Member( + builderId, + pipeline, + api.Plain('test'), + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099'); await controlClient.create('$roomId/$builderId', createMember); if (builder.isSend) { diff --git a/mock/control-api/src/callback/mod.rs b/mock/control-api/src/callback/mod.rs index 586d62e7b..d922fc677 100644 --- a/mock/control-api/src/callback/mod.rs +++ b/mock/control-api/src/callback/mod.rs @@ -147,7 +147,7 @@ pub enum MediaType { Both, } -/// Media Endpoint for which OnStart or OnStop Control API callback +/// Media Endpoint for which `OnStart` or `OnStop` Control API callback /// was received. #[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub enum MediaDirection { @@ -177,16 +177,21 @@ impl From for MediaType { } } +/// `on_start` callback's related entities and implementations. mod start { use medea_control_api_proto::grpc::callback as proto; use serde::{Deserialize, Serialize}; use super::{MediaDirection, MediaType}; - /// `OnJoin` callback for Control API. + /// `OnStart` callback for Control API. #[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct OnStart { + /// [`MediaType`] of the traffic which starts flowing in some + /// `Endpoint`. media_type: MediaType, + /// [`MediaDirection`] of the `Endpoint` for which this callback was + /// received. media_direction: MediaDirection, } @@ -206,6 +211,7 @@ mod start { } } +/// `on_stop` callback's related entities and implementations. mod stop { use derive_more::Display; use medea_control_api_proto::grpc::callback as proto; @@ -213,10 +219,18 @@ mod stop { use super::{MediaDirection, MediaType}; + /// `OnStop` callback of Control API. #[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct OnStop { + /// Reason of why `Endpoint` was stopped. pub reason: OnStopReason, + + /// [`MediaType`] of the traffic which starts flowing in some + /// `Endpoint`. pub media_type: MediaType, + + /// [`MediaDirection`] of the `Endpoint` for which this callback was + /// received. pub media_direction: MediaDirection, } @@ -238,6 +252,7 @@ mod stop { } } + /// Reason of why some `Endpoint` was stopped. #[derive(Clone, Copy, Debug, Deserialize, Display, Serialize)] pub enum OnStopReason { /// All traffic of some `Endpoint` was stopped flowing. diff --git a/proto/control-api/src/grpc/api.proto b/proto/control-api/src/grpc/api.proto index c81674e75..29d73d165 100644 --- a/proto/control-api/src/grpc/api.proto +++ b/proto/control-api/src/grpc/api.proto @@ -168,11 +168,11 @@ message Member { // URL of the callback to fire when this `Member` finishes a persistent // connection with a media server via Client API. string on_leave = 3; - /// URL of the callback to fire when this [`Member`] start traffic - /// with a media server via [Client API]. + // URL of the callback to fire when this [`Member`] start traffic + // with a media server via [Client API]. string on_start = 4; - /// URL of the callback to fire when this [`Member`] stopped traffic - /// with a media server via [Client API]. + // URL of the callback to fire when this [`Member`] stopped traffic + // with a media server via [Client API]. string on_stop = 5; // Credentials to authenticate this `Member` in Client API with. // @@ -283,7 +283,7 @@ message WebRtcPublishEndpoint { message WebRtcPlayEndpoint { // ID of this `WebRtcPlayEndpoint`. string id = 1; - /// Source to play media data from. + // Source to play media data from. string src = 2; // Callback firing when a client starts playing media data from the source. string on_start = 3; diff --git a/proto/control-api/src/grpc/api.rs b/proto/control-api/src/grpc/api.rs index 7040ef162..63ad290f6 100644 --- a/proto/control-api/src/grpc/api.rs +++ b/proto/control-api/src/grpc/api.rs @@ -209,12 +209,12 @@ pub struct Member { /// connection with a media server via Client API. #[prost(string, tag = "3")] pub on_leave: ::prost::alloc::string::String, - /// / URL of the callback to fire when this \[`Member`\] start traffic - /// / with a media server via \[Client API\]. + /// URL of the callback to fire when this \[`Member`\] start traffic + /// with a media server via \[Client API\]. #[prost(string, tag = "4")] pub on_start: ::prost::alloc::string::String, - /// / URL of the callback to fire when this \[`Member`\] stopped traffic - /// / with a media server via \[Client API\]. + /// URL of the callback to fire when this \[`Member`\] stopped traffic + /// with a media server via \[Client API\]. #[prost(string, tag = "5")] pub on_stop: ::prost::alloc::string::String, /// Timeout of receiving heartbeat messages from this `Member` via Client API. @@ -444,7 +444,7 @@ pub struct WebRtcPlayEndpoint { /// ID of this `WebRtcPlayEndpoint`. #[prost(string, tag = "1")] pub id: ::prost::alloc::string::String, - /// / Source to play media data from. + /// Source to play media data from. #[prost(string, tag = "2")] pub src: ::prost::alloc::string::String, /// Callback firing when a client starts playing media data from the source. @@ -678,3 +678,393 @@ pub mod control_api_client { } } } +/// Generated server implementations. +pub mod control_api_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with ControlApiServer. + #[async_trait] + pub trait ControlApi: Send + Sync + 'static { + /// Creates a new `Element` on the media server. + /// + /// Non-idempotent. Errors if an `Element` with such ID already exists. + async fn create( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Removes `Element`s from the media server. + /// Allows referring multiple `Element`s on the last two levels of a FID. + /// + /// Idempotent. If no `Element`s with such FIDs exist, then succeeds. + async fn delete( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Lookups `Element`s by their FIDs on the media server. + /// If no FIDs are specified, then returns all the current `Element`s on the + /// media server. + async fn get( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Applies changes to an existing `Element` on the media server, or creates a + /// new one in case there is no `Element` with such ID. + /// + /// Idempotent. If no `Element` with such ID exists, then it will be created, + /// otherwise it will be reconfigured. `Element`s that exist on the same + /// hierarchy level, but are not specified in the provided spec, will be + /// removed. + async fn apply( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Checks healthiness of the media server. + /// Caller should assert that the returned `Pong` has the same nonce as the + /// sent `Ping`. + async fn healthz( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// Service allowing to control a media server dynamically, by creating, updating + /// and destroying pipelines of media `Element`s on it. + #[derive(Debug)] + pub struct ControlApiServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl ControlApiServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for ControlApiServer + where + T: ControlApi, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/api.ControlApi/Create" => { + #[allow(non_camel_case_types)] + struct CreateSvc(pub Arc); + impl tonic::server::UnaryService + for CreateSvc { + type Response = super::CreateResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::create(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = CreateSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/api.ControlApi/Delete" => { + #[allow(non_camel_case_types)] + struct DeleteSvc(pub Arc); + impl tonic::server::UnaryService + for DeleteSvc { + type Response = super::Response; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::delete(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = DeleteSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/api.ControlApi/Get" => { + #[allow(non_camel_case_types)] + struct GetSvc(pub Arc); + impl tonic::server::UnaryService + for GetSvc { + type Response = super::GetResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/api.ControlApi/Apply" => { + #[allow(non_camel_case_types)] + struct ApplySvc(pub Arc); + impl tonic::server::UnaryService + for ApplySvc { + type Response = super::CreateResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::apply(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ApplySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/api.ControlApi/Healthz" => { + #[allow(non_camel_case_types)] + struct HealthzSvc(pub Arc); + impl tonic::server::UnaryService + for HealthzSvc { + type Response = super::Pong; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::healthz(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = HealthzSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for ControlApiServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for ControlApiServer { + const NAME: &'static str = "api.ControlApi"; + } +} diff --git a/proto/control-api/src/grpc/callback.proto b/proto/control-api/src/grpc/callback.proto index 9db616218..80baa9702 100644 --- a/proto/control-api/src/grpc/callback.proto +++ b/proto/control-api/src/grpc/callback.proto @@ -59,9 +59,9 @@ message OnStart { // Event which fires when Endpoint stops sending/receiving media traffic. message OnStop { - /// Media type of the traffic which stops flowing in some Endpoint. + // Media type of the traffic which stops flowing in some Endpoint. MediaType media_type = 1; - /// Media Endpoint for which this callback was received. + // Media Endpoint for which this callback was received. MediaDirection media_direction = 2; // Reason of why Endpoint was stopped. Reason reason = 3; diff --git a/proto/control-api/src/grpc/callback.rs b/proto/control-api/src/grpc/callback.rs index b4f2c5503..4a418695d 100644 --- a/proto/control-api/src/grpc/callback.rs +++ b/proto/control-api/src/grpc/callback.rs @@ -43,10 +43,10 @@ pub struct OnStart { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OnStop { - /// / Media type of the traffic which stops flowing in some Endpoint. + /// Media type of the traffic which stops flowing in some Endpoint. #[prost(enumeration = "MediaType", tag = "1")] pub media_type: i32, - /// / Media Endpoint for which this callback was received. + /// Media Endpoint for which this callback was received. #[prost(enumeration = "MediaDirection", tag = "2")] pub media_direction: i32, /// Reason of why Endpoint was stopped. @@ -235,6 +235,116 @@ impl MediaDirection { } } } +/// Generated client implementations. +pub mod callback_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// Service for receiving callbacks from a media server. + #[derive(Debug, Clone)] + pub struct CallbackClient { + inner: tonic::client::Grpc, + } + impl CallbackClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl CallbackClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> CallbackClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + CallbackClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Fires when a certain callback event happens on a media server. + pub async fn on_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/callback.Callback/OnEvent", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("callback.Callback", "OnEvent")); + self.inner.unary(req, path, codec).await + } + } +} /// Generated server implementations. pub mod callback_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] From 6881ba3ddf338d443b1fbd6421ddbef143fc17b4 Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 21:52:37 +0300 Subject: [PATCH 4/6] fix --- proto/control-api/src/grpc/api.proto | 24 +- proto/control-api/src/grpc/api.rs | 420 +-------------------------- 2 files changed, 27 insertions(+), 417 deletions(-) diff --git a/proto/control-api/src/grpc/api.proto b/proto/control-api/src/grpc/api.proto index 29d73d165..6ce333608 100644 --- a/proto/control-api/src/grpc/api.proto +++ b/proto/control-api/src/grpc/api.proto @@ -168,12 +168,6 @@ message Member { // URL of the callback to fire when this `Member` finishes a persistent // connection with a media server via Client API. string on_leave = 3; - // URL of the callback to fire when this [`Member`] start traffic - // with a media server via [Client API]. - string on_start = 4; - // URL of the callback to fire when this [`Member`] stopped traffic - // with a media server via [Client API]. - string on_stop = 5; // Credentials to authenticate this `Member` in Client API with. // // Plain and hashed credentials are supported. If no credentials provided, @@ -185,23 +179,29 @@ message Member { // are used, so it should be appended manually on a client side. oneof credentials { // Argon2 hash of credentials. - string hash = 6; + string hash = 4; // Plain text credentials. - string plain = 7; + string plain = 5; } // Timeout of receiving heartbeat messages from this `Member` via Client API. // Once reached, this `Member` is considered being idle. - google.protobuf.Duration idle_timeout = 8; + google.protobuf.Duration idle_timeout = 6; // Timeout of reconnecting this `Member` via Client API. // Once reached, this `Member` is considered disconnected. - google.protobuf.Duration reconnect_timeout = 9; + google.protobuf.Duration reconnect_timeout = 7; // Interval of pinging with heartbeat messages this `Member` via Client API // by a media server. // If empty then the default interval of a media server is used, if // configured. - google.protobuf.Duration ping_interval = 10; + google.protobuf.Duration ping_interval = 8; // Media pipeline representing this `Member`. - map pipeline = 11; + map pipeline = 9; + // URL of the callback to fire when this [`Member`] start traffic + // with a media server via [Client API]. + string on_start = 10; + // URL of the callback to fire when this [`Member`] stopped traffic + // with a media server via [Client API]. + string on_stop = 11; // Elements which Member's pipeline can contain. message Element { diff --git a/proto/control-api/src/grpc/api.rs b/proto/control-api/src/grpc/api.rs index 63ad290f6..0776d3eab 100644 --- a/proto/control-api/src/grpc/api.rs +++ b/proto/control-api/src/grpc/api.rs @@ -209,34 +209,34 @@ pub struct Member { /// connection with a media server via Client API. #[prost(string, tag = "3")] pub on_leave: ::prost::alloc::string::String, - /// URL of the callback to fire when this \[`Member`\] start traffic - /// with a media server via \[Client API\]. - #[prost(string, tag = "4")] - pub on_start: ::prost::alloc::string::String, - /// URL of the callback to fire when this \[`Member`\] stopped traffic - /// with a media server via \[Client API\]. - #[prost(string, tag = "5")] - pub on_stop: ::prost::alloc::string::String, /// Timeout of receiving heartbeat messages from this `Member` via Client API. /// Once reached, this `Member` is considered being idle. - #[prost(message, optional, tag = "8")] + #[prost(message, optional, tag = "6")] pub idle_timeout: ::core::option::Option<::prost_types::Duration>, /// Timeout of reconnecting this `Member` via Client API. /// Once reached, this `Member` is considered disconnected. - #[prost(message, optional, tag = "9")] + #[prost(message, optional, tag = "7")] pub reconnect_timeout: ::core::option::Option<::prost_types::Duration>, /// Interval of pinging with heartbeat messages this `Member` via Client API /// by a media server. /// If empty then the default interval of a media server is used, if /// configured. - #[prost(message, optional, tag = "10")] + #[prost(message, optional, tag = "8")] pub ping_interval: ::core::option::Option<::prost_types::Duration>, /// Media pipeline representing this `Member`. - #[prost(map = "string, message", tag = "11")] + #[prost(map = "string, message", tag = "9")] pub pipeline: ::std::collections::HashMap< ::prost::alloc::string::String, member::Element, >, + /// URL of the callback to fire when this \[`Member`\] start traffic + /// with a media server via \[Client API\]. + #[prost(string, tag = "10")] + pub on_start: ::prost::alloc::string::String, + /// URL of the callback to fire when this \[`Member`\] stopped traffic + /// with a media server via \[Client API\]. + #[prost(string, tag = "11")] + pub on_stop: ::prost::alloc::string::String, /// Credentials to authenticate this `Member` in Client API with. /// /// Plain and hashed credentials are supported. If no credentials provided, @@ -246,7 +246,7 @@ pub struct Member { /// Hashed variant only supports Argon2 hash at the moment. /// `Member` sid won't contain a `token` query parameter if hashed credentials /// are used, so it should be appended manually on a client side. - #[prost(oneof = "member::Credentials", tags = "6, 7")] + #[prost(oneof = "member::Credentials", tags = "4, 5")] pub credentials: ::core::option::Option, } /// Nested message and enum types in `Member`. @@ -282,10 +282,10 @@ pub mod member { #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum Credentials { /// Argon2 hash of credentials. - #[prost(string, tag = "6")] + #[prost(string, tag = "4")] Hash(::prost::alloc::string::String), /// Plain text credentials. - #[prost(string, tag = "7")] + #[prost(string, tag = "5")] Plain(::prost::alloc::string::String), } } @@ -678,393 +678,3 @@ pub mod control_api_client { } } } -/// Generated server implementations. -pub mod control_api_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with ControlApiServer. - #[async_trait] - pub trait ControlApi: Send + Sync + 'static { - /// Creates a new `Element` on the media server. - /// - /// Non-idempotent. Errors if an `Element` with such ID already exists. - async fn create( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Removes `Element`s from the media server. - /// Allows referring multiple `Element`s on the last two levels of a FID. - /// - /// Idempotent. If no `Element`s with such FIDs exist, then succeeds. - async fn delete( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Lookups `Element`s by their FIDs on the media server. - /// If no FIDs are specified, then returns all the current `Element`s on the - /// media server. - async fn get( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Applies changes to an existing `Element` on the media server, or creates a - /// new one in case there is no `Element` with such ID. - /// - /// Idempotent. If no `Element` with such ID exists, then it will be created, - /// otherwise it will be reconfigured. `Element`s that exist on the same - /// hierarchy level, but are not specified in the provided spec, will be - /// removed. - async fn apply( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - /// Checks healthiness of the media server. - /// Caller should assert that the returned `Pong` has the same nonce as the - /// sent `Ping`. - async fn healthz( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; - } - /// Service allowing to control a media server dynamically, by creating, updating - /// and destroying pipelines of media `Element`s on it. - #[derive(Debug)] - pub struct ControlApiServer { - inner: _Inner, - accept_compression_encodings: EnabledCompressionEncodings, - send_compression_encodings: EnabledCompressionEncodings, - max_decoding_message_size: Option, - max_encoding_message_size: Option, - } - struct _Inner(Arc); - impl ControlApiServer { - pub fn new(inner: T) -> Self { - Self::from_arc(Arc::new(inner)) - } - pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - max_decoding_message_size: None, - max_encoding_message_size: None, - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - /// Enable decompressing requests with the given encoding. - #[must_use] - pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.accept_compression_encodings.enable(encoding); - self - } - /// Compress responses with the given encoding, if the client supports it. - #[must_use] - pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { - self.send_compression_encodings.enable(encoding); - self - } - /// Limits the maximum size of a decoded message. - /// - /// Default: `4MB` - #[must_use] - pub fn max_decoding_message_size(mut self, limit: usize) -> Self { - self.max_decoding_message_size = Some(limit); - self - } - /// Limits the maximum size of an encoded message. - /// - /// Default: `usize::MAX` - #[must_use] - pub fn max_encoding_message_size(mut self, limit: usize) -> Self { - self.max_encoding_message_size = Some(limit); - self - } - } - impl tonic::codegen::Service> for ControlApiServer - where - T: ControlApi, - B: Body + Send + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = std::convert::Infallible; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/api.ControlApi/Create" => { - #[allow(non_camel_case_types)] - struct CreateSvc(pub Arc); - impl tonic::server::UnaryService - for CreateSvc { - type Response = super::CreateResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::create(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = CreateSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Delete" => { - #[allow(non_camel_case_types)] - struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService - for DeleteSvc { - type Response = super::Response; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::delete(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DeleteSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Get" => { - #[allow(non_camel_case_types)] - struct GetSvc(pub Arc); - impl tonic::server::UnaryService - for GetSvc { - type Response = super::GetResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Apply" => { - #[allow(non_camel_case_types)] - struct ApplySvc(pub Arc); - impl tonic::server::UnaryService - for ApplySvc { - type Response = super::CreateResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::apply(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ApplySvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/api.ControlApi/Healthz" => { - #[allow(non_camel_case_types)] - struct HealthzSvc(pub Arc); - impl tonic::server::UnaryService - for HealthzSvc { - type Response = super::Pong; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::healthz(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = HealthzSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } - } - } - } - impl Clone for ControlApiServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - max_decoding_message_size: self.max_decoding_message_size, - max_encoding_message_size: self.max_encoding_message_size, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::server::NamedService for ControlApiServer { - const NAME: &'static str = "api.ControlApi"; - } -} From fc186d410e077b71352bdeb211f5202f4f523e0a Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 21:55:29 +0300 Subject: [PATCH 5/6] fmt --- flutter/example/lib/call_route.dart | 10 ++++++++-- flutter/example/lib/control_api/entities/member.dart | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/flutter/example/lib/call_route.dart b/flutter/example/lib/call_route.dart index 665505baf..751f1a9c4 100644 --- a/flutter/example/lib/call_route.dart +++ b/flutter/example/lib/call_route.dart @@ -983,8 +983,14 @@ Future controlApiCreateMemberDialog(BuildContext context, Call call) { const SizedBox(height: 10), TextButton( onPressed: () async { - var member = Member(memberId, {}, Plain(credentials), - 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099'); + var member = Member( + memberId, + {}, + Plain(credentials), + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099'); member.idle_timeout = idle; member.reconnect_timeout = reconnectTimeout; diff --git a/flutter/example/lib/control_api/entities/member.dart b/flutter/example/lib/control_api/entities/member.dart index 17df49135..fe11844b1 100644 --- a/flutter/example/lib/control_api/entities/member.dart +++ b/flutter/example/lib/control_api/entities/member.dart @@ -107,7 +107,8 @@ class Member { /// API. String? ping_interval; - Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave, this.on_start, this.on_stop); + Member(this.id, this.pipeline, this.credentials, this.on_join, this.on_leave, + this.on_start, this.on_stop); factory Member.fromJson(Map json) { json.remove('kind'); From 2f341c154276c15251f525685b899776e58bac85 Mon Sep 17 00:00:00 2001 From: rogurotus Date: Fri, 15 Mar 2024 21:56:44 +0300 Subject: [PATCH 6/6] fix --- flutter/example/lib/call.dart | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flutter/example/lib/call.dart b/flutter/example/lib/call.dart index 973e95ed1..9710b7687 100644 --- a/flutter/example/lib/call.dart +++ b/flutter/example/lib/call.dart @@ -240,8 +240,14 @@ class Call { var resp = await client.create( roomId, Room(roomId, { - memberId: Member(memberId, pipeline, Plain('test'), - 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099') + memberId: Member( + memberId, + pipeline, + Plain('test'), + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099') })); return jsonDecode(resp.body)['sids'][memberId]; } @@ -273,7 +279,13 @@ class Call { var resp = await client.create( '$roomId/$memberId', - Member(memberId, pipeline, Plain('test'), 'grpc://127.0.0.1:9099', + Member( + memberId, + pipeline, + Plain('test'), + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', + 'grpc://127.0.0.1:9099', 'grpc://127.0.0.1:9099')); if (isPublish) {