diff --git a/ing_lib/apps/ProjConfigCreateUpdateCS.py b/ing_lib/apps/ProjConfigCreateUpdateCS.py index 5cfa1d0..1867a77 100644 --- a/ing_lib/apps/ProjConfigCreateUpdateCS.py +++ b/ing_lib/apps/ProjConfigCreateUpdateCS.py @@ -538,7 +538,7 @@ def parse_custom_script_xml(xml_file, base_path): script_elem = script_elements[0] script_name = script_elem.get("script_name") - is_command = script_elem.get('is_command') + is_command = script_elem.get('is_command', 'false').lower() == 'true' # Default to false if not provided script_path = script_elem.get("script_path") description = script_elem.get("description") hash_value = script_elem.get("hash") # Get hash from XML if provided @@ -763,7 +763,7 @@ def parse_custom_script_json(json_file, base_path): def validate_script_data(script_data): """ - Validate that a script data dictionary has all required fields + Validate that a script data dictionary has all required fields and valid values Parameters ---------- @@ -779,6 +779,7 @@ def validate_script_data(script_data): required_fields = ['script_name', 'script_path', 'description', 'script_id', 'is_command'] + # Check for missing required fields for field in required_fields: if not script_data.get(field): logger.error(f"Script data missing required field: {field}") diff --git a/ing_lib/common.py b/ing_lib/common.py index bb2f413..d084178 100644 --- a/ing_lib/common.py +++ b/ing_lib/common.py @@ -95,7 +95,7 @@ def ingenium_rest_get(endpoint): # If needed, refresh the token if _stale_token(): - refresh_endpoint(_extact_server(endpoint), False) + refresh_auth(_extract_server(endpoint), False) data = None @@ -130,7 +130,7 @@ def ingenium_rest_get_paginated(endpoint, query_params={}): # If needed, refresh the token if _stale_token(): - refresh_endpoint(_extact_server(endpoint), False) + refresh_auth(_extract_server(endpoint), False) _INITIAL_LIMIT = 1000 _INITIAL_OFFSET = 0 @@ -352,6 +352,10 @@ def refresh_auth(server, force=False): """ + refresh_time = get_refresh_time() + if refresh_time is None: + raise IngeniumLibError("Cannot refresh token: No refresh time available. Please authenticate first.") + token_time_remaining = (datetime.datetime.utcnow() - get_refresh_time()).total_seconds() if force or _stale_token(): @@ -372,7 +376,7 @@ def refresh_auth(server, force=False): if response_handler(refresh): - _store['token'] = f"Bearer {json.loads(logon.text)['access_token']}" + _store['token'] = f"Bearer {json.loads(refresh.text)['access_token']}" _store['refresh_time'] = datetime.datetime.utcnow() msg = f"Successfully refreshed token with: {server}" logger.debug(msg) @@ -415,6 +419,6 @@ def _stale_token(): return elapsed > _TOKEN_REFRESH_DURATION -def _extact_server(endpoint): +def _extract_server(endpoint): parsed = urlparse(endpoint) return f'{parsed.scheme}://{parsed.netloc}' diff --git a/ing_lib/steps.py b/ing_lib/steps.py index e1b2fdd..8b7a8e9 100644 --- a/ing_lib/steps.py +++ b/ing_lib/steps.py @@ -112,7 +112,7 @@ def apply_bit_mask(input_value, bit_mask, bit_op): return int(output_value) -def check_telemetry_query(query: dict) -> None: +def check_telemetry_query(query: list) -> None: """ This function checks the provided telemetry query to ensure it is well formed @@ -122,7 +122,8 @@ def check_telemetry_query(query: dict) -> None: This is dictionary indexed by channel id with the provide Example: - { 'CMD-1234' : {'verify_wait' : 'WAIT', + {'telem_uuid' : 'CMD-3232', + 'verify_wait' : 'WAIT', 'dn_eu': 'DN', 'verification_condition': 'GREATER_THAN', 'verification_values': ['12'], @@ -134,7 +135,7 @@ def check_telemetry_query(query: dict) -> None: """ - for channel, predict in query.items(): + for predict in query: # Check the Verification Condition and Verification Values @@ -334,37 +335,39 @@ def verify_wait_telemetry(query: dict, telemetry_query_func: callable, start_tim # Check if the query has completed for channel_id, data in results['channels'].items(): + # Get the predict data from the original query + predict = query[channel_id] + # If the verification_condition = NOT_PRESENT only exit on FAIL # Note that this also means that query_timeout = True - if data['predicts']['verification_condition'] in ['NOT_PRESENT']: + if predict['verification_condition'] in ['NOT_PRESENT']: if data['verification_status'] not in ['PASS', 'ERROR']: query_complete = False results['query_matches_predict'] = False else: # If Waiting - mark the query complete if the status equals PASS or ERROR - if data['predicts']['verify_wait'] == 'WAIT': + if predict['verify_wait'] == 'WAIT': if data['verification_status'] in ['FAIL']: query_complete = False results['query_matches_predict'] = False - # If not waiting - the query is already flagged complete - elif data['predicts']['verify_wait'] == 'VERIFY': + elif predict['verify_wait'] == 'VERIFY': if data['verification_status'] in ['FAIL', 'ERROR']: results['query_matches_predict'] = False return results -def evaluate_verify_condition(channels, channel_id, predicts, query_timeout): +def evaluate_verify_condition(telemetry, telem_uuid, predicts, query_timeout): """ This function will apply the bit mask (if appropriate) and evaluate the results based on the predicts Parameters ---------- - channels: list + telemetry: list List of channel objects. The last element is the latest. - channel_id: str + telem_uuid: str Channel id that is being evaluated predicts: dict A dictionary containing bit-masking information and predicts to evaluate against @@ -376,157 +379,177 @@ def evaluate_verify_condition(channels, channel_id, predicts, query_timeout): Dictionary containing the results """ + # Debug: Log function entry and inputs + logger.debug(f"evaluate_verify_condition called for {telem_uuid} with verification_condition: {predicts.get('verification_condition')}") + logger.debug(f"query_timeout: {query_timeout}, telemetry present: {bool(telemetry)}") + # Create a dictionary to captures the results (starting with the predicts) - result = {'channel_id': channel_id, + result = {'telem_uuid': telem_uuid, 'predicts': predicts, 'actual_value': None, 'verification_status': None, - 'channel_details': None} + 'data_present': False, + 'telem_details': None} verification_condition = predicts['verification_condition'] verification_values = predicts.get('verification_values') prior_value = predicts.get('prior_value') - # Check if no EHA values have been returned - if not channels: + # Check if no telem values have been returned + if not telemetry: + # Debug: Log the no-telemetry case + logger.debug(f"No telemetry found for {telem_uuid}, query_timeout: {query_timeout}") + # Check if the query is complete (timed out) and the verification type is NOT_PRESENT if query_timeout and verification_condition == 'NOT_PRESENT': # If so - verification_status = 'PASS' result['verification_status'] = 'PASS' - msg = f'Channel:{channel_id} not located within time range. Verification Type: {predicts["verification_condition"]} Result: {result["verification_status"]}' + msg = f'Channel:{telem_uuid} not located within time range. Verification Type: {predicts["verification_condition"]} Result: {result["verification_status"]}' logger.info(msg) return result # If the verification type is not NOT_PRESENT the verification is FAIL elif query_timeout and verification_condition != 'NOT_PRESENT': result['verification_status'] = 'FAIL' - msg = f'Channel:{channel_id} not located within time range. Verification Type: {predicts["verification_condition"]} Result: {result["verification_status"]}' + msg = f'Channel:{telem_uuid} not located within time range. Verification Type: {predicts["verification_condition"]} Result: {result["verification_status"]}' logger.info(msg) return result + # If no timeout yet, return PENDING status + else: + result['verification_status'] = 'PENDING' + return result - # If EHA values received - time to process them - else: - # Take the most recent value and add it to the results - channel = channels[0] - result['channel_details'] = channel - - # Check that the channel type is compatible with the verification values, the prior values, etc - # check_channel_type(channel, predicts) - - # If the predict is targeting DN - set the evaluation value to that - if predicts['dn_eu'] == 'DN': - result['actual_value'] = channel.get('dn') - - # If the predict is targeting EU - elif predicts['dn_eu'] == 'EU': - # If the channel is enumerated or boolean and EU is selected - use the status value - if channel.get('channelType') in ['STATUS', 'BOOLEAN']: - # Status values are always strings - result['actual_value'] = channel.get('status') - # Otherwise use the EU value - else: - # EU values are always floats - result['actual_value'] = channel.get('eu') - - ##################### - # If there is a bit mask present apply it - if predicts.get('bit_mask') and predicts.get('bit_op'): - msg = f'Applying bit_mask: {predicts.get("bit_mask")} bit_op: {predicts.get("bit_op")} to channel_id: {channel_id} (DN: {channel.get("dn")})' - logger.debug(msg) - - # Try applying the bit mask - try: - masked_value = apply_bit_mask(channel.get('dn'), predicts['bit_mask'], predicts['bit_op']) - except BitMaskError: - msg = f'Error applying specified bit-mask: {predicts.get("bit_mask")}, bit-op: {predicts.get("bit_op")}, to channel: {channel_id}, value: {channel.get("eu")}' - logger.error(msg) - raise InputError(msg) + # If we get here, telemetry was found + telem = telemetry[0] + result['telem_details'] = telem + result['data_present'] = True + + # Debug: Log that we found telemetry + logger.debug(f"Found telemetry for {telem_uuid}: {telem}") + + # If the predict is targeting DN - set the evaluation value to that + if predicts['dn_eu'] == 'DN': + result['actual_value'] = telem.get('raw_value') + + # If the predict is targeting EU + elif predicts['dn_eu'] == 'EU': + result['actual_value'] = telem.get('eng_value') + + # Handle NOT_PRESENT case when telemetry is found + if verification_condition == 'NOT_PRESENT': + result['verification_status'] = 'FAIL' + msg = (f'Channel:{telem_uuid} was found with value {result["actual_value"]} when NOT_PRESENT was expected. ' + f'Verification Status: FAIL') + logger.info(msg) + return result - # Set the evaluation value to the masked value - result['actual_value'] = masked_value + # If there is a bit mask present apply it + if predicts.get('bit_mask') and predicts.get('bit_op'): + msg = f'Applying bit_mask: {predicts.get("bit_mask")} bit_op: {predicts.get("bit_op")} to channel_id: {telem_uuid} (DN: {telem.get("raw_value")})' + logger.debug(msg) - # Check if a prior value was provided - # If so compute revised actual_value based on actual_value - prior_value - if prior_value is not None: - old_actual_value = result['actual_value'] - result['actual_value'] = old_actual_value - prior_value - msg = f'Will evaluate based on the difference between measured value and prior value ({old_actual_value} - {prior_value} = {result["actual_value"]})' - logger.debug(msg) + # Try applying the bit mask + try: + masked_value = apply_bit_mask(telem.get('raw_value'), predicts['bit_mask'], predicts['bit_op']) + except BitMaskError: + msg = f'Error applying specified bit-mask: {predicts.get("bit_mask")}, bit-op: {predicts.get("bit_op")}, to channel: {telem_uuid}, value: {telem.get("eng_value")}' + logger.error(msg) + raise InputError(msg) - # Now evaluate the actual_value against the provided predicts - # Record is a special case (no predict) - if verification_condition == 'RECORD': + # Set the evaluation value to the masked value + result['actual_value'] = masked_value + + # Check if a prior value was provided + # If so compute revised actual_value based on actual_value - prior_value + if prior_value is not None: + old_actual_value = float(result['actual_value']) + result['actual_value'] = old_actual_value - float(prior_value) + msg = f'Will evaluate based on the difference between measured value and prior value ({old_actual_value} - {prior_value} = {result["actual_value"]})' + logger.debug(msg) + + # Now evaluate the actual_value against the provided predicts + # Record is a special case (no predict) + if verification_condition == 'RECORD': + result['verification_status'] = 'PASS' + msg = f'Found value for {telem_uuid}: {result["actual_value"]} (RECORD - no evaluation) - setting verification status to {result["verification_status"]}' + logger.info(msg) + return result + + # Otherwise is follows a standard pattern + if verification_condition == 'GREATER_THAN': + operator = '>' + if float(result['actual_value']) > float(verification_values[0]): result['verification_status'] = 'PASS' - msg = f'Found value for {channel_id}: {result["actual_value"]} (RECORD - no evaluation) - setting verification status to {result["verification_status"]}' - logger.debug(msg) - return result + else: + result['verification_status'] = 'FAIL' - # NOT_PRESENT is a special case (if telemetry was received) - if verification_condition == 'NOT_PRESENT': + elif verification_condition == 'LESS_THAN': + operator = '<' + if float(result['actual_value']) < float(verification_values[0]): + result['verification_status'] = 'PASS' + else: result['verification_status'] = 'FAIL' - msg = f'Found value for {channel_id}: {result["actual_value"]} (NOT PRESENT - but telemetry present) - setting verification status to {result["verification_status"]}' - logger.debug(msg) - return result - # Otherwise is follows a standard pattern - if verification_condition == 'GREATER_THAN': - operator = '>' - if result['actual_value'] > verification_values[0]: - result['verification_status'] = 'PASS' - else: - result['verification_status'] = 'FAIL' + elif verification_condition == 'GREATER_THAN_OR_EQUAL': + operator = '>=' + if float(result['actual_value']) >= float(verification_values[0]): + result['verification_status'] = 'PASS' + else: + result['verification_status'] = 'FAIL' - elif verification_condition == 'LESS_THAN': - operator = '<' - if result['actual_value'] < verification_values[0]: - result['verification_status'] = 'PASS' - else: - result['verification_status'] = 'FAIL' + elif verification_condition == 'LESS_THAN_OR_EQUAL': + operator = '<=' + if float(result['actual_value']) <= float(verification_values[0]): + result['verification_status'] = 'PASS' + else: + result['verification_status'] = 'FAIL' - elif verification_condition == 'GREATER_THAN_OR_EQUAL': - operator = '>=' - if result['actual_value'] >= verification_values[0]: + elif verification_condition == 'EQUAL': + operator = '==' + # The telemetry or predict could be a string for equal - so we will need to check + if confirm_numeric(verification_values[0]): + # If numeric convert both to floats when evaluating + if float(result['actual_value']) == float(verification_values[0]): result['verification_status'] = 'PASS' else: result['verification_status'] = 'FAIL' - - elif verification_condition == 'LESS_THAN_OR_EQUAL': - operator = '<=' - if result['actual_value'] <= verification_values[0]: + else: + if result['actual_value'] == verification_values[0]: result['verification_status'] = 'PASS' else: result['verification_status'] = 'FAIL' - elif verification_condition == 'EQUAL': - operator = '==' - if result['actual_value'] == verification_values[0]: + elif verification_condition == 'NOT_EQUAL': + operator = '!=' + # The telemetry or predict could be a string for equal - so we will need to check + if confirm_numeric(verification_values[0]): + # If numeric convert both to floats when evaluating + if float(result['actual_value']) != float(verification_values[0]): result['verification_status'] = 'PASS' else: result['verification_status'] = 'FAIL' - - elif verification_condition == 'NOT_EQUAL': - operator = '!=' + else: if result['actual_value'] != verification_values[0]: result['verification_status'] = 'PASS' else: result['verification_status'] = 'FAIL' - elif verification_condition == 'INCLUSIVE_RANGE': - operator = 'Inclusive Range' - if result['actual_value'] >= verification_values[0] and result['actual_value'] <= verification_values[1]: - result['verification_status'] = 'PASS' - else: - result['verification_status'] = 'FAIL' + elif verification_condition == 'INCLUSIVE_RANGE': + operator = 'Inclusive Range' + if float(result['actual_value']) >= float(verification_values[0]) and float(result['actual_value']) <= float(verification_values[1]): + result['verification_status'] = 'PASS' + else: + result['verification_status'] = 'FAIL' - elif verification_condition == 'EXCLUSIVE_RANGE': - operator = 'Exclusive Range' - if result['actual_value'] > verification_values[0] and result['actual_value'] < verification_values[1]: - result['verification_status'] = 'PASS' - else: - result['verification_status'] = 'FAIL' + elif verification_condition == 'EXCLUSIVE_RANGE': + operator = 'Exclusive Range' + if float(result['actual_value']) > float(verification_values[0]) and float(result['actual_value']) < float(verification_values[1]): + result['verification_status'] = 'PASS' + else: + result['verification_status'] = 'FAIL' - msg = f'Evaluated predict for {channel_id} {result["actual_value"]} {operator} {verification_values} - setting verification status to {result["verification_status"]}' - logger.info(msg) + msg = f'Evaluated predict for {telem_uuid} {result["actual_value"]} {operator} {verification_values} - setting verification status to {result["verification_status"]}' + logger.info(msg) return result diff --git a/ing_lib/tests/conftest.py b/ing_lib/tests/conftest.py index 26a2217..70a27e9 100644 --- a/ing_lib/tests/conftest.py +++ b/ing_lib/tests/conftest.py @@ -34,9 +34,7 @@ def mock_ssl_verify(): @pytest.fixture def mock_common_globals(): """Mock common module global variables.""" - with patch('common.ssl_verify', True), \ - patch('common.token', 'mock_token_12345'), \ - patch('common.refresh_time', datetime.datetime.utcnow()), \ + with patch('common._store', {'token': 'mock_token_12345', 'ssl_verify': True, 'refresh_time': datetime.datetime.utcnow()}), \ patch('common.refresh_auth', return_value=True): yield @@ -187,8 +185,7 @@ def mock_empty_server(): with contextlib.ExitStack() as stack: # Authentication and globals stack.enter_context(patch('common.authenticate', return_value=True)) - stack.enter_context(patch('common.ssl_verify', True)) - stack.enter_context(patch('common.token', 'mock_token_12345')) + stack.enter_context(patch('common._store', {'token': 'mock_token_12345', 'ssl_verify': True, 'refresh_time': datetime.datetime.utcnow()})) # GET functions - return empty data stack.enter_context(patch('project_config.get_dictionary_versions', return_value=[])) @@ -374,7 +371,8 @@ def temp_custom_script_xml(): script_path="test_script.sh" description="Comprehensive test script with all field types" hash="abc123def456" - script_id="dGVzdF9zY3JpcHQuc2g="> + script_id="dGVzdF9zY3JpcHQuc2g=" + is_command="true"> 0: - endpoint = args[0] - if 'dictionaries' in endpoint and 'versions' in endpoint: - return mock_versions - return [] - - with patch('common.ingenium_rest_get_paginated', side_effect=mock_paginated_side_effect), \ - patch('common.ingenium_rest_get', return_value=[]): - - result = get_source_dictionaries('https://test-server.example.com', 'v4', True) - - # Should only contain the published version - assert 'v1.0' in result['versions']['flight'] - assert 'v0.9' not in result['versions']['flight'] - def test_get_source_dictionaries_include_retired(self): + class MockInputs: + def __init__(self, filter_retired=False): + self.filter_retired = filter_retired + self.flight_sse = [] # limit to specific flight/sse if needed + self.specific_versions = [] # no version filter + self.include_vis = False + self.include_cs = False + + # Mock the dictionary versions returned for both flight and sse + mock_get_versions.return_value = mock_versions + + inputs = MockInputs(filter_retired=True) + result = get_source_dictionaries('https://test-server.example.com', 'v4', inputs) + + # Should only contain the published version + assert 'v1.0' in result['versions']['flight'] + assert 'v0.9' not in result['versions']['flight'] + + @patch('apps.ProjConfigBackup.get_dictionary_versions') + def test_get_source_dictionaries_include_retired(self, mock_get_versions): """Test that retired dictionaries are included when filter_retired=False.""" mock_versions = [ { @@ -117,23 +169,24 @@ def test_get_source_dictionaries_include_retired(self): 'state': 'RETIRED' } ] - - def mock_paginated_side_effect(*args, **kwargs): - """Return versions for dictionary calls, empty for others.""" - if len(args) > 0: - endpoint = args[0] - if 'dictionaries' in endpoint and 'versions' in endpoint: - return mock_versions - return [] - - with patch('common.ingenium_rest_get_paginated', side_effect=mock_paginated_side_effect), \ - patch('common.ingenium_rest_get', return_value=[]): - - result = get_source_dictionaries('https://test-server.example.com', 'v4', False) - - # Should contain both versions - assert 'v1.0' in result['versions']['flight'] - assert 'v0.9' in result['versions']['flight'] + + class MockInputs: + def __init__(self, filter_retired=False): + self.filter_retired = filter_retired + # flight_sse = None -> process both 'sse' and 'flight' + self.flight_sse = None + self.specific_versions = [] # no version filter + self.include_vis = False + self.include_cs = False + + mock_get_versions.return_value = mock_versions + + inputs = MockInputs(filter_retired=False) + result = get_source_dictionaries('https://test-server.example.com', 'v4', inputs) + + # Should contain both versions + assert 'v1.0' in result['versions']['flight'] + assert 'v0.9' in result['versions']['flight'] def test_main_success(self, comprehensive_server_mock, mock_user_input): """Test successful main execution.""" @@ -208,16 +261,42 @@ def test_main_with_rsa(self, comprehensive_server_mock, mock_user_input): call_args = mock_auth.call_args assert call_args[1]['rsa'] is True - def test_get_source_dictionaries_v3(self, comprehensive_server_mock): + + @patch('apps.ProjConfigBackup.get_dictionary_versions') + def test_get_source_dictionaries_v3(self, mock_get_versions, comprehensive_server_mock): """Test get_source_dictionaries with v3 API.""" - result = get_source_dictionaries('https://test-server.example.com', 'v3', True) - + # Mock versions returned by the v3 API + mock_versions = [ + { + 'dictionary_version': 'v1.0', + 'dictionary_description': 'Test Dict v1.0', + 'state': 'PUBLISHED', + } + ] + mock_get_versions.return_value = mock_versions + + # Create a simple object to mock the inputs parameter + class MockInputs: + def __init__(self): + # This will make the function process both 'sse' and 'flight' dict types + self.flight_sse = None + self.specific_versions = [] + self.filter_retired = False + self.include_vis = False + self.include_cs = False + + inputs = MockInputs() + result = get_source_dictionaries('https://test-server.example.com', 'v3', inputs) + assert 'versions' in result assert 'flight' in result['versions'] assert 'v1.0' in result['versions']['flight'] - def test_get_source_dictionaries_exception_handling(self): + @patch('project_config.ingenium_rest_get_paginated') + @patch('apps.ProjConfigBackup.get_dictionary_versions') + def test_get_source_dictionaries_exception_handling(self, mock_get_versions, mock_rest_get): """Test exception handling in get_source_dictionaries.""" + # Setup mock to return test versions mock_versions = [ { 'dictionary_version': 'v1.0', @@ -225,24 +304,38 @@ def test_get_source_dictionaries_exception_handling(self): 'state': 'PUBLISHED' } ] - - def mock_paginated_side_effect(*args, **kwargs): - """Return versions for dictionary calls, empty for others.""" - if len(args) > 0: - endpoint = args[0] - if 'dictionaries' in endpoint and 'versions' in endpoint: - return mock_versions - return [] - - with patch('common.ingenium_rest_get_paginated', side_effect=mock_paginated_side_effect), \ - patch('common.ingenium_rest_get', side_effect=Exception("Network error")): - + mock_rest_get.return_value = mock_versions + mock_get_versions.return_value = mock_versions + + # Create a simple object to mock the inputs parameter + class MockInputs: + def __init__(self): + # Process both 'sse' and 'flight' + self.flight_sse = None + self.specific_versions = [] + self.filter_retired = False + self.include_vis = False + self.include_cs = False + + inputs = MockInputs() + + with patch('common.ingenium_rest_get', side_effect=Exception("Network error")): # Should not raise exception but log warning - result = get_source_dictionaries('https://test-server.example.com', 'v4', True) - + result = get_source_dictionaries('https://test-server.example.com', 'v4', inputs) + assert 'versions' in result assert 'v1.0' in result['versions']['flight'] + # Verify get_dictionary_versions was called for both sse and flight + assert mock_get_versions.call_count == 2 + mock_get_versions.assert_any_call( + 'https://test-server.example.com', 'sse', api_version='v4' + ) + mock_get_versions.assert_any_call( + 'https://test-server.example.com', 'flight', api_version='v4' + ) + mock_rest_get.assert_called() + def test_main_with_ssl_ca_bundle(self, comprehensive_server_mock, mock_user_input): """Test main execution with SSL CA bundle.""" with patch('builtins.open', mock_open()), \ diff --git a/ing_lib/tests/test_proj_config_create_update_cs.py b/ing_lib/tests/test_proj_config_create_update_cs.py index ff62d16..0975231 100644 --- a/ing_lib/tests/test_proj_config_create_update_cs.py +++ b/ing_lib/tests/test_proj_config_create_update_cs.py @@ -470,7 +470,8 @@ def test_validate_script_data_valid(self): 'script_name': 'valid_script', 'script_path': 'scripts/valid_script.sh', 'description': 'Valid test script', - 'script_id': generate_script_id('scripts/valid_script.sh') + 'script_id': generate_script_id('scripts/valid_script.sh'), + 'is_command': True } assert validate_script_data(script_data) is True @@ -484,17 +485,6 @@ def test_validate_script_data_missing_fields(self): assert validate_script_data(script_data) is False - def test_validate_script_data_invalid_name(self): - """Test validation of script data with invalid script name.""" - script_data = { - 'script_name': 'invalid@script#name', - 'script_path': 'scripts/script.sh', - 'description': 'Test script', - 'script_id': 'test_id' - } - - assert validate_script_data(script_data) is False - @patch('common.authenticate') @patch('getpass.getpass') @patch('getpass.getuser') @@ -548,6 +538,7 @@ def test_main_update_existing_script(self, mock_exists, mock_update, mock_get_sc 'description': 'Test script', 'script_id': existing_script_id, 'hash': 'a'*64, # Valid SHA256 hash format + 'is_command': True, 'inputs': [], 'outputs': [] } diff --git a/ing_lib/tests/test_project_config.py b/ing_lib/tests/test_project_config.py index 9009419..09521bd 100644 --- a/ing_lib/tests/test_project_config.py +++ b/ing_lib/tests/test_project_config.py @@ -5,6 +5,7 @@ import pytest from unittest.mock import patch, MagicMock import sys +from datetime import datetime # Import the module under test import project_config @@ -13,7 +14,7 @@ class TestProjectConfig: """Test class for project_config module functionality.""" - @patch('common.ingenium_rest_get_paginated') + @patch('project_config.ingenium_rest_get_paginated') def test_get_dictionary_versions(self, mock_get_paginated): """Test get_dictionary_versions function.""" mock_get_paginated.return_value = [ @@ -40,8 +41,7 @@ def test_delete_dictionary_version_success(self, mock_delete): mock_delete.return_value = mock_response with patch('common.response_handler', return_value=True), \ - patch('common.token', 'test_token'), \ - patch('common.ssl_verify', True): + patch('common._store', {'token': 'test_token', 'ssl_verify': True, 'refresh_time': datetime.utcnow()}): project_config.delete_dictionary_version( 'https://test-server.example.com', 'flight', 'v1.0' @@ -57,15 +57,14 @@ def test_delete_dictionary_version_failure(self, mock_delete): mock_delete.return_value = mock_response with patch('common.response_handler', return_value=False), \ - patch('common.token', 'test_token'), \ - patch('common.ssl_verify', True): + patch('common._store', {'token': 'test_token', 'ssl_verify': True, 'refresh_time': datetime.utcnow()}): with pytest.raises(Exception): # Should raise IngeniumLibError project_config.delete_dictionary_version( 'https://test-server.example.com', 'flight', 'v1.0' ) - @patch('common.ingenium_rest_get_paginated') + @patch('project_config.ingenium_rest_get_paginated') def test_get_dictionary(self, mock_get_paginated): """Test get_dictionary function.""" mock_get_paginated.return_value = [ @@ -89,4 +88,5 @@ def test_constants_and_endpoints(self): # Test that logger is properly configured assert hasattr(project_config, 'logger') - assert project_config.logger.name == 'project_config' \ No newline at end of file + # Check that the logger name ends with 'project_config' to handle different import styles + assert project_config.logger.name.endswith('project_config') \ No newline at end of file diff --git a/ing_lib/tests/test_steps.py b/ing_lib/tests/test_steps.py new file mode 100644 index 0000000..daf5e87 --- /dev/null +++ b/ing_lib/tests/test_steps.py @@ -0,0 +1,607 @@ +""" +Test suite for ing_lib.steps module. +""" +import pytest +from datetime import datetime, timedelta +from unittest.mock import patch, MagicMock +import json +import os +import sys + +# Import the module to test +from .. import steps + +class TestApplyBitMask: + """Test cases for the apply_bit_mask function.""" + + def test_apply_bit_mask_binary_and(self): + """Test AND operation with binary mask.""" + result = steps.apply_bit_mask(0b1010, '0b1100', 'AND') + assert result == 0b1000 + + def test_apply_bit_mask_binary_or(self): + """Test OR operation with binary mask.""" + result = steps.apply_bit_mask(0b1010, '0b1100', 'OR') + assert result == 0b1110 + + def test_apply_bit_mask_hex(self): + """Test with hexadecimal mask.""" + result = steps.apply_bit_mask(0x0F, '0x0A', 'AND') + assert result == 0x0A + + def test_apply_bit_mask_decimal(self): + """Test with decimal mask.""" + result = steps.apply_bit_mask(15, '10', 'AND') + assert result == 10 + + def test_apply_bit_mask_invalid_input_value(self): + """Test with non-integer input value.""" + with pytest.raises(steps.BitMaskError): + steps.apply_bit_mask('not_an_int', '0b1010', 'AND') + + def test_apply_bit_mask_invalid_bit_mask(self): + """Test with invalid bit mask format.""" + with pytest.raises(steps.BitMaskError): + steps.apply_bit_mask(10, '0b2', 'AND') + + def test_apply_bit_mask_invalid_operation(self): + """Test with invalid bit operation.""" + with pytest.raises(steps.BitMaskError): + steps.apply_bit_mask(10, '0b1010', 'XOR') + + +class TestCheckTelemetryQuery: + """Test cases for the check_telemetry_query function.""" + + def test_check_telemetry_query_valid_record(self): + """Test valid RECORD condition.""" + query = [{ + 'verification_condition': 'RECORD', + 'verification_values': [], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }] + steps.check_telemetry_query(query) # Should not raise + + def test_check_telemetry_query_invalid_condition(self): + """Test invalid verification condition.""" + query = [{ + 'verification_condition': 'INVALID_CONDITION', + 'verification_values': [], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }] + with pytest.raises(steps.InputError): + steps.check_telemetry_query(query) + + def test_check_telemetry_query_bitmask_no_op(self): + """Test bitmask provided but no operation.""" + query = [{ + 'verification_condition': 'EQUAL', + 'verification_values': ['10'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT', + 'bit_mask': '0xFF' + }] + with pytest.raises(steps.InputError): + steps.check_telemetry_query(query) + + def test_check_telemetry_query_non_numeric_prior_value(self): + """Test with non-numeric prior_value.""" + query = [{ + 'verification_condition': 'GREATER_THAN', + 'verification_values': ['10'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT', + 'prior_value': 'not_a_number', + 'bit_mask': '0xFF', + 'bit_op': 'AND' + }] + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query(query) + assert 'not numeric' in str(exc_info.value) + + def test_check_telemetry_query_non_numeric_verification_value_with_prior(self): + """Test with non-numeric verification value when prior_value is present.""" + query = [{ + 'verification_condition': 'EQUAL', + 'verification_values': ['not_a_number'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT', + 'prior_value': 10, + 'bit_mask': '0xFF', + 'bit_op': 'AND' + }] + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query(query) + assert 'not numeric' in str(exc_info.value) + + def test_check_telemetry_query_non_numeric_verification_with_bitmask(self): + """Test with non-numeric verification value when bitmask is used.""" + query = [{ + 'verification_condition': 'EQUAL', + 'verification_values': ['not_a_number'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT', + 'bit_mask': '0xFF', + 'bit_op': 'AND' + }] + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query(query) + assert 'not numeric' in str(exc_info.value) + + def test_check_telemetry_query_numeric_verification_with_bitmask(self): + """Test with valid numeric verification value and bitmask.""" + query = [{ + 'verification_condition': 'EQUAL', + 'verification_values': ['42'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT', + 'bit_mask': '0xFF', + 'bit_op': 'AND' + }] + steps.check_telemetry_query(query) # Should not raise + + def test_verification_condition_value_counts(self): + """Test verification conditions with correct and incorrect number of values.""" + # Test conditions that require no values + for condition in ['RECORD', 'NOT_PRESENT']: + # Should not raise + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': [], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + + # Should raise for non-empty values + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['1'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'requires no verification values' in str(exc_info.value) + + # Test conditions that require exactly one value + single_value_conditions = [ + 'GREATER_THAN', 'GREATER_THAN_OR_EQUAL', + 'LESS_THAN', 'LESS_THAN_OR_EQUAL', + 'EQUAL', 'NOT_EQUAL' + ] + + for condition in single_value_conditions: + # Test with one value (should pass) + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['42'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + + # Test with zero values (should fail) + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': [], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'requires one verification value' in str(exc_info.value) + + # Test with two values (should fail) + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['1', '2'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'requires one verification value' in str(exc_info.value) + + # Test conditions that require exactly two values + range_conditions = ['INCLUSIVE_RANGE', 'EXCLUSIVE_RANGE'] + + for condition in range_conditions: + # Test with two values (should pass) + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['10', '20'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + + # Test with one value (should fail) + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['10'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'requires two verification values' in str(exc_info.value) + + # Test with three values (should fail) + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': condition, + 'verification_values': ['10', '20', '30'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'requires two verification values' in str(exc_info.value) + + def test_unknown_verification_condition(self): + """Test that an unknown verification condition raises an error.""" + with pytest.raises(steps.InputError) as exc_info: + steps.check_telemetry_query([{ + 'verification_condition': 'UNKNOWN_CONDITION', + 'verification_values': [], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + }]) + assert 'Unknown Verification Condition' in str(exc_info.value) + + +class TestConfirmNumeric: + """Test cases for the confirm_numeric function.""" + + def test_confirm_numeric_int(self): + """Test with integer input.""" + assert steps.confirm_numeric(42) is True + + def test_confirm_numeric_float(self): + """Test with float input.""" + assert steps.confirm_numeric(3.14) is True + + def test_confirm_numeric_numeric_string(self): + """Test with numeric string input.""" + assert steps.confirm_numeric("42") is True + + def test_confirm_numeric_non_numeric(self): + """Test with non-numeric input.""" + assert steps.confirm_numeric("not a number") is False + + +class TestFileOperations: + """Test cases for file operation functions.""" + + @pytest.fixture + def temp_files(self, tmp_path): + """Create temporary files for testing.""" + input_file = tmp_path / "input.json" + output_file = tmp_path / "output.json" + return input_file, output_file + + def test_write_and_read_file(self, temp_files): + """Test writing and reading a JSON file.""" + input_file, output_file = temp_files + test_data = {"test": "data", "value": 42} + + # Test write_output_file + steps.write_output_file(test_data, str(output_file)) + assert output_file.exists() + + # Test read_input_file + loaded_data = steps.read_input_file(str(output_file)) + assert loaded_data == test_data + + def test_read_nonexistent_file(self, tmp_path): + """Test reading a non-existent file raises InputError.""" + non_existent_file = tmp_path / "nonexistent.json" + with pytest.raises(steps.InputError): + steps.read_input_file(str(non_existent_file)) + + @pytest.mark.skipif(os.name == 'nt', reason="Unix-specific permission test") + def test_read_permission_denied(self, tmp_path): + """Test reading a file without read permission raises InputError.""" + test_file = tmp_path / "restricted.json" + test_file.write_text('{"test": "data"}') + test_file.chmod(0o000) # Remove all permissions + + try: + with pytest.raises(steps.InputError): + steps.read_input_file(str(test_file)) + finally: + # Restore permissions to allow cleanup + test_file.chmod(0o644) + + @pytest.mark.skipif(os.name == 'nt', reason="Unix-specific permission test") + def test_write_permission_denied(self, tmp_path): + """Test writing to a directory without write permission raises InputError.""" + # Create a directory without write permissions + restricted_dir = tmp_path / "restricted" + restricted_dir.mkdir() + restricted_dir.chmod(0o555) # Read and execute, no write + + output_file = restricted_dir / "output.json" + test_data = {"test": "data"} + + try: + with pytest.raises(steps.InputError): + steps.write_output_file(test_data, str(output_file)) + finally: + # Restore permissions to allow cleanup + restricted_dir.chmod(0o755) + + +class TestInputOutputPaths: + """Test cases for get_input_output_paths function.""" + + @patch('sys.argv', ['script.py', 'input.json', 'output.json']) + def test_get_input_output_paths(self, tmp_path): + """Test getting input and output paths.""" + with patch('os.path.abspath', side_effect=lambda x: str(tmp_path / x)): + input_path, output_path = steps.get_input_output_paths("Test error") + assert input_path == str(tmp_path / 'input.json') + assert output_path == str(tmp_path / 'output.json') + + @patch('sys.argv', ['script.py']) + def test_get_input_output_paths_insufficient_args(self): + """Test with insufficient command line arguments.""" + with pytest.raises(steps.InputError, match="Test error"): + steps.get_input_output_paths("Test error") + + @patch('sys.argv', ['script.py', 'nonexistent/input.json', 'output.json']) + def test_get_input_output_paths_nonexistent_input(self, tmp_path): + """Test with non-existent input file path.""" + with patch('os.path.abspath', side_effect=lambda x: str(tmp_path / x)): + input_path, output_path = steps.get_input_output_paths("Test error") + # The function should still return the paths even if they don't exist + assert input_path == str(tmp_path / 'nonexistent/input.json') + assert output_path == str(tmp_path / 'output.json') + + @pytest.mark.skipif(os.name == 'nt', reason="Unix-specific permission test") + @patch('sys.argv', ['script.py', 'input.json', 'output.json']) + def test_get_input_output_paths_no_read_permission(self, tmp_path): + """Test with input file that can't be read due to permissions.""" + input_file = tmp_path / "input.json" + input_file.write_text('{"test": "data"}') + input_file.chmod(0o000) # Remove all permissions + + try: + with patch('os.path.abspath', side_effect=lambda x: str(tmp_path / x)): + input_path, _ = steps.get_input_output_paths("Test error") + # The function should still return the path even if we can't read it + assert input_path == str(input_file) + finally: + # Restore permissions to allow cleanup + input_file.chmod(0o644) + + @pytest.mark.skipif(os.name == 'nt', reason="Unix-specific permission test") + @patch('sys.argv', ['script.py', 'input.json', 'restricted/output.json']) + def test_get_output_path_no_write_permission(self, tmp_path): + """Test with output directory that can't be written to.""" + restricted_dir = tmp_path / "restricted" + restricted_dir.mkdir() + restricted_dir.chmod(0o555) # Read and execute, no write + + try: + with patch('os.path.abspath', side_effect=lambda x: str(tmp_path / x)): + _, output_path = steps.get_input_output_paths("Test error") + # The function should still return the path even if we can't write to it + assert output_path == str(restricted_dir / "output.json") + finally: + # Restore permissions to allow cleanup + restricted_dir.chmod(0o755) + + +class TestEvaluateVerifyCondition: + """Test cases for evaluate_verify_condition function.""" + + def _create_telemetry(self, value, raw_value=None): + """Helper to create telemetry data with both value and raw_value.""" + return [{ + 'value': str(value), + 'raw_value': str(raw_value if raw_value is not None else value), + 'eng_value': str(value) + }] + + def _create_predicts(self, condition, values, dn_eu='DN', **kwargs): + """Helper to create predict dictionary with common fields.""" + predict = { + 'verification_condition': condition, + 'verification_values': [str(v) for v in values], + 'dn_eu': dn_eu + } + predict.update(kwargs) + return predict + + # Test basic verification conditions + def test_verification_conditions(self): + """Test all verification conditions with basic values.""" + test_cases = [ + # (condition, telemetry_value, verification_values, expected_result) + ('EQUAL', 42, [42], 'PASS'), + ('EQUAL', 42, [43], 'FAIL'), + ('NOT_EQUAL', 42, [43], 'PASS'), + ('NOT_EQUAL', 42, [42], 'FAIL'), + ('GREATER_THAN', 43, [42], 'PASS'), + ('GREATER_THAN', 42, [42], 'FAIL'), + ('GREATER_THAN', 41, [42], 'FAIL'), + ('LESS_THAN', 41, [42], 'PASS'), + ('LESS_THAN', 42, [42], 'FAIL'), + ('LESS_THAN', 43, [42], 'FAIL'), + ('GREATER_THAN_OR_EQUAL', 43, [42], 'PASS'), + ('GREATER_THAN_OR_EQUAL', 42, [42], 'PASS'), + ('GREATER_THAN_OR_EQUAL', 41, [42], 'FAIL'), + ('LESS_THAN_OR_EQUAL', 41, [42], 'PASS'), + ('LESS_THAN_OR_EQUAL', 42, [42], 'PASS'), + ('LESS_THAN_OR_EQUAL', 43, [42], 'FAIL'), + ('INCLUSIVE_RANGE', 42, [40, 45], 'PASS'), + ('INCLUSIVE_RANGE', 40, [40, 45], 'PASS'), + ('INCLUSIVE_RANGE', 45, [40, 45], 'PASS'), + ('INCLUSIVE_RANGE', 39, [40, 45], 'FAIL'), + ('INCLUSIVE_RANGE', 46, [40, 45], 'FAIL'), + ('EXCLUSIVE_RANGE', 42, [40, 45], 'PASS'), + ('EXCLUSIVE_RANGE', 40, [40, 45], 'FAIL'), + ('EXCLUSIVE_RANGE', 45, [40, 45], 'FAIL'), + ('EXCLUSIVE_RANGE', 39, [40, 45], 'FAIL'), + ('EXCLUSIVE_RANGE', 46, [40, 45], 'FAIL'), + ('RECORD', 42, [], 'PASS'), # RECORD always passes when data is present + ] + + for condition, telemetry_value, values, expected in test_cases: + telemetry = self._create_telemetry(telemetry_value) + predicts = self._create_predicts(condition, values) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == expected, \ + f"{condition} failed for {telemetry_value} {condition} {values}: expected {expected}, got {result['verification_status']}" + + def test_eu_vs_dn_values(self): + """Test handling of DN vs EU values.""" + # Raw value is 100, engineering value is 42 + telemetry = [{'raw_value': '100', 'eng_value': '42', 'value': '42'}] + + # Test with DN (should use raw_value) + predicts = self._create_predicts('EQUAL', [100], dn_eu='DN') + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert result['actual_value'] == '100' + + # Test with EU (should use eng_value) + predicts = self._create_predicts('EQUAL', [42], dn_eu='EU') + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert result['actual_value'] == '42' + + def test_bitmask_operations(self): + """Test bitmask operations.""" + # Test AND operation (0b1010 & 0b1100 = 0b1000 = 8) + telemetry = self._create_telemetry(10, raw_value=10) # 1010 in binary + predicts = self._create_predicts( + 'EQUAL', + [8], # Expected value after masking + bit_mask='0b1100', # 12 in decimal, 1100 in binary + bit_op='AND', + dn_eu='DN' # Must use DN for raw value + ) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert int(result['actual_value']) == 8 + + # Test OR operation (0b1010 | 0b0101 = 0b1111 = 15) + telemetry = self._create_telemetry(10, raw_value=10) # 1010 in binary + predicts = self._create_predicts( + 'EQUAL', + [15], # Expected value after masking + bit_mask='0b0101', # 5 in decimal, 0101 in binary + bit_op='OR', + dn_eu='DN' # Must use DN for raw value + ) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert int(result['actual_value']) == 15 + + def test_prior_value(self): + """Test evaluation with prior value subtraction.""" + telemetry = self._create_telemetry(50, raw_value=50) + predicts = self._create_predicts( + 'EQUAL', + [30], # 50 - 20 = 30 + prior_value=20 + ) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert int(result['actual_value']) == 30 + + # Test with bitmask and prior value + # (0b1010 & 0b1100 = 0b1000 = 8) - 3 = 5 + telemetry = self._create_telemetry(10, raw_value=10) # 1010 in binary + predicts = self._create_predicts( + 'EQUAL', + [5], # Expected value after masking and prior value subtraction + bit_mask='0b1100', # 12 in decimal, 1100 in binary + bit_op='AND', + prior_value=3, + dn_eu='DN' # Must use DN for raw value + ) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + assert int(result['actual_value']) == 5 + + def test_not_present_conditions(self): + """Test NOT_PRESENT verification conditions.""" + # Test NOT_PRESENT with no telemetry (should pass) + predicts = self._create_predicts('NOT_PRESENT', []) + result = steps.evaluate_verify_condition([], 'test_id', predicts, True) + assert result['verification_status'] == 'PASS' + + # Test NOT_PRESENT with telemetry (should fail) + telemetry = self._create_telemetry(42) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'FAIL' + + def test_record_condition(self): + """Test RECORD verification condition (should always pass when data is present).""" + telemetry = self._create_telemetry(42) + predicts = self._create_predicts('RECORD', []) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + + # Should also pass with any value, as long as data is present + telemetry = self._create_telemetry(999) + result = steps.evaluate_verify_condition(telemetry, 'test_id', predicts, False) + assert result['verification_status'] == 'PASS' + + def test_pending_status(self): + """Test PENDING status when no telemetry but query hasn't timed out.""" + predicts = self._create_predicts('EQUAL', [42]) + result = steps.evaluate_verify_condition([], 'test_id', predicts, False) + assert result['verification_status'] == 'PENDING' + + # With NOT_PRESENT, should still be PENDING if no timeout + predicts = self._create_predicts('NOT_PRESENT', []) + result = steps.evaluate_verify_condition([], 'test_id', predicts, False) + assert result['verification_status'] == 'PENDING' + + +class TestVerifyWaitTelemetry: + """Test cases for verify_wait_telemetry function.""" + + @patch('ing_lib.steps.check_telemetry_query') + @patch('ing_lib.steps.evaluate_verify_condition') + def test_verify_wait_telemetry(self, mock_eval, mock_check): + """Test basic verify_wait_telemetry functionality.""" + # Setup test data + query = { + 'CHANNEL1': { + 'verification_condition': 'EQUAL', + 'verification_values': ['42'], + 'dn_eu': 'DN', + 'verify_wait': 'WAIT' + } + } + + # Mock the telemetry query function + def mock_telemetry_query(channels, timeout, lookback, start_time, return_on): + return { + 'channels': { + 'CHANNEL1': [ + {'value': '42', 'time': '2023-01-01T00:00:00'} + ] + } + } + + # Mock the evaluate function + mock_eval.return_value = { + 'verification_status': 'PASS', + 'actual_value': '42' + } + + # Call the function + result = steps.verify_wait_telemetry( + query, + mock_telemetry_query, + start_time=datetime.now(), + timeout=1, + lookback=0 + ) + + # Verify results + assert result['query_matches_predict'] is True + assert 'CHANNEL1' in result['channels'] + assert result['channels']['CHANNEL1']['verification_status'] == 'PASS' + mock_check.assert_called_once() + mock_eval.assert_called() diff --git a/requirements.txt b/requirements.txt index 8d63007..48cedc5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pytest==8.4.1 rich==14.0.0 requests==2.32.4 deepdiff==7.0.1 +pyJWT==2.10.1