Skip to content

Commit

Permalink
add network tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
benStre committed Nov 11, 2023
1 parent e898879 commit 8a83a84
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 23 deletions.
2 changes: 1 addition & 1 deletion compiler/compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class DatexResponse<T> {


export const ProtocolDataTypesMap = [
"REQUEST", "RESPONSE", "DATA", "TMP_SCOPE", "LOCAL", "HELLO", "DEBUGGER", "SOURCE_MAP", "UPDATE", "GOODBYE"
"REQUEST", "RESPONSE", "DATA", "TMP_SCOPE", "LOCAL", "HELLO", "DEBUGGER", "SOURCE_MAP", "UPDATE", "GOODBYE", "TRACE", "TRACE_BACK"
]


Expand Down
2 changes: 2 additions & 0 deletions compiler/protocol_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ export enum ProtocolDataType {
SOURCE_MAP = 7, // send a source map for a scope
UPDATE = 8, // like normal request, but don't propgate updated pointer values back to sender (prevent recursive loop)
GOODBYE = 9, // info message that endpoint is offline
TRACE = 10, // record endpoint hops
TRACE_BACK = 11 // record endpoint hops, backtrip
}
12 changes: 12 additions & 0 deletions datex_short.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ export function f<T extends endpoint_name>(name:[T]|T):endpoint_by_endpoint_name
return <any>Target.get((typeof name == "string" ? name : name[0]));
}

export function printTrace(endpoint: string|Endpoint) {
endpoint = typeof endpoint == "string" ? Target.get(endpoint) as Endpoint : endpoint;
return endpoint.printTrace()
}

type printTraceT = typeof printTrace;
declare global {
const printTrace: printTraceT;
}


export function syncedValue(parent:any|Pointer, key?:any):PointerProperty {
return PointerProperty.get(parent, key);
Expand Down Expand Up @@ -576,4 +586,6 @@ globalThis.static_pointer = static_pointer;
// @ts-ignore
globalThis.f = f;
// @ts-ignore
globalThis.printTrace = printTrace;
// @ts-ignore
globalThis.props = props;
16 changes: 16 additions & 0 deletions docs/manual/06 Supranet Networking.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ await Datex.Supranet.connectTemporary()
```


## Debugging

### Network traces

The `printTrace` function can be used to create a network trace report for debugging purposes.
`printTrace` also collects and shows additional relevant data like round-trip time and endpoint interface types.
To create a network trace, DATEX `TRACE` and `TRACE_BACK` messages are routed through the network to the destination endpoint and back.

![Network Trace Report](./assets/network_trace.png)

The raw trace output can be retrieved by calling the `.trace()` method on an endpoint:

```ts
const trace = await f('@+unyt2').trace()
```

## Creating custom DATEX channels with the ComInterface

[TODO]
Binary file added docs/manual/assets/network_trace.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 8 additions & 1 deletion network/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import { logger } from "../utils/global_values.ts";
// general interface for all "datex interfaces" (client or server/router)
export interface ComInterface {
type: string
description?: string
persistent?: boolean // can be disconnected?
endpoint?: Endpoint // connected directly to a single endpoint
endpoints?: Set<Endpoint> // multiple endpoints
is_bidirectional_hub?: boolean, // allow the same block to go in and out eg a -> this interface -> this runtime -> this interface again -> b
in: boolean // can receive data
out: boolean // can send data
global?: boolean // has a connection to the global network, use as a default interface if possible
Expand Down Expand Up @@ -253,6 +255,7 @@ export abstract class CommonInterface<Args extends unknown[] = []> implements Co


/** HTTP interface */
// @deprecated
class HttpClientInterface extends CommonInterface {

override type = "http"
Expand Down Expand Up @@ -509,6 +512,10 @@ class WebsocketClientInterface extends CommonInterface {
override out = true
override type = "websocket"

get description() {
return `${this.protocol}://${this.host}`
}

private protocol:'ws'|'wss' = 'wss'; // use wss or ws
private is_first_try = true

