diff --git a/cycode/cli/apps/scan/code_scanner.py b/cycode/cli/apps/scan/code_scanner.py index 4e551f68..35ed1d03 100644 --- a/cycode/cli/apps/scan/code_scanner.py +++ b/cycode/cli/apps/scan/code_scanner.py @@ -47,6 +47,36 @@ logger = get_logger('Code Scanner') +class _UploadProgressAggregator: + """Aggregates upload progress across parallel batch uploads for display in the progress bar.""" + + def __init__(self, progress_bar: 'BaseProgressBar') -> None: + self._progress_bar = progress_bar + self._slots: list[list[int]] = [] + + def create_callback(self) -> Callable[[int, int], None]: + """Create a progress callback for one batch upload. Each batch gets its own slot.""" + slot = [0, 0] + self._slots.append(slot) + + def on_upload_progress(bytes_read: int, total_bytes: int) -> None: + slot[0] = bytes_read + slot[1] = total_bytes + + # Sum across all batch slots to show combined progress + total_read = sum(s[0] for s in self._slots) + total_size = sum(s[1] for s in self._slots) + + if total_read >= total_size: + self._progress_bar.update_right_side_label(None) + else: + mb_read = total_read / (1024 * 1024) + mb_total = total_size / (1024 * 1024) + self._progress_bar.update_right_side_label(f'Uploading {mb_read:.1f} / {mb_total:.1f} MB') + + return on_upload_progress + + def scan_disk_files(ctx: typer.Context, paths: tuple[str, ...]) -> None: scan_type = ctx.obj['scan_type'] progress_bar = ctx.obj['progress_bar'] @@ -121,6 +151,9 @@ def _get_scan_documents_thread_func( severity_threshold = ctx.obj['severity_threshold'] sync_option = ctx.obj['sync'] command_scan_type = ctx.info_name + progress_bar = ctx.obj['progress_bar'] + + aggregator = _UploadProgressAggregator(progress_bar) def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, LocalScanResult]: local_scan_result = error = error_message = None @@ -143,6 +176,7 @@ def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, Local is_commit_range, scan_parameters, should_use_sync_flow, + on_upload_progress=aggregator.create_callback(), ) enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_result) @@ -268,11 +302,14 @@ def _perform_scan_v4_async( scan_parameters: dict, is_git_diff: bool, is_commit_range: bool, + on_upload_progress: Optional[Callable] = None, ) -> ZippedFileScanResult: upload_link = cycode_client.get_upload_link(scan_type) logger.debug('Got upload link, %s', {'upload_id': upload_link.upload_id}) - cycode_client.upload_to_presigned_post(upload_link.url, upload_link.presigned_post_fields, zipped_documents) + cycode_client.upload_to_presigned_post( + upload_link.url, upload_link.presigned_post_fields, zipped_documents, on_upload_progress + ) logger.debug('Uploaded zip to presigned URL') scan_async_result = cycode_client.scan_repository_from_upload_id( @@ -292,9 +329,14 @@ def _perform_scan_async( scan_type: str, scan_parameters: dict, is_commit_range: bool, + on_upload_progress: Optional[Callable] = None, ) -> ZippedFileScanResult: scan_async_result = cycode_client.zipped_file_scan_async( - zipped_documents, scan_type, scan_parameters, is_commit_range=is_commit_range + zipped_documents, + scan_type, + scan_parameters, + is_commit_range=is_commit_range, + on_upload_progress=on_upload_progress, ) logger.debug('Async scan request has been triggered successfully, %s', {'scan_id': scan_async_result.scan_id}) @@ -326,6 +368,7 @@ def _perform_scan( is_commit_range: bool, scan_parameters: dict, should_use_sync_flow: bool = False, + on_upload_progress: Optional[Callable] = None, ) -> ZippedFileScanResult: if should_use_sync_flow: # it does not support commit range scans; should_use_sync_flow handles it @@ -334,12 +377,20 @@ def _perform_scan( if should_use_presigned_upload(scan_type): try: return _perform_scan_v4_async( - cycode_client, zipped_documents, scan_type, scan_parameters, is_git_diff, is_commit_range + cycode_client, + zipped_documents, + scan_type, + scan_parameters, + is_git_diff, + is_commit_range, + on_upload_progress, ) except requests.exceptions.RequestException: logger.warning('Direct upload to object storage failed. Falling back to upload via Cycode API. ') - return _perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range) + return _perform_scan_async( + cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range, on_upload_progress + ) def poll_scan_results( diff --git a/cycode/cli/exceptions/custom_exceptions.py b/cycode/cli/exceptions/custom_exceptions.py index 1200e559..4a874c1f 100644 --- a/cycode/cli/exceptions/custom_exceptions.py +++ b/cycode/cli/exceptions/custom_exceptions.py @@ -55,6 +55,11 @@ def __str__(self) -> str: return f'HTTP unauthorized error occurred during the request. Message: {self.error_message}' +class SlowUploadConnectionError(CycodeError): + def __str__(self) -> str: + return 'Upload was interrupted mid-transfer, indicating a slow or unstable network connection.' + + class ZipTooLargeError(CycodeError): def __init__(self, size_limit: int) -> None: self.size_limit = size_limit @@ -102,6 +107,12 @@ def __str__(self) -> str: code='timeout_error', message='The request timed out. Please try again by executing the `cycode scan` command', ), + SlowUploadConnectionError: CliError( + soft_fail=True, + code='slow_upload_error', + message='The scan upload was interrupted. This is likely due to a slow or unstable network connection. ' + 'Please try again by executing the `cycode scan` command', + ), HttpUnauthorizedError: CliError( soft_fail=True, code='auth_error', diff --git a/cycode/cyclient/cycode_client_base.py b/cycode/cyclient/cycode_client_base.py index 4b2e2698..1aae7bcb 100644 --- a/cycode/cyclient/cycode_client_base.py +++ b/cycode/cyclient/cycode_client_base.py @@ -1,6 +1,7 @@ import os import platform import ssl +from io import BytesIO from typing import TYPE_CHECKING, Callable, ClassVar, Optional import requests @@ -15,6 +16,7 @@ RequestHttpError, RequestSslError, RequestTimeoutError, + SlowUploadConnectionError, ) from cycode.cyclient import config from cycode.cyclient.headers import get_cli_user_agent, get_correlation_id @@ -90,6 +92,23 @@ def _should_retry_exception(exception: BaseException) -> bool: return is_request_error or is_server_error +class UploadProgressTracker: + """File-like wrapper that tracks bytes read during upload and fires a progress callback.""" + + def __init__(self, data: bytes, callback: Optional[Callable[[int, int], None]]) -> None: + self._io = BytesIO(data) + self._callback = callback + self.bytes_read = 0 + self.len = len(data) + + def read(self, size: int = -1) -> bytes: + chunk = self._io.read(size) + self.bytes_read += len(chunk) + if self._callback and chunk: + self._callback(self.bytes_read, self.len) + return chunk + + class CycodeClientBase: MANDATORY_HEADERS: ClassVar[dict[str, str]] = { 'User-Agent': get_cli_user_agent(), @@ -117,6 +136,72 @@ def put(self, url_path: str, body: Optional[dict] = None, headers: Optional[dict def get(self, url_path: str, headers: Optional[dict] = None, **kwargs) -> Response: return self._execute(method='get', endpoint=url_path, headers=headers, **kwargs) + def post_multipart( + self, + url_path: str, + form_fields: dict, + files: dict, + on_upload_progress: Optional[Callable[[int, int], None]] = None, + hide_response_content_log: bool = False, + ) -> Response: + """POST a multipart form body with optional upload progress tracking and retry.""" + url = self.build_full_url(self.api_url, url_path) + logger.debug('Executing request, %s', {'method': 'POST', 'url': url}) + + # Encode the multipart body once up front so we can reuse the same bytes across retries. + # A dummy URL is used because requests.Request requires one, but only the encoded body matters here. + prepared = requests.Request('POST', 'https://dummy', data=form_fields, files=files).prepare() + + return self._send_multipart( + url=url, + body=prepared.body, + content_type=prepared.headers['Content-Type'], + on_upload_progress=on_upload_progress, + hide_response_content_log=hide_response_content_log, + ) + + @retry( + retry=retry_if_exception(_should_retry_exception), + stop=_RETRY_STOP_STRATEGY, + wait=_RETRY_WAIT_STRATEGY, + reraise=True, + before_sleep=_retry_before_sleep, + ) + def _send_multipart( + self, + url: str, + body: bytes, + content_type: str, + on_upload_progress: Optional[Callable[[int, int], None]], + hide_response_content_log: bool, + ) -> Response: + # Wrap the body in a fresh tracker each attempt so bytes_read starts from zero. + tracker = UploadProgressTracker(body, on_upload_progress) + headers = self.get_request_headers({'Content-Type': content_type}) + try: + response = _get_request_function()( + method='post', url=url, data=tracker, headers=headers, timeout=self.timeout + ) + + content = 'HIDDEN' if hide_response_content_log else response.text + logger.debug( + 'Receiving response, %s', + {'status_code': response.status_code, 'url': url, 'content': content}, + ) + + response.raise_for_status() + return response + except (exceptions.ChunkedEncodingError, exceptions.ConnectionError) as e: + # A connection drop before the full body was sent indicates a slow/unstable network. + if tracker.bytes_read < tracker.len: + raise SlowUploadConnectionError from e + # Full body was sent — map to our types so _should_retry_exception handles retry logic. + if isinstance(e, exceptions.ConnectionError): + raise RequestConnectionError from e + raise + except Exception as e: + self._handle_exception(e) + @retry( retry=retry_if_exception(_should_retry_exception), stop=_RETRY_STOP_STRATEGY, diff --git a/cycode/cyclient/scan_client.py b/cycode/cyclient/scan_client.py index 24c5ac46..b609c4c0 100644 --- a/cycode/cyclient/scan_client.py +++ b/cycode/cyclient/scan_client.py @@ -1,6 +1,6 @@ import json from copy import deepcopy -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Callable, Optional, Union from uuid import UUID import requests @@ -8,10 +8,14 @@ from cycode.cli import consts from cycode.cli.config import configuration_manager -from cycode.cli.exceptions.custom_exceptions import CycodeError, RequestHttpError +from cycode.cli.exceptions.custom_exceptions import ( + CycodeError, + RequestHttpError, + SlowUploadConnectionError, +) from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip from cycode.cyclient import models -from cycode.cyclient.cycode_client_base import CycodeClientBase +from cycode.cyclient.cycode_client_base import CycodeClientBase, UploadProgressTracker from cycode.cyclient.logger import logger if TYPE_CHECKING: @@ -114,18 +118,18 @@ def zipped_file_scan_async( scan_parameters: dict, is_git_diff: bool = False, is_commit_range: bool = False, + on_upload_progress: Optional[Callable[[int, int], None]] = None, ) -> models.ScanInitializationResponse: - files = {'file': ('multiple_files_scan.zip', zip_file.read())} - - response = self.scan_cycode_client.post( + response = self.scan_cycode_client.post_multipart( url_path=self.get_zipped_file_scan_async_url_path(scan_type), - data={ + form_fields={ 'is_git_diff': is_git_diff, 'scan_parameters': json.dumps(scan_parameters), 'is_commit_range': is_commit_range, 'compression_manifest': self._create_compression_manifest_string(zip_file), }, - files=files, + files={'file': ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream')}, + on_upload_progress=on_upload_progress, ) return models.ScanInitializationResponseSchema().load(response.json()) @@ -135,12 +139,32 @@ def get_upload_link(self, scan_type: str) -> models.UploadLinkResponse: response = self.scan_cycode_client.get(url_path=url_path, hide_response_content_log=self._hide_response_log) return models.UploadLinkResponseSchema().load(response.json()) - def upload_to_presigned_post(self, url: str, fields: dict[str, str], zip_file: 'InMemoryZip') -> None: - multipart = {key: (None, value) for key, value in fields.items()} - multipart['file'] = (None, zip_file.read()) - # We are not using Cycode client, as we are calling aws S3. - response = requests.post(url, files=multipart, timeout=self.scan_cycode_client.timeout) - response.raise_for_status() + def upload_to_presigned_post( + self, + url: str, + fields: dict[str, str], + zip_file: 'InMemoryZip', + on_upload_progress: Optional[Callable[[int, int], None]] = None, + ) -> None: + all_files = {key: (None, value) for key, value in fields.items()} + all_files['file'] = ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream') + + prepared = requests.Request('POST', 'https://dummy', files=all_files).prepare() + tracker = UploadProgressTracker(prepared.body, on_upload_progress) + + try: + # We are not using Cycode client, as we are calling aws S3. + response = requests.post( + url, + data=tracker, + headers={'Content-Type': prepared.headers['Content-Type']}, + timeout=self.scan_cycode_client.timeout, + ) + response.raise_for_status() + except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as e: + if tracker.bytes_read < tracker.len: + raise SlowUploadConnectionError from e + raise def scan_repository_from_upload_id( self, diff --git a/tests/cyclient/test_scan_client.py b/tests/cyclient/test_scan_client.py index d6928118..505d8d50 100644 --- a/tests/cyclient/test_scan_client.py +++ b/tests/cyclient/test_scan_client.py @@ -4,6 +4,7 @@ import pytest import requests import responses +from pytest_mock import MockerFixture from requests.exceptions import ConnectionError as RequestsConnectionError from cycode.cli.cli_types import ScanTypeOption @@ -12,6 +13,7 @@ HttpUnauthorizedError, RequestConnectionError, RequestTimeoutError, + SlowUploadConnectionError, ) from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip from cycode.cli.models import Document @@ -168,3 +170,28 @@ def test_get_scan_details( scan_details_response = scan_client.get_scan_details(scan_type, str(scan_id)) assert scan_details_response.id == str(scan_id) assert scan_details_response.scan_status == 'Completed' + + +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) +def test_zipped_file_scan_async_slow_upload_error( + scan_type: ScanTypeOption, scan_client: ScanClient, mocker: MockerFixture +) -> None: + """Test that a connection failure mid-transfer raises SlowUploadConnectionError.""" + zip_file = get_test_zip_file(scan_type) + + def _partial_upload_then_fail(**kwargs) -> None: + # Read only a small portion of the body to simulate a partial upload + data = kwargs.get('data') + if data is not None: + data.read(10) + raise requests.exceptions.ChunkedEncodingError('Connection broken mid-transfer') + + mocker.patch('cycode.cyclient.cycode_client_base._get_request_function', return_value=_partial_upload_then_fail) + mocker.patch.object( + scan_client.scan_cycode_client, + 'get_request_headers', + return_value={'Authorization': 'Bearer test'}, + ) + + with pytest.raises(SlowUploadConnectionError): + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={})