diff --git a/README.md b/README.md index ddf3e58..86ff716 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,17 @@ client.Task.TranscribeSpeech( * **Response (stream):** ``` -{"start": 5.0, "end": 9.0, "text": " I'm just speaking some seconds to see if the translation is correct"} +event: system +data: [START] + +event: locale +data: "en" + +event: part +data: {"start": 5.0, "end": 9.0, "text": " I'm just speaking some seconds to see if the translation is correct"} + +event: system +data: [DONE] ``` #### ➡️ Answer Prompt @@ -206,11 +216,29 @@ client.Task.AnswerQuestion( * **Response (stream):** ``` +event: system +data: [START] + +event: model +data: "medium" + +event: answer {"index": 0, "chunk": "You can add the Crisp chatbox to"} + +event: answer {"index": 1, "chunk": " your website by following this guide:"} + +event: answer {"index": 2, "chunk": " https://help.crisp.chat/en/article/how-to-add-crisp-chatbox-to-your-website-dkrg1d/"} + +event: answer {"index": 3, "chunk": " :)"} + +event: answer {"index": 4, "chunk": ""} + +event: system +data: [DONE] ``` #### ➡️ Summarize Paragraphs diff --git a/examples/task_answer_question.js b/examples/task_answer_question.js index 9b3eec2..408e7be 100644 --- a/examples/task_answer_question.js +++ b/examples/task_answer_question.js @@ -58,14 +58,26 @@ client.Task.AnswerQuestion( ) .then(function(stream) { // Bind all event listeners on created stream - stream.on("data", function(data) { - console.log("Got partial data:", data); + stream.on("model", function(data) { + console.log("Got partial data (model):", data); + }); + + stream.on("source", function(data) { + console.log("Got partial data (source):", data); + }); + + stream.on("answer", function(data) { + console.log("Got partial data (answer):", data); }); stream.on("error", function(error) { console.error("Answering aborted:", error); }); + stream.on("start", function() { + console.info("Start receiving answer..."); + }); + stream.on("done", function() { console.info("Done receiving answer!"); }); diff --git a/lib/mirage.js b/lib/mirage.js index 4a201ac..edbc662 100644 --- a/lib/mirage.js +++ b/lib/mirage.js @@ -17,10 +17,19 @@ var DEFAULT_REST_HOST = "https://api.mirage-ai.com"; var DEFAULT_REST_BASE = "/v1"; var DEFAULT_TIMEOUT = 40000; +var STREAM_EVENT_PREFIX = "event:"; var STREAM_DATA_PREFIX = "data:"; +var STREAM_START_TAG = "[START]"; var STREAM_DONE_TAG = "[DONE]"; var STREAM_CHUNK_STALL_TIMEOUT = 10000; +var STREAM_RESERVED_EVENTS = [ + "start", + "done", + "error", + "end" +]; + var RESOURCES = [ "Task", "Data" @@ -178,7 +187,8 @@ Mirage.prototype.__doPostStream = function( request .on("response", function(response) { var emitter = new events.EventEmitter(), - drainBuffer = ""; + drainBuffer = "", + eventBlock = ""; // Response is not successful? if (response.statusCode >= 400) { @@ -238,43 +248,86 @@ Mirage.prototype.__doPostStream = function( drainBuffer += chunk.toString(); // Emit fully-assembled data from chunk - var dataParts = drainBuffer.split("\n"); + var lineParts = drainBuffer.split("\n"); // Terminated by new line? Process buffer now - if ((dataParts.length > 0) && - (dataParts[dataParts.length - 1] === "")) { + if ((lineParts.length > 0) && + (lineParts[lineParts.length - 1] === "")) { // Clear drain buffer immediately drainBuffer = ""; // Process each data part - for (var i = 0; i < dataParts.length; i++) { - var dataPart = dataParts[i]; - - if (dataPart.length > 0 && - dataPart.startsWith(STREAM_DATA_PREFIX) === true) { - // Clear out tag from data part - dataPart = dataPart.substring(STREAM_DATA_PREFIX.length).trim(); + for (var i = 0; i < lineParts.length; i++) { + var linePart = lineParts[i]; - if (dataPart === STREAM_DONE_TAG) { - // Clear previous stall timeout (as needed) - fnCancelNextChunkStall(); + // Line is empty? Skip it. + if (linePart.length === 0) { + // Abort parsing of line there. + continue; + } - // Process at next event loop tick, as the 'done' event might \ - // come out-of-order. - setImmediate(function() { - // Raise 'done' event - emitter.emit("done"); - }); + // Acquire line type ('event:' or 'data:') + if (linePart.startsWith(STREAM_DATA_PREFIX) === true) { + // Clear out tag from text part + var dataPart = ( + linePart.substring(STREAM_DATA_PREFIX.length).trim() + ); + + // System events are not broadcasted to the user, as those are \ + // special non-user level events. + if (eventBlock === "system") { + if (dataPart === STREAM_START_TAG) { + // Process at next event loop tick, as the 'start' event \ + // might come out-of-order. + setImmediate(function() { + // Raise 'start' event + emitter.emit("start"); + }); + } else if (dataPart === STREAM_DONE_TAG) { + // Clear previous stall timeout (as needed) + fnCancelNextChunkStall(); + + // Process at next event loop tick, as the 'done' event \ + // might come out-of-order. + setImmediate(function() { + // Raise 'done' event + emitter.emit("done"); + }); + } } else { var dataPartObject = JSON.parse(dataPart); // Process at next event loop tick, as the 'data' event might \ // come out-of-order. setImmediate(function() { - // Raise 'data' event - emitter.emit("data", dataPartObject); + // Raise event (fallback to 'data' if no event block) + // Important: add prefix if event streamed from Mirage is \ + // a reserved event (eg. 'error' becomes ':error'). + var eventName = (eventBlock || "data"); + + if (STREAM_RESERVED_EVENTS.includes(eventName) === true) { + eventName = (":" + eventName); + } + + emitter.emit(eventName, dataPartObject); }); } + + // Abort parsing of line there. + continue; + } + + if (linePart.startsWith(STREAM_EVENT_PREFIX) === true) { + // Clear out tag from text part + var eventPart = ( + linePart.substring(STREAM_EVENT_PREFIX.length).trim() + ); + + // Update current event block name (or use none) + eventBlock = (eventPart || ""); + + // Abort parsing of line there. + continue; } } }