Expand Down Expand Up @@ -780,7 +787,7 @@ export class InterfaceManager {
return InterfaceManager.handleNoRedirectFound(to);
}
// error: loopback
else if (source == comInterface) {
else if (!source?.is_bidirectional_hub && source == comInterface) {
return InterfaceManager.handleNoRedirectFound(to);
}
// send
Expand Down
67 changes: 51 additions & 16 deletions runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,16 @@ export class Runtime {
* Executes a DATEX Binary locally and returns the result
* @param dxb DATEX Binary
* @param context_location context in which the script should be executed (URL)
* @param overrideMeta custom override header metadata
* @param forceLocalExecution execute block even if receiver is external (default false)
* @returns evaluated DATEX result
*/
public static async executeDXBLocally(dxb:ArrayBuffer, context_location?:URL, overrideMeta?: Partial<datex_meta>):Promise<any> {
public static async executeDXBLocally(dxb:ArrayBuffer, context_location?:URL, overrideMeta?: Partial<datex_meta>, forceLocalExecution = false):Promise<any> {
// generate new header using executor scope header
let header:dxb_header;
let dxb_body:ArrayBuffer;

const res = await this.parseHeader(dxb)
const res = await this.parseHeader(dxb, undefined, false, forceLocalExecution)
if (res instanceof Array) {
header = res[0];
dxb_body = res[1].buffer;
Expand Down Expand Up @@ -1078,14 +1080,14 @@ export class Runtime {


// default labels:
Pointer.createLabel({
REQUEST:ProtocolDataType.REQUEST,
RESPONSE:ProtocolDataType.RESPONSE,
DATA:ProtocolDataType.DATA,
HELLO:ProtocolDataType.HELLO,
LOCAL:ProtocolDataType.LOCAL,
UPDATE:ProtocolDataType.UPDATE,
}, "TYPE");
// Pointer.createLabel({
// REQUEST:ProtocolDataType.REQUEST,
// RESPONSE:ProtocolDataType.RESPONSE,
// DATA:ProtocolDataType.DATA,
// HELLO:ProtocolDataType.HELLO,
// LOCAL:ProtocolDataType.LOCAL,
// UPDATE:ProtocolDataType.UPDATE,
// }, "TYPE");

// create std static scope
this.STD_STATIC_SCOPE = {};
Expand Down Expand Up @@ -1391,7 +1393,7 @@ export class Runtime {
}

// returns header info and dxb body, or routing information if not directed to own endpoint
static async parseHeader(dxb:ArrayBuffer, force_sym_enc_key?:CryptoKey, force_only_header_info = false):Promise<[dxb_header, Uint8Array, Uint8Array, Uint8Array]|dxb_header> {
static async parseHeader(dxb:ArrayBuffer, force_sym_enc_key?:CryptoKey, force_only_header_info = false, force_no_redirect = false):Promise<[dxb_header, Uint8Array, Uint8Array, Uint8Array]|dxb_header> {

const res = this.parseHeaderSynchronousPart(dxb);

Expand All @@ -1402,7 +1404,8 @@ export class Runtime {
iv: Uint8Array,
encrypted_key: ArrayBuffer;

if (!res[0].redirect && !force_only_header_info) {
// no redirect
if ((!res[0].redirect && !force_only_header_info) || force_no_redirect) {
[header, data_buffer, header_buffer, signature_start, iv, encrypted_key] = res;

// save encrypted key?
Expand Down Expand Up @@ -1443,7 +1446,7 @@ export class Runtime {

}

// only return header
// only return header (for redirect)
else return res[0];

}
Expand Down Expand Up @@ -1605,7 +1608,7 @@ export class Runtime {
* @param header_callback callback method returning information for the evaluated header before executing the dxb
* @returns header information (after executing the dxb)
*/
private static async handleDatexIn(dxb:ArrayBuffer, last_endpoint:Endpoint, full_scope_callback?:(sid:number, scope:any, error?:boolean)=>void, _?:any, header_callback?:(header:dxb_header)=>void, source: Source): Promise<dxb_header> {
private static async handleDatexIn(dxb:ArrayBuffer, last_endpoint:Endpoint, full_scope_callback?:(sid:number, scope:any, error?:boolean)=>void, _?:any, header_callback?:(header:dxb_header)=>void, source?: Source): Promise<dxb_header> {

let header:dxb_header, data_uint8:Uint8Array;

Expand Down Expand Up @@ -1649,7 +1652,23 @@ export class Runtime {
console.log("TODO: handle proxy sign for " + res.sender)
}

// propagate TRACE message
try {
const to = [...(res.routing?.receivers??[])][0];
if (res.type == ProtocolDataType.TRACE || res.type == ProtocolDataType.TRACE_BACK) {
const trace = await this.executeDXBLocally(dxb, undefined, undefined, true);
if (to instanceof Endpoint && trace instanceof Array) {
try {
await to.trace({header: res, source, trace})
}
catch {}
return {};
}
else {
logger.error("Invalid TRACE message")
}
}

await this.redirectDatex(dxb, res, false, source);
}
catch (e) {
Expand Down Expand Up @@ -1683,6 +1702,7 @@ export class Runtime {

// get existing scope or create new
let scope = scope_map?.scope ?? this.createNewInitialScope(header);
scope.source = source;

// those values can change later in the while loop
let _header = header;
Expand Down Expand Up @@ -1812,6 +1832,7 @@ export class Runtime {
else if (
header.type == ProtocolDataType.RESPONSE ||
header.type == ProtocolDataType.DATA ||
header.type == ProtocolDataType.TRACE_BACK ||
header.type == ProtocolDataType.LOCAL)
{
const unique_sid = header.sid+"-"+header.return_index;
Expand Down Expand Up @@ -1839,7 +1860,7 @@ export class Runtime {

}

private static async handleScopeResult(header:dxb_header, scope: datex_scope, return_value:any){
private static async handleScopeResult(header:dxb_header, scope: datex_scope, return_value:any, source?: Source){

const unique_sid = header.sid+"-"+header.return_index;

Expand All @@ -1854,8 +1875,14 @@ export class Runtime {
// handle response
else if (header.type == ProtocolDataType.RESPONSE ||
header.type == ProtocolDataType.DATA ||
header.type == ProtocolDataType.TRACE_BACK ||
header.type == ProtocolDataType.LOCAL)
{

if (header.type == ProtocolDataType.TRACE_BACK) {
return_value.push({endpoint:Runtime.endpoint, interface: {type: scope.source?.type, description: scope.source?.description}, timestamp: new Date()});
}

// handle result
if (this.callbacks_by_sid.has(unique_sid)) {
this.callbacks_by_sid.get(unique_sid)[0](return_value);
Expand Down Expand Up @@ -1909,7 +1936,15 @@ export class Runtime {
logger.error("ignoring unsigned GOODBYE message")
}
}


else if (header.type == ProtocolDataType.TRACE) {
const sender = return_value[0].endpoint;

console.log("TRACE request from " + sender);
return_value.push({endpoint:Runtime.endpoint, interface: {type: scope.source?.type, description: scope.source?.description}, timestamp: new Date()});

this.datexOut(["?", [return_value], {type:ProtocolDataType.TRACE_BACK, to:sender, return_index:header.return_index, encrypt:header.encrypted, sign:header.signed}], sender, header.sid, false);
}

else if (header.type == ProtocolDataType.DEBUGGER) {
logger.success("DEBUGGER ?", return_value)
Expand Down
67 changes: 63 additions & 4 deletions types/addressing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import { BinaryCode } from "../compiler/binary_codes.ts";
import { Pointer } from "../runtime/pointers.ts";
import { ValueConsumer } from "./abstract_types.ts";
import { ValueError } from "./errors.ts";
import { Compiler } from "../compiler/compiler.ts";
import type { datex_scope } from "../utils/global_types.ts";
import { Compiler, ProtocolDataTypesMap } from "../compiler/compiler.ts";
import type { datex_scope, dxb_header } from "../utils/global_types.ts";
import { buffer2hex, hex2buffer } from "../utils/utils.ts";
import { clause, Disjunction } from "./logic.ts";
import { Runtime, StaticScope } from "../runtime/runtime.ts";
import { logger } from "../utils/global_values.ts";
import { Datex } from "../mod.ts";
import { ProtocolDataType } from "../compiler/protocol_types.ts";
import { ESCAPE_SEQUENCES } from "../utils/logger.ts";

type target_prefix_person = "@";
type target_prefix_id = "@@";
Expand Down Expand Up @@ -290,11 +291,69 @@ export class Endpoint extends Target {
* Generates a network trace
* (for routing debugging)
*/
public async trace() {
const res = await Runtime.datexOut(['[]', [], {type:ProtocolDataType.REQUEST, sign:false}], this, undefined, true);
public async trace(previous?: {header: dxb_header, trace: any[], source?: any}):Promise<{endpoint: Endpoint, timestamp: Date, interface: {type?: string, description?:string}}[]> {
if (previous) {
console.log(ProtocolDataTypesMap[previous.header.type??ProtocolDataType.TRACE]+" from " + previous.header.sender + " to " + this);
}
const trace = previous?.trace ?? [];
trace.push({endpoint:Runtime.endpoint, interface: {type: previous?.source?.type, description: previous?.source?.description}, timestamp: new Date()});

const res = await Runtime.datexOut(['?', [trace], {type:previous?.header?.type ?? ProtocolDataType.TRACE, sign:false}], this, previous?.header?.sid, true, false, undefined, false, undefined, 60_000);
return res;
}

public async printTrace() {
const format = (val:any) => Runtime.valueToDatexStringExperimental(val, true, true);

let trace: {endpoint: Endpoint, timestamp: Date, interface: {type?: string, description?:string}}[]
try {
trace = await this.trace();
}
catch {
let title = `${ESCAPE_SEQUENCES.BOLD}DATEX Network Trace\n${ESCAPE_SEQUENCES.RESET}`;
title += `${format(Runtime.endpoint)}${ESCAPE_SEQUENCES.RESET} -> ${format(this)}${ESCAPE_SEQUENCES.RESET}\n\n`
title += `${ESCAPE_SEQUENCES.RED}Error: Endpoint not reachable`
console.log(title);
return;
}

if (!trace) throw new Error("Invalid trace");

const resolvedEndpointData = trace.find((data) => trace.indexOf(data)!=0 && (data.endpoint == this || data.endpoint.main == this || (data.endpoint == Runtime.endpoint && (this as any)==Datex.LOCAL_ENDPOINT)))!;
const resolveEndpointIndex = trace.indexOf(resolvedEndpointData);
const resolvedEndpoint = resolvedEndpointData.endpoint;
const hopsToDest = resolveEndpointIndex;
const hopsFromDest = trace.length - resolveEndpointIndex - 1;

let title = `${ESCAPE_SEQUENCES.BOLD}DATEX Network Trace\n${ESCAPE_SEQUENCES.RESET}`;
title += `${format(Runtime.endpoint)}${ESCAPE_SEQUENCES.RESET} -> ${format(resolvedEndpoint)}${ESCAPE_SEQUENCES.RESET}\n\n`
let pre = ''
let logs = ''
const rtt = trace.at(-1).timestamp.getTime() - trace.at(0).timestamp.getTime();

pre += `-----------------------------\n`
pre += `${ESCAPE_SEQUENCES.BOLD}Round-Trip Time: ${ESCAPE_SEQUENCES.RESET}${rtt}ms\n`
pre += `${ESCAPE_SEQUENCES.BOLD}Hops to Destination: ${ESCAPE_SEQUENCES.RESET}${hopsToDest}\n`
pre += `${ESCAPE_SEQUENCES.BOLD}Hops from Destination: ${ESCAPE_SEQUENCES.RESET}${hopsFromDest}\n`
pre += `-----------------------------\n\n`

pre += `\n${ESCAPE_SEQUENCES.BOLD}Hops:${ESCAPE_SEQUENCES.RESET}\n\n`;

for (let i = 0; i<trace.length; i++) {
const current = trace[i];
const next = trace[i+1];
if (!next) break;

if (i == hopsToDest) pre += `\n${ESCAPE_SEQUENCES.BOLD}Return Trip:${ESCAPE_SEQUENCES.RESET}\n\n`;

pre += `${ESCAPE_SEQUENCES.BOLD} #${(i%hopsToDest)+1} ${ESCAPE_SEQUENCES.RESET}(${next.interface.type??'unknown'}${next.interface.description ? ' ' + next.interface.description : ''})${ESCAPE_SEQUENCES.RESET}:\n ${format(current.endpoint)}${ESCAPE_SEQUENCES.RESET} -> ${format(next.endpoint)}${ESCAPE_SEQUENCES.RESET}\n\n`
}


console.log(title+pre+logs)
}


public async getCertifier(){
// resolve alias from Blockchain
return this.#certifier = <Endpoint | undefined> Runtime.Blockchain.getEndpointCertifier(this);
Expand Down
1 change: 0 additions & 1 deletion utils/blobify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { client_type } from "./constants.ts";
*/
export async function blobifyFile(path: string|URL) {
const [script] = await Datex.Runtime.getURLContent(path.toString(), true, true) as [string, string];
console.log("script", script)
return blobifyScript(script);
}

Expand Down
2 changes: 2 additions & 0 deletions utils/global_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ export type datex_scope = {
sender: Endpoint, // sender of the scope
origin: Endpoint, // origin to use for pointers / casting (default is sender)

source?: any // original source (com interface) from which this scope was received

current_index: number,
start_index: number, // keep track of index to jump back to
index_offset: number, // current_index + index_offset = actual index, everything left of the index_offset is no longer cached in the buffer
Expand Down

0 comments on commit 8a83a84

Please sign in to comment.