From c8630728cc91dbad10aa10b741952afd9ec96f8b Mon Sep 17 00:00:00 2001 From: Yevhen Vydolob Date: Wed, 1 Nov 2023 14:16:22 +0200 Subject: [PATCH] Implement state change event Signed-off-by: Yevhen Vydolob --- pkg/crc/api/events/cluster_load_stream.go | 2 +- pkg/crc/api/events/event_server.go | 11 ++-- pkg/crc/api/events/events.go | 5 +- pkg/crc/api/events/log_stream.go | 11 +++- pkg/crc/api/events/status_change_stream.go | 62 ++++++++++++++++++++++ pkg/crc/machine/sync.go | 28 +++++++++- pkg/events/emitter.go | 4 +- pkg/events/events.go | 14 +++++ 8 files changed, 124 insertions(+), 13 deletions(-) create mode 100644 pkg/crc/api/events/status_change_stream.go create mode 100644 pkg/events/events.go diff --git a/pkg/crc/api/events/cluster_load_stream.go b/pkg/crc/api/events/cluster_load_stream.go index be4b1ab12e..f128262953 100644 --- a/pkg/crc/api/events/cluster_load_stream.go +++ b/pkg/crc/api/events/cluster_load_stream.go @@ -20,7 +20,7 @@ type TickListener struct { } func newClusterLoadStream(server *EventServer) EventStream { - return newStream(newStatusListener(server.machine), newEventPublisher(CLUSTER_LOAD, server.sseServer)) + return newStream(newStatusListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer)) } func newStatusListener(machine crcMachine.Client) EventProducer { diff --git a/pkg/crc/api/events/event_server.go b/pkg/crc/api/events/event_server.go index 80ba0dc529..1a7e5155c8 100644 --- a/pkg/crc/api/events/event_server.go +++ b/pkg/crc/api/events/event_server.go @@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer { stream.RemoveSubscriber(sub) } - sseServer.CreateStream(LOGS) - sseServer.CreateStream(CLUSTER_LOAD) + sseServer.CreateStream(Logs) + sseServer.CreateStream(ClusterLoad) + sseServer.CreateStream(StatusChange) return eventServer } @@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func createEventStream(server *EventServer, streamID string) EventStream { switch streamID { - case LOGS: + case Logs: return newLogsStream(server) - case CLUSTER_LOAD: + case ClusterLoad: return newClusterLoadStream(server) + case StatusChange: + return newStatusChangeStream(server) } return nil } diff --git a/pkg/crc/api/events/events.go b/pkg/crc/api/events/events.go index 1c2e5af580..027d35d7da 100644 --- a/pkg/crc/api/events/events.go +++ b/pkg/crc/api/events/events.go @@ -3,8 +3,9 @@ package events import "github.com/r3labs/sse/v2" const ( - LOGS = "logs" // Logs event channel, contains daemon logs - CLUSTER_LOAD = "cluster_load" // status event channel, contains VM load info + Logs = "logs" // Logs event channel, contains daemon logs + ClusterLoad = "cluster_load" // status event channel, contains VM load info + StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc ) type EventPublisher interface { diff --git a/pkg/crc/api/events/log_stream.go b/pkg/crc/api/events/log_stream.go index 2601a38e87..3940210251 100644 --- a/pkg/crc/api/events/log_stream.go +++ b/pkg/crc/api/events/log_stream.go @@ -1,6 +1,8 @@ package events import ( + "bytes" + "github.com/crc-org/crc/v2/pkg/crc/logging" "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" @@ -23,7 +25,7 @@ func newSSEStreamHook(server *sse.Server) *streamHook { &logrus.JSONFormatter{ TimestampFormat: "", DisableTimestamp: false, - DisableHTMLEscape: false, + DisableHTMLEscape: true, DataKey: "", FieldMap: nil, CallerPrettyfier: nil, @@ -56,7 +58,12 @@ func (s *streamHook) Fire(entry *logrus.Entry) error { return err } - s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line}) + // remove "Line Feed"("\n") character which add was added by json.Encoder + if line[len(line)-1] == 10 { + line = bytes.TrimRight(line, "\n") + } + + s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line}) return nil } diff --git a/pkg/crc/api/events/status_change_stream.go b/pkg/crc/api/events/status_change_stream.go new file mode 100644 index 0000000000..3eda4e5d5b --- /dev/null +++ b/pkg/crc/api/events/status_change_stream.go @@ -0,0 +1,62 @@ +package events + +import ( + "encoding/json" + + "github.com/crc-org/crc/v2/pkg/crc/logging" + "github.com/crc-org/crc/v2/pkg/crc/machine" + "github.com/crc-org/crc/v2/pkg/crc/machine/state" + "github.com/crc-org/crc/v2/pkg/crc/machine/types" + "github.com/crc-org/crc/v2/pkg/events" + "github.com/r3labs/sse/v2" +) + +type statusChangeEvent struct { + Status *types.ClusterStatusResult `json:"status"` + Error string `json:"error,omitempty"` +} + +type statusChange struct { + listenerDisposable events.Disposable + machineClient machine.Client +} + +func newStatusChangeStream(server *EventServer) EventStream { + return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer)) +} + +func newStatusChangeListener(client machine.Client) EventProducer { + return &statusChange{ + machineClient: client, + } +} + +func (st *statusChange) Start(publisher EventPublisher) { + st.listenerDisposable = events.StatusChanged.AddListener(func(changedEvent events.StatusChangedEvent) { + logging.Debugf("State Changed Event %s", changedEvent) + var event statusChangeEvent + status, err := st.machineClient.Status() + // if we cannot receive actual state, send error state with error description + if err != nil { + event = statusChangeEvent{Status: &types.ClusterStatusResult{ + CrcStatus: state.Error, + }, Error: err.Error()} + } else { + status.CrcStatus = changedEvent.State // override with actual reported state + event = statusChangeEvent{Status: status} + if changedEvent.Error != nil { + event.Error = changedEvent.Error.Error() + } + + } + data, _ := json.Marshal(event) + publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data}) + }) +} + +func (st *statusChange) Stop() { + if st.listenerDisposable != nil { + st.listenerDisposable() + st.listenerDisposable = nil + } +} diff --git a/pkg/crc/machine/sync.go b/pkg/crc/machine/sync.go index 6da384b415..ef198ff0de 100644 --- a/pkg/crc/machine/sync.go +++ b/pkg/crc/machine/sync.go @@ -10,6 +10,7 @@ import ( "github.com/crc-org/crc/v2/pkg/crc/machine/state" "github.com/crc-org/crc/v2/pkg/crc/machine/types" crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset" + "github.com/crc-org/crc/v2/pkg/events" ) const startCancelTimeout = 15 * time.Second @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error { err := s.underlying.Delete() s.syncOperationDone <- Deleting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Novm}) + } return err } @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error { } s.startCancel = startCancel s.currentState = Starting + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting}) return nil } @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig) startResult, err := s.underlying.Start(ctx, startConfig) s.syncOperationDone <- Starting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + return startResult, err } @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) { if err := s.prepareStopDelete(Stopping); err != nil { return state.Error, err } + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping}) st, err := s.underlying.Stop() s.syncOperationDone <- Stopping + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: st}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } return st, err } @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) { } func (s *Synchronized) PowerOff() error { - return s.underlying.PowerOff() + err := s.underlying.PowerOff() + if err != nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + + return err } func (s *Synchronized) Status() (*types.ClusterStatusResult, error) { diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go index 185af7d5f3..a078868b6b 100644 --- a/pkg/events/emitter.go +++ b/pkg/events/emitter.go @@ -46,9 +46,7 @@ func (e *event[T]) AddListener(listener Listener[T]) Disposable { } func (e *event[T]) Fire(event T) { - e.eventMutex.Lock() - defer e.eventMutex.Unlock() for _, l := range e.listeners { - l.Listener(event) + go l.Listener(event) } } diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000..42a9865c7b --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,14 @@ +package events + +import ( + "github.com/crc-org/crc/v2/pkg/crc/machine/state" +) + +type StatusChangedEvent struct { + State state.State + Error error +} + +var ( + StatusChanged = NewEvent[StatusChangedEvent]() +)