diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 35891878c6..e664bc2b06 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -77,6 +77,7 @@ AUTO_IMPORT_JOB_STATUS = "auto-import-job-status" IMPORT_SUMMARY_FILE = "import_summary.json" STAGING_VERSION_FILE = "staging_version.txt" +MAX_LOG_CHUNK_SIZE = 50000 class ImportStatus(Enum): @@ -480,7 +481,8 @@ def _invoke_import_tool(self, absolute_import_dir: str, "latency": timer.time(), "input_index": input_index, "import_input": import_prefix, - }) + }, + skip_stream_logging=True) process.check_returncode() logging.info( f'Generated resolved mcf for {import_prefix} in {output_path}.') @@ -809,7 +811,8 @@ def _invoke_import_job(self, absolute_import_dir: str, "latency_secs": timer.time(), "script_index": script_index, "script_path": path, - }) + }, + skip_stream_logging=True) process.check_returncode() import_summary.import_stats['script_execution_time'] = start_timer.time( @@ -1118,6 +1121,33 @@ def run_and_handle_exception( return ExecutionResult(ImportStatus.FAILURE, [], message) +def _stream_payload_in_chunks(label: str, payload) -> None: + """Helper function to split text and log in safe chunks to avoid + + "Log entry too large" (InvalidArgument) errors. + """ + if not payload: + return + + if isinstance(payload, bytes): + payload_str = payload.decode('utf-8', errors='replace') + else: + payload_str = str(payload) + + chunk_size = MAX_LOG_CHUNK_SIZE + total_len = len(payload_str) + + if total_len <= chunk_size: + logging.info(f'{label}:\n{payload_str}') + return + + logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---') + for i in range(0, total_len, chunk_size): + chunk = payload_str[i:i + chunk_size] + logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}') + logging.info(f'--- End of {label} ---') + + @log_function_call def _run_with_timeout_async(args: List[str], timeout: float, @@ -1217,8 +1247,8 @@ def _run_with_timeout(args: List[str], cwd=cwd, env=env) logging.info( - f'Completed command: {args}, retcode: {process.returncode}, stdout:' - f' {process.stdout}, stderr: {process.stderr}') + f'Completed command: {args}, retcode: {process.returncode}') + return process except Exception as e: message = traceback.format_exc() @@ -1378,28 +1408,37 @@ def _construct_process_message(message: str, message = (f'{message}\n' f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') - if process.stdout: - message += f'\n[Subprocess stdout]:\n{process.stdout}' - if process.stderr: - message += f'\n[Subprocess stderr]:\n{process.stderr}' return message def _log_process(process: subprocess.CompletedProcess, import_name: str = '', - metrics: dict = {}) -> None: + metrics: dict = None, + skip_stream_logging: bool = False) -> None: """Logs the result of a subprocess. Args: process: subprocess.CompletedProcess object whose arguments, return code, stdout, and stderr are to be logged. + import_name: Name of the import for labeling logs. + metrics: Dictionary to store execution metrics. + skip_stream_logging: Whether to skip chunked logging of stdout and stderr. """ + if metrics is None: + metrics = {} process_message = 'Subprocess succeeded' if process.returncode: process_message = 'Subprocess failed' + message = _construct_process_message(process_message, process) logging.info(message) + if not skip_stream_logging: + _stream_payload_in_chunks(f'[{import_name}] Subprocess stdout', + process.stdout) + _stream_payload_in_chunks(f'[{import_name}] Subprocess stderr', + process.stderr) + status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS if import_name: metrics["import_name"] = import_name diff --git a/import-automation/executor/test/import_executor_test.py b/import-automation/executor/test/import_executor_test.py index 37dfda0b3c..4f685b3ff3 100644 --- a/import-automation/executor/test/import_executor_test.py +++ b/import-automation/executor/test/import_executor_test.py @@ -83,11 +83,7 @@ def test_construct_process_message(self): expected = ( 'message\n' '[Subprocess command]: printf "out" & >&2 printf "err" & exit 1\n' - '[Subprocess return code]: 1\n' - '[Subprocess stdout]:\n' - 'out\n' - '[Subprocess stderr]:\n' - 'err') + '[Subprocess return code]: 1') self.assertEqual(expected, message) def test_construct_process_message_no_output(self): diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index 602061e188..1fa8b4db83 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -1594,9 +1594,13 @@ def output_events(self, output_ended_events = True if self._config.get('output_events', True): _set_counter_stage(self._counters, 'emit_events_csv_') + if self._config.get('omit_events_subdir', False): + events_output_path = output_path + else: + events_output_path = _get_output_subdir_path( + output_path, 'events') output_files.append( - self.write_events_csv(output_path=_get_output_subdir_path( - output_path, 'events'), + self.write_events_csv(output_path=events_output_path, output_ended_events=output_ended_events)) if self._config.get('output_svobs', False): _set_counter_stage(self._counters, 'emit_events_svobs_') diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py index a5cdc2c382..d5cc7d35dc 100644 --- a/scripts/fires/firms/fire_events_pipeline_config.py +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -88,7 +88,7 @@ # Output events csv into a common folder with a year in filename, # as the import automation can copy all files in a single folder. "output_dir": - "gs://{gcs_bucket}/{gcs_folder}/{import_name}-{stage}-{year}-without-usa-", + "gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-", "event_type": "FireEvent", @@ -173,6 +173,8 @@ # Output settings. "output_svobs": True, + "omit_events_subdir": + True, "output_delimiter": ",", "output_affected_place_polygon":