Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions cycode/cli/apps/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@
logger = get_logger('Code Scanner')


class _UploadProgressAggregator:
"""Aggregates upload progress across parallel batch uploads for display in the progress bar."""

def __init__(self, progress_bar: 'BaseProgressBar') -> None:
self._progress_bar = progress_bar
self._slots: list[list[int]] = []

def create_callback(self) -> Callable[[int, int], None]:
"""Create a progress callback for one batch upload. Each batch gets its own slot."""
slot = [0, 0]
self._slots.append(slot)

def on_upload_progress(bytes_read: int, total_bytes: int) -> None:
slot[0] = bytes_read
slot[1] = total_bytes

# Sum across all batch slots to show combined progress
total_read = sum(s[0] for s in self._slots)
total_size = sum(s[1] for s in self._slots)

if total_read >= total_size:
self._progress_bar.update_right_side_label(None)
else:
mb_read = total_read / (1024 * 1024)
mb_total = total_size / (1024 * 1024)
self._progress_bar.update_right_side_label(f'Uploading {mb_read:.1f} / {mb_total:.1f} MB')

return on_upload_progress


def scan_disk_files(ctx: typer.Context, paths: tuple[str, ...]) -> None:
scan_type = ctx.obj['scan_type']
progress_bar = ctx.obj['progress_bar']
Expand Down Expand Up @@ -121,6 +151,9 @@ def _get_scan_documents_thread_func(
severity_threshold = ctx.obj['severity_threshold']
sync_option = ctx.obj['sync']
command_scan_type = ctx.info_name
progress_bar = ctx.obj['progress_bar']

aggregator = _UploadProgressAggregator(progress_bar)

def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, LocalScanResult]:
local_scan_result = error = error_message = None
Expand All @@ -143,6 +176,7 @@ def _scan_batch_thread_func(batch: list[Document]) -> tuple[str, CliError, Local
is_commit_range,
scan_parameters,
should_use_sync_flow,
on_upload_progress=aggregator.create_callback(),
)

enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_result)
Expand Down Expand Up @@ -268,11 +302,14 @@ def _perform_scan_v4_async(
scan_parameters: dict,
is_git_diff: bool,
is_commit_range: bool,
on_upload_progress: Optional[Callable] = None,
) -> ZippedFileScanResult:
upload_link = cycode_client.get_upload_link(scan_type)
logger.debug('Got upload link, %s', {'upload_id': upload_link.upload_id})

cycode_client.upload_to_presigned_post(upload_link.url, upload_link.presigned_post_fields, zipped_documents)
cycode_client.upload_to_presigned_post(
upload_link.url, upload_link.presigned_post_fields, zipped_documents, on_upload_progress
)
logger.debug('Uploaded zip to presigned URL')

