Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RxJSPeer for the peer connection pushing/pulling #66

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions app/components/AudioStream.tsx
Original file line number Diff line number Diff line change
@@ -1,20 +1,87 @@
import type { FC } from 'react'
import { useEffect, useRef } from 'react'
import { useEffect, useMemo, useRef } from 'react'
import { of } from 'rxjs'
import { useSubscribedState } from '~/hooks/rxjsHooks'
import { useRoomContext } from '~/hooks/useRoomContext'

interface AudioStreamProps {
mediaStreamTrack: MediaStreamTrack
tracksToPull: string[]
onTrackAdded: (id: string, track: MediaStreamTrack) => void
onTrackRemoved: (id: string, track: MediaStreamTrack) => void
}

export const AudioStream: FC<AudioStreamProps> = ({ mediaStreamTrack }) => {
export const AudioStream: FC<AudioStreamProps> = ({
tracksToPull,
onTrackAdded,
onTrackRemoved,
}) => {
const mediaStreamRef = useRef(new MediaStream())
const ref = useRef<HTMLAudioElement>(null)

useEffect(() => {
const audio = ref.current
if (!audio) return
const mediaStream = new MediaStream()
mediaStream.addTrack(mediaStreamTrack)
const mediaStream = mediaStreamRef.current
audio.srcObject = mediaStream
}, [mediaStreamTrack])
}, [])

return <audio ref={ref} autoPlay />
return (
<>
<audio ref={ref} autoPlay />
{tracksToPull.map((track) => (
<AudioTrack
key={track}
track={track}
mediaStream={mediaStreamRef.current}
onTrackAdded={onTrackAdded}
onTrackRemoved={onTrackRemoved}
/>
))}
</>
)
}

function AudioTrack({
mediaStream,
track,
onTrackAdded,
onTrackRemoved,
}: {
mediaStream: MediaStream
track: string
onTrackAdded: (id: string, track: MediaStreamTrack) => void
onTrackRemoved: (id: string, track: MediaStreamTrack) => void
}) {
const onTrackAddedRef = useRef(onTrackAdded)
onTrackAddedRef.current = onTrackAdded
const onTrackRemovedRef = useRef(onTrackRemoved)
onTrackRemovedRef.current = onTrackRemoved

const { peer } = useRoomContext()
const trackObject = useMemo(() => {
const [sessionId, trackName] = track.split('/')
return {
sessionId,
trackName,
location: 'remote',
} as const
}, [track])

const pulledTrack$ = useMemo(() => {
return peer.pullTrack(of(trackObject))
}, [peer, trackObject])

const audioTrack = useSubscribedState(pulledTrack$)

useEffect(() => {
if (!audioTrack) return
mediaStream.addTrack(audioTrack)
onTrackAddedRef.current(track, audioTrack)
return () => {
mediaStream.removeTrack(audioTrack)
onTrackRemovedRef.current(track, audioTrack)
}
}, [audioTrack])

return null
}
36 changes: 16 additions & 20 deletions app/components/HighPacketLossWarningsToast.tsx
Original file line number Diff line number Diff line change
@@ -1,38 +1,34 @@
import { useEffect, useState } from 'react'
import { useMemo } from 'react'
import Toast, { Root } from '~/components/Toast'
import { useSubscribedState } from '~/hooks/rxjsHooks'
import { useConditionForAtLeast } from '~/hooks/useConditionForAtLeast'
import type Peer from '~/utils/Peer.client'
import { getPacketLossStats$ } from '~/utils/rxjs/getPacketLossStats$'
import { useRoomContext } from '../hooks/useRoomContext'
import { Icon } from './Icon/Icon'

