From 5223b98930ebe86486791a225e8f6489460497c1 Mon Sep 17 00:00:00 2001 From: Jayanth Kumar Date: Thu, 9 Jan 2025 02:08:17 +0530 Subject: [PATCH] feat(visualization): Optimize loading and compression - data compression using pako - implement progressive loading with LTTB (Largest-Triangle-Three-Buckets) sampling - intersection observer for lazy loading visualizations - optimized chunk sizes and sampling - loading progress indicator for large datasets - zlib compression on server side for data transfer optimization --- frontend/package-lock.json | 23 ++ frontend/package.json | 2 + .../widgets/DataVisualizationWidget.jsx | 302 +++++++++--------- frontend/src/config/features.js | 19 +- frontend/src/utils/dataProcessing.js | 110 ++++++- preswald/server.py | 83 ++++- 6 files changed, 364 insertions(+), 175 deletions(-) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 0ed14d9..b4b2bb5 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -12,10 +12,12 @@ "@heroicons/react": "^2.2.0", "@tailwindcss/typography": "^0.5.10", "classnames": "^2.5.1", + "pako": "^2.1.0", "plotly.js": "^2.35.3", "react": "^18.3.1", "react-dom": "^18.3.1", "react-icons": "^5.4.0", + "react-intersection-observer": "^9.14.1", "react-markdown": "^9.0.1", "react-plotly.js": "^2.6.0", "react-router-dom": "^6.28.1", @@ -8173,6 +8175,12 @@ "integrity": "sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw==", "license": "BlueOak-1.0.0" }, + "node_modules/pako": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/pako/-/pako-2.1.0.tgz", + "integrity": "sha512-w+eufiZ1WuJYgPXbV/PO3NCMEc3xqylkKHzp8bxp1uW4qaSNQUkwmLLEc3kKsfz8lpV1F8Ht3U1Cm+9Srog2ug==", + "license": "(MIT AND Zlib)" + }, "node_modules/parent-module": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/parent-module/-/parent-module-1.0.1.tgz", @@ -8811,6 +8819,21 @@ "react": "*" } }, + "node_modules/react-intersection-observer": { + "version": "9.14.1", + "resolved": "https://registry.npmjs.org/react-intersection-observer/-/react-intersection-observer-9.14.1.tgz", + "integrity": "sha512-k1xIUn3sCQi3ugNeF64FJb3zwve5mcetvAUR9JazXeOmtap4IP2evN8rs+yf6SQ7F1QydsOGiqTmt+lySKZ9uA==", + "license": "MIT", + "peerDependencies": { + "react": "^17.0.0 || ^18.0.0 || ^19.0.0", + "react-dom": "^17.0.0 || ^18.0.0 || ^19.0.0" + }, + "peerDependenciesMeta": { + "react-dom": { + "optional": true + } + } + }, "node_modules/react-is": { "version": "16.13.1", "resolved": "https://registry.npmjs.org/react-is/-/react-is-16.13.1.tgz", diff --git a/frontend/package.json b/frontend/package.json index 8c4d1d6..4c2498e 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -14,10 +14,12 @@ "@heroicons/react": "^2.2.0", "@tailwindcss/typography": "^0.5.10", "classnames": "^2.5.1", + "pako": "^2.1.0", "plotly.js": "^2.35.3", "react": "^18.3.1", "react-dom": "^18.3.1", "react-icons": "^5.4.0", + "react-intersection-observer": "^9.14.1", "react-markdown": "^9.0.1", "react-plotly.js": "^2.6.0", "react-router-dom": "^6.28.1", diff --git a/frontend/src/components/widgets/DataVisualizationWidget.jsx b/frontend/src/components/widgets/DataVisualizationWidget.jsx index 9541819..d796791 100644 --- a/frontend/src/components/widgets/DataVisualizationWidget.jsx +++ b/frontend/src/components/widgets/DataVisualizationWidget.jsx @@ -1,115 +1,138 @@ import React, { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import { debounce, processDataInChunks, sampleData } from "../../utils/dataProcessing"; - +import { debounce, processDataInChunks, sampleData, decompressData } from "../../utils/dataProcessing"; import { FEATURES } from "../../config/features"; import Plot from "react-plotly.js"; +import { useInView } from "react-intersection-observer"; + +const INITIAL_POINTS_THRESHOLD = 1000; +const PROGRESSIVE_LOADING_CHUNK_SIZE = 500; -const DataVisualizationWidget = ({ id, data, content, error }) => { +const DataVisualizationWidget = ({ id, data: rawData, content, error }) => { const [isLoading, setIsLoading] = useState(true); const [plotError, setPlotError] = useState(null); - const [isVisible, setIsVisible] = useState(false); const [processedData, setProcessedData] = useState(null); const plotContainerRef = useRef(null); - const observerRef = useRef(null); + const [loadedDataPercentage, setLoadedDataPercentage] = useState(0); + + const { ref: inViewRef, inView } = useInView({ + threshold: 0.1, + triggerOnce: true, + }); + + // Set refs for both intersection observer and plot container + const setRefs = useCallback( + (node) => { + plotContainerRef.current = node; + inViewRef(node); + }, + [inViewRef] + ); - // Memoize the plot data processing - const processPlotData = useCallback((plotData) => { + // Decompress and process data if needed + const data = useMemo(() => { + if (!rawData) return null; + + try { + // Check if data is compressed + if (rawData.compressed) { + return decompressData(rawData.value); + } + return rawData; + } catch (error) { + console.error("Error processing data:", error); + setPlotError("Failed to process data"); + return null; + } + }, [rawData]); + + // Memoize the plot data processing with progressive loading + const processPlotData = useCallback((plotData, isInitialLoad = false) => { if (!plotData?.data || !Array.isArray(plotData.data)) return plotData; + const threshold = isInitialLoad ? INITIAL_POINTS_THRESHOLD : FEATURES.DATA_SAMPLING_THRESHOLD; + return { ...plotData, - data: plotData.data.map(trace => ({ - ...trace, - x: FEATURES.OPTIMIZED_VISUALIZATION - ? sampleData(trace.x, FEATURES.DATA_SAMPLING_THRESHOLD) - : trace.x, - y: FEATURES.OPTIMIZED_VISUALIZATION - ? sampleData(trace.y, FEATURES.DATA_SAMPLING_THRESHOLD) - : trace.y, - })) - }; - }, []); - - // Memoize the processed data - const memoizedData = useMemo(() => { - if (!data) return null; - return processPlotData(data); - }, [data, processPlotData]); - - // Setup Intersection Observer for lazy loading - useEffect(() => { - if (!FEATURES.OPTIMIZED_VISUALIZATION) { - setIsVisible(true); - return; - } - - const options = { - root: null, - rootMargin: '50px', - threshold: 0.1 - }; - - const observer = new IntersectionObserver((entries) => { - entries.forEach(entry => { - if (entry.isIntersecting) { - setIsVisible(true); - observer.disconnect(); + data: plotData.data.map(trace => { + // Deep clone the trace to avoid mutations + const processedTrace = { ...trace }; + + // Process numerical arrays for optimization + ['x', 'y', 'lat', 'lon'].forEach(key => { + if (Array.isArray(trace[key])) { + processedTrace[key] = sampleData(trace[key], threshold); + } + }); + + // Handle marker properties + if (trace.marker) { + processedTrace.marker = { ...trace.marker }; + ['size', 'color'].forEach(key => { + if (Array.isArray(trace.marker[key])) { + processedTrace.marker[key] = sampleData(trace.marker[key], threshold); + } + }); } - }); - }, options); - if (plotContainerRef.current) { - observer.observe(plotContainerRef.current); - } - - observerRef.current = observer; - return () => observer.disconnect(); + return processedTrace; + }) + }; }, []); // Handle data processing and loading useEffect(() => { - if (!isVisible) return; + if (!inView || !data) return; setIsLoading(true); setPlotError(null); + setLoadedDataPercentage(0); - if (typeof content === 'string' && content.includes('
')) { - try { - if (plotContainerRef.current) { - plotContainerRef.current.innerHTML = content; - const scripts = plotContainerRef.current.getElementsByTagName('script'); - Array.from(scripts).forEach(script => { - const newScript = document.createElement('script'); - Array.from(script.attributes).forEach(attr => { - newScript.setAttribute(attr.name, attr.value); - }); - newScript.appendChild(document.createTextNode(script.innerHTML)); - script.parentNode.replaceChild(newScript, script); - }); - } - } catch (err) { - setPlotError("Failed to render Plotly visualization"); - console.error("Plot rendering error:", err); - } - } else if (memoizedData?.data) { + try { + // Initial load with fewer points + const initialData = processPlotData(data, true); + setProcessedData(initialData); + setIsLoading(false); + + // Progressive load for full resolution if (FEATURES.OPTIMIZED_VISUALIZATION) { + const traces = data.data || []; + let totalPoints = 0; + let processedPoints = 0; + + traces.forEach(trace => { + ['x', 'y', 'lat', 'lon'].forEach(key => { + if (Array.isArray(trace[key])) { + totalPoints += trace[key].length; + } + }); + }); + processDataInChunks( - memoizedData.data, - FEATURES.PROGRESSIVE_LOADING_CHUNK_SIZE, - (chunk) => { + traces, + PROGRESSIVE_LOADING_CHUNK_SIZE, + (chunk, index, total) => { setProcessedData(prevData => ({ - ...memoizedData, - data: [...(prevData?.data || []), ...chunk] + ...data, + data: [ + ...prevData.data.slice(0, index), + ...chunk, + ...prevData.data.slice(index + chunk.length) + ] })); + + processedPoints += chunk.reduce((acc, trace) => { + return acc + (Array.isArray(trace.x) ? trace.x.length : 0); + }, 0); + + setLoadedDataPercentage((processedPoints / totalPoints) * 100); } ); - } else { - setProcessedData(memoizedData); } + } catch (err) { + console.error("Error processing plot data:", err); + setPlotError("Failed to process visualization data"); } - - setIsLoading(false); - }, [isVisible, content, memoizedData]); + }, [inView, data, processPlotData]); // Handle resize events with debouncing const debouncedResize = useMemo(() => @@ -117,15 +140,13 @@ const DataVisualizationWidget = ({ id, data, content, error }) => { if (plotContainerRef.current) { window.Plotly.Plots.resize(plotContainerRef.current); } - }, FEATURES.RESIZE_DEBOUNCE_MS), + }, 150), [] ); useEffect(() => { - if (FEATURES.OPTIMIZED_VISUALIZATION) { - window.addEventListener('resize', debouncedResize); - return () => window.removeEventListener('resize', debouncedResize); - } + window.addEventListener('resize', debouncedResize); + return () => window.removeEventListener('resize', debouncedResize); }, [debouncedResize]); if (error || plotError) { @@ -136,69 +157,56 @@ const DataVisualizationWidget = ({ id, data, content, error }) => { ); } - if (!isVisible || isLoading) { - return ( -
-
-
- ); - } - - if (processedData?.data) { - const { layout = {}, config = {} } = processedData; - - const defaultConfig = { - responsive: true, - displayModeBar: true, - modeBarButtonsToRemove: ['lasso2d', 'select2d'], - displaylogo: false, - ...config - }; - - const enhancedLayout = { - font: { family: 'Inter, system-ui, sans-serif' }, - paper_bgcolor: 'transparent', - plot_bgcolor: 'transparent', - margin: { t: 40, r: 10, l: 60, b: 40 }, - ...layout - }; - - try { - return ( -
-
- { - console.error("Plotly rendering error:", err); - setPlotError("Failed to render plot"); - }} - /> -
+ return ( +
+ {(!inView || isLoading) ? ( +
+
+

Loading visualization...

- ); - } catch (err) { - console.error("Error rendering Plotly component:", err); - return ( -
-

Error rendering plot: {err.message}

+ ) : processedData?.data ? ( +
+ {loadedDataPercentage > 0 && loadedDataPercentage < 100 && ( +
+
+
+
+
+ )} + { + console.error("Plotly rendering error:", err); + setPlotError("Failed to render plot"); + }} + />
- ); - } - } - - return ( -
-
+ ) : ( +
+ )}
); }; diff --git a/frontend/src/config/features.js b/frontend/src/config/features.js index 4e3364d..631562f 100644 --- a/frontend/src/config/features.js +++ b/frontend/src/config/features.js @@ -1,6 +1,17 @@ export const FEATURES = { - OPTIMIZED_VISUALIZATION: true, // Toggle for visualization optimizations - DATA_SAMPLING_THRESHOLD: 100, // Number of points above which to apply sampling - PROGRESSIVE_LOADING_CHUNK_SIZE: 100, // Number of points to load in each chunk - RESIZE_DEBOUNCE_MS: 10, // Debounce time for resize events in milliseconds + OPTIMIZED_VISUALIZATION: false, + DATA_SAMPLING_THRESHOLD: 5000, + INITIAL_POINTS_THRESHOLD: 1000, + PROGRESSIVE_LOADING_CHUNK_SIZE: 500, + ENABLE_COMPRESSION: true, + ENABLE_VIRTUALIZATION: false, + VIRTUALIZATION_THRESHOLD: 1000, + DEBOUNCE_MS: 150, + ANIMATION_FRAME_ENABLED: true, + GARBAGE_COLLECTION_INTERVAL: 60000, // 1 minute + MAX_CACHED_POINTS: 100000, + BATCH_UPDATES: true, + BATCH_INTERVAL_MS: 100, + MAX_RETRY_ATTEMPTS: 3, + RETRY_DELAY_MS: 1000, }; \ No newline at end of file diff --git a/frontend/src/utils/dataProcessing.js b/frontend/src/utils/dataProcessing.js index 0f292e5..63a7597 100644 --- a/frontend/src/utils/dataProcessing.js +++ b/frontend/src/utils/dataProcessing.js @@ -1,37 +1,123 @@ -// Utility function to sample data points +import pako from 'pako'; + +// Decompress data received from WebSocket +export const decompressData = (compressedData) => { + try { + const decompressed = pako.inflate(compressedData); + const textDecoder = new TextDecoder('utf-8'); + const jsonStr = textDecoder.decode(decompressed); + return JSON.parse(jsonStr); + } catch (error) { + console.error('Error decompressing data:', error); + return null; + } +}; + +// Utility function to sample data points with smart sampling export const sampleData = (data, threshold) => { if (!Array.isArray(data) || data.length <= threshold) return data; + // Use LTTB (Largest-Triangle-Three-Buckets) algorithm for time series + if (typeof data[0] === 'number') { + return lttbDownsample(data, threshold); + } + + // For categorical or non-numeric data, use regular sampling const samplingRate = Math.ceil(data.length / threshold); return data.filter((_, index) => index % samplingRate === 0); }; -// Process data in chunks +// LTTB (Largest-Triangle-Three-Buckets) downsampling +function lttbDownsample(data, threshold) { + if (data.length <= threshold) return data; + + const sampled = new Array(threshold); + let sampledIndex = 0; + sampled[sampledIndex++] = data[0]; // Always add the first point + + const bucketSize = (data.length - 2) / (threshold - 2); + + let lastSelectedX = 0; + let lastSelectedY = data[0]; + + for (let i = 0; i < threshold - 2; i++) { + const bucketStart = Math.floor((i + 0) * bucketSize) + 1; + const bucketEnd = Math.floor((i + 1) * bucketSize) + 1; + + const avgX = (bucketStart + bucketEnd) / 2; + let maxArea = -1; + let maxAreaPoint = data[bucketStart]; + + for (let j = bucketStart; j < bucketEnd; j++) { + // Calculate triangle area + const area = Math.abs( + (lastSelectedX - avgX) * (data[j] - lastSelectedY) - + (lastSelectedX - j) * (avgX - lastSelectedY) + ) * 0.5; + + if (area > maxArea) { + maxArea = area; + maxAreaPoint = data[j]; + } + } + + sampled[sampledIndex++] = maxAreaPoint; + lastSelectedX = bucketEnd; + lastSelectedY = maxAreaPoint; + } + + sampled[sampledIndex] = data[data.length - 1]; // Always add the last point + return sampled; +} + +// Process data in chunks with optimized chunk size export const processDataInChunks = (data, chunkSize, callback) => { + if (!Array.isArray(data)) return callback(data); + + const optimalChunkSize = Math.min( + chunkSize, + Math.max(100, Math.floor(data.length / 10)) + ); + let currentIndex = 0; + let isProcessing = false; const processNextChunk = () => { - const chunk = data.slice(currentIndex, currentIndex + chunkSize); - callback(chunk, currentIndex); - currentIndex += chunkSize; + if (isProcessing || currentIndex >= data.length) return; - if (currentIndex < data.length) { - requestAnimationFrame(processNextChunk); - } + isProcessing = true; + const chunk = data.slice(currentIndex, currentIndex + optimalChunkSize); + + requestAnimationFrame(() => { + callback(chunk, currentIndex, data.length); + currentIndex += optimalChunkSize; + isProcessing = false; + + if (currentIndex < data.length) { + processNextChunk(); + } + }); }; processNextChunk(); }; -// Debounce function -export const debounce = (func, wait) => { +// Enhanced debounce with immediate option +export const debounce = (func, wait, immediate = false) => { let timeout; + return function executedFunction(...args) { + const context = this; + const later = () => { - clearTimeout(timeout); - func(...args); + timeout = null; + if (!immediate) func.apply(context, args); }; + + const callNow = immediate && !timeout; clearTimeout(timeout); timeout = setTimeout(later, wait); + + if (callNow) func.apply(context, args); }; }; \ No newline at end of file diff --git a/preswald/server.py b/preswald/server.py index 9a13ed2..1bccaf5 100644 --- a/preswald/server.py +++ b/preswald/server.py @@ -7,7 +7,7 @@ import pkg_resources import logging import uvicorn -from typing import Dict, Any, Optional, Set +from typing import Dict, Any, Optional, Set, List, Union from preswald.scriptrunner import ScriptRunner from preswald.core import ( get_all_component_states, @@ -21,6 +21,8 @@ import signal import sys import time +import zlib +import numpy as np logger = logging.getLogger(__name__) @@ -424,20 +426,77 @@ async def send_error(websocket: WebSocket, message: str): except Exception as e: logger.error(f"Error sending error message: {e}") +def optimize_plotly_data(data: Dict[str, Any], max_points: int = 5000) -> Dict[str, Any]: + """Optimize Plotly data for large datasets.""" + if not isinstance(data, dict) or 'data' not in data: + return data + + optimized_data = {'data': [], 'layout': data.get('layout', {})} + + for trace in data['data']: + if not isinstance(trace, dict): + continue + + # Handle scatter/scattergeo traces + if trace.get('type') in ['scatter', 'scattergeo']: + points = len(trace.get('x', [])) if 'x' in trace else len(trace.get('lat', [])) + if points > max_points: + # Calculate sampling rate + sample_rate = max(1, points // max_points) + + # Sample the data + if 'x' in trace and 'y' in trace: + trace['x'] = trace['x'][::sample_rate] + trace['y'] = trace['y'][::sample_rate] + elif 'lat' in trace and 'lon' in trace: + trace['lat'] = trace['lat'][::sample_rate] + trace['lon'] = trace['lon'][::sample_rate] + + # Sample other array attributes + for key in ['text', 'marker.size', 'marker.color']: + if key in trace: + if isinstance(trace[key], list): + trace[key] = trace[key][::sample_rate] + + optimized_data['data'].append(trace) + + return optimized_data + +def compress_data(data: Union[Dict, List, str]) -> bytes: + """Compress data using zlib.""" + json_str = json_dumps(data) + return zlib.compress(json_str.encode('utf-8')) + +def decompress_data(compressed_data: bytes) -> Union[Dict, List, str]: + """Decompress zlib compressed data.""" + decompressed = zlib.decompress(compressed_data) + return json_loads(decompressed.decode('utf-8')) + async def broadcast_state_update(component_id: str, value: Any, exclude_client: WebSocket = None): - """Broadcast state update to all clients except the sender""" - update_msg = { - "type": "state_update", - "component_id": component_id, - "value": value + """Broadcast state updates to all connected clients with optimizations.""" + if not websocket_connections: + return + + # Optimize plotly data if it's a visualization component + if isinstance(value, dict) and 'data' in value and 'layout' in value: + value = optimize_plotly_data(value) + + # Compress the data + compressed_value = compress_data(value) + + message = { + 'type': 'state_update', + 'component_id': component_id, + 'value': compressed_value, + 'compressed': True } - try: - for client in websocket_connections.values(): - if client != exclude_client: - await client.send_json(update_msg) - except Exception as e: - logger.error(f"Error broadcasting state update: {e}") + for client_id, websocket in websocket_connections.items(): + if websocket != exclude_client: + try: + await websocket.send_bytes(compress_data(message)) + except Exception as e: + logger.error(f"Error sending message to client {client_id}: {e}") async def rerun_script(websocket: WebSocket, states: Dict[str, Any]): """Rerun the script with updated states"""