diff --git a/apps/dev-playground/client/src/appKitTypes.d.ts b/apps/dev-playground/client/src/appKitTypes.d.ts index be9e1f61..bc7094a5 100644 --- a/apps/dev-playground/client/src/appKitTypes.d.ts +++ b/apps/dev-playground/client/src/appKitTypes.d.ts @@ -3,7 +3,7 @@ import "@databricks/app-kit-ui/react"; declare module "@databricks/app-kit-ui/react" { interface PluginRegistry { - reconnect: { + "reconnect": { "/": { message: string; }; @@ -15,7 +15,7 @@ declare module "@databricks/app-kit-ui/react" { content: string; }; } - analytics: { + "analytics": { "/users/me/query/:query_key": { chunk_index: number; row_offset: number; @@ -32,6 +32,7 @@ declare module "@databricks/app-kit-ui/react" { } interface QueryRegistry { + apps_list: { id: string; name: string; @@ -61,5 +62,4 @@ declare module "@databricks/app-kit-ui/react" { total_cost_usd: number; }[]; } - } diff --git a/apps/dev-playground/client/src/hooks/use-directory-listing.ts b/apps/dev-playground/client/src/hooks/use-directory-listing.ts new file mode 100644 index 00000000..aa1febda --- /dev/null +++ b/apps/dev-playground/client/src/hooks/use-directory-listing.ts @@ -0,0 +1,190 @@ +import { useState, useEffect, useCallback } from "react"; + +interface FileItem { + name: string; + path: string; + isDirectory: boolean; + size?: number; + mimeType?: string | null; +} + +interface DirectoryListing { + path: string; + files: FileItem[]; +} + +export function useDirectoryListing( + initialPath: string = "/", + onPathChange?: (path: string) => void, + batchSize: number = 50, +) { + const [currentPath, setCurrentPath] = useState(initialPath); + const [directoryListing, setDirectoryListing] = + useState(null); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const fetchPath = useCallback( + async (path: string) => { + setLoading(true); + setError(null); + setDirectoryListing(null); + + // Batch-related variables accessible to both try and catch blocks + const files: FileItem[] = []; + let dirPath = path; + let batchBuffer: FileItem[] = []; + const effectiveBatchSize = Math.max(1, batchSize); // Clamp to minimum 1 + + // Flush function - centralized batch update logic + const flushBatch = () => { + if (batchBuffer.length > 0) { + files.push(...batchBuffer); + setDirectoryListing({ path: dirPath, files: [...files] }); + batchBuffer = []; + } + }; + + try { + // Stream directory listing from NDJSON response + const response = await fetch(`/api/volume-serving${path}`); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + // Read the stream line by line + const reader = response.body?.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + if (reader) { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + flushBatch(); // Flush remaining files + break; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + + // Process all complete lines + for (let i = 0; i < lines.length - 1; i++) { + const line = lines[i].trim(); + if (line) { + const data = JSON.parse(line); + + if (data.type === "metadata") { + dirPath = data.path; + } else if (data.type === "file") { + const fileItem: FileItem = { + name: data.name, + path: data.path, + isDirectory: data.isDirectory, + size: data.size, + mimeType: data.mimeType, + }; + + batchBuffer.push(fileItem); + + // Flush when batch is full + if (batchBuffer.length >= effectiveBatchSize) { + flushBatch(); + } + } + } + } + + // Keep the last incomplete line in the buffer + buffer = lines[lines.length - 1]; + } + } + } catch (err) { + flushBatch(); // Flush partial batch before error state + setError(err instanceof Error ? err.message : "Failed to fetch path"); + } finally { + setLoading(false); + } + }, + [batchSize], + ); + + const navigateUp = () => { + // Remove trailing slash if present + const cleanPath = + currentPath.endsWith("/") && currentPath !== "/" + ? currentPath.slice(0, -1) + : currentPath; + + // Get parent directory + const parentPath = + cleanPath.substring(0, cleanPath.lastIndexOf("/")) || "/"; + const normalizedParent = parentPath === "/" ? "/" : `${parentPath}/`; + + setCurrentPath(normalizedParent); + onPathChange?.(normalizedParent); + fetchPath(normalizedParent); + }; + + const handleNavigate = (file: FileItem) => { + // Handle ".." navigation + if (file.name === "..") { + navigateUp(); + return; + } + + if (file.isDirectory) { + // Navigate to directory + const newPath = file.path; + setCurrentPath(newPath); + onPathChange?.(newPath); + fetchPath(newPath); + } else { + // Open file in new window + window.open(`/api/volume-serving${file.path}`, "_blank"); + } + }; + + // Load directory when initialPath changes (including from URL) + useEffect(() => { + setCurrentPath(initialPath); + fetchPath(initialPath); + }, [initialPath, fetchPath]); + + // Prepare files list with ".." entry if not in root, sorted with folders first + const filesWithNavigation = directoryListing + ? [ + ...(currentPath !== "/" + ? [ + { + name: "..", + path: "", + isDirectory: true, + size: undefined, + mimeType: null, + }, + ] + : []), + // Sort: directories first, then by name + ...directoryListing.files.sort((a, b) => { + if (a.isDirectory === b.isDirectory) { + return a.name.localeCompare(b.name); + } + return a.isDirectory ? -1 : 1; + }), + ] + : []; + + return { + currentPath, + directoryListing, + loading, + error, + filesWithNavigation, + fetchPath, + navigateUp, + handleNavigate, + }; +} diff --git a/apps/dev-playground/client/src/routeTree.gen.ts b/apps/dev-playground/client/src/routeTree.gen.ts index fefd80c8..687c477a 100644 --- a/apps/dev-playground/client/src/routeTree.gen.ts +++ b/apps/dev-playground/client/src/routeTree.gen.ts @@ -9,12 +9,18 @@ // Additionally, you should also exclude this file from your linter and/or formatter to prevent it from being checked or modified. import { Route as rootRouteImport } from "./routes/__root"; +import { Route as VolumeServingRouteRouteImport } from "./routes/volume-serving.route"; import { Route as TelemetryRouteRouteImport } from "./routes/telemetry.route"; import { Route as ReconnectRouteRouteImport } from "./routes/reconnect.route"; import { Route as DataVisualizationRouteRouteImport } from "./routes/data-visualization.route"; import { Route as AnalyticsRouteRouteImport } from "./routes/analytics.route"; import { Route as IndexRouteImport } from "./routes/index"; +const VolumeServingRouteRoute = VolumeServingRouteRouteImport.update({ + id: "/volume-serving", + path: "/volume-serving", + getParentRoute: () => rootRouteImport, +} as any); const TelemetryRouteRoute = TelemetryRouteRouteImport.update({ id: "/telemetry", path: "/telemetry", @@ -47,6 +53,7 @@ export interface FileRoutesByFullPath { "/data-visualization": typeof DataVisualizationRouteRoute; "/reconnect": typeof ReconnectRouteRoute; "/telemetry": typeof TelemetryRouteRoute; + "/volume-serving": typeof VolumeServingRouteRoute; } export interface FileRoutesByTo { "/": typeof IndexRoute; @@ -54,6 +61,7 @@ export interface FileRoutesByTo { "/data-visualization": typeof DataVisualizationRouteRoute; "/reconnect": typeof ReconnectRouteRoute; "/telemetry": typeof TelemetryRouteRoute; + "/volume-serving": typeof VolumeServingRouteRoute; } export interface FileRoutesById { __root__: typeof rootRouteImport; @@ -62,6 +70,7 @@ export interface FileRoutesById { "/data-visualization": typeof DataVisualizationRouteRoute; "/reconnect": typeof ReconnectRouteRoute; "/telemetry": typeof TelemetryRouteRoute; + "/volume-serving": typeof VolumeServingRouteRoute; } export interface FileRouteTypes { fileRoutesByFullPath: FileRoutesByFullPath; @@ -70,16 +79,24 @@ export interface FileRouteTypes { | "/analytics" | "/data-visualization" | "/reconnect" - | "/telemetry"; + | "/telemetry" + | "/volume-serving"; fileRoutesByTo: FileRoutesByTo; - to: "/" | "/analytics" | "/data-visualization" | "/reconnect" | "/telemetry"; + to: + | "/" + | "/analytics" + | "/data-visualization" + | "/reconnect" + | "/telemetry" + | "/volume-serving"; id: | "__root__" | "/" | "/analytics" | "/data-visualization" | "/reconnect" - | "/telemetry"; + | "/telemetry" + | "/volume-serving"; fileRoutesById: FileRoutesById; } export interface RootRouteChildren { @@ -88,10 +105,18 @@ export interface RootRouteChildren { DataVisualizationRouteRoute: typeof DataVisualizationRouteRoute; ReconnectRouteRoute: typeof ReconnectRouteRoute; TelemetryRouteRoute: typeof TelemetryRouteRoute; + VolumeServingRouteRoute: typeof VolumeServingRouteRoute; } declare module "@tanstack/react-router" { interface FileRoutesByPath { + "/volume-serving": { + id: "/volume-serving"; + path: "/volume-serving"; + fullPath: "/volume-serving"; + preLoaderRoute: typeof VolumeServingRouteRouteImport; + parentRoute: typeof rootRouteImport; + }; "/telemetry": { id: "/telemetry"; path: "/telemetry"; @@ -136,6 +161,7 @@ const rootRouteChildren: RootRouteChildren = { DataVisualizationRouteRoute: DataVisualizationRouteRoute, ReconnectRouteRoute: ReconnectRouteRoute, TelemetryRouteRoute: TelemetryRouteRoute, + VolumeServingRouteRoute: VolumeServingRouteRoute, }; export const routeTree = rootRouteImport ._addFileChildren(rootRouteChildren) diff --git a/apps/dev-playground/client/src/routes/__root.tsx b/apps/dev-playground/client/src/routes/__root.tsx index 8c83fe63..a1cbab27 100644 --- a/apps/dev-playground/client/src/routes/__root.tsx +++ b/apps/dev-playground/client/src/routes/__root.tsx @@ -56,6 +56,14 @@ function RootComponent() { Telemetry + + + diff --git a/apps/dev-playground/client/src/routes/index.tsx b/apps/dev-playground/client/src/routes/index.tsx index 6c632afa..a268445d 100644 --- a/apps/dev-playground/client/src/routes/index.tsx +++ b/apps/dev-playground/client/src/routes/index.tsx @@ -103,6 +103,24 @@ function IndexRoute() { + + +
+

+ Volume Serving +

+

+ Serve files and data directly from Databricks volumes with high + performance and scalability. +

+ +
+
diff --git a/apps/dev-playground/client/src/routes/telemetry.route.tsx b/apps/dev-playground/client/src/routes/telemetry.route.tsx index a9179172..2b4301c4 100644 --- a/apps/dev-playground/client/src/routes/telemetry.route.tsx +++ b/apps/dev-playground/client/src/routes/telemetry.route.tsx @@ -1,8 +1,8 @@ import { createFileRoute, retainSearchParams } from "@tanstack/react-router"; +import { Activity, Loader2 } from "lucide-react"; +import { useState } from "react"; import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; -import { useState } from "react"; -import { Activity, Loader2 } from "lucide-react"; export const Route = createFileRoute("/telemetry")({ component: TelemetryRoute, diff --git a/apps/dev-playground/client/src/routes/volume-serving.route.tsx b/apps/dev-playground/client/src/routes/volume-serving.route.tsx new file mode 100644 index 00000000..5160b598 --- /dev/null +++ b/apps/dev-playground/client/src/routes/volume-serving.route.tsx @@ -0,0 +1,338 @@ +import { createFileRoute, useNavigate } from "@tanstack/react-router"; +import { Header } from "@/components/layout/header"; +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "@/components/ui/table"; +import { Skeleton } from "@/components/ui/skeleton"; +import { useDirectoryListing } from "@/hooks/use-directory-listing"; + +export const Route = createFileRoute("/volume-serving")({ + component: VolumeServingRoute, + validateSearch: (search: Record) => { + return { + path: (search.path as string) || "/", + }; + }, +}); + +function formatFileSize(bytes?: number): string { + if (bytes === undefined || bytes === null) return "-"; + if (bytes === 0) return "0 B"; + + const units = ["B", "KB", "MB", "GB", "TB"]; + const k = 1024; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + + return `${(bytes / k ** i).toFixed(2)} ${units[i]}`; +} + +function VolumeServingRoute() { + const navigate = useNavigate({ from: Route.fullPath }); + const { path } = Route.useSearch(); + const { + currentPath, + directoryListing, + loading, + error, + filesWithNavigation, + handleNavigate, + } = useDirectoryListing(path, (newPath: string) => { + navigate({ search: { path: newPath } }); + }); + + // Check if error indicates plugin is not configured (404, JSON parse error, or connection refused) + const isPluginNotConfigured = + error && + (error.includes("404") || + error.includes("HTTP error! status: 404") || + error.includes("not valid JSON") || + error.includes("Unexpected token")); + + return ( +
+
+
+ +
+ + + Browse Volume Files + + {loading + ? "Loading..." + : "Click on folders to navigate, files to open"} + + + +
+ {/* Show info overlay when plugin is not configured */} + {isPluginNotConfigured && ( +
+
+
â„šī¸
+

+ Volume Serving Not Configured +

+

+ The volume serving plugin is not configured. To enable + file browsing, set the{" "} + + VOLUME_PATH + {" "} + environment variable. +

+
+

+ Example configuration: +

+
+                          {`VOLUME_PATH=/Volumes/catalog/schema/volume_name`}
+                        
+
+

+ Add this to your{" "} + + .env + {" "} + file and restart the server. +

+
+
+ )} + + {/* Show error message for other errors */} + {error && !isPluginNotConfigured && ( +
+

{error}

+
+ )} + + {/* File listing table (with or without overlay) */} +
+
+ {currentPath || "/"} +
+
+ + + + + Name + + Size + + Type + MIME Type + + + + {loading || isPluginNotConfigured ? ( + // Show skeleton rows while loading or when plugin not configured + Array.from({ length: 8 }).map((_, idx) => ( + // biome-ignore lint/suspicious/noArrayIndexKey: Skeleton rows are static placeholders + + + + + + + + + + + + + + + + + + )) + ) : directoryListing ? ( + // Show actual files when loaded + filesWithNavigation.map((file) => { + const isFile = + !file.isDirectory && file.name !== ".."; + const href = isFile + ? `/api/volume-serving${file.path}` + : "#"; + + return ( + + + + {file.name === ".." + ? "âŦ†ī¸" + : file.isDirectory + ? "📁" + : "📄"} + + + + {isFile ? ( + + {file.name} + + ) : ( + + )} + + + {formatFileSize(file.size)} + + + {file.name === ".." + ? "Parent" + : file.isDirectory + ? "Folder" + : "File"} + + + {file.mimeType || "-"} + + + ); + }) + ) : ( + + + No files or directories to display + + + )} + +
+
+
+
+
+
+ + + + About Volume Serving + + Serve files directly from Databricks Unity Catalog volumes + + + +
+

+ Volume serving allows you to serve files directly from + Databricks Unity Catalog volumes. This is ideal for serving + static assets, model artifacts, or any files stored in + volumes. +

+
+

+ How to Use +

+
    +
  • + The browser automatically loads the root directory on page + load +
  • +
  • + Click on folder names (📁) to navigate into subdirectories +
  • +
  • + Click on the{" "} + + .. + {" "} + entry (âŦ†ī¸) to go up to the parent directory +
  • +
  • Click on file names (📄) to open them in a new tab
  • +
  • Folders are always listed first, followed by files
  • +
+
+
+

+ Configuration +

+

+ Set the volume path in your server configuration: +

+
+                    {`volumeServing({
+  volumePath: process.env.VOLUME_PATH,
+  enableDirectoryListing: true
+})`}
+                  
+

+ In your{" "} + + .env + {" "} + file: +

+
+                    {`VOLUME_PATH=/Volumes/catalog/schema/volume_name`}
+                  
+
+
+

+ API Access +

+
    +
  • + Directories:{" "} + + /api/volume-serving/path/to/dir/ + {" "} + (with trailing{" "} + / + ) +
  • +
  • + Files:{" "} + + /api/volume-serving/path/to/file.ext + {" "} + (without trailing{" "} + / + ) +
  • +
+
+
+
+
+
+
+
+ ); +} diff --git a/apps/dev-playground/server/.env.dist b/apps/dev-playground/server/.env.dist index ae8f1a73..9ade7c5a 100644 --- a/apps/dev-playground/server/.env.dist +++ b/apps/dev-playground/server/.env.dist @@ -7,3 +7,5 @@ NODE_ENV='development' OTEL_EXPORTER_OTLP_ENDPOINT='http://localhost:4318' OTEL_RESOURCE_ATTRIBUTES='service.sample_attribute=dev' OTEL_SERVICE_NAME='sdk-playground' +# Volume Serving Plugin: Full path to the Databricks Unity Catalog Volume (optional) +# VOLUME_PATH=/Volumes/main/default/my_volume diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index d8b236c5..02eb4a3e 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -1,7 +1,57 @@ import { analytics, createApp, server } from "@databricks/app-kit"; +import { + volumeServing, + policy, + type VolumeConfigs, +} from "./plugins/volume-serving-plugin"; import { reconnect } from "./reconnect-plugin"; import { telemetryExamples } from "./telemetry-example-plugin"; -createApp({ - plugins: [server(), reconnect(), telemetryExamples(), analytics()], -}); +// Build plugins array conditionally based on environment +// Log warning if VOLUME_PATH is not configured +if (!process.env.VOLUME_PATH) { + console.warn( + "[Server] VOLUME_PATH not configured - volume-serving plugin will not be loaded", + ); +} + +const volumeConfigs: VolumeConfigs = { + images: { + volumePath: process.env.VOLUME_PATH_IMAGES || "/Volumes/default/images", + policy: policy.publicRead(), + }, + home: { + volumePath: process.env.VOLUME_PATH_HOME || "/Volumes/default/home", + pathPrefix: "users/", // Internal prefix - public URLs don't include this + policy: policy.any( + // Allow download/list of own files + (action, resource, user) => + ["download", "list"].includes(action) && + resource.path.startsWith(`/${user.id}/`), + // Allow upload to own uploads directory + (action, resource, user) => + ["upload", "upsert"].includes(action) && + resource.size < 10 * 1024 * 1024 && + (resource.mimeType.startsWith("image/") || resource.mimeType.startsWith("video/")) && + resource.path.startsWith(`/${user.id}/uploads/`), + ), + onAfterUpload: (req, res, resource, user) => { + console.log("File uploaded successfully", resource, user); + res.status(200).json({ message: "File uploaded successfully" }); + }, + }, +}; + +const plugins = [ + server(), + reconnect(), + telemetryExamples(), + analytics(), +]; + +// Only add volume serving plugin if VOLUME_PATH is configured +if (process.env.VOLUME_PATH) { + plugins.push(volumeServing({ volumeConfigs })); +} + +createApp({ plugins }); diff --git a/apps/dev-playground/server/plugins/volume-serving-plugin.ts b/apps/dev-playground/server/plugins/volume-serving-plugin.ts new file mode 100644 index 00000000..269ce9eb --- /dev/null +++ b/apps/dev-playground/server/plugins/volume-serving-plugin.ts @@ -0,0 +1,649 @@ +import { join, normalize } from "node:path"; +import { Readable } from "node:stream"; +import { pipeline } from "node:stream/promises"; +import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import type { IAppRouter, IAppRequest } from "shared"; +import { + type BasePluginConfig, + Plugin, + toPlugin, + getRequestContext, + type Response, +} from "@databricks/app-kit"; + +// Type definitions for volume serving +export type Action = "download" | "list" | "upload" | "upsert"; + +export interface Resource { + path: string; + volume: string; + isDirectory: boolean; + size: number; + mimeType: string; +} + +export type User = { + id: string; +}; + +export type Policy = (action: Action, resource: Resource, user: User) => boolean; + +// Policy combinators +export const policy = { + /** + * Combines multiple policies with AND logic - all policies must return true + */ + all: (...policies: Policy[]): Policy => { + return (action: Action, resource: Resource, user: User): boolean => { + for (const p of policies) { + if (!p(action, resource, user)) { + return false; + } + } + return true; + }; + }, + + /** + * Combines multiple policies with OR logic - at least one policy must return true + */ + any: (...policies: Policy[]): Policy => { + return (action: Action, resource: Resource, user: User): boolean => { + for (const p of policies) { + if (p(action, resource, user)) { + return true; + } + } + return false; + }; + }, + + /** + * Allow downloading files but deny listing directories. + * Useful for serving public files where you don't want directory browsing. + */ + publicRead: (): Policy => { + return (action: Action, resource: Resource, user: User): boolean => { + return action === "download"; + }; + }, + + /** + * Allow downloading files and listing directories (read-only access). + * Useful for public file browsing where users can explore and download. + */ + publicReadAndList: (): Policy => { + return (action: Action, resource: Resource, user: User): boolean => { + return action === "download" || action === "list"; + }; + }, +}; + +export interface VolumeConfig { + volumePath: string; + pathPrefix?: string; + policy: Policy; + onAfterUpload?: ( + req: IAppRequest, + res: Response, + resource: Resource, + user: User, + ) => void; +} + +export interface VolumeConfigs { + [key: string]: VolumeConfig; +} + +export interface VolumeServingConfig extends BasePluginConfig { + volumeConfigs: VolumeConfigs; +} + +export class VolumeServingPlugin extends Plugin { + static DEFAULT_CONFIG: Record = {}; + name = "volume-serving"; + envVars = []; + protected declare config: VolumeServingConfig; + + constructor(config: VolumeServingConfig) { + super(config); + this.config = config; + } + + async setup(): Promise { + console.log("[VolumeServing] Plugin initialized"); + + const configKeys = Object.keys(this.config.volumeConfigs); + if (configKeys.length === 0) { + console.warn("[VolumeServing] No volume configurations provided"); + return; + } + + console.log(`[VolumeServing] Configured ${configKeys.length} volume(s):`); + for (const [key, config] of Object.entries(this.config.volumeConfigs)) { + const prefixInfo = config.pathPrefix + ? ` (prefix: ${config.pathPrefix})` + : ""; + console.log(` - ${key}: ${config.volumePath}${prefixInfo}`); + } + } + + private parsePath(requestPath: string, configKey: string): string { + const prefix = `/api/${this.name}/${configKey}`; + const path = requestPath.startsWith(prefix) + ? requestPath.substring(prefix.length) + : requestPath; + return path.startsWith("/") ? path : `/${path}`; + } + + private normalizePath( + requestPath: string, + volumePath: string, + pathPrefix?: string, + ): string | null { + if (!volumePath) { + throw new Error("Volume path is not configured"); + } + + // Remove leading slash for path.join to work correctly + const cleanPath = requestPath.startsWith("/") + ? requestPath.slice(1) + : requestPath; + + // Normalize to resolve .. and . segments + const normalized = normalize(cleanPath); + + // Check for path traversal attempts + // After normalization, the path should not start with .. or contain .. + if (normalized.startsWith("..") || normalized.includes("/..")) { + console.warn( + `[VolumeServing] Path traversal attempt blocked: ${requestPath}`, + ); + return null; + } + + // Prepend pathPrefix if configured (internal prefix not visible in public URLs) + const pathWithPrefix = pathPrefix + ? join(pathPrefix, normalized) + : normalized; + + // Construct full path + const fullPath = join(volumePath, pathWithPrefix); + + // Final security check: ensure the path starts with the volume path + // If pathPrefix is set, also ensure path is under the prefix + const expectedBasePath = pathPrefix + ? join(volumePath, pathPrefix) + : volumePath; + + if (!fullPath.startsWith(expectedBasePath)) { + console.warn( + `[VolumeServing] Path outside allowed area blocked: ${requestPath}`, + ); + return null; + } + + return fullPath; + } + + private async validatePolicy( + action: Action, + resource: Resource, + user: User, + policy: Policy, + ): Promise { + try { + return policy(action, resource, user); + } catch (error) { + console.error("[VolumeServing] Policy execution error:", error); + return false; + } + } + + private async handleGetRequest( + req: IAppRequest, + res: Response, + configKey: string, + volumeConfig: VolumeConfig, + ): Promise { + try { + const requestContext = getRequestContext(); + const serviceClient: WorkspaceClient = + requestContext.serviceDatabricksClient; + const user: User = { id: requestContext.userId }; + + const filePath = this.parsePath(req.path, configKey); + const fullPath = this.normalizePath( + filePath, + volumeConfig.volumePath, + volumeConfig.pathPrefix, + ); + + if (!fullPath) { + res.status(403).json({ + error: "Invalid path", + message: "Path traversal attempts are not allowed", + }); + return; + } + + // Determine action based on path + const action: Action = filePath.endsWith("/") ? "list" : "download"; + + if (action === "list") { + // Directory listing + const resource: Resource = { + path: filePath, + volume: configKey, + isDirectory: true, + size: 0, + mimeType: "", + }; + + if ( + !(await this.validatePolicy( + action, + resource, + user, + volumeConfig.policy, + )) + ) { + res.status(403).json({ + error: "Forbidden", + message: "You do not have permission to list this directory", + }); + return; + } + + // Stream directory listing + res.setHeader("Content-Type", "application/x-ndjson"); + res.setHeader("Transfer-Encoding", "chunked"); + + try { + for await (const item of this.listDirectoryStream( + fullPath, + filePath, + )) { + res.write(`${JSON.stringify(item)}\n`); + } + res.end(); + } catch (streamError) { + console.error( + "[VolumeServing] Error streaming directory:", + streamError, + ); + if (!res.headersSent) { + res.status(500).json({ error: "Failed to list directory" }); + } + } + } else { + // File download + try { + const downloadResponse = await serviceClient.files.download({ + file_path: fullPath, + }); + + const resource: Resource = { + path: filePath, + volume: configKey, + isDirectory: false, + size: Number(downloadResponse["content-length"]) || 0, + mimeType: this.getContentType(filePath), + }; + + if ( + !(await this.validatePolicy( + action, + resource, + user, + volumeConfig.policy, + )) + ) { + res.status(403).json({ + error: "Forbidden", + message: "You do not have permission to download this file", + }); + return; + } + + // Stream the file + await this.streamFile(fullPath, filePath, res); + } catch (error: any) { + if ( + error.message?.includes("NOT_FOUND") || + error.statusCode === 404 + ) { + res.status(404).json({ error: "File not found" }); + } else { + throw error; + } + } + } + } catch (error) { + console.error("[VolumeServing] Error handling GET request:", error); + if (!res.headersSent) { + res.status(500).json({ error: "Internal server error" }); + } + } + } + + private async handlePostRequest( + req: IAppRequest, + res: Response, + configKey: string, + volumeConfig: VolumeConfig, + ): Promise { + try { + const requestContext = getRequestContext(); + const serviceClient: WorkspaceClient = + requestContext.serviceDatabricksClient; + const user: User = { id: requestContext.userId }; + + const filePath = this.parsePath(req.path, configKey); + const fullPath = this.normalizePath( + filePath, + volumeConfig.volumePath, + volumeConfig.pathPrefix, + ); + + if (!fullPath) { + res.status(403).json({ + error: "Invalid path", + message: "Path traversal attempts are not allowed", + }); + return; + } + + // Don't allow upload to directories + if (filePath.endsWith("/")) { + res.status(400).json({ + error: "Bad request", + message: "Cannot upload to a directory path. Specify a file name.", + }); + return; + } + + // Build resource for policy validation + const contentLength = req.headers["content-length"] + ? Number.parseInt(req.headers["content-length"]) + : 0; + const contentType = + (req.headers["content-type"] as string) || "application/octet-stream"; + + const resource: Resource = { + path: filePath, + volume: configKey, + isDirectory: false, + size: contentLength, + mimeType: contentType, + }; + + // Validate upload/upsert policies (both mean the same thing) + const canUpload = + (await this.validatePolicy( + "upload", + resource, + user, + volumeConfig.policy, + )) || + (await this.validatePolicy( + "upsert", + resource, + user, + volumeConfig.policy, + )); + + if (!canUpload) { + res.status(403).json({ + error: "Forbidden", + message: "You do not have permission to upload to this path", + }); + return; + } + + // File size limit check (100MB) + const MAX_FILE_SIZE = 100 * 1024 * 1024; + if (contentLength > MAX_FILE_SIZE) { + res.status(413).json({ + error: "Payload too large", + message: `File size exceeds maximum allowed size of ${MAX_FILE_SIZE} bytes`, + }); + return; + } + + // Collect request body + const chunks: Buffer[] = []; + + req.on("data", (chunk: Buffer) => { + chunks.push(chunk); + }); + + req.on("end", async () => { + try { + const fileBuffer = Buffer.concat(chunks); + + // Convert Buffer to ReadableStream for SDK upload + const nodeStream = Readable.from(fileBuffer); + const webStream = Readable.toWeb(nodeStream); + + // Upload file to Databricks volume + await serviceClient.files.upload({ + file_path: fullPath, + contents: webStream as any, // Type cast due to ReadableStream version differences + overwrite: true, + }); + + // Call onAfterUpload callback if defined + if (volumeConfig.onAfterUpload) { + try { + await volumeConfig.onAfterUpload(req, res, resource, user); + } catch (callbackError) { + console.error( + "[VolumeServing] onAfterUpload callback error:", + callbackError, + ); + // If callback fails and response not sent, send error + if (!res.headersSent) { + res.status(500).json({ + error: "Callback failed", + message: "File uploaded but callback failed", + }); + } + } + } else { + // Send default success response if no callback + res.status(200).json({ + success: true, + message: "File uploaded successfully", + path: filePath, + size: fileBuffer.length, + }); + } + } catch (uploadError: any) { + console.error("[VolumeServing] Error uploading file:", uploadError); + if (!res.headersSent) { + res.status(500).json({ + error: "Upload failed", + message: + uploadError.message || "Failed to upload file to volume", + }); + } + } + }); + + req.on("error", (error) => { + console.error("[VolumeServing] Error reading request body:", error); + if (!res.headersSent) { + res.status(500).json({ + error: "Upload failed", + message: "Failed to read request body", + }); + } + }); + } catch (error) { + console.error("[VolumeServing] Error handling POST request:", error); + if (!res.headersSent) { + res.status(500).json({ error: "Internal server error" }); + } + } + } + + injectRoutes(router: IAppRouter): void { + // Register routes for each volume config + for (const [configKey, volumeConfig] of Object.entries( + this.config.volumeConfigs, + )) { + // GET route for download and list operations + router.get(`/${configKey}/*`, async (req, res) => { + await this.handleGetRequest(req, res, configKey, volumeConfig); + }); + + // POST route for upload operations + router.post(`/${configKey}/*`, async (req, res) => { + await this.handlePostRequest(req, res, configKey, volumeConfig); + }); + } + } + + private async streamFile( + fullPath: string, + displayPath: string, + res: Response, + ): Promise { + const requestContext = getRequestContext(); + const serviceClient: WorkspaceClient = + requestContext.serviceDatabricksClient; + + try { + const downloadResponse = await serviceClient.files.download({ + file_path: fullPath, + }); + + const webStream = downloadResponse.contents; + if (!webStream) { + res.status(404).json({ error: "File not found or cannot be read" }); + return; + } + + const nodeStream = Readable.fromWeb(webStream as any); + // const contentType = downloadResponse["content-type"] || this.getContentType(displayPath); + const contentType = this.getContentType(displayPath); + + res.setHeader("Content-Type", contentType); + if (downloadResponse["content-length"]) { + res.setHeader( + "Content-Length", + downloadResponse["content-length"].toString(), + ); + } + res.setHeader("Cache-Control", "public, max-age=3600"); + if (downloadResponse["last-modified"]) { + res.setHeader("Last-Modified", downloadResponse["last-modified"]); + const etag = `"${Buffer.from(downloadResponse["last-modified"]).toString("base64")}"`; + res.setHeader("ETag", etag); + } + res.setHeader("Accept-Ranges", "bytes"); + + await pipeline(nodeStream, res); + } catch (error: any) { + console.error("[VolumeServing] Error downloading file:", error); + if (!res.headersSent) { + if (error.message?.includes("NOT_FOUND") || error.statusCode === 404) { + res.status(404).json({ error: "File not found" }); + } else { + res.status(500).json({ error: "Failed to download file" }); + } + } + } + } + + private getContentType(filePath: string): string { + const ext = filePath.split(".").pop()?.toLowerCase(); + const mimeTypes: Record = { + // Text + txt: "text/plain", + html: "text/html", + css: "text/css", + js: "application/javascript", + json: "application/json", + xml: "application/xml", + csv: "text/csv", + + // Images + jpg: "image/jpeg", + jpeg: "image/jpeg", + png: "image/png", + gif: "image/gif", + svg: "image/svg+xml", + webp: "image/webp", + ico: "image/x-icon", + + // Documents + pdf: "application/pdf", + doc: "application/msword", + docx: "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + xls: "application/vnd.ms-excel", + xlsx: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + + // Archives + zip: "application/zip", + tar: "application/x-tar", + gz: "application/gzip", + + // Media + mp4: "video/mp4", + mp3: "audio/mpeg", + wav: "audio/wav", + + // Python/Data Science + py: "text/x-python", + ipynb: "application/x-ipynb+json", + pkl: "application/octet-stream", + h5: "application/octet-stream", + parquet: "application/octet-stream", + }; + + return mimeTypes[ext || ""] || "application/octet-stream"; + } + + private async *listDirectoryStream( + fullPath: string, + displayPath: string, + ): AsyncGenerator { + const requestContext = getRequestContext(); + const serviceClient: WorkspaceClient = + requestContext.serviceDatabricksClient; + + yield { + type: "metadata", + path: displayPath, + volumePath: this.config.volumePath, + }; + + const iterator = serviceClient.files.listDirectoryContents({ + directory_path: fullPath, + }); + + for await (const item of iterator) { + const itemPath = `${displayPath}${item.name}${item.is_directory ? "/" : ""}`; + const mimeType = item.is_directory + ? null + : this.getContentType(item.name || ""); + + yield { + type: "file", + name: item.name || "", + path: itemPath, + isDirectory: item.is_directory || false, + size: item.file_size, + mimeType, + }; + } + } +} + +export const volumeServing = toPlugin< + typeof VolumeServingPlugin, + VolumeServingConfig, + "volumeServing" +>(VolumeServingPlugin, "volumeServing"); diff --git a/packages/app-kit-ui/src/react/hooks/use-analytics-query.ts b/packages/app-kit-ui/src/react/hooks/use-analytics-query.ts index c3cc7bfe..c945f45e 100644 --- a/packages/app-kit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/app-kit-ui/src/react/hooks/use-analytics-query.ts @@ -1,5 +1,5 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import { connectSSE } from "@/js"; +import { connectSSE } from "../../js"; import type { InferResult, QueryKey, diff --git a/packages/app-kit/src/analytics/analytics.ts b/packages/app-kit/src/analytics/analytics.ts index f89ed022..95ade0af 100644 --- a/packages/app-kit/src/analytics/analytics.ts +++ b/packages/app-kit/src/analytics/analytics.ts @@ -7,11 +7,11 @@ import type { QuerySchemas, StreamExecutionSettings, } from "shared"; -import { generateQueryRegistryTypes } from "@/utils/type-generator"; import { SQLWarehouseConnector } from "../connectors"; import { Plugin, toPlugin } from "../plugin"; import type { Request, Response } from "../utils"; import { getRequestContext } from "../utils"; +import { generateQueryRegistryTypes } from "../utils/type-generator"; import { queryDefaults } from "./defaults"; import { QueryProcessor } from "./query"; import { diff --git a/packages/app-kit/src/index.ts b/packages/app-kit/src/index.ts index ce831415..12fccf07 100644 --- a/packages/app-kit/src/index.ts +++ b/packages/app-kit/src/index.ts @@ -1,3 +1,8 @@ +export type { + BasePluginConfig, + IAppRouter, + StreamExecutionSettings, +} from "shared"; export { analytics } from "./analytics"; export { CacheManager } from "./cache"; export { createApp } from "./core"; @@ -11,9 +16,4 @@ export { type Span, SpanStatusCode, } from "./telemetry"; -export type { - BasePluginConfig, - IAppRouter, - StreamExecutionSettings, -} from "shared"; export { getRequestContext } from "./utils"; diff --git a/packages/app-kit/src/server/index.ts b/packages/app-kit/src/server/index.ts index afc8ea48..e62b78bc 100644 --- a/packages/app-kit/src/server/index.ts +++ b/packages/app-kit/src/server/index.ts @@ -4,10 +4,10 @@ import path from "node:path"; import dotenv from "dotenv"; import express from "express"; import type { PluginPhase } from "shared"; -import { generatePluginRegistryTypes } from "@/utils/type-generator"; import { Plugin, toPlugin } from "../plugin"; import { instrumentations } from "../telemetry"; import { databricksClientMiddleware, isRemoteServerEnabled } from "../utils"; +import { generatePluginRegistryTypes } from "../utils/type-generator"; import { DevModeManager } from "./dev-mode"; import type { ServerConfig } from "./types"; import { getQueries, getRoutes } from "./utils"; diff --git a/packages/app-kit/src/server/types.ts b/packages/app-kit/src/server/types.ts index 51458151..8e7d6a13 100644 --- a/packages/app-kit/src/server/types.ts +++ b/packages/app-kit/src/server/types.ts @@ -1,5 +1,5 @@ import type { BasePluginConfig } from "shared"; -import type { Plugin } from "@/plugin"; +import type { Plugin } from "../plugin"; export interface ServerConfig extends BasePluginConfig { port?: number; diff --git a/packages/app-kit/src/utils/type-generator.ts b/packages/app-kit/src/utils/type-generator.ts index 0d633eac..53872678 100644 --- a/packages/app-kit/src/utils/type-generator.ts +++ b/packages/app-kit/src/utils/type-generator.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import type { QuerySchemas } from "shared"; import { createAuxiliaryTypeStore, printNode, zodToTs } from "zod-to-ts"; -import { type Plugin, routeSchemaRegistry } from "@/plugin"; +import { type Plugin, routeSchemaRegistry } from "../plugin"; interface AppKitRegistry { pluginRegistry: string;