From 807c0d1d8b6660403f88d014c7610b5926d3f2b4 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 12:30:46 +0000 Subject: [PATCH 01/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 35891878c6..e3f9fafb58 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1378,10 +1378,6 @@ def _construct_process_message(message: str, message = (f'{message}\n' f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') - if process.stdout: - message += f'\n[Subprocess stdout]:\n{process.stdout}' - if process.stderr: - message += f'\n[Subprocess stderr]:\n{process.stderr}' return message @@ -1400,6 +1396,27 @@ def _log_process(process: subprocess.CompletedProcess, message = _construct_process_message(process_message, process) logging.info(message) + # Helper function to split text and log in safe chunks + def _stream_payload_in_chunks(label: str, payload): + if not payload: + return + + if isinstance(payload, bytes): + payload_str = payload.decode('utf-8', errors='replace') + else: + payload_str = str(payload) + chunk_size = 100000 + total_len = len(payload_str) + + logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') + for i in range(0, total_len, chunk_size): + chunk = payload_str[i:i + chunk_size] + logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') + logging.info(f'--- End of {label} ---') + + _stream_payload_in_chunks('Subprocess stdout', process.stdout) + _stream_payload_in_chunks('Subprocess stderr', process.stderr) + status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS if import_name: metrics["import_name"] = import_name From dfe631da4c173d5718d54e751af11917ede72760 Mon Sep 17 00:00:00 2001 From: TarunBali Date: Sun, 24 May 2026 18:07:54 +0530 Subject: [PATCH 02/23] Update import-automation/executor/app/executor/import_executor.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- import-automation/executor/app/executor/import_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index e3f9fafb58..d61be4194b 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1405,7 +1405,7 @@ def _stream_payload_in_chunks(label: str, payload): payload_str = payload.decode('utf-8', errors='replace') else: payload_str = str(payload) - chunk_size = 100000 + chunk_size = 50000 total_len = len(payload_str) logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') From 4e6f1f272d3a1d3e861ed33d7e54c66355b99938 Mon Sep 17 00:00:00 2001 From: TarunBali Date: Sun, 24 May 2026 18:08:20 +0530 Subject: [PATCH 03/23] Update import-automation/executor/app/executor/import_executor.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- import-automation/executor/app/executor/import_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index d61be4194b..af64cf6b52 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1414,8 +1414,8 @@ def _stream_payload_in_chunks(label: str, payload): logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') logging.info(f'--- End of {label} ---') - _stream_payload_in_chunks('Subprocess stdout', process.stdout) - _stream_payload_in_chunks('Subprocess stderr', process.stderr) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS if import_name: From dce4bd09d4f3b92572a693a4327ec8a0c6b79e5c Mon Sep 17 00:00:00 2001 From: TarunBali Date: Sun, 24 May 2026 18:49:13 +0530 Subject: [PATCH 04/23] Update import-automation/executor/app/executor/import_executor.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- import-automation/executor/app/executor/import_executor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index af64cf6b52..43dfe7d2fa 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1408,6 +1408,10 @@ def _stream_payload_in_chunks(label: str, payload): chunk_size = 50000 total_len = len(payload_str) + if total_len <= chunk_size: + logging.info(f'{label}:\n{payload_str}') + return + logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') for i in range(0, total_len, chunk_size): chunk = payload_str[i:i + chunk_size] From cfe1be04cfba1a428afdd789badd8bb2d5f70558 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 13:38:21 +0000 Subject: [PATCH 05/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 52 ++++++++++++------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index af64cf6b52..71b0c640a4 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1118,6 +1118,33 @@ def run_and_handle_exception( return ExecutionResult(ImportStatus.FAILURE, [], message) +def _stream_payload_in_chunks(label: str, payload) -> None: + """Helper function to split text and log in safe chunks to avoid + + "Log entry too large" (InvalidArgument) errors. + """ + if not payload: + return + + if isinstance(payload, bytes): + payload_str = payload.decode('utf-8', errors='replace') + else: + payload_str = str(payload) + + chunk_size = 50000 + total_len = len(payload_str) + + if total_len <= chunk_size: + logging.info(f'{label}:\n{payload_str}') + return + + logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') + for i in range(0, total_len, chunk_size): + chunk = payload_str[i:i + chunk_size] + logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') + logging.info(f'--- End of {label} ---') + + @log_function_call def _run_with_timeout_async(args: List[str], timeout: float, @@ -1216,9 +1243,12 @@ def _run_with_timeout(args: List[str], timeout=timeout, cwd=cwd, env=env) + process_message = 'Subprocess succeeded' if process.returncode == 0 else 'Subprocess failed' logging.info( - f'Completed command: {args}, retcode: {process.returncode}, stdout:' - f' {process.stdout}, stderr: {process.stderr}') + f'Completed command: {args}, retcode: {process.returncode}') + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stdout', process.stdout) + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stderr', process.stderr) + return process except Exception as e: message = traceback.format_exc() @@ -1396,24 +1426,6 @@ def _log_process(process: subprocess.CompletedProcess, message = _construct_process_message(process_message, process) logging.info(message) - # Helper function to split text and log in safe chunks - def _stream_payload_in_chunks(label: str, payload): - if not payload: - return - - if isinstance(payload, bytes): - payload_str = payload.decode('utf-8', errors='replace') - else: - payload_str = str(payload) - chunk_size = 50000 - total_len = len(payload_str) - - logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') - for i in range(0, total_len, chunk_size): - chunk = payload_str[i:i + chunk_size] - logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') - logging.info(f'--- End of {label} ---') - _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) From e008b5ae34d1ab05905edca98b50316e5992a8e0 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 13:46:35 +0000 Subject: [PATCH 06/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 52 ++++++++++++------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index af64cf6b52..71b0c640a4 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1118,6 +1118,33 @@ def run_and_handle_exception( return ExecutionResult(ImportStatus.FAILURE, [], message) +def _stream_payload_in_chunks(label: str, payload) -> None: + """Helper function to split text and log in safe chunks to avoid + + "Log entry too large" (InvalidArgument) errors. + """ + if not payload: + return + + if isinstance(payload, bytes): + payload_str = payload.decode('utf-8', errors='replace') + else: + payload_str = str(payload) + + chunk_size = 50000 + total_len = len(payload_str) + + if total_len <= chunk_size: + logging.info(f'{label}:\n{payload_str}') + return + + logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') + for i in range(0, total_len, chunk_size): + chunk = payload_str[i:i + chunk_size] + logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') + logging.info(f'--- End of {label} ---') + + @log_function_call def _run_with_timeout_async(args: List[str], timeout: float, @@ -1216,9 +1243,12 @@ def _run_with_timeout(args: List[str], timeout=timeout, cwd=cwd, env=env) + process_message = 'Subprocess succeeded' if process.returncode == 0 else 'Subprocess failed' logging.info( - f'Completed command: {args}, retcode: {process.returncode}, stdout:' - f' {process.stdout}, stderr: {process.stderr}') + f'Completed command: {args}, retcode: {process.returncode}') + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stdout', process.stdout) + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stderr', process.stderr) + return process except Exception as e: message = traceback.format_exc() @@ -1396,24 +1426,6 @@ def _log_process(process: subprocess.CompletedProcess, message = _construct_process_message(process_message, process) logging.info(message) - # Helper function to split text and log in safe chunks - def _stream_payload_in_chunks(label: str, payload): - if not payload: - return - - if isinstance(payload, bytes): - payload_str = payload.decode('utf-8', errors='replace') - else: - payload_str = str(payload) - chunk_size = 50000 - total_len = len(payload_str) - - logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') - for i in range(0, total_len, chunk_size): - chunk = payload_str[i:i + chunk_size] - logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') - logging.info(f'--- End of {label} ---') - _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) From 92c166a7eb0248c41e6b75bb736f9739f58d90aa Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 17:07:28 +0000 Subject: [PATCH 07/23] changed logic to stream subprocesses stdout and stderr --- import-automation/executor/app/executor/import_executor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 71b0c640a4..c335f2015a 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1243,11 +1243,8 @@ def _run_with_timeout(args: List[str], timeout=timeout, cwd=cwd, env=env) - process_message = 'Subprocess succeeded' if process.returncode == 0 else 'Subprocess failed' logging.info( f'Completed command: {args}, retcode: {process.returncode}') - _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stdout', process.stdout) - _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stderr', process.stderr) return process except Exception as e: From c388278b6ad60383decdf340b1a64e8b6cef6982 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 17:30:20 +0000 Subject: [PATCH 08/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index c335f2015a..ea4f7c1cf2 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -809,7 +809,8 @@ def _invoke_import_job(self, absolute_import_dir: str, "latency_secs": timer.time(), "script_index": script_index, "script_path": path, - }) + }, + skip_stream_logging=True) process.check_returncode() import_summary.import_stats['script_execution_time'] = start_timer.time( @@ -1246,6 +1247,9 @@ def _run_with_timeout(args: List[str], logging.info( f'Completed command: {args}, retcode: {process.returncode}') + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stdout', process.stdout) + _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stderr', process.stderr) + return process except Exception as e: message = traceback.format_exc() @@ -1410,7 +1414,8 @@ def _construct_process_message(message: str, def _log_process(process: subprocess.CompletedProcess, import_name: str = '', - metrics: dict = {}) -> None: + metrics: dict = {}, + skip_stream_logging: bool = False) -> None: """Logs the result of a subprocess. Args: @@ -1423,8 +1428,9 @@ def _log_process(process: subprocess.CompletedProcess, message = _construct_process_message(process_message, process) logging.info(message) - _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) - _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) + if not skip_stream_logging: + _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS if import_name: From 468ff9e405ec7cbb90f7ad45b1c192e08d18f5c2 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 17:51:58 +0000 Subject: [PATCH 09/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index ea4f7c1cf2..4a560c8d3c 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -480,7 +480,8 @@ def _invoke_import_tool(self, absolute_import_dir: str, "latency": timer.time(), "input_index": input_index, "import_input": import_prefix, - }) + }, + skip_stream_logging=True) process.check_returncode() logging.info( f'Generated resolved mcf for {import_prefix} in {output_path}.') @@ -896,7 +897,8 @@ def _import_one_helper( metrics={ "stage": ImportStage.INIT.name, "latency_secs": timer.time(), - }) + }, + skip_stream_logging=False) process.check_returncode() self._invoke_import_job(absolute_import_dir=absolute_import_dir, @@ -1247,9 +1249,6 @@ def _run_with_timeout(args: List[str], logging.info( f'Completed command: {args}, retcode: {process.returncode}') - _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stdout', process.stdout) - _stream_payload_in_chunks(f'[Command: {" ".join(args)}] stderr', process.stderr) - return process except Exception as e: message = traceback.format_exc() @@ -1409,6 +1408,13 @@ def _construct_process_message(message: str, message = (f'{message}\n' f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') + if hasattr(process, 'stdout') and process.stdout: + stdout_str = process.stdout.decode('utf-8', errors='replace') if isinstance(process.stdout, bytes) else str(process.stdout) + message += f'\n[Subprocess stdout]:\n{stdout_str}' + if hasattr(process, 'stderr') and process.stderr: + stderr_str = process.stderr.decode('utf-8', errors='replace') if isinstance(process.stderr, bytes) else str(process.stderr) + message += f'\n[Subprocess stderr]:\n{stderr_str}' + return message From d8267499e2e5d2b83b2a40035445f3f87964966e Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 18:07:19 +0000 Subject: [PATCH 10/23] changed logic to stream subprocesses stdout and stderr --- .../executor/app/executor/import_executor.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 4a560c8d3c..8835cad63c 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1394,7 +1394,8 @@ def _clean_date(time: str) -> str: def _construct_process_message(message: str, - process: subprocess.CompletedProcess) -> str: + process: subprocess.CompletedProcess, + include_streams: bool = False) -> str: """Constructs a log message describing the result of a subprocess. Args: @@ -1408,12 +1409,11 @@ def _construct_process_message(message: str, message = (f'{message}\n' f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') - if hasattr(process, 'stdout') and process.stdout: - stdout_str = process.stdout.decode('utf-8', errors='replace') if isinstance(process.stdout, bytes) else str(process.stdout) - message += f'\n[Subprocess stdout]:\n{stdout_str}' - if hasattr(process, 'stderr') and process.stderr: - stderr_str = process.stderr.decode('utf-8', errors='replace') if isinstance(process.stderr, bytes) else str(process.stderr) - message += f'\n[Subprocess stderr]:\n{stderr_str}' + if include_streams: + if process.stdout: + message += f'\n[Subprocess stdout]:\n{process.stdout}' + if process.stderr: + message += f'\n[Subprocess stderr]:\n{process.stderr}' return message @@ -1431,7 +1431,8 @@ def _log_process(process: subprocess.CompletedProcess, process_message = 'Subprocess succeeded' if process.returncode: process_message = 'Subprocess failed' - message = _construct_process_message(process_message, process) + + message = _construct_process_message(process_message, process, include_streams=False) logging.info(message) if not skip_stream_logging: From a7738bf50dc531b7d0deaccdf721378e8c9d016f Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 18:24:16 +0000 Subject: [PATCH 11/23] fixed build failure --- .../executor/app/executor/import_executor.py | 11 ++--------- .../executor/test/import_executor_test.py | 6 +----- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 8835cad63c..e701624532 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1394,8 +1394,7 @@ def _clean_date(time: str) -> str: def _construct_process_message(message: str, - process: subprocess.CompletedProcess, - include_streams: bool = False) -> str: + process: subprocess.CompletedProcess) -> str: """Constructs a log message describing the result of a subprocess. Args: @@ -1409,12 +1408,6 @@ def _construct_process_message(message: str, message = (f'{message}\n' f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') - if include_streams: - if process.stdout: - message += f'\n[Subprocess stdout]:\n{process.stdout}' - if process.stderr: - message += f'\n[Subprocess stderr]:\n{process.stderr}' - return message @@ -1432,7 +1425,7 @@ def _log_process(process: subprocess.CompletedProcess, if process.returncode: process_message = 'Subprocess failed' - message = _construct_process_message(process_message, process, include_streams=False) + message = _construct_process_message(process_message, process) logging.info(message) if not skip_stream_logging: diff --git a/import-automation/executor/test/import_executor_test.py b/import-automation/executor/test/import_executor_test.py index 37dfda0b3c..4f685b3ff3 100644 --- a/import-automation/executor/test/import_executor_test.py +++ b/import-automation/executor/test/import_executor_test.py @@ -83,11 +83,7 @@ def test_construct_process_message(self): expected = ( 'message\n' '[Subprocess command]: printf "out" & >&2 printf "err" & exit 1\n' - '[Subprocess return code]: 1\n' - '[Subprocess stdout]:\n' - 'out\n' - '[Subprocess stderr]:\n' - 'err') + '[Subprocess return code]: 1') self.assertEqual(expected, message) def test_construct_process_message_no_output(self): From bdc2926e6f9d5dc33e50963cf02166d472120d39 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 18:40:53 +0000 Subject: [PATCH 12/23] fixed linting error --- .../executor/app/executor/import_executor.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index e701624532..072b82eb9a 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -77,7 +77,7 @@ AUTO_IMPORT_JOB_STATUS = "auto-import-job-status" IMPORT_SUMMARY_FILE = "import_summary.json" STAGING_VERSION_FILE = "staging_version.txt" - +MAX_LOG_CHUNK_SIZE = 50000 class ImportStatus(Enum): SUCCESS = 1 @@ -897,8 +897,7 @@ def _import_one_helper( metrics={ "stage": ImportStage.INIT.name, "latency_secs": timer.time(), - }, - skip_stream_logging=False) + }) process.check_returncode() self._invoke_import_job(absolute_import_dir=absolute_import_dir, @@ -1134,7 +1133,7 @@ def _stream_payload_in_chunks(label: str, payload) -> None: else: payload_str = str(payload) - chunk_size = 50000 + chunk_size = MAX_LOG_CHUNK_SIZE total_len = len(payload_str) if total_len <= chunk_size: @@ -1429,8 +1428,10 @@ def _log_process(process: subprocess.CompletedProcess, logging.info(message) if not skip_stream_logging: - _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', process.stdout) - _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', process.stderr) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', + process.stdout) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', + process.stderr) status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS if import_name: From df35c3913c2227fc7fd5a59c29dce09230e883e5 Mon Sep 17 00:00:00 2001 From: TarunBali Date: Mon, 25 May 2026 00:23:40 +0530 Subject: [PATCH 13/23] Update import-automation/executor/app/executor/import_executor.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- import-automation/executor/app/executor/import_executor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 072b82eb9a..a0b01b7f63 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1420,6 +1420,8 @@ def _log_process(process: subprocess.CompletedProcess, process: subprocess.CompletedProcess object whose arguments, return code, stdout, and stderr are to be logged. """ + if metrics is None: + metrics = {} process_message = 'Subprocess succeeded' if process.returncode: process_message = 'Subprocess failed' From 8e784c8ffdad8d9049bb74b69582898074523c7c Mon Sep 17 00:00:00 2001 From: TarunBali Date: Mon, 25 May 2026 00:23:59 +0530 Subject: [PATCH 14/23] Update import-automation/executor/app/executor/import_executor.py done Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- import-automation/executor/app/executor/import_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index a0b01b7f63..33c8155c64 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1412,7 +1412,7 @@ def _construct_process_message(message: str, def _log_process(process: subprocess.CompletedProcess, import_name: str = '', - metrics: dict = {}, + metrics: dict = None, skip_stream_logging: bool = False) -> None: """Logs the result of a subprocess. From a3399df375cab1651022ac7f35baee8545f73333 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 19:02:40 +0000 Subject: [PATCH 15/23] fixed linting error --- import-automation/executor/app/executor/import_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 33c8155c64..95134b5dbe 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -79,6 +79,7 @@ STAGING_VERSION_FILE = "staging_version.txt" MAX_LOG_CHUNK_SIZE = 50000 + class ImportStatus(Enum): SUCCESS = 1 FAILURE = 2 From a8945b56a0d6731fb0f55150329eeaaa4ffe61c6 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 19:53:54 +0000 Subject: [PATCH 16/23] fixed gcs folder creation logic --- scripts/earthengine/process_events.py | 3 +-- scripts/fires/firms/fire_events_pipeline_config.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index 602061e188..ad97488a99 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -1595,8 +1595,7 @@ def output_events(self, if self._config.get('output_events', True): _set_counter_stage(self._counters, 'emit_events_csv_') output_files.append( - self.write_events_csv(output_path=_get_output_subdir_path( - output_path, 'events'), + self.write_events_csv(output_path=output_path, output_ended_events=output_ended_events)) if self._config.get('output_svobs', False): _set_counter_stage(self._counters, 'emit_events_svobs_') diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py index a5cdc2c382..6eb6ce1be7 100644 --- a/scripts/fires/firms/fire_events_pipeline_config.py +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -88,7 +88,7 @@ # Output events csv into a common folder with a year in filename, # as the import automation can copy all files in a single folder. "output_dir": - "gs://{gcs_bucket}/{gcs_folder}/{import_name}-{stage}-{year}-without-usa-", + "gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-", "event_type": "FireEvent", From 4e3686bf384478db8975d310aab16dbe296a58fd Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 20:27:54 +0000 Subject: [PATCH 17/23] fixed gcs output paths --- scripts/earthengine/process_events.py | 6 +++++- scripts/fires/firms/fire_events_pipeline_config.py | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index ad97488a99..f65ef5d90a 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -1594,8 +1594,12 @@ def output_events(self, output_ended_events = True if self._config.get('output_events', True): _set_counter_stage(self._counters, 'emit_events_csv_') + if self._config.get('omit_events_subdir', False): + events_output_path = output_path + else: + events_output_path = _get_output_subdir_path(output_path, 'events') output_files.append( - self.write_events_csv(output_path=output_path, + self.write_events_csv(output_path=events_output_path, output_ended_events=output_ended_events)) if self._config.get('output_svobs', False): _set_counter_stage(self._counters, 'emit_events_svobs_') diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py index 6eb6ce1be7..d5cc7d35dc 100644 --- a/scripts/fires/firms/fire_events_pipeline_config.py +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -173,6 +173,8 @@ # Output settings. "output_svobs": True, + "omit_events_subdir": + True, "output_delimiter": ",", "output_affected_place_polygon": From f4d45e8ee4116c85a8a57a0cf573e03ffdbaef2b Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 20:33:16 +0000 Subject: [PATCH 18/23] fixed gcs output paths --- import-automation/executor/app/executor/import_executor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 95134b5dbe..dfe70fd13b 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -481,8 +481,7 @@ def _invoke_import_tool(self, absolute_import_dir: str, "latency": timer.time(), "input_index": input_index, "import_input": import_prefix, - }, - skip_stream_logging=True) + }) process.check_returncode() logging.info( f'Generated resolved mcf for {import_prefix} in {output_path}.') @@ -811,8 +810,7 @@ def _invoke_import_job(self, absolute_import_dir: str, "latency_secs": timer.time(), "script_index": script_index, "script_path": path, - }, - skip_stream_logging=True) + }) process.check_returncode() import_summary.import_stats['script_execution_time'] = start_timer.time( From 9e369c288fe3283da6758c3a53ae9449c743b6ee Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 20:44:27 +0000 Subject: [PATCH 19/23] fixed gcs output paths --- import-automation/executor/app/executor/import_executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index dfe70fd13b..f4f6913dc1 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1418,6 +1418,9 @@ def _log_process(process: subprocess.CompletedProcess, Args: process: subprocess.CompletedProcess object whose arguments, return code, stdout, and stderr are to be logged. + import_name: Name of the import for labeling logs. + metrics: Dictionary to store execution metrics. + skip_stream_logging: Whether to skip chunked logging of stdout and stderr. """ if metrics is None: metrics = {} From a9089a554f29e3ba60c8c4421a1428b0f3675276 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 20:46:48 +0000 Subject: [PATCH 20/23] fixed lint error --- scripts/earthengine/process_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index f65ef5d90a..1fa8b4db83 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -1597,7 +1597,8 @@ def output_events(self, if self._config.get('omit_events_subdir', False): events_output_path = output_path else: - events_output_path = _get_output_subdir_path(output_path, 'events') + events_output_path = _get_output_subdir_path( + output_path, 'events') output_files.append( self.write_events_csv(output_path=events_output_path, output_ended_events=output_ended_events)) From f115ec98a391e75b7225ba3d1d2a63d001639476 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 21:02:16 +0000 Subject: [PATCH 21/23] fixed duplicate logging and kept everything as streamed --- import-automation/executor/app/executor/import_executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index f4f6913dc1..d0825706f0 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1182,10 +1182,8 @@ def _run_with_timeout_async(args: List[str], # Log output continuously until the command completes. for line in process.stderr: stderr.append(line) - logging.info(f'Process stderr:{name}: {line}') for line in process.stdout: stdout.append(line) - logging.info(f'Process stdout:{name}: {line}') # Wait in case script has closed stderr/stdout early. process.wait() From 52d11f28390be61024ff441981b2541c87e25c16 Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 21:21:42 +0000 Subject: [PATCH 22/23] fixed duplicate logging and kept everything as streamed --- .../executor/app/executor/import_executor.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index d0825706f0..99deac5968 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -1179,14 +1179,11 @@ def _run_with_timeout_async(args: List[str], env=env, ) - # Log output continuously until the command completes. - for line in process.stderr: - stderr.append(line) - for line in process.stdout: - stdout.append(line) - - # Wait in case script has closed stderr/stdout early. - process.wait() + out, err = process.communicate(timeout=timeout) + stdout.append(out) + stderr.append(err) + if process.returncode != 0: + raise RuntimeError(f"Process failed with exit code {process.returncode}") end_time = time.time() return_code = process.returncode From 621417ef0fb84c308b05664d3dcbf6e75c23e30b Mon Sep 17 00:00:00 2001 From: Tarun Bali Date: Sun, 24 May 2026 21:50:26 +0000 Subject: [PATCH 23/23] fixed duplicate logging and kept everything as streamed --- .../executor/app/executor/import_executor.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 99deac5968..e664bc2b06 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -481,7 +481,8 @@ def _invoke_import_tool(self, absolute_import_dir: str, "latency": timer.time(), "input_index": input_index, "import_input": import_prefix, - }) + }, + skip_stream_logging=True) process.check_returncode() logging.info( f'Generated resolved mcf for {import_prefix} in {output_path}.') @@ -810,7 +811,8 @@ def _invoke_import_job(self, absolute_import_dir: str, "latency_secs": timer.time(), "script_index": script_index, "script_path": path, - }) + }, + skip_stream_logging=True) process.check_returncode() import_summary.import_stats['script_execution_time'] = start_timer.time( @@ -1179,11 +1181,16 @@ def _run_with_timeout_async(args: List[str], env=env, ) - out, err = process.communicate(timeout=timeout) - stdout.append(out) - stderr.append(err) - if process.returncode != 0: - raise RuntimeError(f"Process failed with exit code {process.returncode}") + # Log output continuously until the command completes. + for line in process.stderr: + stderr.append(line) + logging.info(f'Process stderr:{name}: {line}') + for line in process.stdout: + stdout.append(line) + logging.info(f'Process stdout:{name}: {line}') + + # Wait in case script has closed stderr/stdout early. + process.wait() end_time = time.time() return_code = process.returncode