-
Notifications
You must be signed in to change notification settings - Fork 5
/
Saga.v2.js
187 lines (170 loc) · 7.58 KB
/
Saga.v2.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
var EventSourcedAggregate = require('./EventSourcedAggregate').EventSourcedAggregate;
var Event = require('./Event').Event;
var util = require('util');
var when = require('when');
// ### Error types ###
function TransitionConflictError(){
this.message = 'Transition conflict detected - will not proceed with state transition';
this.name = 'TransitionConflictError';
}
util.inherits(TransitionConflictError, Error);
function EventTypeNotAccepted(){
this.message = 'The saga does not accept this type of events at this stage - perhaps try processing at a later stage?';
this.name = 'EventTypeNotAccepted';
}
util.inherits(EventTypeNotAccepted, Error);
// ### Helper types ###
function SagaStage(name, acceptedEventTypes){
this.name = name;
this.transitions = {};
this.acceptedEventTypes = acceptedEventTypes;
}
SagaStage.prototype.addTransition = function addTransition(transition){
this.transitions[transition.name] = transition;
// Allow method chaining.
return this;
};
SagaStage.prototype.handleEvent = function handleEvent(event, commit, queuedEvents, accumulator, sagaAccumulator){
var specificMethodName = 'on' + event.eventType;
if(typeof(this[specificMethodName]) === 'function'){
this[specificMethodName](event, commit, queuedEvents, accumulator, sagaAccumulator);
}
else{
if(typeof(this.defaultEventHandler) === 'function'){
this.defaultEventHandler(event, commit, queuedEvents, accumulator, sagaAccumulator);
}
}
};
function SagaTransition(name, eventAcceptorFunction, actionFunction, transitionEventType, eventPayloadGenerator){
this.destination = null;
this.eventEndsTransition = eventAcceptorFunction;
this.performAction = actionFunction || function(event, commit, queuedEvents, stageAccumulator, sagaAccumulator, environment){};
this.transitionEventType = transitionEventType;
this.eventPayloadGenerator = (typeof(eventPayloadGenerator) === 'function') ? eventPayloadGenerator : function(event, commit, queuedEvents, accumulator, globalAccumulator, actionResult){
return {};
};
}
SagaTransition.prototype.setDestination = function setDestination(destination){
this.destination = destination;
// Allow method chaining.
return this;
};
// ### Aggregate definition ###
function Saga(){
}
util.inherits(Saga, EventSourcedAggregate);
Saga.prototype._init = function _init(initialStage){
this._currentStage = initialStage;
this._currentStagePath = initialStage.name;
this._stageAccumulator = {};
this._globalAccumulator = {};
this._enqueuedEvents = [];
this._seenEventIDs = {};
this._error = null;
this._allowMissingEventHandlers = true;
};
Saga.prototype.processEvent = function processEvent(event, commit, environment){
var self = this;
// Guard clause: do not process duplicate events.
if(this._seenEventIDs[event.eventID]){
return when.resolve();
}
if(this._currentStage.acceptedEventTypes.indexOf(event.eventType) < 0){
return when.reject(new EventTypeNotAccepted('The saga does not accept this type of events at this stage - perhaps try processing at a later stage?'));
}
// Gather all transitions that are to occur. We use each transition's supplied decision function.
var transitionIntents = [];
for(var transitionKey in this._currentStage.transitions){
var currentTransition = this._currentStage.transitions[transitionKey];
var transitionDecision = currentTransition.eventEndsTransition(event, commit, this._enqueuedEvents, this._stageAccumulator, this._globalAccumulator);
if(transitionDecision){
transitionIntents.push(currentTransition);
}
}
// Check if any transitions have been marked for passing.
if(transitionIntents.length > 0){
// There is at least one nominated transition.
// Check for conflicts.
if(transitionIntents.length > 1){
//TODO: Define the event payload below in a better way.
//TODO: Reconsider whether it makes sense at all to stage events if we are likely not going to commit them (as the processing yields a promise rejection).
this._stageEvent('TransitionConflictDetected', {currentStage: this._currentStage.name, currentEventType: event.eventType});
return when.reject(new TransitionConflictError('Transition conflict detected - will not proceed with state transition'));
}
var transition = transitionIntents[0];
return when(transition.performAction(event, commit, this._enqueuedEvents, this._stageAccumulator, this._globalAccumulator, environment),
function _finalizeTransition(actionResult){
if(transition.transitionEventType){
self._stageEvent(new Event(transition.transitionEventType, transition.eventPayloadGenerator(event, commit, self._enqueuedEvents, self._stageAccumulator, self._globalAccumulator, actionResult)));
}
self._stageEvent(new Event('TransitionCompleted', {transitionName: transition.name, event: event, commit: commit}));
return when.resolve(actionResult);
},
function _cancelTransition(reason){
return when.reject(reason);
});
}
else{
// No transitions - simply enqueue the event.
this._stageEvent(new Event('EventEnqueued', {event: event}));
return when.resolve();
}
};
Saga.prototype.onEventEnqueued = function onEventEnqueued(event, commit){
this._enqueuedEvents.push(event);
this._currentStage.handleEvent(event, commit, this._enqueuedEvents, this._stageAccumulator, this._globalAccumulator);
};
Saga.prototype.onTransitionCompleted = function onTransitionCompleted(event, commit){
var transitionName = event.eventPayload.transitionName;
this._currentStage.handleEvent(event.eventPayload.event, event.eventPayload.commit, this._enqueuedEvents, this._stageAccumulator, this._globalAccumulator);
this._currentStage = this._currentStage.transitions[transitionName].destination;
this._currentStagePath += '.' + transitionName;
this._enqueuedEvents = [];
this._stageAccumulator = {};
};
Saga.prototype._getSnapshotData = function _getSnapshotData(){
return {
stagePath: this._currentStagePath,
enqueuedEvents: this._enqueuedEvents,
stageAccumulator: this._stageAccumulator,
globalAccumulator: this._globalAccumulator,
seenEventIDs: Object.keys(this._seenEventIDs)
};
};
// Helper functions for Saga users.
Saga.eventTypesSeen = function eventTypesSeen(requiredEventTypes){
function isContainedIn(containee, container){
return containee.every(function(element){
return container.indexOf(element) >= 0;
});
}
return function _eventTypesSeen(event, commit, queuedEvents){
var seenEventTypes = queuedEvents.concat(event).map(function(ev){
return ev.eventType;
});
return isContainedIn(requiredEventTypes, seenEventTypes);
};
};
/**
* Get the event routing keys that the saga type is interested in processing. All sagas should overload this static method if they are meant to receive any events.
* @static
* @abstract
* @returns {Array.string} The list of events that should be routed to instances of this saga class.
*/
Saga.getBinds = function getBinds(){
return [];
};
/**
* Get the saga aggregate ID to which the event should be routed.
* @static
* @abstract
* @param {module:esdf/core/Event~Event} event The event to be routed to a saga for processing.
* @param {module:esdf/core/Commit~Commit} commit The event's originating commit.
* @returns {?string} The aggregate ID, or null if this event should not be processed by any saga. In case undefined is returned, the event is not consumed from processing and is expected to be re-processed in the future (to avoid programmer errors and missing returns).
*/
Saga.route = function route(event, commit){
throw new Error('The routing function needs to be overloaded by child prototypes - refusing to act with the default routing');
};
module.exports.SagaStage = SagaStage;
module.exports.SagaTransition = SagaTransition;
module.exports.Saga = Saga;