export function usePacketLossInformation(): Partial<
ReturnType<typeof Peer.prototype.getDebugInfo>
> {
function useStats() {
const { peer } = useRoomContext()
const [debugInfo, setDebugInfo] = useState(peer?.getDebugInfo())

useEffect(() => {
const interval = setInterval(() => {
setDebugInfo(peer?.getDebugInfo())
}, 1000)

return () => {
clearInterval(interval)
}
}, [peer])
const stats$ = useMemo(
() => getPacketLossStats$(peer.peerConnection$),
[peer.peerConnection$]
)
const stats = useSubscribedState(stats$, {
inboundPacketLossPercentage: 0,
outboundPacketLossPercentage: 0,
})

return debugInfo ?? {}
return stats
}

export function HighPacketLossWarningsToast() {
const { inboundPacketLossPercentage, outboundPacketLossPercentage } =
usePacketLossInformation()
useStats()

const hasIssues = useConditionForAtLeast(
inboundPacketLossPercentage !== undefined &&
outboundPacketLossPercentage !== undefined &&
inboundPacketLossPercentage > 0.01 &&
outboundPacketLossPercentage > 0.01,
(inboundPacketLossPercentage > 0.01 ||
outboundPacketLossPercentage > 0.01),
3000
)

Expand Down
4 changes: 2 additions & 2 deletions app/components/Participant.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ export const Participant = forwardRef<
useDeadPulledTrackMonitor(
user.tracks.video,
user.transceiverSessionId,
user.tracks.videoEnabled,
!!user.tracks.video,
videoTrack,
user.name
)

useDeadPulledTrackMonitor(
user.tracks.audio,
user.transceiverSessionId,
user.tracks.audioEnabled,
!!user.tracks.audio,
audioTrack,
user.name
)
Expand Down
21 changes: 15 additions & 6 deletions app/components/PullAudioTracks.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { FC, ReactNode } from 'react'
import { createContext, useContext } from 'react'
import usePulledTracks from '~/hooks/usePulledTracks'
import { createContext, useContext, useState } from 'react'
import { AudioStream } from './AudioStream'

interface PullAudioTracksProps {
Expand All @@ -14,13 +13,23 @@ export const PullAudioTracks: FC<PullAudioTracksProps> = ({
audioTracks,
children,
}) => {
const audioTrackMap = usePulledTracks(audioTracks)
const [audioTrackMap, setAudioTrackMap] = useState<
Record<string, MediaStreamTrack>
>({})

return (
<AudioTrackContext.Provider value={audioTrackMap}>
{Object.entries(audioTrackMap).map(([trackKey, mediaStreamTrack]) => (
<AudioStream key={trackKey} mediaStreamTrack={mediaStreamTrack} />
))}
<AudioStream
tracksToPull={audioTracks}
onTrackAdded={(id, track) =>
setAudioTrackMap({ ...audioTrackMap, [id]: track })
}
onTrackRemoved={(id) => {
const update = { ...audioTrackMap }
delete update[id]
setAudioTrackMap(update)
}}
/>
{children}
</AudioTrackContext.Provider>
)
Expand Down
59 changes: 29 additions & 30 deletions app/components/PullVideoTrack.tsx
Original file line number Diff line number Diff line change
@@ -1,48 +1,47 @@
import type { ReactElement } from 'react'
import { useEffect, useState } from 'react'
import { useMemo } from 'react'
import { of, switchMap } from 'rxjs'
import { useStateObservable, useSubscribedState } from '~/hooks/rxjsHooks'
import { useRoomContext } from '~/hooks/useRoomContext'
import keepTrying from '~/utils/keepTrying'
import type { TrackObject } from '~/utils/callsTypes'
import { usePulledAudioTrack } from './PullAudioTracks'

interface PullTracksProps {
audio?: string
video?: string
audio?: string
children: (props: {
audioTrack?: MediaStreamTrack
videoTrack?: MediaStreamTrack
audioTrack?: MediaStreamTrack
}) => ReactElement
}

export const PullVideoTrack = ({ video, audio, children }: PullTracksProps) => {
const { peer } = useRoomContext()

const [videoTrack, setVideoTrack] = useState<MediaStreamTrack>()
const audioTrack = usePulledAudioTrack(audio)

useEffect(() => {
if (!video || !peer) return
let mounted = true
const cancel = keepTrying(() => {
const [sessionId, trackName] = video.split('/')
// backward compatibility: ResourceID -> TrackObject
return peer
.pullTrack({ location: 'remote', sessionId, trackName })
.then((track) => {
if (mounted) setVideoTrack(track)
})
})
return () => {
cancel()
mounted = false
}
}, [peer, video])

useEffect(() => {
if (videoTrack && peer)
return () => {
peer.closeTrack(videoTrack)
}
}, [videoTrack, peer])
const [sessionId, trackName] = video?.split('/') ?? []
const trackObject = useMemo(
() =>
sessionId && trackName
? ({
trackName,
sessionId,
location: 'remote',
} satisfies TrackObject)
: undefined,
[sessionId, trackName]
)

const trackObject$ = useStateObservable(trackObject)
const pulledTrack$ = useMemo(
() =>
trackObject$.pipe(
switchMap((track) =>
track ? peer.pullTrack(of(track)) : of(undefined)
)
),
[peer, trackObject$]
)
const videoTrack = useSubscribedState(pulledTrack$)
return children({ videoTrack, audioTrack })
}
26 changes: 23 additions & 3 deletions app/components/VideoInputSelector.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import type { FC } from 'react'
import { useMemo, type FC } from 'react'
import { useSubscribedState } from '~/hooks/rxjsHooks'
import useMediaDevices from '~/hooks/useMediaDevices'
import { useRoomContext } from '~/hooks/useRoomContext'
import { errorMessageMap } from '~/hooks/useUserMedia'
import { getSortedDeviceListObservable } from '~/utils/rxjs/getDeviceListObservable'
import { Option, Select } from './Select'

export const VideoInputSelector: FC<{ id?: string }> = ({ id }) => {
const videoInputDevices = useMediaDevices((d) => d.kind === 'videoinput')
const sortedDeviceListObservable$ = useMemo(
() => getSortedDeviceListObservable(),
[]
)
const sortedDeviceList = useSubscribedState(sortedDeviceListObservable$, [])

const {
userMedia: { videoUnavailableReason, videoDeviceId, setVideoDeviceId },
userMedia: {
videoUnavailableReason,
videoDeviceId,
setVideoDeviceId,
videoEnabled,
},
} = useRoomContext()

if (videoUnavailableReason) {
Expand All @@ -25,9 +37,17 @@ export const VideoInputSelector: FC<{ id?: string }> = ({ id }) => {
)
}

// we can only rely on videoDeviceId when the webcam is enabled because
// when it's not, the device id is being pulled from our black canvas track
// so we will instead fall back to show the user's preferred webcam that
// we would _try_ to acquire the next time they enable their webcam.
const shownDeviceId = videoEnabled
? videoDeviceId
: sortedDeviceList.find((d) => d.kind === 'videoinput')?.deviceId

return (
<div className="max-w-[40ch]">
<Select value={videoDeviceId} onValueChange={setVideoDeviceId} id={id}>
<Select value={shownDeviceId} onValueChange={setVideoDeviceId} id={id}>
{videoInputDevices.map((d) => (
<Option key={d.deviceId} value={d.deviceId}>
{d.label}
Expand Down
14 changes: 8 additions & 6 deletions app/hooks/useBroadcastStatus.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { useEffect } from 'react'
import { useUnmount } from 'react-use'
import type { ClientMessage, User } from '~/types/Messages'
import type Peer from '~/utils/Peer.client'

import type PartySocket from 'partysocket'
import type { RxjsPeer } from '~/utils/rxjs/RxjsPeer.client'
import { useSubscribedState } from './rxjsHooks'
import type { RoomContextType } from './useRoomContext'
import type { UserMedia } from './useUserMedia'

interface Config {
userMedia: UserMedia
peer: Peer | null
peer: RxjsPeer
identity?: User
websocket: PartySocket
pushedTracks: RoomContextType['pushedTracks']
Expand All @@ -28,18 +29,19 @@ export default function useBroadcastStatus({
}: Config) {
const { audioEnabled, videoEnabled, screenShareEnabled } = userMedia
const { audio, video, screenshare } = pushedTracks
const { sessionId } = useSubscribedState(peer.session$) ?? {}

const id = identity?.id
const name = identity?.name
useEffect(() => {
if (id && name) {
const user = {
const user: User = {
id,
name,
joined: true,
raisedHand,
speaking,
transceiverSessionId: peer?.sessionId,
transceiverSessionId: sessionId,
tracks: {
audioEnabled,
videoEnabled,
Expand Down Expand Up @@ -71,7 +73,7 @@ export default function useBroadcastStatus({
id,
name,
websocket,
peer?.sessionId,
sessionId,
audio,
video,
screenshare,
Expand All @@ -93,7 +95,7 @@ export default function useBroadcastStatus({
joined: false,
raisedHand,
speaking,
transceiverSessionId: peer?.sessionId,
transceiverSessionId: sessionId,
tracks: {},
},
} satisfies ClientMessage)
Expand Down
Loading