Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
807c0d1
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
dfe631d
Update import-automation/executor/app/executor/import_executor.py
balit-raibot May 24, 2026
4e6f1f2
Update import-automation/executor/app/executor/import_executor.py
balit-raibot May 24, 2026
dce4bd0
Update import-automation/executor/app/executor/import_executor.py
balit-raibot May 24, 2026
cfe1be0
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
7c4afb4
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
e008b5a
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
92c166a
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
c388278
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
468ff9e
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
d826749
changed logic to stream subprocesses stdout and stderr
balit-raibot May 24, 2026
a7738bf
fixed build failure
balit-raibot May 24, 2026
bdc2926
fixed linting error
balit-raibot May 24, 2026
df35c39
Update import-automation/executor/app/executor/import_executor.py
balit-raibot May 24, 2026
8e784c8
Update import-automation/executor/app/executor/import_executor.py
balit-raibot May 24, 2026
a3399df
fixed linting error
balit-raibot May 24, 2026
a8945b5
fixed gcs folder creation logic
balit-raibot May 24, 2026
4e3686b
fixed gcs output paths
balit-raibot May 24, 2026
f4d45e8
fixed gcs output paths
balit-raibot May 24, 2026
9e369c2
fixed gcs output paths
balit-raibot May 24, 2026
a9089a5
fixed lint error
balit-raibot May 24, 2026
f115ec9
fixed duplicate logging and kept everything as streamed
balit-raibot May 24, 2026
52d11f2
fixed duplicate logging and kept everything as streamed
balit-raibot May 24, 2026
621417e
fixed duplicate logging and kept everything as streamed
balit-raibot May 24, 2026
156fb5e
Merge branch 'master' into fix_huge_error_streaming_cloud
balit-raibot May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
AUTO_IMPORT_JOB_STATUS = "auto-import-job-status"
IMPORT_SUMMARY_FILE = "import_summary.json"
STAGING_VERSION_FILE = "staging_version.txt"
MAX_LOG_CHUNK_SIZE = 50000


class ImportStatus(Enum):
Expand Down Expand Up @@ -480,7 +481,8 @@ def _invoke_import_tool(self, absolute_import_dir: str,
"latency": timer.time(),
"input_index": input_index,
"import_input": import_prefix,
})
},
skip_stream_logging=True)
process.check_returncode()
logging.info(
f'Generated resolved mcf for {import_prefix} in {output_path}.')
Expand Down Expand Up @@ -809,7 +811,8 @@ def _invoke_import_job(self, absolute_import_dir: str,
"latency_secs": timer.time(),
"script_index": script_index,
"script_path": path,
})
},
skip_stream_logging=True)
process.check_returncode()

import_summary.import_stats['script_execution_time'] = start_timer.time(
Expand Down Expand Up @@ -1118,6 +1121,33 @@ def run_and_handle_exception(
return ExecutionResult(ImportStatus.FAILURE, [], message)


def _stream_payload_in_chunks(label: str, payload) -> None:
"""Helper function to split text and log in safe chunks to avoid

"Log entry too large" (InvalidArgument) errors.
"""
if not payload:
return

if isinstance(payload, bytes):
payload_str = payload.decode('utf-8', errors='replace')
Comment thread
balit-raibot marked this conversation as resolved.
else:
payload_str = str(payload)

chunk_size = MAX_LOG_CHUNK_SIZE
total_len = len(payload_str)

if total_len <= chunk_size:
logging.info(f'{label}:\n{payload_str}')
return

logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---')
for i in range(0, total_len, chunk_size):
chunk = payload_str[i:i + chunk_size]
logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}')
logging.info(f'--- End of {label} ---')


@log_function_call
def _run_with_timeout_async(args: List[str],
timeout: float,
Expand Down Expand Up @@ -1217,8 +1247,8 @@ def _run_with_timeout(args: List[str],
cwd=cwd,
env=env)
logging.info(
f'Completed command: {args}, retcode: {process.returncode}, stdout:'
f' {process.stdout}, stderr: {process.stderr}')
f'Completed command: {args}, retcode: {process.returncode}')

return process
except Exception as e:
message = traceback.format_exc()
Expand Down Expand Up @@ -1378,28 +1408,37 @@ def _construct_process_message(message: str,
message = (f'{message}\n'
f'[Subprocess command]: {command}\n'
f'[Subprocess return code]: {process.returncode}')
if process.stdout:
message += f'\n[Subprocess stdout]:\n{process.stdout}'
if process.stderr:
message += f'\n[Subprocess stderr]:\n{process.stderr}'
return message


def _log_process(process: subprocess.CompletedProcess,
import_name: str = '',
metrics: dict = {}) -> None:
metrics: dict = None,
skip_stream_logging: bool = False) -> None:
"""Logs the result of a subprocess.

Args:
process: subprocess.CompletedProcess object whose arguments, return code,
stdout, and stderr are to be logged.
import_name: Name of the import for labeling logs.
metrics: Dictionary to store execution metrics.
skip_stream_logging: Whether to skip chunked logging of stdout and stderr.
"""
Comment thread
balit-raibot marked this conversation as resolved.
if metrics is None:
metrics = {}
process_message = 'Subprocess succeeded'
Comment thread
balit-raibot marked this conversation as resolved.
if process.returncode:
process_message = 'Subprocess failed'

message = _construct_process_message(process_message, process)
logging.info(message)

if not skip_stream_logging:
_stream_payload_in_chunks(f'[{import_name}] Subprocess stdout',
process.stdout)
_stream_payload_in_chunks(f'[{import_name}] Subprocess stderr',
process.stderr)
Comment thread
balit-raibot marked this conversation as resolved.
Comment thread
balit-raibot marked this conversation as resolved.
Comment thread
balit-raibot marked this conversation as resolved.

status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS
if import_name:
metrics["import_name"] = import_name
Expand Down
6 changes: 1 addition & 5 deletions import-automation/executor/test/import_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ def test_construct_process_message(self):
expected = (
'message\n'
'[Subprocess command]: printf "out" & >&2 printf "err" & exit 1\n'
'[Subprocess return code]: 1\n'
'[Subprocess stdout]:\n'
'out\n'
'[Subprocess stderr]:\n'
'err')
'[Subprocess return code]: 1')
self.assertEqual(expected, message)

def test_construct_process_message_no_output(self):
Expand Down
8 changes: 6 additions & 2 deletions scripts/earthengine/process_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1594,9 +1594,13 @@ def output_events(self,
output_ended_events = True
if self._config.get('output_events', True):
_set_counter_stage(self._counters, 'emit_events_csv_')
if self._config.get('omit_events_subdir', False):
events_output_path = output_path
else:
events_output_path = _get_output_subdir_path(
output_path, 'events')
output_files.append(
self.write_events_csv(output_path=_get_output_subdir_path(
output_path, 'events'),
self.write_events_csv(output_path=events_output_path,
output_ended_events=output_ended_events))
if self._config.get('output_svobs', False):
_set_counter_stage(self._counters, 'emit_events_svobs_')
Expand Down
4 changes: 3 additions & 1 deletion scripts/fires/firms/fire_events_pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
# Output events csv into a common folder with a year in filename,
# as the import automation can copy all files in a single folder.
"output_dir":
"gs://{gcs_bucket}/{gcs_folder}/{import_name}-{stage}-{year}-without-usa-",
"gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-",
"event_type":
"FireEvent",

Expand Down Expand Up @@ -173,6 +173,8 @@
# Output settings.
"output_svobs":
True,
"omit_events_subdir":
True,
"output_delimiter":
",",
"output_affected_place_polygon":
Expand Down
Loading