diff --git a/flake.nix b/flake.nix index 479379f02..a9c07eac3 100644 --- a/flake.nix +++ b/flake.nix @@ -262,6 +262,8 @@ }; }; + webBridge = pkgs.callPackage ./web/bridge/image.nix {inherit buildImage pullImage pkgs;}; + nativelink-worker-init = pkgs.callPackage ./tools/nativelink-worker-init.nix {inherit buildImage self nativelink-image;}; rbe-autogen = pkgs.callPackage ./local-remote-execution/rbe-autogen.nix { @@ -373,6 +375,7 @@ nativelink-worker-init nativelink-x86_64-linux publish-ghcr + webBridge ; default = nativelink; diff --git a/nativelink-config/examples/basic_bes.json b/nativelink-config/examples/basic_bes.json new file mode 100644 index 000000000..cca412f12 --- /dev/null +++ b/nativelink-config/examples/basic_bes.json @@ -0,0 +1,177 @@ +{ + "stores": { + "AC_MAIN_STORE": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-ac", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-ac", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "BEP_STORE": { + "redis_store": { + "addresses": [ + "redis://@localhost:6379/0" + ], + "response_timeout_s": 5, + "connection_timeout_s": 5, + "experimental_pub_sub_channel": "build_event", + "key_prefix": "nativelink.", + "mode": "standard" + } + }, + "WORKER_FAST_SLOW_STORE": { + "fast_slow": { + "fast": { + "filesystem": { + "content_path": "/tmp/nativelink/data-worker-test/content_path-cas", + "temp_path": "/tmp/nativelink/data-worker-test/tmp_path-cas", + "eviction_policy": { + "max_bytes": 100000000000 + } + } + }, + "slow": { + "noop": {} + } + } + } + }, + "schedulers": { + "MAIN_SCHEDULER": { + "simple": { + "supported_platform_properties": { + "cpu_count": "minimum", + "memory_kb": "minimum", + "network_kbps": "minimum", + "disk_read_iops": "minimum", + "disk_read_bps": "minimum", + "disk_write_iops": "minimum", + "disk_write_bps": "minimum", + "shm_size": "minimum", + "gpu_count": "minimum", + "gpu_model": "exact", + "cpu_vendor": "exact", + "cpu_arch": "exact", + "cpu_model": "exact", + "kernel_version": "exact", + "OSFamily": "priority", + "container-image": "priority" + } + } + } + }, + "workers": [ + { + "local": { + "worker_api_endpoint": { + "uri": "grpc://127.0.0.1:50062" + }, + "cas_fast_slow_store": "WORKER_FAST_SLOW_STORE", + "upload_action_result": { + "ac_store": "AC_MAIN_STORE" + }, + "work_directory": "/tmp/nativelink/work", + "platform_properties": { + "cpu_count": { + "values": [ + "16" + ] + }, + "memory_kb": { + "values": [ + "500000" + ] + }, + "network_kbps": { + "values": [ + "100000" + ] + }, + "cpu_arch": { + "values": [ + "x86_64" + ] + }, + "OSFamily": { + "values": [ + "" + ] + }, + "container-image": { + "values": [ + "" + ] + } + } + } + } + ], + "servers": [ + { + "name": "public", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052" + } + }, + "services": { + "cas": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE" + } + }, + "ac": { + "main": { + "ac_store": "AC_MAIN_STORE" + } + }, + "execution": { + "main": { + "cas_store": "WORKER_FAST_SLOW_STORE", + "scheduler": "MAIN_SCHEDULER" + } + }, + "capabilities": { + "main": { + "remote_execution": { + "scheduler": "MAIN_SCHEDULER" + } + } + }, + "bytestream": { + "cas_stores": { + "main": "WORKER_FAST_SLOW_STORE" + } + } + } + }, + { + "name": "private_workers_servers", + "listener": { + "http": { + "socket_address": "0.0.0.0:50062" + } + }, + "services": { + "experimental_prometheus": { + "path": "/metrics" + }, + "experimental_bep": { + "store": "BEP_STORE" + }, + "worker_api": { + "scheduler": "MAIN_SCHEDULER" + }, + "admin": {}, + "health": { + "path": "/status" + } + } + } + ], + "global": { + "max_open_files": 512 + } +} diff --git a/tools/pre-commit-hooks.nix b/tools/pre-commit-hooks.nix index b0b8c90e1..8b93bafa7 100644 --- a/tools/pre-commit-hooks.nix +++ b/tools/pre-commit-hooks.nix @@ -65,6 +65,7 @@ in { # Bun binary lockfile "web/platform/bun.lockb" + "web/bridge/bun.lockb" ]; enable = true; types = ["binary"]; diff --git a/web/bridge/.gitignore b/web/bridge/.gitignore new file mode 100644 index 000000000..9b1ee42e8 --- /dev/null +++ b/web/bridge/.gitignore @@ -0,0 +1,175 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/web/bridge/README.md b/web/bridge/README.md new file mode 100644 index 000000000..3a7e740be --- /dev/null +++ b/web/bridge/README.md @@ -0,0 +1,74 @@ +# NativeLink Bridge + +Make sure you are running an instance of Redis or DragonflyDB in your local network. + +For DragonflyDB inside docker run: + +```bash +docker run -d --name some-dragonfly -p 6379:6379 --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly +``` + +For Redis inside docker run: + +```bash +docker run -d --name some-redis -p 6379:6379 redis +``` + +## You need 4 Shells to be open + +### 1. Shell (NativeLink) + +Start an instance of NativeLink and connect it with the basic_bes_conf.json to the redis/dragonflydb (default values): + +```bash +cd ../.. && ./result/bin/nativelink ./nativelink-config/examples/basic_bes.json +``` + +## 2. Shell (NativeLink Web Bridge) + +Install dependencies of the bridges: + +```bash +bun i +``` + +Run the Bridge: + +```bash +bun run index.ts +``` + +## 3. Shell (NativeLink Web App) + +Inside the web/app directory run: + +```bash +bun i & bun dev +``` + +Now you can open http://localhost:4321/app. + + +## 4. Shell (Bazel) + +Now you can run your Bazel build with NativeLink and see it in real-time going into the web app + +Include this in your .bazelrc +```bash +build --remote_instance_name=main +build --remote_cache=grpc://127.0.0.1:50051 +build --remote_executor=grpc://127.0.0.1:50051 +build --bes_backend=grpc://127.0.0.1:50061 +build --bes_results_url=http://127.0.0.1:50061/ +build --bes_upload_mode=fully_async +build --build_event_publish_all_actions=true +``` + +```bash +bazel build some-target +``` + + + + +This project was created using `bun init` in bun v1.1.25. [Bun](https://bun.sh) is a fast all-in-one JavaScript runtime. diff --git a/web/bridge/bun.lockb b/web/bridge/bun.lockb new file mode 100755 index 000000000..653336b55 Binary files /dev/null and b/web/bridge/bun.lockb differ diff --git a/web/bridge/image.nix b/web/bridge/image.nix new file mode 100644 index 000000000..8398c06d5 --- /dev/null +++ b/web/bridge/image.nix @@ -0,0 +1,62 @@ +{ + pkgs, + buildImage, + pullImage, + ... +}: let + imageParams = + if pkgs.stdenv.hostPlatform.system == "x86_64-linux" + then { + imageName = "oven/bun"; + imageDigest = "sha256:2ebe63bae78e24788a7c0be646475ebfa843167370bfae1e436383c15dd70cc7"; + sha256 = "sha256-ZSDLLWWVDmPhTEOu1DkCnhJoODpOTbWOumk38419MRE="; + arch = "amd64"; + description = "A simple Bun environment image for x86_64-linux."; + title = "Bun Environment x86_64-linux"; + } + else if pkgs.stdenv.hostPlatform.system == "aarch64-linux" || pkgs.stdenv.hostPlatform.system == "aarch64-darwin" + then { + imageName = "oven/bun"; + imageDigest = "sha256:f83947fb1646bee6f71d7aaf7914cd7ae7eedcb322b10b52aadfc1a3d56da84e"; + sha256 = "sha256-ZSDLLWWVDmPhTEOu1DkCnhJoODpOTbWOumk38419MRE="; + arch = "arm64"; + description = "A simple Bun environment image for aarch64-linux."; + title = "Bun Environment aarch64-linux"; + } + else throw "Unsupported architecture: ${pkgs.stdenv.hostPlatform.system}"; + + webBridge = pkgs.stdenv.mkDerivation { + name = "web-bridge-files"; + src = ./.; + buildInputs = [pkgs.bun]; + installPhase = '' + mkdir -p $out + cp -r * $out + cd $out + bun install + ''; + }; +in + buildImage { + name = "nativelink-web-bridge"; + + # Base image configuration + fromImage = pullImage { + inherit (imageParams) imageName imageDigest sha256 arch; + tlsVerify = true; + os = "linux"; + }; + + # Container configuration + config = { + WorkingDir = "/web/bridge"; + Entrypoint = ["bun" "run" "${webBridge}/index.ts"]; + ExposedPorts = { + "8080/tcp" = {}; + }; + Labels = { + "org.opencontainers.image.description" = imageParams.description; + "org.opencontainers.image.title" = imageParams.title; + }; + }; + } diff --git a/web/bridge/index.ts b/web/bridge/index.ts new file mode 100644 index 000000000..3a6ceb2d9 --- /dev/null +++ b/web/bridge/index.ts @@ -0,0 +1,68 @@ +import { initializeRedisClients } from './src/redis'; +import { initializeProtobuf } from './src/protobuf'; +import { handleEvent } from './src/eventHandler'; +import { startWebSocket } from './src/websocket'; + +async function main() { + //console.log("Hello via Bun!"); + + // Base URL + const github = "https://raw.githubusercontent.com" + + // NativeLink URL + const nativelinkRepo = "TraceMachina/nativelink/main" + const nativelinkBranch = "main" + const nativelinkProtoPath = `${github}/${nativelinkRepo}/${nativelinkBranch}/nativelink-proto/`; + + // Proto Remote Path + const protoRepo = "protocolbuffers/protobuf" + const protoBranch = "master" + const protoRepoPath = `${github}/${protoRepo}/${protoBranch}/main/src/google/protobuf`; + const protoDevToolsPath = `${github}/${protoRepo}/main/src/google/devtools/build/v1`; + + const googleProto = "googleapis/googleapis" + const googleProtoBranch = "master" + const googleProtoPath = `${github}/${googleProto}/${googleProtoBranch}/google/devtools/build/v1`; + + // Bazel Remote Path + const bazelRepo = "bazelbuild/bazel" + const bazelBranch = "master" + const bazelProtoPath = `${github}/${bazelRepo}/${bazelBranch}/src/main/java/com/google/devtools/build/lib/buildeventstream/proto`; + + // Buck2 Protos + // const buck2Repo = "facebook/buck2/main" + // const buck2Branch = "main" + // const buck2ProtoPath = `${github}/${buck2Repo}/${buck2Branch}/app/buck2_data/data.proto`; + + // Actual using Protos. + const PublishBuildEventProto =`${googleProtoPath}/publish_build_event.proto`; + const BazelBuildEventStreamProto = `${bazelProtoPath}/build_event_stream.proto`; + + const protos = [ PublishBuildEventProto, BazelBuildEventStreamProto ] + + console.info("Link to: \n") + + console.info("Google Publish Build Events Proto:\n", PublishBuildEventProto, "\n"); + console.info("Bazel Build Event Stream Proto:\n", BazelBuildEventStreamProto, "\n") + + // Load Remote Bazel Proto Files + const protoTypes = await initializeProtobuf(protos) + + const { redisClient, commandClient } = await initializeRedisClients(); + + // Subscribe to the build_event channel + await redisClient.subscribe('build_event', async (message: string) => { + await handleEvent(message, commandClient, protoTypes); + }); + + const websocketServer = startWebSocket() + + // Clean up on exit + process.on('SIGINT', async () => { + await redisClient.disconnect(); + await commandClient.disconnect(); + process.exit(); + }); +} + +main().catch(err => console.error(err)); diff --git a/web/bridge/package.json b/web/bridge/package.json new file mode 100644 index 000000000..83e075c7e --- /dev/null +++ b/web/bridge/package.json @@ -0,0 +1,15 @@ +{ + "name": "bridge", + "module": "index.ts", + "type": "module", + "devDependencies": { + "@types/bun": "^1.1.8" + }, + "peerDependencies": { + "typescript": "^5.0.0" + }, + "dependencies": { + "protobufjs": "^7.4.0", + "redis": "^4.7.0" + } +} diff --git a/web/bridge/src/eventHandler.ts b/web/bridge/src/eventHandler.ts new file mode 100644 index 000000000..34409f4be --- /dev/null +++ b/web/bridge/src/eventHandler.ts @@ -0,0 +1,152 @@ +import type protobuf from 'protobufjs'; +import { commandOptions, type RedisClientType } from 'redis'; +import { constructRedisKey, parseMessage } from './utils'; +import { broadcastProgress } from './websocket'; + +interface CustomBuildEvent extends protobuf.Message { + orderedBuildEvent?: { + event: { + eventTime: { + seconds: protobuf.Long; + nanos: number; + }; + // biome-ignore lint/suspicious/noExplicitAny: + bazelEvent?: any; + }; + }; +} + + + +// biome-ignore lint/suspicious/noExplicitAny: +export async function handleEvent(message: string, commandClient: any, types: { PublishBuildToolEventStreamRequest: protobuf.Type, PublishLifecycleEventRequest: protobuf.Type }) { +// console.log(`Received message from build_event channel: ${message}`); + + const parsedMessage = parseMessage(message); +// console.log('Parsed Message:', parsedMessage); + + const redisKey = constructRedisKey(parsedMessage); +// console.log('Constructed Redis Key:', redisKey); + + switch (parsedMessage.eventType) { + case 'LifecycleEvent': + // console.log(`Processing ${parsedMessage.eventType} with key ${redisKey}`); + await fetchAndDecodeBuildData(redisKey, commandClient, types.PublishLifecycleEventRequest); + break; + case 'BuildToolEventStream': + // console.log(`Processing ${parsedMessage.eventType} with key ${redisKey}`); + await fetchAndDecodeBuildData(redisKey, commandClient, types.PublishBuildToolEventStreamRequest); + break; + default: + console.log('Unknown event type:', parsedMessage.eventType); + } +} + +// biome-ignore lint/suspicious/noExplicitAny: +async function fetchAndDecodeBuildData(redisKey: string, commandClient: any, messageType: protobuf.Type) { + try { + const buildData = await commandClient.get(commandOptions({ returnBuffers: true }), redisKey); + if (buildData) { + // console.log(`Fetched build data for key ${redisKey}`); + const buffer = Buffer.from(buildData); + const decodedMessage = messageType.decode(buffer) as CustomBuildEvent; + + // Hier wird der `bazelEvent` dekodiert, falls er existiert + if(decodedMessage.orderedBuildEvent) { + + + const buildId = decodedMessage.orderedBuildEvent.streamId.buildId + const invocationId = decodedMessage.orderedBuildEvent.streamId.invocationId + // const sequenceNumber = decodedMessage.orderedBuildEvent + + console.log("Build ID: ", buildId) + console.log("Invocation ID: ", invocationId) + // console.log("Sequence Number: ", sequenceNumber) + + + const eventTime = decodedMessage.orderedBuildEvent.event.eventTime; + + // Convert seconds to milliseconds and add nanoseconds converted to milliseconds + const milliseconds = eventTime.seconds.low * 1000 + Math.floor(eventTime.nanos / 1000000); + + // Create a new Date object using the computed milliseconds + const eventDate = new Date(milliseconds); + + // const date = new Date(decodedMessage.orderedBuildEvent.event.eventTime.seconds*1000); + console.log("Event time nanos:", eventTime.nanos) + console.log("Event time seconds:", eventTime.seconds.low) + console.log("Event time:", eventDate.toISOString()); + const currentTime = new Date() + const elapsedTime = currentTime.getTime() - eventDate.getTime(); + console.log("Time Now: ", currentTime.toISOString()) + console.log(`Elapsed Time: ${elapsedTime} ms`); + } + if (decodedMessage?.orderedBuildEvent?.event?.bazelEvent) { + console.log("------------------") + // console.log("Got here.") + const decodedBazelEvent = decodeBazelEvent(decodedMessage.orderedBuildEvent.event.bazelEvent, messageType.root); + // console.log("Decoded Bazel Event:", decodedBazelEvent); + } else { + // console.log("No Bazel Event found."); + } + + // console.log("Decoded String:", decodedMessage.toJSON()); + } else { + // console.log(`No build data found for key ${redisKey}`); + } + } catch (err) { + // console.error(`Error fetching build data for key ${redisKey}:`, err); + } +} + +// biome-ignore lint/suspicious/noExplicitAny: +function decodeBazelEvent(bazelEvent: any, root: protobuf.Root): any { + if (!bazelEvent || !bazelEvent.value) return null; + + const decodedBinaryData = Buffer.from(bazelEvent.value, 'base64'); + const messageType = root.lookupType(bazelEvent.typeUrl.split('/').pop()); + const decodedMessage = messageType.decode(decodedBinaryData); + // In ein lesbares JSON-Objekt umwandeln + const decodedObject = messageType.toObject(decodedMessage, { + longs: String, + enums: String, + bytes: String, + }); + + // Progress Informationen verarbeiten + if (decodedObject.progress) { + console.log("Processing progress information...\n\n"); + processProgress(decodedObject.progress); + } + + return decodedObject; +} + +// biome-ignore lint/suspicious/noExplicitAny: +function processProgress(progress: any) { +// console.log(progress.stderr) + if (progress.stderr) { + // console.log(progress) + // const cleanStderr = stripAnsi(progress.stderr); + console.log(progress.stderr); + broadcastProgress(progress.stderr) + } + + if (progress.opaqueCount === 1) { + // console.log(`Progress Opaque Count: ${progress.opaqueCount}`); + // console.log(progress.stderr); + } + + if (progress.children) { + // biome-ignore lint/suspicious/noExplicitAny: + progress.children.forEach((child: any, index: number) => { + // console.log(`Child ${index + 1}:`); + if (child.progress && child.progress.opaqueCount ===2 ) { + // console.log(` Child Progress Opaque Count: ${child.progress.opaqueCount}`); + } + // if (child.configuration && child.configuration.id) { + // // console.log(` Child Configuration ID: ${child.configuration.id}`); + // } + }); + } +} diff --git a/web/bridge/src/protobuf.ts b/web/bridge/src/protobuf.ts new file mode 100644 index 000000000..861383ba4 --- /dev/null +++ b/web/bridge/src/protobuf.ts @@ -0,0 +1,107 @@ +import protobuf from 'protobufjs'; + +export async function initializeProtobuf(protos: string[]) { + console.log("Loading Remote Proto Files"); + + // Create a new Root instance + const combinedRoot = new protobuf.Root(); + + // Track loaded files to avoid circular dependencies + const loadedFiles: Record = {}; + + // Track processed imports to avoid duplicates + const processedImports = new Set(); + + // Load all initial proto files + for (const proto of protos) { + await loadProto(loadedFiles, combinedRoot, proto, processedImports); + } + + console.log("\nDone parsing all proto files.\n"); + + // Now combinedRoot contains your parsed .proto content + // Example: Look up specific message types + const BazelBuildEvent = combinedRoot.lookupType("build_event_stream.BuildEvent"); + const PublishBuildToolEventStreamRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishBuildToolEventStreamRequest"); + const PublishLifecycleEventRequest = combinedRoot.lookupType("google.devtools.build.v1.PublishLifecycleEventRequest"); + + console.log("Loaded Types:\n"); + console.log({ + PublishLifecycleEventRequest: PublishLifecycleEventRequest ? PublishLifecycleEventRequest.fullName : "Not found", + PublishBuildToolEventStreamRequest: PublishBuildToolEventStreamRequest ? PublishBuildToolEventStreamRequest.fullName : "Not found", + BazelBuildEvent: BazelBuildEvent ? BazelBuildEvent.fullName : "Not found" + }); + + return { + PublishLifecycleEventRequest, + PublishBuildToolEventStreamRequest, + BazelBuildEvent + }; +} + +function resolveImportPath(protoUrl: string, importPath: string): string { + // Handle googleapis imports + if (importPath.startsWith("google/api") || importPath.startsWith("google/devtools/build/v1")) { + return `https://raw.githubusercontent.com/googleapis/googleapis/master/${importPath}`; + } + + // Handle protocolbuffers imports + if (importPath.startsWith("google/protobuf")) { + return `https://raw.githubusercontent.com/protocolbuffers/protobuf/master/src/${importPath}`; + } + + // Handle specific case for bazel + if (importPath.includes("com/google/devtools/build/lib/packages/metrics") || importPath.startsWith("src/main/protobuf")) { + return `https://raw.githubusercontent.com/bazelbuild/bazel/master/${importPath}`; + } + + // Default behavior for other imports - resolve relative to protoUrl + return new URL(importPath, protoUrl).toString(); +} + +// Recursive function to fetch, parse, and handle imports +async function loadProto( + loadedFiles: Record, + root: protobuf.Root, + protoUrl: string, + processedImports: Set, + indentLevel = 0, +) { + if (loadedFiles[protoUrl]) { + // If already loaded, skip to prevent circular imports + return; + } + + const indent = ' '.repeat(indentLevel); + + // Fetch the .proto file content + const response = await fetch(protoUrl); + if (!response.ok) { + throw new Error(`Failed to fetch .proto file from ${protoUrl}: ${response.statusText}`); + } + + const protoContent = await response.text(); + + // Parse the proto content + const parsedProto = protobuf.parse(protoContent, root); + + // Mark this proto as loaded + loadedFiles[protoUrl] = true; + + // Log the imports necessary for this proto file + if (indentLevel < 1) { + console.log(`\n${indent} ${protoUrl}:`); + } + + if (parsedProto.imports && parsedProto.imports.length > 0) { + for (const importPath of parsedProto.imports) { + const resolvedImportUrl = resolveImportPath(protoUrl, importPath); + if (!processedImports.has(resolvedImportUrl)) { + console.log(`${indent} - ${importPath}`); + processedImports.add(resolvedImportUrl); + // Recursively handle the imports + await loadProto(loadedFiles, root, resolvedImportUrl, processedImports, indentLevel + 1,); + } + } + } +} diff --git a/web/bridge/src/redis.ts b/web/bridge/src/redis.ts new file mode 100644 index 000000000..fe571ab81 --- /dev/null +++ b/web/bridge/src/redis.ts @@ -0,0 +1,28 @@ +import { createClient } from 'redis'; + +export async function initializeRedisClients() { + try { + const redisClient = createClient({ + socket: { + host: "172.17.0.2", + port: 6379 + } + }); + const commandClient = redisClient.duplicate(); + + redisClient.on('error', (err) => { + console.error('Redis Client Error:', err); + throw new Error('Failed to connect to Redis.'); + }); + + await redisClient.connect(); + await commandClient.connect(); + + console.log('\nRedis clients successfully connected.\n'); + + return { redisClient, commandClient }; + } catch (error) { + console.error('Error during Redis client initialization:', error); + throw new Error('Unable to initialize Redis clients. Please check your connection.'); + } +} diff --git a/web/bridge/src/utils.ts b/web/bridge/src/utils.ts new file mode 100644 index 000000000..c312e3184 --- /dev/null +++ b/web/bridge/src/utils.ts @@ -0,0 +1,27 @@ +export function parseMessage(message: string) { + const parts = message.split(':'); + + const eventType = parts[0].replace('nativelink:', ''); + const eventID = parts.slice(1, 6).join(':'); + const subEventID = parts.slice(6, 11).join(':'); + const sequenceNumber = parts[11]; + + return { + eventType, + eventID, + subEventID, + sequenceNumber + }; +} + +// biome-ignore lint/suspicious/noExplicitAny: +export function constructRedisKey(parsedMessage: any) { + console.log("\nNew Published Event: ") + console.log(" EventID: ", parsedMessage.eventID) + console.log(" Sequence Number: ", parsedMessage.sequenceNumber) + console.log(" Invocation ID: ", parsedMessage.subEventID) + console.log("------------------") + + // console.log( `nativelink:${parsedMessage.eventType}:${parsedMessage.eventID}:${parsedMessage.subEventID}:${parsedMessage.sequenceNumber}`) + return `nativelink:${parsedMessage.eventType}:${parsedMessage.eventID}:${parsedMessage.subEventID}:${parsedMessage.sequenceNumber}`; +} diff --git a/web/bridge/src/websocket.ts b/web/bridge/src/websocket.ts new file mode 100644 index 000000000..f0b29c01e --- /dev/null +++ b/web/bridge/src/websocket.ts @@ -0,0 +1,46 @@ +import type { ServerWebSocket } from "bun"; + +const clients = new Set>(); + +export const startWebSocket = () => { + console.log('\nWebSocket server is running on ws://localhost:8080\n'); + Bun.serve({ + port: 8080, + fetch(req, server) { + // Upgrade the request to a WebSocket + // Here we can also do the websocket auth/token auth + if (server.upgrade(req)) { + return; + } + return new Response("Upgrade failed", { status: 500 }); + }, + websocket: { + open(ws) { + console.log('New client connected'); + clients.add(ws); + ws.send("Hello Web Client") + }, + message(ws, message) { + console.log('Received message from web client:', message); + }, + close(ws) { + console.log('Web Client disconnected'); + clients.delete(ws); + }, + drain(ws) { + console.log('Ready to receive more data'); + }, + }, +});} + +export function broadcastProgress(progress: string) { + // Convert the string to a Uint8Array + // const buffer = new TextEncoder().encode(progress); + // console.log(progress) + // console.log("----------------------------------------------") + const buffer = Buffer.from(progress) + + for (const ws of clients) { + ws.send(new Uint8Array(buffer)); // Send the ArrayBufferView + } +} diff --git a/web/bridge/tsconfig.json b/web/bridge/tsconfig.json new file mode 100644 index 000000000..238655f2c --- /dev/null +++ b/web/bridge/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + // Enable latest features + "lib": ["ESNext", "DOM"], + "target": "ESNext", + "module": "ESNext", + "moduleDetection": "force", + "jsx": "react-jsx", + "allowJs": true, + + // Bundler mode + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "verbatimModuleSyntax": true, + "noEmit": true, + + // Best practices + "strict": true, + "skipLibCheck": true, + "noFallthroughCasesInSwitch": true, + + // Some stricter flags (disabled by default) + "noUnusedLocals": false, + "noUnusedParameters": false, + "noPropertyAccessFromIndexSignature": false + } +}