From 0b5ab47ec9be1a5c4a926750311222b4c368b6c3 Mon Sep 17 00:00:00 2001 From: William Date: Tue, 1 Feb 2022 17:11:27 +0800 Subject: [PATCH] Remove hookstream service --- cmd/hookstream.go | 9 -- cmd/internal/hookstream/cmd.go | 190 --------------------------------- internal/hookstream/hook.go | 75 ------------- 3 files changed, 274 deletions(-) delete mode 100644 cmd/hookstream.go delete mode 100644 cmd/internal/hookstream/cmd.go delete mode 100644 internal/hookstream/hook.go diff --git a/cmd/hookstream.go b/cmd/hookstream.go deleted file mode 100644 index 0e3e0ff..0000000 --- a/cmd/hookstream.go +++ /dev/null @@ -1,9 +0,0 @@ -//go:build hookstream - -package main - -import "github.com/SB-IM/charoite/cmd/internal/hookstream" - -func init() { - commands = append(commands, hookstream.Command()) -} diff --git a/cmd/internal/hookstream/cmd.go b/cmd/internal/hookstream/cmd.go deleted file mode 100644 index afce0df..0000000 --- a/cmd/internal/hookstream/cmd.go +++ /dev/null @@ -1,190 +0,0 @@ -package hookstream - -import ( - "context" - "time" - - "github.com/SB-IM/charoite/internal/hookstream" - "github.com/SB-IM/charoite/pkg/mqttclient" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - "github.com/urfave/cli/v2" - "github.com/urfave/cli/v2/altsrc" - "github.com/williamlsh/logging" -) - -const configFlagName = "config" - -// Command returns a livestream command. -func Command() *cli.Command { - ctx := context.Background() - - var ( - logger zerolog.Logger - - mc mqtt.Client - - uuid string - mqttConfigOptions mqttclient.ConfigOptions - mqttClientConfigOptions hookstream.MQTTClientConfigOptions - hookStreamConfigOptions hookstream.HookCommandLine - ) - - flags := func() (flags []cli.Flag) { - for _, v := range [][]cli.Flag{ - loadConfigFlag(), - uuidFlag(&uuid), - mqttFlags(&mqttConfigOptions), - mqttClientFlags(&mqttClientConfigOptions), - hookCommandLineFlags(&hookStreamConfigOptions), - } { - flags = append(flags, v...) - } - return - }() - - return &cli.Command{ - Name: "hookstream", - Usage: "Hookstream hooks a track source stream by executing hooking command line", - Flags: flags, - Before: func(c *cli.Context) error { - if err := altsrc.InitInputSourceWithContext( - flags, - altsrc.NewTomlSourceFromFlagFunc(configFlagName), - )(c); err != nil { - return err - } - - // Set up logger. - debug := c.Bool("debug") - logging.Debug(debug) - logger = log.With().Str("command", "hookstream").Logger() - ctx = logger.WithContext(ctx) - - // Initializes MQTT client. - mc = mqttclient.NewClient(ctx, mqttConfigOptions) - if err := mqttclient.CheckConnectivity(mc, 3*time.Second); err != nil { - return err - } - ctx = mqttclient.WithContext(ctx, mc) - - return nil - }, - Action: func(c *cli.Context) error { - errCh := hookstream.Exec(ctx, hookstream.ConfigOptions{ - UUID: uuid, - HookCommandLine: hookStreamConfigOptions, - MQTTClientConfigOptions: mqttClientConfigOptions, - }) - return <-errCh - }, - After: func(c *cli.Context) error { - logger.Info().Msg("exits") - return nil - }, - } -} - -// loadConfigFlag sets a config file path for app command. -// Note: you can't set any other flags' `Required` value to `true`, -// As it conflicts with this flag. You can set only either this flag or specifically the other flags but not both. -func loadConfigFlag() []cli.Flag { - return []cli.Flag{ - &cli.StringFlag{ - Name: configFlagName, - Aliases: []string{"c"}, - Usage: "Config file path", - Value: "config/config.toml", - DefaultText: "config/config.toml", - }, - } -} - -func uuidFlag(uuid *string) []cli.Flag { - return []cli.Flag{ - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "machine_id.uuid", - Usage: "UUID v4 for this Linux machine", - Value: "", - DefaultText: "", - Destination: uuid, - }), - } -} - -func mqttFlags(options *mqttclient.ConfigOptions) []cli.Flag { - return []cli.Flag{ - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "mqtt.server", - Usage: "MQTT server address", - Value: "tcp://mosquitto:1883", - DefaultText: "tcp://mosquitto:1883", - Destination: &options.Server, - }), - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "mqtt.clientID", - Usage: "MQTT client id", - Value: "mqtt_edge", - DefaultText: "mqtt_edge", - Destination: &options.ClientID, - }), - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "mqtt.username", - Usage: "MQTT broker username", - Value: "", - Destination: &options.Username, - }), - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "mqtt.password", - Usage: "MQTT broker password", - Value: "", - Destination: &options.Password, - }), - } -} - -func mqttClientFlags(options *hookstream.MQTTClientConfigOptions) []cli.Flag { - return []cli.Flag{ - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "mqtt_client.topic_hook_stream_prefix", - Usage: "MQTT topic prefix for hooking of seeding stream", - Value: "/edge/livestream/hook", - DefaultText: "/edge/livestream/hook", - Destination: &options.HookStreamTopicPrefix, - }), - altsrc.NewUintFlag(&cli.UintFlag{ - Name: "mqtt_client.qos", - Usage: "MQTT client qos for WebRTC SDP signaling", - Value: 0, - DefaultText: "0", - Destination: &options.Qos, - }), - altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "mqtt_client.retained", - Usage: "MQTT client setting retention for WebRTC SDP signaling", - Value: false, - DefaultText: "false", - Destination: &options.Retained, - }), - } -} - -func hookCommandLineFlags(options *hookstream.HookCommandLine) []cli.Flag { - return []cli.Flag{ - altsrc.NewStringFlag(&cli.StringFlag{ - Name: "hook_stream.service", - Usage: "Systemd service to be restarted of hooking stream", - Value: "sbcameradronemulti.service", - DefaultText: "sbcameradronemulti.service", - Destination: &options.Service, - }), - altsrc.NewDurationFlag(&cli.DurationFlag{ - Name: "hook_stream.wait_timeout", - Usage: "Waiting timeout to hook stream in seconds", - Value: 1 * time.Second, - DefaultText: "1", - Destination: &options.WaitTimeout, - }), - } -} diff --git a/internal/hookstream/hook.go b/internal/hookstream/hook.go deleted file mode 100644 index 7df026a..0000000 --- a/internal/hookstream/hook.go +++ /dev/null @@ -1,75 +0,0 @@ -package hookstream - -import ( - "context" - "os/exec" - "strconv" - "time" - - pb "github.com/SB-IM/charoite/internal/pb/signal" - "github.com/SB-IM/charoite/pkg/mqttclient" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/pion/webrtc/v3" - "github.com/rs/zerolog/log" -) - -type ConfigOptions struct { - UUID string - HookCommandLine - MQTTClientConfigOptions -} - -type MQTTClientConfigOptions struct { - HookStreamTopicPrefix string - Qos uint - Retained bool -} - -type HookCommandLine struct { - Service string - WaitTimeout time.Duration -} - -func Exec(ctx context.Context, config ConfigOptions) (errCh chan error) { - logger := log.Ctx(ctx) - client := mqttclient.FromContext(ctx) - - errCh = make(chan error, 1) - - topic := config.HookStreamTopicPrefix + "/" + config.UUID + "/" + strconv.Itoa(int(pb.TrackSource_DRONE)) - t := client.Subscribe(topic, byte(config.Qos), func(c mqtt.Client, m mqtt.Message) { - payload, err := strconv.Atoi(string(m.Payload())) - if err != nil { - logger.Err(err).Bytes("payload", m.Payload()).Msg("invalid message") - return - } - state := webrtc.ICEConnectionState(payload) - if state == webrtc.ICEConnectionStateDisconnected { - return - } - - time.Sleep(config.WaitTimeout) - logger.Info().Dur("wait", config.WaitTimeout).Msg("wait for a while") - - if err := exec.CommandContext( //nolint:gosec - ctx, - "systemctl", - "restart", - config.Service, - ).Run(); err != nil { - errCh <- err - return - } - logger.Info().Str("service", config.Service).Msg("restarted service") - }) - go func() { - <-t.Done() - if t.Error() != nil { - logger.Err(t.Error()).Msgf("could not subscribe to %s", topic) - } else { - logger.Info().Msgf("subscribed to %s", topic) - } - }() - - return -}