diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index 9ac6e4e5161f931e6cab2a9232d6a01fc63b7e0d..2fdd44c927450fdd02e5ee2de524110d0b8ed5db 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -89,7 +89,7 @@ def _get_settings( parameters["useIngest"] = bool(ingestion_settings.useIngest) case _: raise ValueError( - "Invalid value for edu.nrao.workspaces.IngestionSettings.useIngest: " f"{ingestion_settings.useIngest}" + f"Invalid value for edu.nrao.workspaces.IngestionSettings.useIngest: {ingestion_settings.useIngest}" ) parameters["script_location"] = script_location diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py index e92b601e7d9ca77ec63ddf595f84c05c876887d4..f00b340795968990990300041570c18f7f3e1331 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py @@ -164,10 +164,27 @@ class IngestionManifest: ScienceProductType.VLASS_COARSE, ]: raise NotImplementedError() + + staging_source_dir = self.staging_source_dir + + # Change the staging_source_dir to the parent if it's not curation and if it's an EVLA EB. + # This is because SDM ingestion is expecting to work upon a directory given as the product name + # with an expected set of files (XML files and others) underneath. + # We need to pass that directory's parent. for example, We should pass /lustre/.../workspaces/sdm-staging + # instead of /lustre/.../workspaces/sdm-staging/19A-001.sb1234567890.eb233423545632.54321.894327984569. + if all( + [ + not self.reingest, # Not curation + self.telescope == Telescope.EVLA, + self.sp_type == ScienceProductType.EXEC_BLOCK, + ] + ): + staging_source_dir = self.staging_source_dir.parent + params = ManifestParameters( telescope=self.telescope, ngas_ingest=self.ngas_ingest, - staging_source_dir=self.staging_source_dir, + staging_source_dir=staging_source_dir, additional_metadata=additional_metadata, ) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py index 90269aa12de74fd5007e15b09256ef194aadff25..ea058da44335c97148e45f3573b1733aeca732dc 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -16,6 +16,7 @@ # You should have received a copy of the GNU General Public License # along with Workspaces. If not, see <https://www.gnu.org/licenses/>. import logging +import re import subprocess import sys from pathlib import Path @@ -34,22 +35,47 @@ from ingest_envoy.schema import AbstractTextFile from ingest_envoy.utilities import CuratorType, IngestType, VLASSIngestType -def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> int: +def get_ingest_spls(subprocess_stdout: str) -> list[str]: + """ + :param subprocess_stdout: The standard output of the ingestion pex. + :return: The product locators found in the passed-in standard output. + """ + # OSPs: output science products + # First, find the line that starts with "OSPs: " + osps_line_match = re.search(r"^OSPs: \[.*\]$", subprocess_stdout, re.MULTILINE) + + if not osps_line_match: + return [] + + # Get the full OSPs line + osps_line = osps_line_match.group(0) + + # Extract all SPLs from that line + locators = re.findall(r"uid://[a-z]+/[a-z_]+/[a-z0-9-]+", osps_line) + + return locators + + +def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> (int, list[str]): """ Run ingest :param real_ingest: real ingestion vs. testing flag :param staging_dir: staging directory to ingest from :param bin_dir: directory containing the ingestion utility - :return: return code + :return: return code, list of SPLs generated by ingest. """ if real_ingest: ingest_process = subprocess.run( [f"{bin_dir}/ingest", "--json", "-p", staging_dir], stdout=sys.stdout, stderr=sys.stderr, + capture_output=True, + text=True, ) - return ingest_process.returncode + generated_spls = get_ingest_spls(ingest_process.stdout) + print(f"Returning ({ingest_process.returncode}): {generated_spls})") + return ingest_process.returncode, generated_spls else: # For local testing return 0 @@ -96,8 +122,11 @@ class IngestCalibrationLauncher(LauncherIF): self.prepare_for_launch() self.logger.info("Running ingest!") + ingest_returncode, _ = trigger_ingest( + self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"] + ) return ( - trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"]), + ingest_returncode, True, ) @@ -200,10 +229,11 @@ class IngestVLASSLauncher(LauncherIF): if self.qa_passed: self.logger.info("Running ingest!") + ingest_returncode, _ = trigger_ingest( + self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"] + ) return ( - trigger_ingest( - self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"] - ), + ingest_returncode, True, ) @@ -285,8 +315,11 @@ class IngestImageLauncher(LauncherIF): self.prepare_for_launch() self.logger.info("Running ingest!") + ingest_returncode, _ = trigger_ingest( + self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"] + ) return ( - trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"]), + ingest_returncode, True, ) @@ -345,6 +378,31 @@ class IngestObservationLauncher(LauncherIF): self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] self.collector = ObservationCollector(self.parameters) + def launch_curator_for_spls(self, spls: list[str]) -> dict[str, int]: + """ + :param spls: The locators for which we are going to run curator. + :return: The dictionary mapping SPLs where curator failed to the return values from curator. + """ + parameters = { + **self.parameters, + "product_type": "execution_block", + "input_group_locator": None, + "curation_source": self.staging_source_dir, + "file_list": None, + "product_group_id": None, + "target_list": [], + } + launcher = CuratorLauncher(CuratorType.PARTIAL, parameters) + curator_results = {} + + for spl in spls: + launcher.parameters["spl"] = spl + trigger_curator_returncode, _ = launcher.launch() + if trigger_curator_returncode != 0: + curator_results[spl] = trigger_curator_returncode + + return curator_results + def launch(self) -> (int, bool): """ Prepare for and run ingest @@ -355,8 +413,17 @@ class IngestObservationLauncher(LauncherIF): self.prepare_for_launch() self.logger.info("Running ingest!") + ingest_returncode, generated_spls = trigger_ingest( + self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"] + ) + + if ingest_returncode == 0: + curator_results = self.launch_curator_for_spls(generated_spls) + if curator_results: + self.logger.error("Curator failed: %s", curator_results) + return ( - trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"]), + ingest_returncode, True, ) diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json index 1c26325e55ce04a25df789f8f7768be351df8a13..2eb646d3fa59a203adc87d887f36d70faf375ba0 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json @@ -2,7 +2,7 @@ "parameters":{ "ngas_ingest":true, "telescope":"EVLA", - "ingestion_path":"/lustre/aoc/cluster/pipeline/docker/workspaces/sdm-staging/19A-001.sb1234567890.eb233423545632.54321.894327984569" + "ingestion_path":"/lustre/aoc/cluster/pipeline/docker/workspaces/sdm-staging" }, "output_group":{ "science_products":[ diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_eb_manifest.py b/apps/cli/executables/pexable/ingest_envoy/test/test_evla_eb_manifest.py index fca7ef36f5e4e73ea804ebf4808577ff126b188c..973c7b3cff174c20f52c662c5e38af467f6befc0 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_eb_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_evla_eb_manifest.py @@ -202,5 +202,5 @@ def test_manifest_parameters( actual_manifest_params = actual_manifest.parameters assert actual_manifest_params.telescope == Telescope.EVLA - assert actual_manifest_params.staging_source_dir == evla_eb_staging_dir + assert actual_manifest_params.staging_source_dir == evla_eb_staging_dir.parent assert actual_manifest_params.ngas_ingest is True diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py index 53fdf378bd06b0cb22ee6474bef6605e9d33e269..4512ee4deadeeae83e27fbd68800750541ec19ee 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py @@ -18,21 +18,24 @@ """ Tests for ingest_envoy.launchers """ +import logging import sys from pathlib import Path -from unittest.mock import patch +from typing import Any, Generator +from unittest.mock import MagicMock, patch import pytest from conftest import expected_settings_evla_eb from ingest_envoy import launchers from ingest_envoy.launchers import ( + CuratorLauncher, IngestCalibrationLauncher, IngestImageLauncher, IngestObservationLauncher, IngestVLASSLauncher, ) from ingest_envoy.schema import AbstractTextFile -from ingest_envoy.utilities import IngestType, VLASSIngestType +from ingest_envoy.utilities import CuratorType, IngestType, VLASSIngestType parameters = { "sdmId": "16B-069_sb32814386_1_001.57685.66193635417", @@ -78,12 +81,67 @@ coarse_parameters = { "script_location": "/lustre/aoc/cluster/pipeline/docker/workspaces/sbin", } +expected_spls = [ + "uid://vlba/correlation/ebfac8c5-6b86-489f-84f8-e77c046d29a8", + "uid://vlba/correlation/cd61f65d-e1ee-4972-9c67-973fccb0dd58", + "uid://vlba/correlation/a2042206-dc72-4351-b662-b5d08008307e", + "uid://vlba/correlation/299dc994-c168-4d77-9155-92f5d3516b3a", + "uid://vlba/correlation/3f619a60-5790-4d05-8e1f-346e5ecf150f", + "uid://vlba/correlation/3124cf11-5793-42d4-b909-9cd4b17501ab", + "uid://vlba/correlation/c0808e39-fb9c-42dd-bb13-a5ace0024752", + "uid://vlba/correlation/1834cd4a-9390-4a03-874e-1acf652d717d", + "uid://vlba/correlation/70d70b22-d1bd-4ca5-a426-2d316ea5cf29", + "uid://vlba/correlation/8bc3d9c3-b85d-4758-9931-5ffd5885f1e7", +] +sample_input = """ +*Input science products-2: [] +*Output science products-2: [<ScienceProduct#uid://vlba/correlation/ebfac8c5-6b86-489f-84f8-e77c046d29a8 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/cd61f65d-e1ee-4972-9c67-973fccb0dd58 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/a2042206-dc72-4351-b662-b5d08008307e type=Execution Block>, <ScienceProduct#uid://vlba/correlation/299dc994-c168-4d77-9155-92f5d3516b3a type=Execution Block>, <ScienceProduct#uid://vlba/correlation/3f619a60-5790-4d05-8e1f-346e5ecf150f type=Execution Block>, <ScienceProduct#uid://vlba/correlation/3124cf11-5793-42d4-b909-9cd4b17501ab type=Execution Block>, <ScienceProduct#uid://vlba/correlation/c0808e39-fb9c-42dd-bb13-a5ace0024752 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/1834cd4a-9390-4a03-874e-1acf652d717d type=Execution Block>, <ScienceProduct#uid://vlba/correlation/70d70b22-d1bd-4ca5-a426-2d316ea5cf29 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/8bc3d9c3-b85d-4758-9931-5ffd5885f1e7 type=Execution Block>] +*Output group ancillary products: [{'type': 'observation_log', 'filename': 'VLBA_TR039HD_tr039hd_250321T142044.difx.log.gz'}, {'type': 'observation_details', 'filename': 'TR039HD.tr039hd.sum.txt'}, {'type': 'ingestion_artifacts', 'filename': 'ingestion_artifacts_2025_03_21T09_08_50.708.tar'}] +ISPs: [] +OSPs: [<ScienceProduct#uid://vlba/correlation/ebfac8c5-6b86-489f-84f8-e77c046d29a8 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/cd61f65d-e1ee-4972-9c67-973fccb0dd58 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/a2042206-dc72-4351-b662-b5d08008307e type=Execution Block>, <ScienceProduct#uid://vlba/correlation/299dc994-c168-4d77-9155-92f5d3516b3a type=Execution Block>, <ScienceProduct#uid://vlba/correlation/3f619a60-5790-4d05-8e1f-346e5ecf150f type=Execution Block>, <ScienceProduct#uid://vlba/correlation/3124cf11-5793-42d4-b909-9cd4b17501ab type=Execution Block>, <ScienceProduct#uid://vlba/correlation/c0808e39-fb9c-42dd-bb13-a5ace0024752 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/1834cd4a-9390-4a03-874e-1acf652d717d type=Execution Block>, <ScienceProduct#uid://vlba/correlation/70d70b22-d1bd-4ca5-a426-2d316ea5cf29 type=Execution Block>, <ScienceProduct#uid://vlba/correlation/8bc3d9c3-b85d-4758-9931-5ffd5885f1e7 type=Execution Block>] +OGAPs: [{'type': 'observation_log', 'filename': 'VLBA_TR039HD_tr039hd_250321T142044.difx.log.gz'}, {'type': 'observation_details', 'filename': 'TR039HD.tr039hd.sum.txt'}, {'type': 'ingestion_artifacts', 'filename': 'ingestion_artifacts_2025_03_21T09_08_50.708.tar'}] +mous: None +""" + + +@pytest.fixture +def expected_curator_params() -> Generator[dict[str, None | list[Any] | str], Any, None]: + """ + Yields the expected parameters for the curator launcher. + """ + obs_launcher_settings = expected_settings_evla_eb() + yield { + **obs_launcher_settings, + "product_type": "execution_block", + "input_group_locator": None, + "curation_source": f"{obs_launcher_settings['staging_area']}/{obs_launcher_settings['workflowDir']}", + "file_list": None, + "product_group_id": None, + "target_list": [], + "spl": expected_spls[0], + } + + +def test_get_ingest_product_locators(): + """ + Tests the get_ingest_product_locators function. + """ + assert launchers.get_ingest_spls(sample_input) == expected_spls + @patch("subprocess.run") def test_trigger_ingest(mock_run): """ Tests the trigger_ingest function. """ + # Create a mock return value (a CompletedProcess[str] object) + mock_process = MagicMock() + mock_process.stdout = sample_input + mock_process.returncode = 0 + + # Set this as the return value of subprocess.run + mock_run.return_value = mock_process + # Don't actually run ingest launchers.trigger_ingest(parameters["useIngest"], parameters["destinationDir"], parameters["script_location"]) assert mock_run.call_count == 0 @@ -95,14 +153,142 @@ def test_trigger_ingest(mock_run): [f"{parameters['script_location']}/ingest", "--json", "-p", parameters["destinationDir"]], stdout=sys.stdout, stderr=sys.stderr, + capture_output=True, + text=True, ) +@patch("subprocess.run") +def test_trigger_curator(mock_run, expected_curator_params): + """ + Tests the trigger_curator function. + """ + # Create a mock return value (a CompletedProcess[str] object) + mock_process = MagicMock() + mock_process.stdout = sample_input + mock_process.returncode = 0 + + # Set this as the return value of subprocess.run + mock_run.return_value = mock_process + + # Don't actually run curator + launchers.trigger_curator( + False, Path(expected_curator_params["curation_source"]), expected_curator_params["script_location"] + ) + assert mock_run.call_count == 0 + + # "Actually" run curator (we are mocking subprocess.run) + launchers.trigger_curator( + True, Path(expected_curator_params["curation_source"]), expected_curator_params["script_location"] + ) + assert mock_run.call_count == 1 + mock_run.assert_called_with( + [ + f"{expected_curator_params['script_location']}/curator", + "-m", + f"{expected_curator_params['curation_source']}/manifest.json", + ], + stdout=sys.stdout, + stderr=sys.stderr, + ) + + +class TestCuratorLauncher: + """ + Tests for the CuratorLauncher class. + """ + + @staticmethod + @pytest.fixture + def curator_launcher(expected_curator_params): + """ + Yields the CuratorLauncher instance used in testing. + """ + yield CuratorLauncher(CuratorType.PARTIAL, expected_curator_params) + + def test_init(self, curator_launcher, expected_curator_params): + """ + Tests that the class initializes correctly. + """ + assert curator_launcher.curator_type == CuratorType.PARTIAL + assert curator_launcher.curation_source == expected_curator_params["curation_source"] + assert curator_launcher.sci_product_type == IngestType.OBS.value + assert curator_launcher.manifest_destination_dir == Path.cwd() + assert curator_launcher.parameters == expected_curator_params + + def test_prepare_for_launch(self, curator_launcher): + """ + Tests the prepare_for_launch method. + """ + with patch("ingest_envoy.launchers.CuratorLauncher.create_manifest") as mock_create_manifest: + curator_launcher.prepare_for_launch() + assert mock_create_manifest.call_count == 1 + + def test_create_manifest(self, curator_launcher, expected_curator_params): + """ + Tests the create_manifest method. + """ + + with patch("ingest_envoy.ingestion_manifest.IngestionManifestBuilder.build") as mock_manifest_build, patch( + "ingest_envoy.ingestion_manifest.IngestionManifestBuilder.__init__" + ) as mock_init: + mock_init.return_value = None # Otherwise, we get a TypeError + + curator_launcher.create_manifest() + + assert mock_init.call_count == 1 + mock_init.assert_called_with( + manifest_source_dir=expected_curator_params["curation_source"], + sp_type=expected_curator_params["product_type"], + locator=expected_curator_params["spl"], + telescope=expected_curator_params["telescope"], + curate=(CuratorType.PARTIAL, expected_curator_params["target_list"]), + manifest_destination_dir=Path.cwd(), + file_list=expected_curator_params["file_list"], + product_group=expected_curator_params["product_group_id"], + input_group_locator=expected_curator_params["input_group_locator"], + ) + + assert mock_manifest_build.call_count == 1 + + def test_launch(self, curator_launcher): + """ + Tests the launch method. + """ + with patch("ingest_envoy.launchers.CuratorLauncher.prepare_for_launch") as mock_prepare, patch( + "ingest_envoy.launchers.trigger_curator" + ) as mock_trigger_curator: + mock_trigger_curator.return_value = 0 + curator_returncode, run_indexer = curator_launcher.launch() + + assert mock_prepare.call_count == 1 + assert mock_trigger_curator.call_count == 1 + assert curator_returncode == 0 + assert run_indexer is True + + def test_launch_trigger_curator_failure(self, curator_launcher): + """ + Tests that the launch method returns the trigger_curator return code, + even when it is a nonzero value. + """ + with patch("ingest_envoy.launchers.CuratorLauncher.prepare_for_launch"), patch( + "ingest_envoy.launchers.trigger_curator" + ) as mock_trigger_curator: + mock_trigger_curator.return_value = 1 + curator_returncode, run_indexer = curator_launcher.launch() + + assert curator_returncode == 1 + assert run_indexer is True + + class TestIngestCalibrationLauncher: def test_launch_ingestion(self): with patch("ingest_envoy.launchers.IngestCalibrationLauncher.prepare_for_launch") as prepare: with patch("ingest_envoy.launchers.trigger_ingest") as ingest: - IngestCalibrationLauncher(IngestType.CAL, parameters).launch() + ingest.return_value = (0, [expected_spls[0]]) + ingest_returncode, run_indexer = IngestCalibrationLauncher(IngestType.CAL, parameters).launch() + assert ingest_returncode == 0 + assert run_indexer is True assert prepare.call_count == 1 assert ingest.call_count == 1 @@ -131,7 +317,11 @@ class TestIngestImageLauncher: def test_launch_ingestion(self): with patch("ingest_envoy.launchers.IngestImageLauncher.prepare_for_launch") as prepare: with patch("ingest_envoy.launchers.trigger_ingest") as ingest: - IngestImageLauncher(IngestType.IMG, image_parameters).launch() + ingest.return_value = (0, [expected_spls[0]]) + ingest_returncode, run_indexer = IngestImageLauncher(IngestType.IMG, image_parameters).launch() + + assert ingest_returncode == 0 + assert run_indexer is True assert prepare.call_count == 1 assert ingest.call_count == 1 @@ -191,21 +381,84 @@ class TestIngestObservationLauncher: expected_staging_source_dir = f"{expected_parameters['staging_area']}/{expected_parameters['workflowDir']}" assert observation_launcher.staging_source_dir == expected_staging_source_dir + def test_launch_curator_for_spls(self, observation_launcher): + """ + Tests the launch_curator_for_spls method for normal execution (nothing in the returned dictionary). + """ + with patch("ingest_envoy.launchers.CuratorLauncher.launch") as mock_curator_launch: + mock_curator_launch.return_value = (0, True) + assert observation_launcher.launch_curator_for_spls(expected_spls) == {} + + def test_launch_curator_for_spls_curator_fails(self, observation_launcher): + """ + Tests the launch_curator_for_spls method for when curator fails. + """ + with patch("ingest_envoy.launchers.CuratorLauncher.launch") as mock_curator_launch: + mock_trigger_curator_returncode = 2 + mock_curator_launch.return_value = (mock_trigger_curator_returncode, True) + expected_results = {spl: mock_trigger_curator_returncode for spl in expected_spls} + + actual_results = observation_launcher.launch_curator_for_spls(expected_spls) + + assert actual_results == expected_results + def test_launch(self, observation_launcher): """ Tests the launch method. """ with patch("ingest_envoy.launchers.IngestObservationLauncher.prepare_for_launch") as mock_prepare, patch( "ingest_envoy.launchers.trigger_ingest" - ) as mock_trigger_ingest: - mock_trigger_ingest.return_value = 0 + ) as mock_trigger_ingest, patch( + "ingest_envoy.launchers.IngestObservationLauncher.launch_curator_for_spls" + ) as mock_launch_curator_for_spls: + mock_trigger_ingest.return_value = (0, [expected_spls[0]]) + mock_launch_curator_for_spls.return_value = {} # Simulate all SPLs having curator launched successfully - trigger_ingest_val, boolean_val = observation_launcher.launch() + ingest_returncode, run_indexer = observation_launcher.launch() assert mock_prepare.call_count == 1 assert mock_trigger_ingest.call_count == 1 - assert trigger_ingest_val == 0 - assert boolean_val is True + assert ingest_returncode == 0 + assert run_indexer is True + assert mock_launch_curator_for_spls.call_count == 1 + + def test_launch_trigger_ingest_failure_no_curator(self, observation_launcher): + """ + Test that when trigger_ingest fails, launch_curator_for_spls isn't run. + """ + with patch("ingest_envoy.launchers.IngestObservationLauncher.prepare_for_launch"), patch( + "ingest_envoy.launchers.trigger_ingest" + ) as mock_trigger_ingest, patch( + "ingest_envoy.launchers.IngestObservationLauncher.launch_curator_for_spls" + ) as mock_launch_curator_for_spls: + mock_trigger_ingest.return_value = (2, []) + + ingest_returncode, run_indexer = observation_launcher.launch() + + assert ingest_returncode == 2 + assert mock_launch_curator_for_spls.call_count == 0 + + def test_launch_curator_launch_fails(self, observation_launcher, caplog): + """ + Test that when trigger_ingest succeeds but launch_curator_for_spls fails for at least one SPL, + the failed SPLs are logged out. + """ + with patch("ingest_envoy.launchers.IngestObservationLauncher.prepare_for_launch"), patch( + "ingest_envoy.launchers.trigger_ingest" + ) as mock_trigger_ingest, patch( + "ingest_envoy.launchers.IngestObservationLauncher.launch_curator_for_spls" + ) as mock_launch_curator_for_spls: + mock_trigger_ingest.return_value = (0, [expected_spls[0]]) + mock_launch_curator_for_spls.return_value = {expected_spls[0]: 1} # Simulate curator failing for an SPL + caplog.set_level(logging.ERROR) + + ingest_returncode, run_indexer = observation_launcher.launch() + + assert ingest_returncode == 0 + assert mock_launch_curator_for_spls.call_count == 1 + assert caplog.record_tuples == [ + ("ingest_envoy", logging.ERROR, f"Curator failed: {{'{expected_spls[0]}': 1}}") + ] def test_prepare_for_launch(self, observation_launcher): """