diff --git a/platform/forward.go b/platform/forward.go index 1b485105..3cb82c4a 100644 --- a/platform/forward.go +++ b/platform/forward.go @@ -19,6 +19,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -334,6 +335,8 @@ func (v *ForwardWorker) Start(ctx context.Context) error { type ForwardConfigure struct { // The platform name, for example, wx Platform string `json:"platform"` + // the source stream in oryx. + Stream string `json:"stream"` // The RTMP server url, for example, rtmp://localhost/live Server string `json:"server"` // The RTMP stream and secret, for example, livestream @@ -347,13 +350,14 @@ type ForwardConfigure struct { } func (v *ForwardConfigure) String() string { - return fmt.Sprintf("platform=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v", - v.Platform, v.Server, v.Secret, v.Enabled, v.Customed, v.Label, + return fmt.Sprintf("platform=%v, stream=%v, server=%v, secret=%v, enabled=%v, customed=%v, label=%v", + v.Platform, v.Stream, v.Server, v.Secret, v.Enabled, v.Customed, v.Label, ) } func (v *ForwardConfigure) Update(u *ForwardConfigure) error { v.Platform = u.Platform + v.Stream = u.Stream v.Server = u.Server v.Secret = u.Secret v.Label = u.Label @@ -500,18 +504,28 @@ func (v *ForwardTask) Run(ctx context.Context) error { ctx = logger.WithContext(ctx) logger.Tf(ctx, "forward run task %v", v.String()) + // select active stream by stream name or random select one when stream name is empty. selectActiveStream := func() (*SrsStream, error) { streams, err := rdb.HGetAll(ctx, SRS_STREAM_ACTIVE).Result() if err != nil { return nil, errors.Wrapf(err, "hgetall %v", SRS_STREAM_ACTIVE) } + streamName := v.config.Stream + var best *SrsStream for _, v := range streams { var stream SrsStream if err := json.Unmarshal([]byte(v), &stream); err != nil { return nil, errors.Wrapf(err, "unmarshal %v", v) } + if streamName != "" { + if stream.Stream == streamName { + best = &stream + break + } + continue + } if best == nil { best = &stream diff --git a/ui/src/pages/ScenarioForward.js b/ui/src/pages/ScenarioForward.js index edb94fcf..33704fbe 100644 --- a/ui/src/pages/ScenarioForward.js +++ b/ui/src/pages/ScenarioForward.js @@ -162,7 +162,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { }, [configs, setConfigs]); // Update the forward config to server. - const updateSecrets = React.useCallback((e, action, platform, server, secret, enabled, custom, label, onSuccess) => { + const updateSecrets = React.useCallback((e, action, platform, stream, server, secret, enabled, custom, label, onSuccess) => { e.preventDefault(); if (!server) return alert(t('plat.com.addr')); if (custom && !label) return alert(t('plat.com.label')); @@ -171,7 +171,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { setSubmiting(true); axios.post('/terraform/v1/ffmpeg/forward/secret', { - action, platform, server, secret, enabled: !!enabled, custom: !!custom, label, + action, platform, stream, server, secret, enabled: !!enabled, custom: !!custom, label, }, { headers: Token.loadBearerHeader(), }).then(res => { @@ -235,6 +235,11 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { * {conf.custom ? `(${t('helper.required')})` : `(${t('helper.optional')})`} {t('plat.com.name2')} updateConfigObject({...conf, label: e.target.value})}/> + + {t('plat.com.source')} + {!conf.custom && * {t('plat.com.source')} check System-{'>'}Streams tab} + updateConfigObject({...conf, stream: e.target.value})}/> + {conf.custom ? t('plat.com.server') : t('plat.com.server2')} {!conf.custom && * {t('plat.com.server3')} {conf?.locale?.link2}, {t('plat.com.server4')}} @@ -259,7 +264,7 @@ function ScenarioForwardImpl({defaultActiveKey, defaultSecrets}) { type="submit" disabled={submiting} onClick={(e) => { - updateSecrets(e, 'update', conf.platform, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => { + updateSecrets(e, 'update', conf.platform, conf.stream, conf.server, conf.secret, !conf.enabled, conf.custom, conf.label, () => { updateConfigObject({...conf, enabled: !conf.enabled}); }); }}