diff --git a/.gitignore b/.gitignore index 323adf4..ef88921 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,22 @@ .env .DS_Store + +# Python __pycache__/ *.py[cod] *.pyo *.pyd .Python build/ -dist/ *.egg-info/ env/ venv/ .venv/ .pytest_cache/ .coverage + +# TypeScript / Node +node_modules/ +dist/ +*.tsbuildinfo +package-lock.json diff --git a/README.md b/README.md index 0beb6ec..d7dfa95 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,9 @@ A curated collection of production-ready workflow examples demonstrating various ## Overview -These examples demonstrate how to build robust, scalable workflows using Render's Python SDK. All examples follow best practices for production deployments and include comprehensive documentation. +These examples demonstrate how to build robust, scalable workflows using Render's SDK. Each example is available in both **Python** and **TypeScript**, with implementations side by side in `python/` and `typescript/` subdirectories. All examples follow best practices for production deployments and include comprehensive documentation. -**Important Notes:** -- **Python-only**: Render Workflows are currently only supported in Python via `render-sdk` -- **Service Type**: All workflow services must be deployed as Workflow services on Render +**Service type**: All workflow services must be deployed as Workflow services on Render. ## Examples diff --git a/data-pipeline/README.md b/data-pipeline/README.md index 8c348bb..a5bf6b5 100644 --- a/data-pipeline/README.md +++ b/data-pipeline/README.md @@ -45,67 +45,63 @@ run_data_pipeline (orchestrator) └── aggregate_insights ``` -## Local Development +## Local development ### Prerequisites -- Python 3.10+ -### Setup and Run +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) -```bash -# Navigate to example directory -cd data-pipeline +### Python -# Install dependencies +```bash +cd data-pipeline/python pip install -r requirements.txt - -# Run the workflow service python main.py ``` -## Deploying to Render - -### Service Configuration - -**Service Type**: Workflow +### TypeScript -**Build Command**: ```bash -cd data-pipeline && pip install -r requirements.txt +cd data-pipeline/typescript +npm install +npm run dev ``` -**Start Command**: -```bash -cd data-pipeline && python main.py -``` +## Deploying to Render + +### Service configuration + +**Service type**: Workflow -### Environment Variables +**Python:** -Required: -- `RENDER_API_KEY` - Your Render API key (from Render dashboard) +| Setting | Value | +|---|---| +| Build command | `cd data-pipeline/python && pip install -r requirements.txt` | +| Start command | `cd data-pipeline/python && python main.py` | -Optional (if using real APIs): -- Any API keys for external services you integrate +**TypeScript:** -### Deployment Steps +| Setting | Value | +|---|---| +| Build command | `cd data-pipeline/typescript && npm install && npm run build` | +| Start command | `cd data-pipeline/typescript && npm start` | -1. **Create Workflow Service** - - Go to Render Dashboard - - Click "New +" → "Workflow" - - Connect your repository - - Name: `data-pipeline-workflows` +### Environment variables -2. **Configure Build Settings** - - Build Command: `cd data-pipeline && pip install -r requirements.txt` - - Start Command: `cd data-pipeline && python main.py` +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Render Dashboard) | -3. **Set Environment Variables** - - Add `RENDER_API_KEY` in the Environment section - - Get API key from: Render Dashboard → Account Settings → API Keys +### Deployment steps -4. **Deploy** - - Click "Create Workflow" - - Render will build and start your workflow service +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository. +1. Set the build and start commands for your chosen language. +1. Add `RENDER_API_KEY` in the Environment section. +1. Click **Create Workflow**. ## Testing in Render Dashboard @@ -373,11 +369,9 @@ async def send_pipeline_notification(result: dict) -> dict: 4. **Timeout Settings**: HTTP client configured with 30s timeout 5. **Error Isolation**: One source failure doesn't block others -## Important Notes +## Important notes -- **Python-only**: Workflows are only supported in Python via render-sdk -- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration -- **Mock Data**: Example uses simulated data; replace with real API calls in production -- **Idempotency**: Design pipeline to be safely re-runnable -- **Monitoring**: Add logging and metrics for production deployments -- **Cost**: Consider API rate limits and costs for external services +- **Mock data**: This example uses simulated data. Replace with real API calls in production. +- **Idempotency**: Design pipeline to be safely re-runnable. +- **Monitoring**: Add logging and metrics for production deployments. +- **Cost**: Consider API rate limits and costs for external services. diff --git a/data-pipeline/main.py b/data-pipeline/python/main.py similarity index 100% rename from data-pipeline/main.py rename to data-pipeline/python/main.py diff --git a/data-pipeline/requirements.txt b/data-pipeline/python/requirements.txt similarity index 100% rename from data-pipeline/requirements.txt rename to data-pipeline/python/requirements.txt diff --git a/data-pipeline/typescript/package.json b/data-pipeline/typescript/package.json new file mode 100644 index 0000000..65ca0e2 --- /dev/null +++ b/data-pipeline/typescript/package.json @@ -0,0 +1,19 @@ +{ + "name": "data-pipeline-workflow", + "version": "1.0.0", + "description": "Data Pipeline - Multi-source customer analytics with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7" + }, + "devDependencies": { + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/data-pipeline/typescript/src/main.ts b/data-pipeline/typescript/src/main.ts new file mode 100644 index 0000000..8403555 --- /dev/null +++ b/data-pipeline/typescript/src/main.ts @@ -0,0 +1,332 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; + +interface User { + id: string; + name: string; + email: string; + plan: string; +} + +interface Transaction { + id: string; + user_id: string; + amount: number; + type: string; + date: string; +} + +interface Engagement { + user_id: string; + page_views: number; + sessions: number; + last_active: string; + feature_usage: { search: number; export: number; share: number }; +} + +interface EnrichedUser { + user_id: string; + name: string; + email: string; + plan: string; + transaction_count: number; + total_spent: number; + total_refunded: number; + net_revenue: number; + engagement_score: number; + segment: string; + page_views: number; + sessions: number; + geo: { country: string; timezone: string; language: string }; +} + +const retry = { + maxRetries: 3, + waitDurationMs: 2000, + factor: 1.5, +}; + +function simpleHash(str: string): number { + let hash = 0; + for (let i = 0; i < str.length; i++) { + hash = (hash * 31 + str.charCodeAt(i)) | 0; + } + return Math.abs(hash); +} + +// ---- Data Source Tasks ---- + +const fetchUserData = task( + { name: "fetchUserData", retry }, + async function fetchUserData(userIds: string[]) { + console.log(`[SOURCE] Fetching user data for ${userIds.length} users`); + + const mockUsers: { [key: string]: User } = { + user_1: { id: "user_1", name: "Alice Johnson", email: "alice@example.com", plan: "premium" }, + user_2: { id: "user_2", name: "Bob Smith", email: "bob@example.com", plan: "basic" }, + user_3: { id: "user_3", name: "Charlie Brown", email: "charlie@example.com", plan: "premium" }, + user_4: { id: "user_4", name: "Diana Prince", email: "diana@example.com", plan: "basic" }, + }; + + const users = userIds.map( + (uid) => mockUsers[uid] ?? { id: uid, name: "Unknown", email: `${uid}@example.com`, plan: "none" }, + ); + + console.log(`[SOURCE] Fetched ${users.length} user records`); + return { success: true, source: "user_service", count: users.length, data: users }; + }, +); + +const fetchTransactionData = task( + { name: "fetchTransactionData", retry }, + async function fetchTransactionData(userIds: string[], days: number = 30) { + console.log(`[SOURCE] Fetching transactions for ${userIds.length} users (${days} days)`); + + const transactions: Transaction[] = []; + const types = ["purchase", "refund", "subscription"]; + + for (const userId of userIds) { + const numTxns = (simpleHash(userId) % 10) + 1; + for (let i = 0; i < numTxns; i++) { + const key = `${userId}_${i}`; + const daysAgo = simpleHash(key) % days; + const date = new Date(Date.now() - daysAgo * 86400000); + transactions.push({ + id: `txn_${key}`, + user_id: userId, + amount: (simpleHash(key) % 10000) / 100, + type: types[simpleHash(key) % 3], + date: date.toISOString(), + }); + } + } + + console.log(`[SOURCE] Fetched ${transactions.length} transactions`); + return { success: true, source: "transaction_service", count: transactions.length, data: transactions }; + }, +); + +const fetchEngagementData = task( + { name: "fetchEngagementData", retry }, + async function fetchEngagementData(userIds: string[]) { + console.log(`[SOURCE] Fetching engagement data for ${userIds.length} users`); + + const engagement: Engagement[] = userIds.map((userId) => { + const daysAgo = simpleHash(userId) % 30; + return { + user_id: userId, + page_views: simpleHash(`pv_${userId}`) % 1000, + sessions: simpleHash(`sess_${userId}`) % 100, + last_active: new Date(Date.now() - daysAgo * 86400000).toISOString(), + feature_usage: { + search: simpleHash(`search_${userId}`) % 50, + export: simpleHash(`export_${userId}`) % 20, + share: simpleHash(`share_${userId}`) % 30, + }, + }; + }); + + console.log(`[SOURCE] Fetched engagement for ${engagement.length} users`); + return { success: true, source: "analytics_service", count: engagement.length, data: engagement }; + }, +); + +// ---- Enrichment Tasks ---- + +const enrichWithGeoData = task( + { name: "enrichWithGeoData", retry }, + async function enrichWithGeoData(userEmail: string) { + console.log(`[ENRICH] Enriching geo data for ${userEmail}`); + const idx = simpleHash(userEmail) % 4; + return { + country: ["USA", "Canada", "UK", "Germany"][idx], + timezone: ["America/New_York", "America/Toronto", "Europe/London", "Europe/Berlin"][idx], + language: ["en-US", "en-CA", "en-GB", "de-DE"][idx], + }; + }, +); + +const calculateUserMetrics = task( + { name: "calculateUserMetrics", retry }, + async function calculateUserMetrics( + user: User, + transactions: Transaction[], + engagement: Engagement, + ) { + console.log(`[METRICS] Calculating metrics for user ${user.id}`); + + const userTxns = transactions.filter((t) => t.user_id === user.id); + const totalSpent = userTxns.filter((t) => t.type === "purchase").reduce((s, t) => s + t.amount, 0); + const totalRefunded = userTxns.filter((t) => t.type === "refund").reduce((s, t) => s + t.amount, 0); + const netRevenue = totalSpent - totalRefunded; + + const featureSum = Object.values(engagement.feature_usage ?? {}).reduce((s, v) => s + v, 0); + const engagementScore = Math.min( + 100, + (engagement.page_views ?? 0) / 10 + (engagement.sessions ?? 0) / 2 + featureSum, + ); + + let segment: string; + if (user.plan === "premium" && netRevenue > 100) segment = "high_value"; + else if (user.plan === "premium") segment = "premium"; + else if (engagementScore > 50) segment = "engaged"; + else segment = "standard"; + + const metrics = { + user_id: user.id, + name: user.name, + email: user.email, + plan: user.plan, + transaction_count: userTxns.length, + total_spent: Math.round(totalSpent * 100) / 100, + total_refunded: Math.round(totalRefunded * 100) / 100, + net_revenue: Math.round(netRevenue * 100) / 100, + engagement_score: Math.round(engagementScore * 100) / 100, + segment, + page_views: engagement.page_views ?? 0, + sessions: engagement.sessions ?? 0, + }; + + console.log(`[METRICS] User ${user.id} - Segment: ${segment}, Revenue: $${netRevenue.toFixed(2)}`); + return metrics; + }, +); + +// ---- Transformation Tasks ---- + +const transformUserData = task( + { name: "transformUserData", retry }, + async function transformUserData( + userData: { data: User[] }, + transactionData: { data: Transaction[] }, + engagementData: { data: Engagement[] }, + ) { + console.log("[TRANSFORM] Combining data from multiple sources"); + + const users = userData.data ?? []; + const transactions = transactionData.data ?? []; + const engagementList = engagementData.data ?? []; + + const engagementMap = new Map(engagementList.map((e) => [e.user_id, e])); + + console.log(`[TRANSFORM] Processing ${users.length} users with enrichment`); + + const enrichedUsers = []; + for (const user of users) { + const userEngagement = engagementMap.get(user.id) ?? ({} as Engagement); + const userMetrics = await calculateUserMetrics(user, transactions, userEngagement); + const geoData = await enrichWithGeoData(user.email); + enrichedUsers.push({ ...userMetrics, geo: geoData }); + } + + console.log(`[TRANSFORM] Enriched ${enrichedUsers.length} user profiles`); + return { success: true, count: enrichedUsers.length, data: enrichedUsers }; + }, +); + +// ---- Aggregation Tasks ---- + +const aggregateInsights = task( + { name: "aggregateInsights", retry }, + function aggregateInsights(enrichedData: { data: EnrichedUser[] }) { + console.log("[AGGREGATE] Generating insights from enriched data"); + + const users = enrichedData.data ?? []; + if (users.length === 0) return { success: false, error: "No data to aggregate" }; + + const segments: { [key: string]: number } = {}; + const countries: { [key: string]: number } = {}; + + for (const user of users) { + segments[user.segment] = (segments[user.segment] ?? 0) + 1; + const country = user.geo?.country ?? "Unknown"; + countries[country] = (countries[country] ?? 0) + 1; + } + + const totalRevenue = users.reduce((s, u) => s + u.net_revenue, 0); + const avgRevenue = users.length > 0 ? totalRevenue / users.length : 0; + const avgEngagement = users.length > 0 + ? users.reduce((s, u) => s + u.engagement_score, 0) / users.length + : 0; + + const topUsers = [...users] + .sort((a, b) => b.net_revenue - a.net_revenue) + .slice(0, 5) + .map((u) => ({ name: u.name, revenue: u.net_revenue, segment: u.segment })); + + const insights = { + total_users: users.length, + segment_distribution: segments, + revenue: { + total: Math.round(totalRevenue * 100) / 100, + average_per_user: Math.round(avgRevenue * 100) / 100, + top_users: topUsers, + }, + engagement: { + average_score: Math.round(avgEngagement * 100) / 100, + total_page_views: users.reduce((s, u) => s + u.page_views, 0), + total_sessions: users.reduce((s, u) => s + u.sessions, 0), + }, + geographic_distribution: countries, + generated_at: new Date().toISOString(), + }; + + console.log(`[AGGREGATE] Insights generated: ${users.length} users, $${totalRevenue.toFixed(2)} revenue`); + return insights; + }, +); + +// Root task: full pipeline orchestrator +task( + { name: "runDataPipeline", retry, timeoutSeconds: 300 }, + async function runDataPipeline(userIds: string[]) { + console.log("=".repeat(80)); + console.log("[PIPELINE] Starting Data Pipeline"); + console.log(`[PIPELINE] Processing ${userIds.length} users`); + console.log("=".repeat(80)); + + try { + // Stage 1: Parallel extraction + console.log("[PIPELINE] Stage 1/3: EXTRACT (parallel)"); + const [userData, transactionData, engagementData] = await Promise.all([ + fetchUserData(userIds), + fetchTransactionData(userIds), + fetchEngagementData(userIds), + ]); + + console.log( + `[PIPELINE] Extracted: ${userData.count} users, ${transactionData.count} transactions, ${engagementData.count} engagement records`, + ); + + // Stage 2: Transform + console.log("[PIPELINE] Stage 2/3: TRANSFORM"); + const enrichedData = await transformUserData(userData, transactionData, engagementData); + console.log(`[PIPELINE] Enriched ${enrichedData.count} user profiles`); + + // Stage 3: Aggregate + console.log("[PIPELINE] Stage 3/3: AGGREGATE"); + const insights = await aggregateInsights(enrichedData as { data: EnrichedUser[] }); + + const pipelineResult = { + status: "success", + user_count: userIds.length, + stages: { + extract: { users: userData.count, transactions: transactionData.count, engagement: engagementData.count }, + transform: { enriched_users: enrichedData.count }, + aggregate: { insights }, + }, + insights, + completed_at: new Date().toISOString(), + }; + + console.log("=".repeat(80)); + console.log("[PIPELINE] Data Pipeline Complete!"); + console.log("=".repeat(80)); + + return pipelineResult; + } catch (error) { + console.error(`[PIPELINE] Pipeline failed: ${error}`); + return { status: "failed", error: String(error), failed_at: new Date().toISOString() }; + } + }, +); diff --git a/data-pipeline/typescript/tsconfig.json b/data-pipeline/typescript/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/data-pipeline/typescript/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/etl-job/README.md b/etl-job/README.md index f265807..f06043f 100644 --- a/etl-job/README.md +++ b/etl-job/README.md @@ -34,61 +34,60 @@ run_etl_pipeline (main orchestrator) ## Local Development ### Prerequisites -- Python 3.10+ -### Setup and Run +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) -```bash -# Navigate to example directory -cd etl-job +### Python -# Install dependencies +```bash +cd etl-job/python pip install -r requirements.txt - -# Run the workflow service python main.py ``` -## Deploying to Render - -### Service Configuration - -**Service Type**: Workflow +### TypeScript -**Build Command**: ```bash -cd etl-job && pip install -r requirements.txt +cd etl-job/typescript +npm install +npm run dev ``` -**Start Command**: -```bash -cd etl-job && python main.py -``` +## Deploying to Render + +### Service configuration + +**Service type**: Workflow + +**Python:** -### Environment Variables +| Setting | Value | +|---|---| +| Build command | `cd etl-job/python && pip install -r requirements.txt` | +| Start command | `cd etl-job/python && python main.py` | -Required: -- `RENDER_API_KEY` - Your Render API key (from Render dashboard) +**TypeScript:** -### Deployment Steps +| Setting | Value | +|---|---| +| Build command | `cd etl-job/typescript && npm install && npm run build` | +| Start command | `cd etl-job/typescript && npm start` | -1. **Create Workflow Service** - - Go to Render Dashboard - - Click "New +" → "Workflow" - - Connect your repository - - Name: `etl-job-workflows` +### Environment variables -2. **Configure Build Settings** - - Build Command: `cd etl-job && pip install -r requirements.txt` - - Start Command: `cd etl-job && python main.py` +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Render Dashboard) | -3. **Set Environment Variables** - - Add `RENDER_API_KEY` in the Environment section - - Get API key from: Render Dashboard → Account Settings → API Keys +### Deployment steps -4. **Deploy** - - Click "Create Workflow" - - Render will build and start your workflow service +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository. +1. Set the build and start commands for your chosen language. +1. Add `RENDER_API_KEY` in the Environment section. +1. Click **Create Workflow**. ## Testing in Render Dashboard @@ -218,8 +217,6 @@ async def transform_batch_parallel(records: list[dict]) -> dict: return results ``` -## Important Notes +## Important notes -- **Python-only**: Workflows are only supported in Python via render-sdk -- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration -- **Service Type**: Deploy as a Workflow service on Render (not Background Worker or Web Service) +- **Service type**: Deploy as a Workflow service on Render (not Background Worker or Web Service). diff --git a/etl-job/main.py b/etl-job/python/main.py similarity index 100% rename from etl-job/main.py rename to etl-job/python/main.py diff --git a/etl-job/requirements.txt b/etl-job/python/requirements.txt similarity index 100% rename from etl-job/requirements.txt rename to etl-job/python/requirements.txt diff --git a/etl-job/typescript/package.json b/etl-job/typescript/package.json new file mode 100644 index 0000000..3589fc6 --- /dev/null +++ b/etl-job/typescript/package.json @@ -0,0 +1,20 @@ +{ + "name": "etl-job-workflow", + "version": "1.0.0", + "description": "ETL Job - Extract, Transform, Load pipeline with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7" + }, + "devDependencies": { + "@types/node": "^22.13.0", + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/etl-job/typescript/src/main.ts b/etl-job/typescript/src/main.ts new file mode 100644 index 0000000..83e8239 --- /dev/null +++ b/etl-job/typescript/src/main.ts @@ -0,0 +1,248 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; +import { readFileSync, existsSync } from "node:fs"; +import { resolve } from "node:path"; + +interface Record { + id?: string; + name?: string; + email?: string; + age?: string; + country?: string; + [key: string]: string | undefined; +} + +interface ValidatedRecord { + id: string | undefined; + name: string; + email: string | null; + age: number | null; + country: string; + is_valid: boolean; + errors: string[]; + warnings: string[]; +} + +const retry = { + maxRetries: 3, + waitDurationMs: 1000, + factor: 1.5, +}; + +// Subtask: extract rows from a CSV file +const extractCsvData = task( + { name: "extractCsvData", retry }, + function extractCsvData(filePath: string): Record[] { + console.log(`[EXTRACT] Reading CSV file: ${filePath}`); + + const fullPath = resolve(filePath); + if (!existsSync(fullPath)) { + console.warn("[EXTRACT] File not found, using sample data"); + return [ + { id: "1", name: "Alice", email: "alice@example.com", age: "28", country: "USA" }, + { id: "2", name: "Bob", email: "bob@example.com", age: "34", country: "Canada" }, + { id: "3", name: "Charlie", email: "invalid-email", age: "invalid", country: "UK" }, + ]; + } + + const content = readFileSync(fullPath, "utf-8"); + const lines = content.trim().split("\n"); + if (lines.length < 2) return []; + + const headers = lines[0].split(",").map((h) => h.trim()); + const records: Record[] = []; + + for (let i = 1; i < lines.length; i++) { + const values = lines[i].split(",").map((v) => v.trim()); + const record: Record = {}; + headers.forEach((h, idx) => { + record[h] = values[idx]; + }); + records.push(record); + } + + console.log(`[EXTRACT] Successfully extracted ${records.length} records`); + return records; + }, +); + +// Subtask: validate and clean a single record +const validateRecord = task( + { name: "validateRecord", retry }, + function validateRecord(record: Record): ValidatedRecord { + console.log(`[TRANSFORM] Validating record ID: ${record.id ?? "unknown"}`); + + const errors: string[] = []; + const warnings: string[] = []; + + if (!record.name) errors.push("Missing name"); + if (!record.email) errors.push("Missing email"); + + const email = record.email ?? ""; + if (email && !email.includes("@")) errors.push("Invalid email format"); + + let age: number | null = null; + try { + age = parseInt(record.age ?? "0", 10); + if (isNaN(age) || age < 0 || age > 120) { + errors.push(`Invalid age: ${record.age}`); + age = null; + } + } catch { + errors.push(`Age must be a number: ${record.age}`); + } + + const cleaned: ValidatedRecord = { + id: record.id, + name: (record.name ?? "").trim(), + email: email ? email.toLowerCase().trim() : null, + age, + country: (record.country ?? "").trim(), + is_valid: errors.length === 0, + errors, + warnings, + }; + + const status = cleaned.is_valid ? "VALID" : "INVALID"; + console.log(`[TRANSFORM] Record ${record.id}: ${status}`); + return cleaned; + }, +); + +// Subtask: validate a batch of records by calling validateRecord for each +const transformBatch = task( + { name: "transformBatch", retry }, + async function transformBatch(records: Record[]) { + console.log(`[TRANSFORM] Starting batch transformation of ${records.length} records`); + + const validRecords: ValidatedRecord[] = []; + const invalidRecords: ValidatedRecord[] = []; + + for (let i = 0; i < records.length; i++) { + console.log(`[TRANSFORM] Processing record ${i + 1}/${records.length}`); + const validated = await validateRecord(records[i]); + + if (validated.is_valid) { + validRecords.push(validated); + } else { + invalidRecords.push(validated); + } + } + + const result = { + valid_records: validRecords, + invalid_records: invalidRecords, + total_processed: records.length, + valid_count: validRecords.length, + invalid_count: invalidRecords.length, + success_rate: records.length > 0 ? validRecords.length / records.length : 0, + }; + + console.log( + `[TRANSFORM] Batch complete: ${result.valid_count} valid, ${result.invalid_count} invalid`, + ); + return result; + }, +); + +// Subtask: compute statistics from validated records +const computeStatistics = task( + { name: "computeStatistics", retry }, + function computeStatistics(validRecords: ValidatedRecord[]) { + console.log(`[LOAD] Computing statistics for ${validRecords.length} records`); + + if (validRecords.length === 0) { + console.warn("[LOAD] No valid records to analyze"); + return { total_records: 0, country_distribution: {}, age_stats: {} }; + } + + const countryCounts: { [key: string]: number } = {}; + for (const record of validRecords) { + const country = record.country || "Unknown"; + countryCounts[country] = (countryCounts[country] ?? 0) + 1; + } + + const ages = validRecords + .filter((r) => r.age !== null) + .map((r) => r.age as number); + + const ageStats = + ages.length > 0 + ? { + min: Math.min(...ages), + max: Math.max(...ages), + average: ages.reduce((a, b) => a + b, 0) / ages.length, + count: ages.length, + } + : {}; + + const statistics = { + total_records: validRecords.length, + country_distribution: countryCounts, + age_stats: ageStats, + timestamp: new Date().toISOString(), + }; + + console.log("[LOAD] Statistics computed successfully"); + return statistics; + }, +); + +// Root task: orchestrates the full ETL pipeline +task( + { name: "runEtlPipeline", retry, timeoutSeconds: 300 }, + async function runEtlPipeline(sourceFile: string) { + console.log("=".repeat(80)); + console.log("[PIPELINE] Starting ETL Pipeline"); + console.log(`[PIPELINE] Source: ${sourceFile}`); + console.log("=".repeat(80)); + + try { + console.log("[PIPELINE] Stage 1/3: EXTRACT"); + const rawRecords = await extractCsvData(sourceFile); + console.log(`[PIPELINE] Extracted ${rawRecords.length} records`); + + console.log("[PIPELINE] Stage 2/3: TRANSFORM"); + const transformResult = await transformBatch(rawRecords); + console.log( + `[PIPELINE] Transformation complete: ${(transformResult.success_rate * 100).toFixed(1)}% success rate`, + ); + + console.log("[PIPELINE] Stage 3/3: LOAD"); + const statistics = await computeStatistics(transformResult.valid_records); + console.log("[PIPELINE] Statistics computed"); + + const pipelineResult = { + status: "success", + extract: { + records_extracted: rawRecords.length, + source: sourceFile, + }, + transform: { + valid_count: transformResult.valid_count, + invalid_count: transformResult.invalid_count, + success_rate: transformResult.success_rate, + invalid_records: transformResult.invalid_records, + }, + load: { statistics }, + completed_at: new Date().toISOString(), + }; + + console.log("=".repeat(80)); + console.log("[PIPELINE] ETL Pipeline Complete!"); + console.log(`[PIPELINE] Processed: ${rawRecords.length} records`); + console.log(`[PIPELINE] Valid: ${transformResult.valid_count} records`); + console.log(`[PIPELINE] Invalid: ${transformResult.invalid_count} records`); + console.log("=".repeat(80)); + + return pipelineResult; + } catch (error) { + console.error(`[PIPELINE] ETL Pipeline failed: ${error}`); + return { + status: "failed", + error: String(error), + failed_at: new Date().toISOString(), + }; + } + }, +); diff --git a/etl-job/typescript/tsconfig.json b/etl-job/typescript/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/etl-job/typescript/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/file-analyzer/README.md b/file-analyzer/README.md index 74154c4..55d10ca 100644 --- a/file-analyzer/README.md +++ b/file-analyzer/README.md @@ -121,20 +121,30 @@ task_id = get_task_identifier("analyze_file") # Result: "file-analyzer-workflows/analyze_file" ``` -## Project Structure +## Project structure ``` file-analyzer/ -├── README.md # This file -├── workflow-service/ # Task SDK - Defines tasks -│ ├── requirements.txt # Python dependencies -│ ├── main.py # Task definitions -│ └── sample_files/ -│ ├── sales_data.csv # Sample sales data -│ └── customer_data.csv # Sample customer data -└── api-service/ # Client SDK - Calls tasks - ├── requirements.txt # Python dependencies - └── main.py # FastAPI endpoints +├── README.md +├── sample_files/ # Shared sample data +│ ├── sales_data.csv +│ └── customer_data.csv +├── python/ +│ ├── api-service/ # Client SDK - FastAPI +│ │ ├── main.py +│ │ └── requirements.txt +│ └── workflow-service/ # Task SDK - Workflows +│ ├── main.py +│ └── requirements.txt +└── typescript/ + ├── api-service/ # Client SDK - Express + │ ├── src/main.ts + │ ├── package.json + │ └── tsconfig.json + └── workflow-service/ # Task SDK - Workflows + ├── src/main.ts + ├── package.json + └── tsconfig.json ``` ## Workflow Service (Task SDK) @@ -228,45 +238,54 @@ print(result.status) # Task status (e.g., "SUCCEEDED") print(result.results) # Task return value ``` -## Local Development +## Local development ### Prerequisites -- Python 3.10+ +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) - Render API key -### Running the Workflow Service +### Running the workflow service -```bash -# Navigate to workflow service -cd file-analyzer/workflow-service +**Python:** -# Install dependencies +```bash +cd file-analyzer/python/workflow-service pip install -r requirements.txt - -# Run the service python main.py ``` -The service will start and register all tasks. Keep this running. +**TypeScript:** + +```bash +cd file-analyzer/typescript/workflow-service +npm install +npm run dev +``` + +Keep this running. -### Running the API Service +### Running the API service In a separate terminal: -```bash -# Navigate to API service -cd file-analyzer/api-service +**Python:** -# Install dependencies +```bash +cd file-analyzer/python/api-service pip install -r requirements.txt +cp .env.example .env # then edit .env with your keys +uvicorn main:app --host 0.0.0.0 --port 8000 +``` -# Set environment variables -export RENDER_API_KEY="your_render_api_key" -export WORKFLOW_SERVICE_SLUG="local" # For local development +**TypeScript:** -# Run the service -uvicorn main:app --host 0.0.0.0 --port 8000 +```bash +cd file-analyzer/typescript/api-service +npm install +cp .env.example .env # then edit .env with your keys +npm run dev ``` ### Testing Locally @@ -275,7 +294,7 @@ uvicorn main:app --host 0.0.0.0 --port 8000 ```bash curl -X POST "http://localhost:8000/analyze" \ - -F "file=@workflow-service/sample_files/sales_data.csv" + -F "file=@sample_files/sales_data.csv" ``` **Using Python:** @@ -283,7 +302,7 @@ curl -X POST "http://localhost:8000/analyze" \ ```python import requests -with open('workflow-service/sample_files/sales_data.csv', 'rb') as f: +with open('sample_files/sales_data.csv', 'rb') as f: response = requests.post( 'http://localhost:8000/analyze', files={'file': f} @@ -299,32 +318,40 @@ curl http://localhost:8000/health ## Deploying to Render -### Step 1: Deploy Workflow Service +### Step 1: Deploy workflow service + +**Service type**: Workflow + +**Python:** -**Service Type**: Workflow +| Setting | Value | +|---|---| +| Name | `file-analyzer-workflows` (this becomes your slug) | +| Build command | `cd file-analyzer/python/workflow-service && pip install -r requirements.txt` | +| Start command | `cd file-analyzer/python/workflow-service && python main.py` | -**Configuration:** -- **Name**: `file-analyzer-workflows` (this becomes your slug!) -- **Build Command**: - ```bash - cd file-analyzer/workflow-service && pip install -r requirements.txt - ``` -- **Start Command**: - ```bash - cd file-analyzer/workflow-service && python main.py - ``` +**TypeScript:** -**Environment Variables:** -- `RENDER_API_KEY` - Your Render API key (from Account Settings → API Keys) +| Setting | Value | +|---|---| +| Name | `file-analyzer-workflows` (this becomes your slug) | +| Build command | `cd file-analyzer/typescript/workflow-service && npm install && npm run build` | +| Start command | `cd file-analyzer/typescript/workflow-service && npm start` | -**Deployment Steps:** -1. Go to Render Dashboard -2. Click **"New +"** → **"Workflow"** -3. Connect your repository -4. Configure as above -5. Click **"Create Workflow"** +**Environment variables:** -**Important:** Note the service slug (usually the service name in lowercase with hyphens). You'll need this for the API service. +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Account Settings > API Keys) | + +**Deployment steps:** + +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository and configure as above. +1. Click **Create Workflow**. + +Note the service slug -- you'll need this for the API service. ## Testing Workflow Service in Render Dashboard @@ -395,32 +422,40 @@ Returns statistical metrics for numeric columns. **Note:** The workflow service doesn't handle file uploads - it processes raw CSV content. For file uploads, use the API service (tested via HTTP endpoints, not the Dashboard). -### Step 2: Deploy API Service - -**Service Type**: Web Service - -**Configuration:** -- **Name**: `file-analyzer-api` -- **Build Command**: - ```bash - cd file-analyzer/api-service && pip install -r requirements.txt - ``` -- **Start Command**: - ```bash - cd file-analyzer/api-service && uvicorn main:app --host 0.0.0.0 --port $PORT - ``` - -**Environment Variables:** -- `RENDER_API_KEY` - Your Render API key (same as workflow service) -- `WORKFLOW_SERVICE_SLUG` - Your workflow service slug (e.g., `file-analyzer-workflows`) - -**Deployment Steps:** -1. Go to Render Dashboard -2. Click **"New +"** → **"Web Service"** -3. Connect your repository -4. Configure as above -5. Set environment variables (including `WORKFLOW_SERVICE_SLUG`!) -6. Click **"Create Web Service"** +### Step 2: Deploy API service + +**Service type**: Web Service + +**Python:** + +| Setting | Value | +|---|---| +| Name | `file-analyzer-api` | +| Build command | `cd file-analyzer/python/api-service && pip install -r requirements.txt` | +| Start command | `cd file-analyzer/python/api-service && uvicorn main:app --host 0.0.0.0 --port $PORT` | + +**TypeScript:** + +| Setting | Value | +|---|---| +| Name | `file-analyzer-api` | +| Build command | `cd file-analyzer/typescript/api-service && npm install && npm run build` | +| Start command | `cd file-analyzer/typescript/api-service && npm start` | + +**Environment variables:** + +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (same as workflow service) | +| `WORKFLOW_SERVICE_SLUG` | Your workflow service slug (e.g., `file-analyzer-workflows`) | + +**Deployment steps:** + +1. Go to Render Dashboard. +1. Click **New +** > **Web Service**. +1. Connect your repository and configure as above. +1. Set environment variables (including `WORKFLOW_SERVICE_SLUG`). +1. Click **Create Web Service**. ### Step 3: Test the Deployed Services @@ -669,23 +704,19 @@ def parse_excel_data(file_content: bytes) -> dict: - Use file hash as cache key - Store results in Redis or database -## Important Notes +## Important notes -- **Python-only**: Render Workflows are only supported in Python via `render-sdk` -- **No Blueprint Support**: Workflows don't support `render.yaml` blueprint configuration -- **Service Types**: Workflow service (for tasks) vs Web Service (for API) -- **Task Arguments**: Passed as a dict: `{"arg1": value1, "arg2": value2}` -- **Awaitable Pattern**: Use `await task_run` to wait for completion -- **Service Slug**: Set correctly in `WORKFLOW_SERVICE_SLUG` environment variable -- **API Key**: Required in both services, get from Account Settings +- **Service types**: Workflow service (for tasks) vs. Web Service (for API). +- **Task arguments**: Passed as a dict/object: `{"arg1": value1, "arg2": value2}`. +- **Awaitable pattern**: Use `await taskRun` to wait for completion. +- **Service slug**: Set correctly in `WORKFLOW_SERVICE_SLUG` environment variable. +- **API key**: Required in both services, get from Account Settings. ## Resources -- [Render Workflows Documentation](https://docs.render.com/workflows) +- [Render Workflows documentation](https://docs.render.com/workflows) - [Render SDK on PyPI](https://pypi.org/project/render-sdk/) -- [FastAPI Documentation](https://fastapi.tiangolo.com/) +- [Render SDK on npm](https://www.npmjs.com/package/@renderinc/sdk) +- [FastAPI documentation](https://fastapi.tiangolo.com/) +- [Express documentation](https://expressjs.com/) - [Render Dashboard](https://dashboard.render.com/) - ---- - -**Built with Render Workflows** | [Render.com](https://render.com/) diff --git a/file-analyzer/python/api-service/.env.example b/file-analyzer/python/api-service/.env.example new file mode 100644 index 0000000..7cc1989 --- /dev/null +++ b/file-analyzer/python/api-service/.env.example @@ -0,0 +1,2 @@ +RENDER_API_KEY=your-render-api-key +WORKFLOW_SERVICE_SLUG=file-analyzer-workflows diff --git a/file-analyzer/api-service/main.py b/file-analyzer/python/api-service/main.py similarity index 99% rename from file-analyzer/api-service/main.py rename to file-analyzer/python/api-service/main.py index f1eadd2..65ccd08 100644 --- a/file-analyzer/api-service/main.py +++ b/file-analyzer/python/api-service/main.py @@ -14,6 +14,10 @@ import os import logging from typing import Any + +from dotenv import load_dotenv + +load_dotenv() from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from render_sdk import Render diff --git a/file-analyzer/api-service/requirements.txt b/file-analyzer/python/api-service/requirements.txt similarity index 80% rename from file-analyzer/api-service/requirements.txt rename to file-analyzer/python/api-service/requirements.txt index 5e5cdc1..9cc8bfc 100644 --- a/file-analyzer/api-service/requirements.txt +++ b/file-analyzer/python/api-service/requirements.txt @@ -2,3 +2,4 @@ render-sdk>=0.2.0 fastapi>=0.110.0 uvicorn[standard]>=0.27.0 python-multipart>=0.0.9 +python-dotenv>=1.0.0 diff --git a/file-analyzer/workflow-service/main.py b/file-analyzer/python/workflow-service/main.py similarity index 100% rename from file-analyzer/workflow-service/main.py rename to file-analyzer/python/workflow-service/main.py diff --git a/file-analyzer/workflow-service/requirements.txt b/file-analyzer/python/workflow-service/requirements.txt similarity index 100% rename from file-analyzer/workflow-service/requirements.txt rename to file-analyzer/python/workflow-service/requirements.txt diff --git a/file-analyzer/workflow-service/sample_files/customer_data.csv b/file-analyzer/sample_files/customer_data.csv similarity index 100% rename from file-analyzer/workflow-service/sample_files/customer_data.csv rename to file-analyzer/sample_files/customer_data.csv diff --git a/file-analyzer/workflow-service/sample_files/sales_data.csv b/file-analyzer/sample_files/sales_data.csv similarity index 100% rename from file-analyzer/workflow-service/sample_files/sales_data.csv rename to file-analyzer/sample_files/sales_data.csv diff --git a/file-analyzer/typescript/api-service/.env.example b/file-analyzer/typescript/api-service/.env.example new file mode 100644 index 0000000..7cc1989 --- /dev/null +++ b/file-analyzer/typescript/api-service/.env.example @@ -0,0 +1,2 @@ +RENDER_API_KEY=your-render-api-key +WORKFLOW_SERVICE_SLUG=file-analyzer-workflows diff --git a/file-analyzer/typescript/api-service/package.json b/file-analyzer/typescript/api-service/package.json new file mode 100644 index 0000000..3b4e9e6 --- /dev/null +++ b/file-analyzer/typescript/api-service/package.json @@ -0,0 +1,23 @@ +{ + "name": "file-analyzer-api-service", + "version": "1.0.0", + "description": "File Analyzer API Service - Express gateway for workflow tasks (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7", + "express": "^5.1.0", + "multer": "^1.4.5-lts.2" + }, + "devDependencies": { + "@types/express": "^5.0.0", + "@types/multer": "^1.4.12", + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/file-analyzer/typescript/api-service/src/main.ts b/file-analyzer/typescript/api-service/src/main.ts new file mode 100644 index 0000000..13b7e81 --- /dev/null +++ b/file-analyzer/typescript/api-service/src/main.ts @@ -0,0 +1,163 @@ +import "dotenv/config"; +import express from "express"; +import multer from "multer"; +import { Render, ClientError, ServerError } from "@renderinc/sdk"; + +const app = express(); +const upload = multer({ storage: multer.memoryStorage() }); + +function getClient(): Render { + const apiKey = process.env.RENDER_API_KEY; + if (!apiKey) { + throw new Error( + "RENDER_API_KEY not configured. Get your API key from Render Dashboard > Account Settings > API Keys", + ); + } + return new Render(); +} + +function getTaskIdentifier(taskName: string): string { + const serviceSlug = process.env.WORKFLOW_SERVICE_SLUG; + if (!serviceSlug) { + throw new Error( + "WORKFLOW_SERVICE_SLUG not configured. Set this to your workflow service slug from Render Dashboard.", + ); + } + const identifier = `${serviceSlug}/${taskName}`; + console.log(`Task identifier: ${identifier}`); + return identifier; +} + +app.get("/", (_req, res) => { + res.json({ + service: "File Analyzer API", + version: "0.1.0", + description: "Upload CSV files for analysis via workflow tasks", + endpoints: { + "POST /analyze": "Upload and analyze a CSV file", + "GET /health": "Health check and configuration status", + }, + }); +}); + +app.get("/health", (_req, res) => { + const apiKey = process.env.RENDER_API_KEY; + const serviceSlug = process.env.WORKFLOW_SERVICE_SLUG; + + res.json({ + status: "healthy", + render_api_key_configured: Boolean(apiKey), + workflow_service_slug_configured: Boolean(serviceSlug), + workflow_service_slug: serviceSlug ?? null, + }); +}); + +app.post("/analyze", upload.single("file"), async (req, res) => { + const file = req.file; + if (!file) { + res.status(400).json({ detail: "No file uploaded." }); + return; + } + + if (!file.originalname.endsWith(".csv")) { + res.status(400).json({ detail: "Only CSV files are supported. Please upload a .csv file." }); + return; + } + + console.log(`Received file upload: ${file.originalname}`); + + try { + const fileContent = file.buffer.toString("utf-8"); + console.log(`File content size: ${fileContent.length} bytes`); + + const client = getClient(); + const taskIdentifier = getTaskIdentifier("analyzeFile"); + + console.log(`Calling workflow task: ${taskIdentifier}`); + + const taskRun = await client.workflows.runTask(taskIdentifier, [fileContent]); + + console.log(`Task started: ${taskRun.id}`); + + const result = await taskRun; + + console.log(`Task completed with status: ${result.status}`); + + res.json({ + task_run_id: result.id, + status: result.status, + message: `File '${file.originalname}' analyzed successfully`, + result: result.results, + }); + } catch (error) { + if (error instanceof ClientError) { + console.error(`Client error: ${error.statusCode}`); + res.status(error.statusCode).json({ detail: `Client error: ${error.message}` }); + } else if (error instanceof ServerError) { + console.error(`Server error: ${error.statusCode}`); + res.status(500).json({ detail: `Server error: ${error.message}` }); + } else { + console.error(`Unexpected error: ${error}`); + res.status(500).json({ detail: `Analysis failed: ${String(error)}` }); + } + } +}); + +app.post("/analyze-task/:taskName", upload.single("file"), async (req, res) => { + const taskName = req.params.taskName as string; + const file = req.file; + + if (!file) { + res.status(400).json({ detail: "No file uploaded." }); + return; + } + + if (!file.originalname.endsWith(".csv")) { + res.status(400).json({ detail: "Only CSV files are supported. Please upload a .csv file." }); + return; + } + + console.log(`Received file upload for task '${taskName}': ${file.originalname}`); + + try { + const fileContent = file.buffer.toString("utf-8"); + const client = getClient(); + const taskIdentifier = getTaskIdentifier(taskName); + + console.log(`Calling workflow task: ${taskIdentifier}`); + + const taskRun = await client.workflows.runTask(taskIdentifier, [fileContent]); + + console.log(`Task started: ${taskRun.id}`); + + const result = await taskRun; + + console.log(`Task '${taskName}' completed with status: ${result.status}`); + + res.json({ + task_run_id: result.id, + status: result.status, + message: `File '${file.originalname}' processed with task '${taskName}'`, + result: result.results, + }); + } catch (error) { + if (error instanceof ClientError) { + res.status(error.statusCode).json({ detail: `Client error: ${error.message}` }); + } else if (error instanceof ServerError) { + res.status(500).json({ detail: `Server error: ${error.message}` }); + } else { + res.status(500).json({ detail: `Analysis failed: ${String(error)}` }); + } + } +}); + +const PORT = parseInt(process.env.PORT ?? "8000", 10); + +app.listen(PORT, () => { + console.log("Starting File Analyzer API Service"); + console.log("This service calls workflow tasks using the Client SDK"); + console.log("Required environment variables:"); + console.log(" - RENDER_API_KEY: Your Render API key"); + console.log(" - WORKFLOW_SERVICE_SLUG: Your workflow service slug"); + console.log(`Listening on http://0.0.0.0:${PORT}`); +}); diff --git a/file-analyzer/typescript/api-service/tsconfig.json b/file-analyzer/typescript/api-service/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/file-analyzer/typescript/api-service/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/file-analyzer/typescript/workflow-service/package.json b/file-analyzer/typescript/workflow-service/package.json new file mode 100644 index 0000000..77fd733 --- /dev/null +++ b/file-analyzer/typescript/workflow-service/package.json @@ -0,0 +1,19 @@ +{ + "name": "file-analyzer-workflow-service", + "version": "1.0.0", + "description": "File Analyzer Workflow Service - CSV analysis tasks with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7" + }, + "devDependencies": { + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/file-analyzer/typescript/workflow-service/src/main.ts b/file-analyzer/typescript/workflow-service/src/main.ts new file mode 100644 index 0000000..6d8e2f0 --- /dev/null +++ b/file-analyzer/typescript/workflow-service/src/main.ts @@ -0,0 +1,250 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; + +interface ParsedData { + success: boolean; + rows: { [key: string]: string }[]; + columns: string[]; + row_count: number; + error?: string; + parsed_at?: string; +} + +const retry = { + maxRetries: 2, + waitDurationMs: 1000, + factor: 1.5, +}; + +// Subtask: parse CSV content into structured data +const parseCsvData = task( + { name: "parseCsvData", retry }, + function parseCsvData(fileContent: string): ParsedData { + console.log("[PARSE] Starting CSV parsing"); + + try { + const lines = fileContent.trim().split("\n"); + if (lines.length < 2) { + console.warn("[PARSE] No data rows found in CSV"); + return { success: false, error: "No data rows found", rows: [], columns: [], row_count: 0 }; + } + + const headers = lines[0].split(",").map((h) => h.trim()); + const rows = lines.slice(1).map((line) => { + const values = line.split(",").map((v) => v.trim()); + const row: { [key: string]: string } = {}; + headers.forEach((h, i) => { row[h] = values[i]; }); + return row; + }); + + console.log(`[PARSE] Successfully parsed ${rows.length} rows with ${headers.length} columns`); + + return { + success: true, + rows, + columns: headers, + row_count: rows.length, + parsed_at: new Date().toISOString(), + }; + } catch (e) { + console.error(`[PARSE] Error parsing CSV: ${e}`); + return { success: false, error: String(e), rows: [], columns: [], row_count: 0 }; + } + }, +); + +// Subtask: calculate statistics from parsed data +const calculateStatistics = task( + { name: "calculateStatistics", retry }, + function calculateStatistics(data: ParsedData) { + console.log("[STATS] Calculating statistics"); + + if (!data.success || data.rows.length === 0) { + console.warn("[STATS] No data to analyze"); + return { success: false, error: "No data available for statistics" }; + } + + const { rows, columns } = data; + + const numericColumns: string[] = []; + const numericData: { [key: string]: number[] } = {}; + + for (const col of columns) { + const values: number[] = []; + for (const row of rows) { + const val = row[col]?.trim(); + if (val) { + const num = parseFloat(val); + if (!Number.isNaN(num)) values.push(num); + } + } + if (values.length > 0) { + numericColumns.push(col); + numericData[col] = values; + } + } + + const statistics: { [col: string]: { min: number; max: number; avg: number; sum: number; count: number } } = {}; + for (const col of numericColumns) { + const values = numericData[col]; + statistics[col] = { + min: Math.min(...values), + max: Math.max(...values), + avg: values.reduce((a, b) => a + b, 0) / values.length, + sum: values.reduce((a, b) => a + b, 0), + count: values.length, + }; + } + + console.log(`[STATS] Calculated statistics for ${numericColumns.length} numeric columns`); + + return { + success: true, + numeric_columns: numericColumns, + statistics, + total_rows: rows.length, + calculated_at: new Date().toISOString(), + }; + }, +); + +// Subtask: identify trends and patterns +const identifyTrends = task( + { name: "identifyTrends", retry }, + function identifyTrends(data: ParsedData) { + console.log("[TRENDS] Identifying trends"); + + if (!data.success || data.rows.length === 0) { + console.warn("[TRENDS] No data to analyze"); + return { success: false, error: "No data available for trend analysis" }; + } + + const { rows, columns } = data; + + const categoricalAnalysis: { + [col: string]: { + unique_count: number; + total_count: number; + top_5: [string, number][]; + distribution: { [key: string]: number }; + }; + } = {}; + + for (const col of columns) { + const values = rows.map((r) => r[col]).filter(Boolean); + const uniqueValues = new Set(values); + + if (uniqueValues.size < rows.length / 2) { + const counts: { [key: string]: number } = {}; + for (const val of values) counts[val] = (counts[val] ?? 0) + 1; + + const sorted = Object.entries(counts).sort((a, b) => b[1] - a[1]); + + categoricalAnalysis[col] = { + unique_count: uniqueValues.size, + total_count: values.length, + top_5: sorted.slice(0, 5), + distribution: Object.fromEntries(sorted), + }; + } + } + + console.log(`[TRENDS] Analyzed ${Object.keys(categoricalAnalysis).length} categorical columns`); + + return { + success: true, + categorical_columns: Object.keys(categoricalAnalysis), + categorical_analysis: categoricalAnalysis, + analyzed_at: new Date().toISOString(), + }; + }, +); + +// Subtask: generate insights report +const generateInsights = task( + { name: "generateInsights", retry }, + async function generateInsights( + stats: { success?: boolean; numeric_columns?: string[]; statistics?: { [col: string]: { avg: number; min: number; max: number; sum: number } } }, + trends: { success?: boolean; categorical_columns?: string[]; categorical_analysis?: { [col: string]: { top_5: [string, number][]; distribution: { [key: string]: number } } } }, + metadata: ParsedData, + ) { + console.log("[INSIGHTS] Generating insights report"); + + const keyFindings: { type: string; column: string; finding: string }[] = []; + + if (stats.success && stats.statistics) { + for (const [col, stat] of Object.entries(stats.statistics)) { + keyFindings.push({ + type: "numeric", + column: col, + finding: `${col}: avg=${stat.avg.toFixed(2)}, min=${stat.min.toFixed(2)}, max=${stat.max.toFixed(2)}, sum=${stat.sum.toFixed(2)}`, + }); + } + } + + if (trends.success && trends.categorical_analysis) { + for (const [col, analysis] of Object.entries(trends.categorical_analysis)) { + const topValue = analysis.top_5[0]; + if (topValue) { + keyFindings.push({ + type: "categorical", + column: col, + finding: `${col}: Most common value is '${topValue[0]}' (${topValue[1]} occurrences, ${Object.keys(analysis.distribution).length} unique values)`, + }); + } + } + } + + console.log(`[INSIGHTS] Generated ${keyFindings.length} key findings`); + + return { + success: true, + summary: { + total_rows: metadata.row_count, + total_columns: metadata.columns.length, + numeric_columns_count: stats.numeric_columns?.length ?? 0, + categorical_columns_count: trends.categorical_columns?.length ?? 0, + }, + key_findings: keyFindings, + generated_at: new Date().toISOString(), + }; + }, +); + +// Root task: orchestrates the full analysis pipeline +task( + { name: "analyzeFile", retry, timeoutSeconds: 300 }, + async function analyzeFile(fileContent: string) { + console.log("[ANALYZE_FILE] Starting file analysis pipeline"); + + console.log("[ANALYZE_FILE] Stage 1: Parsing CSV data"); + const parsedData = await parseCsvData(fileContent); + + if (!parsedData.success) { + console.error("[ANALYZE_FILE] Failed to parse CSV data"); + return { success: false, error: "Failed to parse CSV data", details: parsedData.error }; + } + + console.log(`[ANALYZE_FILE] Parsed ${parsedData.row_count} rows`); + + console.log("[ANALYZE_FILE] Stage 2: Calculating statistics"); + const stats = await calculateStatistics(parsedData); + + console.log("[ANALYZE_FILE] Stage 3: Identifying trends"); + const trends = await identifyTrends(parsedData); + + console.log("[ANALYZE_FILE] Stage 4: Generating insights"); + const insights = await generateInsights(stats, trends, parsedData); + + console.log("[ANALYZE_FILE] Analysis pipeline completed successfully"); + + return { + success: true, + file_metadata: { row_count: parsedData.row_count, columns: parsedData.columns }, + statistics: stats, + trends, + insights, + completed_at: new Date().toISOString(), + }; + }, +); diff --git a/file-analyzer/typescript/workflow-service/tsconfig.json b/file-analyzer/typescript/workflow-service/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/file-analyzer/typescript/workflow-service/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/file-processing/README.md b/file-processing/README.md index 48674e1..b603a2d 100644 --- a/file-processing/README.md +++ b/file-processing/README.md @@ -33,64 +33,63 @@ process_file_batch (orchestrator) generate_consolidated_report (final aggregation) ``` -## Local Development +## Local development ### Prerequisites -- Python 3.10+ -### Setup and Run +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) -```bash -# Navigate to example directory -cd file-processing +### Python -# Install dependencies +```bash +cd file-processing/python pip install -r requirements.txt - -# Run the workflow service python main.py ``` -## Deploying to Render - -### Service Configuration - -**Service Type**: Workflow +### TypeScript -**Build Command**: ```bash -cd file-processing && pip install -r requirements.txt +cd file-processing/typescript +npm install +npm run dev ``` -**Start Command**: -```bash -cd file-processing && python main.py -``` +## Deploying to Render + +### Service configuration + +**Service type**: Workflow + +**Python:** -### Environment Variables +| Setting | Value | +|---|---| +| Build command | `cd file-processing/python && pip install -r requirements.txt` | +| Start command | `cd file-processing/python && python main.py` | -Required: -- `RENDER_API_KEY` - Your Render API key (from Render dashboard) +**TypeScript:** -### Deployment Steps +| Setting | Value | +|---|---| +| Build command | `cd file-processing/typescript && npm install && npm run build` | +| Start command | `cd file-processing/typescript && npm start` | -1. **Create Workflow Service** - - Go to Render Dashboard - - Click "New +" → "Workflow" - - Connect your repository - - Name: `file-processing-workflows` +### Environment variables -2. **Configure Build Settings** - - Build Command: `cd file-processing && pip install -r requirements.txt` - - Start Command: `cd file-processing && python main.py` +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Render Dashboard) | -3. **Set Environment Variables** - - Add `RENDER_API_KEY` in the Environment section - - Get API key from: Render Dashboard → Account Settings → API Keys +### Deployment steps -4. **Deploy** - - Click "Create Workflow" - - Render will build and start your workflow service +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository. +1. Set the build and start commands for your chosen language. +1. Add `RENDER_API_KEY` in the Environment section. +1. Click **Create Workflow**. ## Testing in Render Dashboard @@ -325,10 +324,8 @@ async def export_to_database(report: dict) -> dict: 3. **Memory Management**: For large files, consider streaming or chunked processing 4. **Error Isolation**: One file failure shouldn't stop the entire batch -## Important Notes +## Important notes -- **Python-only**: Workflows are only supported in Python via render-sdk -- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration -- **File Access**: In production, integrate with cloud storage (S3, GCS) or databases -- **Retry Logic**: All read operations include retry configuration for transient failures -- **Local Paths**: Sample uses local paths; adapt for your storage solution +- **File access**: In production, integrate with cloud storage (S3, GCS) or databases. +- **Retry logic**: All read operations include retry configuration for transient failures. +- **Local paths**: Sample uses local paths; adapt for your storage solution. diff --git a/file-processing/main.py b/file-processing/python/main.py similarity index 100% rename from file-processing/main.py rename to file-processing/python/main.py diff --git a/file-processing/requirements.txt b/file-processing/python/requirements.txt similarity index 100% rename from file-processing/requirements.txt rename to file-processing/python/requirements.txt diff --git a/file-processing/typescript/package.json b/file-processing/typescript/package.json new file mode 100644 index 0000000..6df4efb --- /dev/null +++ b/file-processing/typescript/package.json @@ -0,0 +1,20 @@ +{ + "name": "file-processing-workflow", + "version": "1.0.0", + "description": "File Processing - Parallel batch processing with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7" + }, + "devDependencies": { + "@types/node": "^22.13.0", + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/file-processing/typescript/src/main.ts b/file-processing/typescript/src/main.ts new file mode 100644 index 0000000..8457227 --- /dev/null +++ b/file-processing/typescript/src/main.ts @@ -0,0 +1,345 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; +import { readFileSync, existsSync } from "node:fs"; +import { resolve, extname } from "node:path"; + +const retry = { + maxRetries: 3, + waitDurationMs: 1000, + factor: 1.5, +}; + +// ---- File Reading Tasks ---- + +const readCsvFile = task( + { name: "readCsvFile", retry }, + function readCsvFile(filePath: string) { + console.log(`[CSV] Reading file: ${filePath}`); + + const fullPath = resolve(filePath); + if (!existsSync(fullPath)) { + console.warn(`[CSV] File not found: ${filePath}`); + return { success: false, error: "File not found", file_path: filePath }; + } + + try { + const content = readFileSync(fullPath, "utf-8"); + const lines = content.trim().split("\n"); + if (lines.length < 2) { + return { success: true, file_path: filePath, file_type: "csv", row_count: 0, data: [], columns: [] }; + } + + const headers = lines[0].split(",").map((h) => h.trim()); + const rows = lines.slice(1).map((line) => { + const values = line.split(",").map((v) => v.trim()); + const row: { [key: string]: string } = {}; + headers.forEach((h, i) => { row[h] = values[i]; }); + return row; + }); + + console.log(`[CSV] Successfully read ${rows.length} rows`); + return { success: true, file_path: filePath, file_type: "csv", row_count: rows.length, data: rows, columns: headers }; + } catch (e) { + console.error(`[CSV] Error reading file: ${e}`); + return { success: false, error: String(e), file_path: filePath }; + } + }, +); + +const readJsonFile = task( + { name: "readJsonFile", retry }, + function readJsonFile(filePath: string) { + console.log(`[JSON] Reading file: ${filePath}`); + + const fullPath = resolve(filePath); + if (!existsSync(fullPath)) { + console.warn(`[JSON] File not found: ${filePath}`); + return { success: false, error: "File not found", file_path: filePath }; + } + + try { + const content = readFileSync(fullPath, "utf-8"); + const data = JSON.parse(content); + console.log("[JSON] Successfully parsed JSON"); + return { + success: true, + file_path: filePath, + file_type: "json", + data, + keys: typeof data === "object" && !Array.isArray(data) ? Object.keys(data) : null, + }; + } catch (e) { + console.error(`[JSON] Error reading file: ${e}`); + return { success: false, error: String(e), file_path: filePath }; + } + }, +); + +const readTextFile = task( + { name: "readTextFile", retry }, + function readTextFile(filePath: string) { + console.log(`[TEXT] Reading file: ${filePath}`); + + const fullPath = resolve(filePath); + if (!existsSync(fullPath)) { + console.warn(`[TEXT] File not found: ${filePath}`); + return { success: false, error: "File not found", file_path: filePath }; + } + + try { + const content = readFileSync(fullPath, "utf-8"); + const lines = content.split("\n"); + const words = content.split(/\s+/).filter(Boolean); + + console.log(`[TEXT] Successfully read ${lines.length} lines`); + return { + success: true, + file_path: filePath, + file_type: "text", + content, + line_count: lines.length, + word_count: words.length, + char_count: content.length, + }; + } catch (e) { + console.error(`[TEXT] Error reading file: ${e}`); + return { success: false, error: String(e), file_path: filePath }; + } + }, +); + +// ---- Analysis Tasks ---- + +const analyzeCsvData = task( + { name: "analyzeCsvData", retry }, + function analyzeCsvData(csvResult: { success?: boolean; data?: { [key: string]: string }[] }) { + console.log("[ANALYSIS] Analyzing CSV data"); + + if (!csvResult.success) return { success: false, error: "No data to analyze" }; + + const rows = csvResult.data ?? []; + if (rows.length === 0) return { success: false, error: "Empty dataset" }; + + let totalQuantity = 0; + let totalRevenue = 0; + const products = new Set(); + const regions = new Set(); + + for (const row of rows) { + const quantity = parseInt(row.quantity ?? "0", 10) || 0; + const price = parseFloat(row.price ?? "0") || 0; + totalQuantity += quantity; + totalRevenue += quantity * price; + if (row.product) products.add(row.product); + if (row.region) regions.add(row.region); + } + + const analysis = { + success: true, + total_records: rows.length, + total_quantity: totalQuantity, + total_revenue: Math.round(totalRevenue * 100) / 100, + unique_products: products.size, + unique_regions: regions.size, + products: [...products], + regions: [...regions], + }; + + console.log(`[ANALYSIS] Total revenue: $${analysis.total_revenue}`); + return analysis; + }, +); + +const analyzeJsonStructure = task( + { name: "analyzeJsonStructure", retry }, + function analyzeJsonStructure(jsonResult: { success?: boolean; data?: unknown }) { + console.log("[ANALYSIS] Analyzing JSON structure"); + + if (!jsonResult.success) return { success: false, error: "No data to analyze" }; + + const data = jsonResult.data ?? {}; + + function countKeys(obj: unknown): number { + if (typeof obj === "object" && obj !== null && !Array.isArray(obj)) { + let count = Object.keys(obj).length; + for (const value of Object.values(obj)) count += countKeys(value); + return count; + } + if (Array.isArray(obj)) return obj.reduce((sum, item) => sum + countKeys(item), 0); + return 0; + } + + const isDict = typeof data === "object" && !Array.isArray(data) && data !== null; + return { + success: true, + type: Array.isArray(data) ? "array" : typeof data, + top_level_keys: isDict ? Object.keys(data as object) : null, + total_keys: countKeys(data), + is_nested: isDict + ? Object.values(data as object).some((v) => typeof v === "object" && v !== null) + : false, + }; + }, +); + +const analyzeTextContent = task( + { name: "analyzeTextContent", retry }, + function analyzeTextContent(textResult: { + success?: boolean; + content?: string; + }) { + console.log("[ANALYSIS] Analyzing text content"); + + if (!textResult.success) return { success: false, error: "No data to analyze" }; + + const content = textResult.content ?? ""; + const lines = content.split("\n"); + const words = content.split(/\s+/).filter(Boolean); + + const sections = lines.filter( + (l) => l.trim() && (/^[A-Z]/.test(l.trim()) || l.trim().startsWith("-")), + ); + + const longWords = words.filter((w) => w.length > 6).map((w) => w.replace(/[.,!?]/g, "").toLowerCase()); + const freq: { [key: string]: number } = {}; + for (const word of longWords) freq[word] = (freq[word] ?? 0) + 1; + + const topKeywords = Object.entries(freq) + .sort((a, b) => b[1] - a[1]) + .slice(0, 5); + + return { + success: true, + total_lines: lines.length, + total_words: words.length, + total_chars: content.length, + section_count: sections.length, + top_keywords: Object.fromEntries(topKeywords), + avg_line_length: lines.length > 0 ? content.length / lines.length : 0, + }; + }, +); + +// ---- Orchestration Tasks ---- + +const processSingleFile = task( + { name: "processSingleFile", retry }, + async function processSingleFile(filePath: string) { + console.log(`[PROCESS] Processing file: ${filePath}`); + + const extension = extname(filePath).toLowerCase(); + let readResult: { success?: boolean; [key: string]: unknown }; + let analysis: { [key: string]: unknown } = {}; + + if (extension === ".csv") { + readResult = await readCsvFile(filePath); + if (readResult.success) analysis = await analyzeCsvData(readResult as Parameters[0]); + } else if (extension === ".json") { + readResult = await readJsonFile(filePath); + if (readResult.success) analysis = await analyzeJsonStructure(readResult as Parameters[0]); + } else if (extension === ".txt") { + readResult = await readTextFile(filePath); + if (readResult.success) analysis = await analyzeTextContent(readResult as Parameters[0]); + } else { + console.warn(`[PROCESS] Unsupported file type: ${extension}`); + return { success: false, file_path: filePath, error: `Unsupported file type: ${extension}` }; + } + + console.log(`[PROCESS] File processed: ${filePath}`); + return { + success: readResult.success ?? false, + file_path: filePath, + file_type: extension.slice(1), + read_result: readResult, + analysis, + }; + }, +); + +// Root task: processes multiple files in parallel +task( + { name: "processFileBatch", retry, timeoutSeconds: 300 }, + async function processFileBatch(...filePaths: string[]) { + console.log("=".repeat(80)); + console.log(`[BATCH] Starting batch processing of ${filePaths.length} files`); + console.log("=".repeat(80)); + + const results = await Promise.all(filePaths.map((fp) => processSingleFile(fp))); + + const successful = results.filter((r) => r.success); + const failed = results.filter((r) => !r.success); + + const fileTypes: { [key: string]: number } = {}; + for (const result of successful) { + const ft = (result.file_type as string) ?? "unknown"; + fileTypes[ft] = (fileTypes[ft] ?? 0) + 1; + } + + const batchResult = { + total_files: filePaths.length, + successful: successful.length, + failed: failed.length, + success_rate: filePaths.length > 0 ? successful.length / filePaths.length : 0, + file_types: fileTypes, + results, + processed_at: new Date().toISOString(), + }; + + console.log("=".repeat(80)); + console.log("[BATCH] Batch processing complete!"); + console.log(`[BATCH] Successful: ${successful.length}/${filePaths.length}`); + console.log("=".repeat(80)); + + return batchResult; + }, +); + +// Root task: generate a consolidated report from batch results +task( + { name: "generateConsolidatedReport", retry }, + async function generateConsolidatedReport(batchResult: { + total_files?: number; + successful?: number; + failed?: number; + success_rate?: number; + file_types?: { [key: string]: number }; + results?: { success?: boolean; file_type?: string; analysis?: { total_records?: number; total_words?: number; total_keys?: number } }[]; + }) { + console.log("[REPORT] Generating consolidated report"); + + const results = batchResult.results ?? []; + const successfulResults = results.filter((r) => r.success); + + let totalCsvRows = 0; + let totalTextWords = 0; + let totalJsonKeys = 0; + + for (const result of successfulResults) { + const analysis = result.analysis ?? {}; + if (result.file_type === "csv") totalCsvRows += analysis.total_records ?? 0; + else if (result.file_type === "text") totalTextWords += analysis.total_words ?? 0; + else if (result.file_type === "json") totalJsonKeys += analysis.total_keys ?? 0; + } + + const report = { + title: "File Processing Report", + generated_at: new Date().toISOString(), + summary: { + total_files_processed: batchResult.total_files, + successful: batchResult.successful, + failed: batchResult.failed, + success_rate_pct: Math.round((batchResult.success_rate ?? 0) * 1000) / 10, + }, + data_summary: { + total_csv_rows: totalCsvRows, + total_text_words: totalTextWords, + total_json_keys: totalJsonKeys, + }, + file_breakdown: batchResult.file_types ?? {}, + detailed_results: successfulResults, + }; + + console.log("[REPORT] Report generated successfully"); + return report; + }, +); diff --git a/file-processing/typescript/tsconfig.json b/file-processing/typescript/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/file-processing/typescript/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/hello-world/README.md b/hello-world/README.md index 4fc6002..3b392f9 100644 --- a/hello-world/README.md +++ b/hello-world/README.md @@ -2,36 +2,38 @@ The simplest possible workflow example to help you understand the basics of Render Workflows. Perfect for beginners! -## What You'll Learn +## What you'll learn This example teaches the fundamental concepts: -- **What is a Task?** - A function decorated with `@app.task` that can be executed as a workflow -- **What is a Subtask?** - A task called by another task using `await` -- **How to Orchestrate** - Combining multiple tasks to create workflows -- **How to Deploy** - Getting your first workflow running on Render +- **What is a task?** A function that can be executed as a workflow +- **What is a subtask?** A task called by another task using `await` +- **How to orchestrate:** Combining multiple tasks to create workflows +- **How to deploy:** Getting your first workflow running on Render -## Use Case +## Use case Simple number processing to demonstrate workflow patterns without complex business logic. If you can understand this example, you can build any workflow! -## Workflow Structure +## Workflow structure ``` -calculate_and_process (multi-step orchestrator) - ├── add_doubled_numbers +calculateAndProcess (multi-step orchestrator) + ├── addDoubledNumbers │ ├── double (subtask #1) │ └── double (subtask #2) - └── process_numbers + └── processNumbers ├── double (subtask for item 1) ├── double (subtask for item 2) └── double (subtask for item N) ``` -## Understanding Tasks and Subtasks +## Understanding tasks and subtasks -### What is a Task? +### What is a task? -A **task** is simply a Python function decorated with `@app.task`. It becomes a workflow step that Render can execute: +A **task** is a function registered with the workflow runtime. It becomes a workflow step that Render can execute. + +**Python:** ```python from render_sdk import Workflows @@ -40,129 +42,125 @@ app = Workflows(auto_start=True) @app.task def double(x: int) -> int: - """A simple task that doubles a number""" return x * 2 ``` -### What is a Subtask? +**TypeScript:** + +```typescript +import { task } from "@renderinc/sdk/workflows"; + +const double = task({ name: "double" }, function double(x: number): number { + return x * 2; +}); +``` -A **subtask** is when one task calls another task using `await`. This is how you compose workflows: +### What is a subtask? + +A **subtask** is when one task calls another task using `await`: + +**Python:** ```python @app.task async def add_doubled_numbers(a: int, b: int) -> dict: - # Call 'double' as a subtask using await - doubled_a = await double(a) # ← This is a subtask call! - doubled_b = await double(b) # ← This is also a subtask call! + doubled_a = await double(a) # subtask call + doubled_b = await double(b) # subtask call + return {"sum": doubled_a + doubled_b} +``` - return { - "sum": doubled_a + doubled_b - } +**TypeScript:** + +```typescript +const addDoubledNumbers = task( + { name: "addDoubledNumbers" }, + async function addDoubledNumbers(a: number, b: number) { + const doubledA = await double(a); // subtask call + const doubledB = await double(b); // subtask call + return { sum: doubledA + doubledB }; + }, +); ``` -### Why Use Subtasks? +### Why use subtasks? 1. **Reusability**: Write `double` once, use it everywhere 2. **Composition**: Build complex workflows from simple building blocks -3. **Visibility**: Render shows you each subtask execution in the dashboard +3. **Visibility**: Render shows each subtask execution in the dashboard 4. **Testing**: Test individual tasks independently -## Local Development +## Local development ### Prerequisites -- Python 3.10+ -### Setup and Run +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) -```bash -# Navigate to example directory -cd hello-world +### Python -# Install dependencies +```bash +cd hello-world/python pip install -r requirements.txt - -# Run the workflow service python main.py ``` -The service will start and register all tasks. You'll see output like: +### TypeScript -``` -Starting Hello World Workflow Service -Registered tasks: - - double(x) - - add_doubled_numbers(a, b) - - process_numbers(numbers) - - calculate_and_process(a, b, more_numbers) -Ready to accept task executions! +```bash +cd hello-world/typescript +npm install +npm run dev ``` ## Deploying to Render -### Service Configuration - -**Service Type**: Workflow +### Service configuration -**Build Command**: -```bash -cd hello-world && pip install -r requirements.txt -``` +**Service type**: Workflow -**Start Command**: -```bash -cd hello-world && python main.py -``` +**Python:** -### Environment Variables +| Setting | Value | +|---|---| +| Build command | `cd hello-world/python && pip install -r requirements.txt` | +| Start command | `cd hello-world/python && python main.py` | -Required: -- `RENDER_API_KEY` - Your Render API key (from Render dashboard) +**TypeScript:** -### Deployment Steps +| Setting | Value | +|---|---| +| Build command | `cd hello-world/typescript && npm install && npm run build` | +| Start command | `cd hello-world/typescript && npm start` | -1. **Create Workflow Service** - - Go to Render Dashboard - - Click "New +" → "Workflow" - - Connect your repository - - Name: `hello-world-workflows` +### Environment variables -2. **Configure Build Settings** - - Build Command: `cd hello-world && pip install -r requirements.txt` - - Start Command: `cd hello-world && python main.py` +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Render Dashboard) | -3. **Set Environment Variables** - - Add `RENDER_API_KEY` in the Environment section - - Get API key from: Render Dashboard → Account Settings → API Keys +### Deployment steps -4. **Deploy** - - Click "Create Workflow" - - Render will build and start your workflow service +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository. +1. Set the build and start commands for your chosen language. +1. Add `RENDER_API_KEY` in the Environment section. +1. Click **Create Workflow**. ## Testing in Render Dashboard Once deployed, test your workflows directly in the Render Dashboard: -### How to Test +1. Go to your Workflow service in Render Dashboard. +1. Click **Manual Run** or **Start Task**. +1. Select the task you want to test. +1. Enter the task input as JSON. +1. Click **Start task**. -1. Go to your Workflow service in Render Dashboard -2. Click the **"Manual Run"** or **"Start Task"** button -3. Select the task you want to test -4. Enter the task input as JSON in the text area -5. Click **"Start task"** +### Example task inputs -### Example Task Inputs +**Test the basic task** (`double`): -**Important:** The hello-world workflow expects direct values and arrays, not JSON objects. Use `5` instead of `{"x": 5}`, and `[3, 4]` instead of `{"a": 3, "b": 4}`. - -**Recommended Starting Point:** Start with `double` - the simplest possible task, then work your way up to more complex examples. - ---- - -**Test the basic task:** - -Task: `double` - -Input: ```json 5 ``` @@ -171,16 +169,14 @@ Expected output: `10` --- -**Test subtask calling:** +**Test subtask calling** (`addDoubledNumbers`): -Task: `add_doubled_numbers` - -Input: ```json [3, 4] ``` Expected output: + ```json { "original_numbers": [3, 4], @@ -190,20 +186,16 @@ Expected output: } ``` -This task calls `double` twice as subtasks! - --- -**Test subtask in a loop:** +**Test subtask in a loop** (`processNumbers`): -Task: `process_numbers` - -Input: ```json [1, 2, 3, 4, 5] ``` Expected output: + ```json { "original_numbers": [1, 2, 3, 4, 5], @@ -213,188 +205,60 @@ Expected output: } ``` -This calls `double` as a subtask 5 times (once for each number)! - --- -**Test multi-step workflow:** - -Task: `calculate_and_process` +**Test multi-step workflow** (`calculateAndProcess`): -Input: ```json [2, 3, 10, 20, 30] ``` -This is the most complex example - it calls `add_doubled_numbers` and `process_numbers` as subtasks, which in turn call `double` multiple times. Watch the Render Dashboard to see the entire execution tree! +This calls `addDoubledNumbers` and `processNumbers` as subtasks, which in turn call `double` multiple times. -## Triggering via SDK +## Tasks explained -Once deployed, trigger workflows via the Render Client SDK: - -```python -from render_sdk import Render - -# Uses RENDER_API_KEY environment variable automatically -render = Render() - -# Call the simple double task -task_run = await render.workflows.run_task( - "hello-world-workflows/double", - {"x": 5} -) -result = await task_run -print(f"Result: {result.results}") # Output: 10 - -# Call the subtask orchestration example -task_run = await render.workflows.run_task( - "hello-world-workflows/add_doubled_numbers", - {"a": 3, "b": 4} -) -result = await task_run -print(f"Sum of doubled: {result.results['sum_of_doubled']}") # Output: 14 -``` - -## Tasks Explained - -### `double(x: int) -> int` +### `double(x) -> number` The simplest possible task. Takes a number, doubles it, returns the result. -**Purpose**: Show what a basic task looks like. - -**Can be called as a subtask**: Yes! Other tasks call this. - ---- - -### `add_doubled_numbers(a: int, b: int) -> dict` - -Demonstrates the fundamental subtask pattern. - -**What it does**: -1. Calls `double(a)` as a subtask -2. Calls `double(b)` as a subtask -3. Adds the results together - -**Purpose**: Show how to call tasks as subtasks using `await`. - -**Key Pattern**: -```python -result = await double(a) # ← Subtask call with await -``` - ---- - -### `process_numbers(numbers: list[int]) -> dict` - -Demonstrates calling a subtask in a loop. - -**What it does**: -1. Takes a list of numbers -2. Calls `double` as a subtask for each number -3. Collects all the results - -**Purpose**: Show how to process lists/batches using subtasks. +### `addDoubledNumbers(a, b) -> object` -**Key Pattern**: -```python -for num in numbers: - doubled = await double(num) # ← Subtask call in a loop -``` - ---- - -### `calculate_and_process(a: int, b: int, more_numbers: list[int]) -> dict` - -Demonstrates a multi-step workflow with multiple subtask calls. - -**What it does**: -1. Calls `add_doubled_numbers` as a subtask -2. Calls `process_numbers` as a subtask -3. Combines the results - -**Purpose**: Show how to chain multiple subtasks to create complex workflows. - -**Key Pattern**: -```python -step1 = await add_doubled_numbers(a, b) # ← First subtask -step2 = await process_numbers(numbers) # ← Second subtask -# Combine results -``` - -## Key Concepts - -### The `@app.task` Decorator +Demonstrates the fundamental subtask pattern: calls `double(a)` and `double(b)` as subtasks, then sums the results. -Every workflow function needs the `@app.task` decorator: +### `processNumbers(...numbers) -> object` -```python -from render_sdk import Workflows +Demonstrates calling a subtask in a loop: calls `double` for each number in the list. -app = Workflows(auto_start=True) +### `calculateAndProcess(a, b, ...moreNumbers) -> object` -@app.task -def my_task(): - return "Hello World" -``` +Demonstrates a multi-step workflow: chains `addDoubledNumbers` and `processNumbers` as subtasks, combining their results. -### The `async` Keyword +## Common patterns -Tasks that call other tasks as subtasks must be `async`: - -```python -@app.task -async def orchestrator(): - result = await subtask() # ← Calls another task - return result -``` - -### The `await` Keyword - -Use `await` to call a task as a subtask: - -```python -result = await task_name(arguments) -``` - -Without `await`, you're just calling a regular Python function! - -### Task Registration - -When you use `Workflows(auto_start=True)`, all `@app.task` decorated functions are automatically registered and become available as workflow tasks. - -## Common Patterns - -### Pattern 1: Sequential Subtasks - -Execute subtasks one after another: +### Sequential subtasks ```python +# Python @app.task async def sequential(): step1 = await task_a() - step2 = await task_b(step1) # Uses result from step1 - step3 = await task_c(step2) # Uses result from step2 - return step3 + step2 = await task_b(step1) + return step2 ``` -### Pattern 2: Independent Subtasks - -Execute subtasks where order doesn't matter: - -```python -@app.task -async def independent(): - result_a = await task_a() - result_b = await task_b() - return combine(result_a, result_b) +```typescript +// TypeScript +task({ name: "sequential" }, async function sequential() { + const step1 = await taskA(); + const step2 = await taskB(step1); + return step2; +}); ``` -### Pattern 3: Subtasks in a Loop - -Process a list by calling a subtask for each item: +### Subtasks in a loop ```python +# Python @app.task async def batch_process(items: list): results = [] @@ -404,70 +268,48 @@ async def batch_process(items: list): return results ``` -### Pattern 4: Nested Subtasks - -Subtasks can call other subtasks: - -```python -@app.task -async def level_1(): - return await level_2() - -@app.task -async def level_2(): - return await level_3() - -@app.task -def level_3(): - return "Done!" +```typescript +// TypeScript +task({ name: "batchProcess" }, async function batchProcess(items: string[]) { + const results = []; + for (const item of items) { + results.push(await processItem(item)); + } + return results; +}); ``` -## Next Steps +## Next steps Once you understand this example, check out: -1. **ETL Job** - Learn data processing patterns with CSV files -2. **File Processing** - Learn parallel execution with `asyncio.gather()` -3. **Data Pipeline** - Learn complex multi-stage workflows -4. **OpenAI Agent** - Learn advanced patterns with AI integration -5. **File Analyzer** - Learn how to call workflows from APIs using Client SDK +1. **ETL Job** - Data processing patterns with CSV files +2. **File Processing** - Parallel execution with fan-out +3. **Data Pipeline** - Complex multi-stage workflows +4. **OpenAI Agent** - Advanced patterns with AI integration +5. **File Analyzer** - Calling workflows from APIs using the Client SDK ## Troubleshooting ### "Task not found" error -Make sure: -- The service is deployed and running -- The task name matches exactly (case-sensitive) -- You're using the correct service slug +- Verify that the service is deployed and running. +- Check that the task name matches exactly (case-sensitive). +- Confirm you're using the correct service slug. ### Import errors -Make sure: -- `requirements.txt` includes `render-sdk>=0.2.0` -- Build command is running correctly -- Python version is 3.10 or higher +- **Python**: Verify that `requirements.txt` includes `render-sdk>=0.2.0` and Python 3.10+ is installed. +- **TypeScript**: Run `npm install` and verify Node.js 18+ is installed. ### Subtask calls not working -Make sure: -- Your task function is marked `async` -- You're using `await` before the task call -- Both tasks are decorated with `@app.task` - -## Important Notes - -- **Python-only**: Workflows are only supported in Python via render-sdk -- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration -- **Service Type**: Deploy as a Workflow service on Render (not Background Worker or Web Service) -- **Async Functions**: Tasks that call subtasks must be declared as `async` +- **Python**: Verify your task function is `async` and you're using `await`. +- **TypeScript**: Verify you're using `await` on subtask calls. ## Resources -- [Render Workflows Documentation](https://docs.render.com/workflows) +- [Render Workflows documentation](https://docs.render.com/workflows) - [Render SDK on PyPI](https://pypi.org/project/render-sdk/) +- [Render SDK on npm](https://www.npmjs.com/package/@renderinc/sdk) - [Render Dashboard](https://dashboard.render.com/) - ---- - -**Start simple, build powerful workflows!** diff --git a/hello-world/main.py b/hello-world/python/main.py similarity index 100% rename from hello-world/main.py rename to hello-world/python/main.py diff --git a/hello-world/requirements.txt b/hello-world/python/requirements.txt similarity index 100% rename from hello-world/requirements.txt rename to hello-world/python/requirements.txt diff --git a/hello-world/typescript/package.json b/hello-world/typescript/package.json new file mode 100644 index 0000000..9b3dd68 --- /dev/null +++ b/hello-world/typescript/package.json @@ -0,0 +1,19 @@ +{ + "name": "hello-world-workflow", + "version": "1.0.0", + "description": "Hello World - Getting Started with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7" + }, + "devDependencies": { + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/hello-world/typescript/src/main.ts b/hello-world/typescript/src/main.ts new file mode 100644 index 0000000..01bedee --- /dev/null +++ b/hello-world/typescript/src/main.ts @@ -0,0 +1,89 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; + +// Subtask: doubles a number +const double = task({ name: "double" }, function double(x: number): number { + console.log(`[TASK] Doubling ${x}`); + const result = x * 2; + console.log(`[TASK] Result: ${result}`); + return result; +}); + +// Subtask (also callable as root): doubles two numbers and sums them +const addDoubledNumbers = task( + { name: "addDoubledNumbers" }, + async function addDoubledNumbers(a: number, b: number) { + console.log(`[WORKFLOW] Starting: addDoubledNumbers(${a}, ${b})`); + + const doubledA = await double(a); + const doubledB = await double(b); + const total = doubledA + doubledB; + + const result = { + original_numbers: [a, b], + doubled_numbers: [doubledA, doubledB], + sum_of_doubled: total, + explanation: `${a} doubled is ${doubledA}, ${b} doubled is ${doubledB}, sum is ${total}`, + }; + + console.log("[WORKFLOW] Complete:", result); + return result; + }, +); + +// Subtask (also callable as root): doubles each number in a list +const processNumbers = task( + { name: "processNumbers" }, + async function processNumbers(...numbers: number[]) { + console.log(`[WORKFLOW] Starting: processNumbers(${numbers})`); + + const doubledResults: number[] = []; + + for (let i = 0; i < numbers.length; i++) { + console.log( + `[WORKFLOW] Processing item ${i + 1}/${numbers.length}: ${numbers[i]}`, + ); + const doubled = await double(numbers[i]); + doubledResults.push(doubled); + } + + const result = { + original_numbers: numbers, + doubled_numbers: doubledResults, + count: numbers.length, + explanation: `Processed ${numbers.length} numbers through the double subtask`, + }; + + console.log("[WORKFLOW] Complete:", result); + return result; + }, +); + +// Root task: chains addDoubledNumbers and processNumbers +task( + { name: "calculateAndProcess" }, + async function calculateAndProcess( + a: number, + b: number, + ...moreNumbers: number[] + ) { + console.log("[WORKFLOW] Starting multi-step workflow"); + + console.log("[WORKFLOW] Step 1: Adding doubled numbers"); + const step1Result = await addDoubledNumbers(a, b); + + console.log("[WORKFLOW] Step 2: Processing number list"); + const step2Result = await processNumbers(...moreNumbers); + + console.log("[WORKFLOW] Step 3: Combining results"); + const finalResult = { + step1_sum: step1Result.sum_of_doubled, + step2_doubled: step2Result.doubled_numbers, + total_operations: 2 + moreNumbers.length, + summary: `Added doubled ${a} and ${b}, then doubled ${moreNumbers.length} more numbers`, + }; + + console.log("[WORKFLOW] Multi-step workflow complete"); + return finalResult; + }, +); diff --git a/hello-world/typescript/tsconfig.json b/hello-world/typescript/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/hello-world/typescript/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +} diff --git a/openai-agent/README.md b/openai-agent/README.md index b52f75c..72f8a9f 100644 --- a/openai-agent/README.md +++ b/openai-agent/README.md @@ -39,71 +39,67 @@ multi_turn_conversation (orchestrator) └── search_knowledge_base ``` -## Local Development +## Local development ### Prerequisites -- Python 3.10+ + +- Python 3.10+ (for the Python version) +- Node.js 18+ (for the TypeScript version) - OpenAI API key -### Setup and Run +### Python ```bash -# Navigate to example directory -cd openai-agent - -# Install dependencies +cd openai-agent/python pip install -r requirements.txt +cp .env.example .env # then edit .env with your keys +python main.py +``` -# Set OpenAI API key -export OPENAI_API_KEY="your-openai-api-key" +### TypeScript -# Run the workflow service -python main.py +```bash +cd openai-agent/typescript +npm install +cp .env.example .env # then edit .env with your keys +npm run dev ``` ## Deploying to Render -### Service Configuration +### Service configuration -**Service Type**: Workflow +**Service type**: Workflow -**Build Command**: -```bash -cd openai-agent && pip install -r requirements.txt -``` - -**Start Command**: -```bash -cd openai-agent && python main.py -``` +**Python:** -### Environment Variables +| Setting | Value | +|---|---| +| Build command | `cd openai-agent/python && pip install -r requirements.txt` | +| Start command | `cd openai-agent/python && python main.py` | -Required: -- `RENDER_API_KEY` - Your Render API key (from Render dashboard) -- `OPENAI_API_KEY` - Your OpenAI API key (from OpenAI platform) +**TypeScript:** -### Deployment Steps +| Setting | Value | +|---|---| +| Build command | `cd openai-agent/typescript && npm install && npm run build` | +| Start command | `cd openai-agent/typescript && npm start` | -1. **Create Workflow Service** - - Go to Render Dashboard - - Click "New +" → "Workflow" - - Connect your repository - - Name: `openai-agent-workflows` +### Environment variables -2. **Configure Build Settings** - - Build Command: `cd openai-agent && pip install -r requirements.txt` - - Start Command: `cd openai-agent && python main.py` +| Variable | Description | +|---|---| +| `RENDER_API_KEY` | Your Render API key (from Render Dashboard) | +| `OPENAI_API_KEY` | Your OpenAI API key (from [OpenAI platform](https://platform.openai.com/api-keys)) | -3. **Set Environment Variables** - - Add `RENDER_API_KEY` in the Environment section - - Add `OPENAI_API_KEY` in the Environment section - - Get Render API key from: Render Dashboard → Account Settings → API Keys - - Get OpenAI API key from: https://platform.openai.com/api-keys +### Deployment steps -4. **Deploy** - - Click "Create Workflow" - - Render will build and start your workflow service +1. Go to Render Dashboard. +1. Click **New +** > **Workflow**. +1. Connect your repository. +1. Set the build and start commands for your chosen language. +1. Add `RENDER_API_KEY` and `OPENAI_API_KEY` in the Environment section. +1. Click **Create Workflow**. ## Testing in Render Dashboard @@ -370,11 +366,9 @@ User: "What is your shipping policy?" Agent: "We offer free shipping on orders over $50. Standard shipping takes 3-5 business days, and express shipping is available for $15 with delivery in 1-2 business days." ``` -## Important Notes +## Important notes -- **Python-only**: Workflows are only supported in Python via render-sdk -- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration -- **OpenAI Costs**: Be mindful of API costs when running the agent frequently -- **Model Selection**: Currently uses GPT-4; can be changed to GPT-3.5-turbo for cost savings -- **Tool Safety**: In production, add authorization checks before executing sensitive tools -- **Rate Limiting**: Consider implementing rate limits for production deployments +- **OpenAI costs**: Be mindful of API costs when running the agent frequently. +- **Model selection**: Currently uses GPT-4; can be changed to GPT-3.5-turbo for cost savings. +- **Tool safety**: In production, add authorization checks before executing sensitive tools. +- **Rate limiting**: Consider implementing rate limits for production deployments. diff --git a/openai-agent/python/.env.example b/openai-agent/python/.env.example new file mode 100644 index 0000000..5e56631 --- /dev/null +++ b/openai-agent/python/.env.example @@ -0,0 +1,2 @@ +OPENAI_API_KEY=your-openai-api-key +RENDER_API_KEY=your-render-api-key diff --git a/openai-agent/main.py b/openai-agent/python/main.py similarity index 99% rename from openai-agent/main.py rename to openai-agent/python/main.py index fdc0faa..5b3ff9c 100644 --- a/openai-agent/main.py +++ b/openai-agent/python/main.py @@ -18,6 +18,10 @@ import os from datetime import datetime +from dotenv import load_dotenv + +load_dotenv() + from render_sdk import Retry, Workflows # Configure logging diff --git a/openai-agent/requirements.txt b/openai-agent/python/requirements.txt similarity index 60% rename from openai-agent/requirements.txt rename to openai-agent/python/requirements.txt index 3ea8c21..9b85556 100644 --- a/openai-agent/requirements.txt +++ b/openai-agent/python/requirements.txt @@ -1,2 +1,3 @@ render-sdk>=0.2.0 openai>=1.0.0 +python-dotenv>=1.0.0 diff --git a/openai-agent/typescript/.env.example b/openai-agent/typescript/.env.example new file mode 100644 index 0000000..5e56631 --- /dev/null +++ b/openai-agent/typescript/.env.example @@ -0,0 +1,2 @@ +OPENAI_API_KEY=your-openai-api-key +RENDER_API_KEY=your-render-api-key diff --git a/openai-agent/typescript/package.json b/openai-agent/typescript/package.json new file mode 100644 index 0000000..bf16cc6 --- /dev/null +++ b/openai-agent/typescript/package.json @@ -0,0 +1,20 @@ +{ + "name": "openai-agent-workflow", + "version": "1.0.0", + "description": "OpenAI Agent - AI customer support agent with Render Workflows (TypeScript)", + "type": "module", + "scripts": { + "build": "tsc", + "start": "node dist/main.js", + "dev": "tsx src/main.ts" + }, + "dependencies": { + "@renderinc/sdk": "latest", + "dotenv": "^16.4.7", + "openai": "^4.77.0" + }, + "devDependencies": { + "tsx": "^4.19.0", + "typescript": "^5.7.0" + } +} diff --git a/openai-agent/typescript/src/main.ts b/openai-agent/typescript/src/main.ts new file mode 100644 index 0000000..420e63a --- /dev/null +++ b/openai-agent/typescript/src/main.ts @@ -0,0 +1,344 @@ +import "dotenv/config"; +import { task } from "@renderinc/sdk/workflows"; +import OpenAI from "openai"; +import type { ChatCompletionMessageParam, ChatCompletionTool } from "openai/resources/chat/completions"; + +const retry = { + maxRetries: 3, + waitDurationMs: 2000, + factor: 2.0, +}; + +function createOpenAIClient(): OpenAI { + const apiKey = process.env.OPENAI_API_KEY; + if (!apiKey) { + throw new Error( + "OPENAI_API_KEY environment variable not set. " + + "Please set it in your Render environment variables.", + ); + } + return new OpenAI({ apiKey }); +} + +// ---- Tool Functions ---- + +const getOrderStatus = task( + { name: "getOrderStatus", retry }, + function getOrderStatus(orderId: string) { + console.log(`[TOOL] Looking up order status for: ${orderId}`); + + const mockOrders: { [key: string]: { status: string; tracking: string | null; eta: string } } = { + "ORD-001": { status: "shipped", tracking: "1Z999AA1234567890", eta: "2024-10-15" }, + "ORD-002": { status: "processing", tracking: null, eta: "2024-10-12" }, + "ORD-003": { status: "delivered", tracking: "1Z999AA9876543210", eta: "2024-10-08" }, + }; + + if (orderId in mockOrders) { + const order = mockOrders[orderId]; + console.log(`[TOOL] Order ${orderId} found: ${order.status}`); + return { success: true, order_id: orderId, ...order }; + } + + console.warn(`[TOOL] Order ${orderId} not found`); + return { success: false, order_id: orderId, error: "Order not found" }; + }, +); + +const processRefund = task( + { name: "processRefund", retry }, + function processRefund(orderId: string, reason: string) { + console.log(`[TOOL] Processing refund for order: ${orderId}`); + console.log(`[TOOL] Refund reason: ${reason}`); + + const refundId = `REF-${orderId}-${new Date().toISOString().replace(/[-:T.]/g, "").slice(0, 14)}`; + + const result = { + success: true, + refund_id: refundId, + order_id: orderId, + reason, + amount: 99.99, + processed_at: new Date().toISOString(), + }; + + console.log(`[TOOL] Refund processed: ${refundId}`); + return result; + }, +); + +const searchKnowledgeBase = task( + { name: "searchKnowledgeBase", retry }, + function searchKnowledgeBase(query: string) { + console.log(`[TOOL] Searching knowledge base: ${query}`); + + const knowledge: { [key: string]: { title: string; content: string } } = { + shipping: { + title: "Shipping Policy", + content: + "We offer free shipping on orders over $50. Standard shipping takes 3-5 business days. Express shipping is available for $15 and takes 1-2 business days.", + }, + returns: { + title: "Return Policy", + content: + "We accept returns within 30 days of purchase. Items must be unused and in original packaging. Refunds are processed within 5-7 business days.", + }, + warranty: { + title: "Warranty Information", + content: + "All products come with a 1-year manufacturer warranty. Extended warranties are available for purchase.", + }, + }; + + const queryLower = query.toLowerCase(); + const matches = Object.entries(knowledge) + .filter( + ([key, article]) => + queryLower.includes(key) || + queryLower.split(" ").some((word) => article.content.toLowerCase().includes(word)), + ) + .map(([, article]) => article); + + console.log(`[TOOL] Found ${matches.length} knowledge base articles`); + return { success: true, query, results: matches, count: matches.length }; + }, +); + +// ---- Agent Tasks ---- + +const tools: ChatCompletionTool[] = [ + { + type: "function", + function: { + name: "get_order_status", + description: "Look up the status of a customer order by order ID", + parameters: { + type: "object", + properties: { + order_id: { type: "string", description: "The order ID (e.g., ORD-001)" }, + }, + required: ["order_id"], + }, + }, + }, + { + type: "function", + function: { + name: "process_refund", + description: "Process a refund for an order", + parameters: { + type: "object", + properties: { + order_id: { type: "string", description: "The order ID to refund" }, + reason: { type: "string", description: "Reason for the refund" }, + }, + required: ["order_id", "reason"], + }, + }, + }, + { + type: "function", + function: { + name: "search_knowledge_base", + description: "Search the knowledge base for help articles and information", + parameters: { + type: "object", + properties: { + query: { type: "string", description: "The search query" }, + }, + required: ["query"], + }, + }, + }, +]; + +const callLlmWithTools = task( + { name: "callLlmWithTools", retry }, + async function callLlmWithTools( + messages: ChatCompletionMessageParam[], + toolDefs: ChatCompletionTool[], + model: string = "gpt-4", + ) { + console.log(`[AGENT] Calling ${model} with ${toolDefs.length} tools available`); + + const client = createOpenAIClient(); + + const response = await client.chat.completions.create({ + model, + messages, + tools: toolDefs, + tool_choice: "auto", + }); + + const message = response.choices[0].message; + const result: { + content: string | null; + tool_calls: { id: string; type: string; function: { name: string; arguments: string } }[]; + } = { content: message.content, tool_calls: [] }; + + if (message.tool_calls) { + result.tool_calls = message.tool_calls.map((tc) => ({ + id: tc.id, + type: "function", + function: { name: tc.function.name, arguments: tc.function.arguments }, + })); + console.log(`[AGENT] Model requested ${result.tool_calls.length} tool calls`); + } + + return result; + }, +); + +const executeTool = task( + { name: "executeTool", retry }, + async function executeTool(toolName: string, args: { [key: string]: string }) { + console.log(`[AGENT] Executing tool: ${toolName}`); + + try { + switch (toolName) { + case "get_order_status": + return await getOrderStatus(args.order_id); + case "process_refund": + return await processRefund(args.order_id, args.reason); + case "search_knowledge_base": + return await searchKnowledgeBase(args.query); + default: + console.error(`[AGENT] Unknown tool: ${toolName}`); + return { error: `Unknown tool: ${toolName}` }; + } + } catch (error) { + console.error(`[AGENT] Tool execution failed: ${error}`); + return { error: String(error) }; + } + }, +); + +const agentTurn = task( + { name: "agentTurn", retry }, + async function agentTurn( + userMessage: string, + conversationHistory: ChatCompletionMessageParam[] = [], + ) { + console.log("[AGENT TURN] Starting agent turn"); + + if (typeof userMessage !== "string") { + return { + success: false, + error: `user_message must be a string, got ${typeof userMessage}`, + response: "I'm sorry, there was an error processing your message. Please try again.", + }; + } + + const systemMessage: ChatCompletionMessageParam = { + role: "system", + content: + "You are a helpful customer support agent. You can look up order " + + "status, process refunds, and search the knowledge base for information. " + + "Be polite, professional, and helpful. Use tools when necessary to " + + "assist the customer.", + }; + + const messages: ChatCompletionMessageParam[] = [ + systemMessage, + ...conversationHistory, + { role: "user", content: userMessage }, + ]; + + const llmResponse = await callLlmWithTools(messages, tools); + + if (!llmResponse.tool_calls.length) { + console.log("[AGENT TURN] No tool calls, returning response"); + return { + response: llmResponse.content, + conversation_history: [ + ...conversationHistory, + { role: "user" as const, content: userMessage }, + { role: "assistant" as const, content: llmResponse.content }, + ], + tool_calls: [], + }; + } + + console.log(`[AGENT TURN] Executing ${llmResponse.tool_calls.length} tool calls`); + const toolResults: { tool: string; result: unknown }[] = []; + + for (const toolCall of llmResponse.tool_calls) { + const result = await executeTool( + toolCall.function.name, + JSON.parse(toolCall.function.arguments), + ); + toolResults.push({ tool: toolCall.function.name, result }); + } + + const toolMessages: ChatCompletionMessageParam[] = llmResponse.tool_calls.map((tc, i) => ({ + role: "tool" as const, + tool_call_id: tc.id, + content: JSON.stringify(toolResults[i].result), + })); + + const finalMessages: ChatCompletionMessageParam[] = [ + ...messages, + { + role: "assistant" as const, + content: llmResponse.content, + tool_calls: llmResponse.tool_calls.map((tc) => ({ + id: tc.id, + type: "function" as const, + function: { name: tc.function.name, arguments: tc.function.arguments }, + })), + }, + ...toolMessages, + ]; + + const finalResponse = await callLlmWithTools(finalMessages, tools); + + console.log("[AGENT TURN] Agent turn complete"); + + return { + response: finalResponse.content, + conversation_history: [ + ...conversationHistory, + { role: "user" as const, content: userMessage }, + { role: "assistant" as const, content: finalResponse.content }, + ], + tool_calls: toolResults, + }; + }, +); + +// Root task: multi-turn conversation +task( + { name: "multiTurnConversation", retry, timeoutSeconds: 300 }, + async function multiTurnConversation(...messages: string[]) { + console.log("=".repeat(80)); + console.log(`[CONVERSATION] Starting multi-turn conversation with ${messages.length} messages`); + console.log("=".repeat(80)); + + let conversationHistory: ChatCompletionMessageParam[] = []; + const responses: { turn: number; user: string; assistant: string | null; tool_calls: unknown[] }[] = []; + + for (let i = 0; i < messages.length; i++) { + console.log(`[CONVERSATION] Turn ${i + 1}/${messages.length}`); + + const turnResult = await agentTurn(messages[i], conversationHistory); + + responses.push({ + turn: i + 1, + user: messages[i], + assistant: turnResult.response, + tool_calls: turnResult.tool_calls ?? [], + }); + + conversationHistory = turnResult.conversation_history ?? []; + } + + console.log("=".repeat(80)); + console.log("[CONVERSATION] Multi-turn conversation complete"); + console.log("=".repeat(80)); + + return { + turns: responses, + total_turns: responses.length, + conversation_history: conversationHistory, + }; + }, +); diff --git a/openai-agent/typescript/tsconfig.json b/openai-agent/typescript/tsconfig.json new file mode 100644 index 0000000..bc74857 --- /dev/null +++ b/openai-agent/typescript/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "Node16", + "moduleResolution": "Node16", + "outDir": "dist", + "rootDir": "src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "declaration": true + }, + "include": ["src"] +}