scan_async_result = cycode_client.scan_repository_from_upload_id(
Expand All @@ -292,9 +329,14 @@ def _perform_scan_async(
scan_type: str,
scan_parameters: dict,
is_commit_range: bool,
on_upload_progress: Optional[Callable] = None,
) -> ZippedFileScanResult:
scan_async_result = cycode_client.zipped_file_scan_async(
zipped_documents, scan_type, scan_parameters, is_commit_range=is_commit_range
zipped_documents,
scan_type,
scan_parameters,
is_commit_range=is_commit_range,
on_upload_progress=on_upload_progress,
)
logger.debug('Async scan request has been triggered successfully, %s', {'scan_id': scan_async_result.scan_id})

Expand Down Expand Up @@ -326,6 +368,7 @@ def _perform_scan(
is_commit_range: bool,
scan_parameters: dict,
should_use_sync_flow: bool = False,
on_upload_progress: Optional[Callable] = None,
) -> ZippedFileScanResult:
if should_use_sync_flow:
# it does not support commit range scans; should_use_sync_flow handles it
Expand All @@ -334,12 +377,20 @@ def _perform_scan(
if should_use_presigned_upload(scan_type):
try:
return _perform_scan_v4_async(
cycode_client, zipped_documents, scan_type, scan_parameters, is_git_diff, is_commit_range
cycode_client,
zipped_documents,
scan_type,
scan_parameters,
is_git_diff,
is_commit_range,
on_upload_progress,
)
except requests.exceptions.RequestException:
logger.warning('Direct upload to object storage failed. Falling back to upload via Cycode API. ')

return _perform_scan_async(cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range)
return _perform_scan_async(
cycode_client, zipped_documents, scan_type, scan_parameters, is_commit_range, on_upload_progress
)


def poll_scan_results(
Expand Down
11 changes: 11 additions & 0 deletions cycode/cli/exceptions/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def __str__(self) -> str:
return f'HTTP unauthorized error occurred during the request. Message: {self.error_message}'


class SlowUploadConnectionError(CycodeError):
def __str__(self) -> str:
return 'Upload was interrupted mid-transfer, indicating a slow or unstable network connection.'


class ZipTooLargeError(CycodeError):
def __init__(self, size_limit: int) -> None:
self.size_limit = size_limit
Expand Down Expand Up @@ -102,6 +107,12 @@ def __str__(self) -> str:
code='timeout_error',
message='The request timed out. Please try again by executing the `cycode scan` command',
),
SlowUploadConnectionError: CliError(
soft_fail=True,
code='slow_upload_error',
message='The scan upload was interrupted. This is likely due to a slow or unstable network connection. '
'Please try again by executing the `cycode scan` command',
),
HttpUnauthorizedError: CliError(
soft_fail=True,
code='auth_error',
Expand Down
85 changes: 85 additions & 0 deletions cycode/cyclient/cycode_client_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import platform
import ssl
from io import BytesIO
from typing import TYPE_CHECKING, Callable, ClassVar, Optional

import requests
Expand All @@ -15,6 +16,7 @@
RequestHttpError,
RequestSslError,
RequestTimeoutError,
SlowUploadConnectionError,
)
from cycode.cyclient import config
from cycode.cyclient.headers import get_cli_user_agent, get_correlation_id
Expand Down Expand Up @@ -90,6 +92,23 @@ def _should_retry_exception(exception: BaseException) -> bool:
return is_request_error or is_server_error


class UploadProgressTracker:
"""File-like wrapper that tracks bytes read during upload and fires a progress callback."""

def __init__(self, data: bytes, callback: Optional[Callable[[int, int], None]]) -> None:
self._io = BytesIO(data)
self._callback = callback
self.bytes_read = 0
self.len = len(data)

def read(self, size: int = -1) -> bytes:
chunk = self._io.read(size)
self.bytes_read += len(chunk)
if self._callback and chunk:
self._callback(self.bytes_read, self.len)
return chunk


class CycodeClientBase:
MANDATORY_HEADERS: ClassVar[dict[str, str]] = {
'User-Agent': get_cli_user_agent(),
Expand Down Expand Up @@ -117,6 +136,72 @@ def put(self, url_path: str, body: Optional[dict] = None, headers: Optional[dict
def get(self, url_path: str, headers: Optional[dict] = None, **kwargs) -> Response:
return self._execute(method='get', endpoint=url_path, headers=headers, **kwargs)

def post_multipart(
self,
url_path: str,
form_fields: dict,
files: dict,
on_upload_progress: Optional[Callable[[int, int], None]] = None,
hide_response_content_log: bool = False,
) -> Response:
"""POST a multipart form body with optional upload progress tracking and retry."""
url = self.build_full_url(self.api_url, url_path)
logger.debug('Executing request, %s', {'method': 'POST', 'url': url})

# Encode the multipart body once up front so we can reuse the same bytes across retries.
# A dummy URL is used because requests.Request requires one, but only the encoded body matters here.
prepared = requests.Request('POST', 'https://dummy', data=form_fields, files=files).prepare()

return self._send_multipart(
url=url,
body=prepared.body,
content_type=prepared.headers['Content-Type'],
on_upload_progress=on_upload_progress,
hide_response_content_log=hide_response_content_log,
)

@retry(
retry=retry_if_exception(_should_retry_exception),
stop=_RETRY_STOP_STRATEGY,
wait=_RETRY_WAIT_STRATEGY,
reraise=True,
before_sleep=_retry_before_sleep,
)
def _send_multipart(
self,
url: str,
body: bytes,
content_type: str,
on_upload_progress: Optional[Callable[[int, int], None]],
hide_response_content_log: bool,
) -> Response:
# Wrap the body in a fresh tracker each attempt so bytes_read starts from zero.
tracker = UploadProgressTracker(body, on_upload_progress)
headers = self.get_request_headers({'Content-Type': content_type})
try:
response = _get_request_function()(
method='post', url=url, data=tracker, headers=headers, timeout=self.timeout
)

content = 'HIDDEN' if hide_response_content_log else response.text
logger.debug(
'Receiving response, %s',
{'status_code': response.status_code, 'url': url, 'content': content},
)

response.raise_for_status()
return response
except (exceptions.ChunkedEncodingError, exceptions.ConnectionError) as e:
# A connection drop before the full body was sent indicates a slow/unstable network.
if tracker.bytes_read < tracker.len:
raise SlowUploadConnectionError from e
# Full body was sent — map to our types so _should_retry_exception handles retry logic.
if isinstance(e, exceptions.ConnectionError):
raise RequestConnectionError from e
raise
except Exception as e:
self._handle_exception(e)

@retry(
retry=retry_if_exception(_should_retry_exception),
stop=_RETRY_STOP_STRATEGY,
Expand Down
52 changes: 38 additions & 14 deletions cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import json
from copy import deepcopy
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Callable, Optional, Union
from uuid import UUID

import requests
from requests import Response

from cycode.cli import consts
from cycode.cli.config import configuration_manager
from cycode.cli.exceptions.custom_exceptions import CycodeError, RequestHttpError
from cycode.cli.exceptions.custom_exceptions import (
CycodeError,
RequestHttpError,
SlowUploadConnectionError,
)
from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip
from cycode.cyclient import models
from cycode.cyclient.cycode_client_base import CycodeClientBase
from cycode.cyclient.cycode_client_base import CycodeClientBase, UploadProgressTracker
from cycode.cyclient.logger import logger

if TYPE_CHECKING:
Expand Down Expand Up @@ -114,18 +118,18 @@ def zipped_file_scan_async(
scan_parameters: dict,
is_git_diff: bool = False,
is_commit_range: bool = False,
on_upload_progress: Optional[Callable[[int, int], None]] = None,
) -> models.ScanInitializationResponse:
files = {'file': ('multiple_files_scan.zip', zip_file.read())}

response = self.scan_cycode_client.post(
response = self.scan_cycode_client.post_multipart(
url_path=self.get_zipped_file_scan_async_url_path(scan_type),
data={
form_fields={
'is_git_diff': is_git_diff,
'scan_parameters': json.dumps(scan_parameters),
'is_commit_range': is_commit_range,
'compression_manifest': self._create_compression_manifest_string(zip_file),
},
files=files,
files={'file': ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream')},
on_upload_progress=on_upload_progress,
)
return models.ScanInitializationResponseSchema().load(response.json())

Expand All @@ -135,12 +139,32 @@ def get_upload_link(self, scan_type: str) -> models.UploadLinkResponse:
response = self.scan_cycode_client.get(url_path=url_path, hide_response_content_log=self._hide_response_log)
return models.UploadLinkResponseSchema().load(response.json())

def upload_to_presigned_post(self, url: str, fields: dict[str, str], zip_file: 'InMemoryZip') -> None:
multipart = {key: (None, value) for key, value in fields.items()}
multipart['file'] = (None, zip_file.read())
# We are not using Cycode client, as we are calling aws S3.
response = requests.post(url, files=multipart, timeout=self.scan_cycode_client.timeout)
response.raise_for_status()
def upload_to_presigned_post(
self,
url: str,
fields: dict[str, str],
zip_file: 'InMemoryZip',
on_upload_progress: Optional[Callable[[int, int], None]] = None,
) -> None:
all_files = {key: (None, value) for key, value in fields.items()}
all_files['file'] = ('multiple_files_scan.zip', zip_file.read(), 'application/octet-stream')

prepared = requests.Request('POST', 'https://dummy', files=all_files).prepare()
tracker = UploadProgressTracker(prepared.body, on_upload_progress)

try:
# We are not using Cycode client, as we are calling aws S3.
response = requests.post(
url,
data=tracker,
headers={'Content-Type': prepared.headers['Content-Type']},
timeout=self.scan_cycode_client.timeout,
)
response.raise_for_status()
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as e:
if tracker.bytes_read < tracker.len:
raise SlowUploadConnectionError from e
raise

def scan_repository_from_upload_id(
self,
Expand Down
27 changes: 27 additions & 0 deletions tests/cyclient/test_scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import requests
import responses
from pytest_mock import MockerFixture
from requests.exceptions import ConnectionError as RequestsConnectionError

from cycode.cli.cli_types import ScanTypeOption
Expand All @@ -12,6 +13,7 @@
HttpUnauthorizedError,
RequestConnectionError,
RequestTimeoutError,
SlowUploadConnectionError,
)
from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip
from cycode.cli.models import Document
Expand Down Expand Up @@ -168,3 +170,28 @@ def test_get_scan_details(
scan_details_response = scan_client.get_scan_details(scan_type, str(scan_id))
assert scan_details_response.id == str(scan_id)
assert scan_details_response.scan_status == 'Completed'


@pytest.mark.parametrize('scan_type', list(ScanTypeOption))
def test_zipped_file_scan_async_slow_upload_error(
scan_type: ScanTypeOption, scan_client: ScanClient, mocker: MockerFixture
) -> None:
"""Test that a connection failure mid-transfer raises SlowUploadConnectionError."""
zip_file = get_test_zip_file(scan_type)

def _partial_upload_then_fail(**kwargs) -> None:
# Read only a small portion of the body to simulate a partial upload
data = kwargs.get('data')
if data is not None:
data.read(10)
raise requests.exceptions.ChunkedEncodingError('Connection broken mid-transfer')

mocker.patch('cycode.cyclient.cycode_client_base._get_request_function', return_value=_partial_upload_then_fail)
mocker.patch.object(
scan_client.scan_cycode_client,
'get_request_headers',
return_value={'Authorization': 'Bearer test'},
)

with pytest.raises(SlowUploadConnectionError):
scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={})
Loading