Skip to content

Commit 0af2756

Browse files
Refactor disorder export demo for Sonar
1 parent a6f9c79 commit 0af2756

1 file changed

Lines changed: 160 additions & 100 deletions

File tree

scripts/materials/disorder_workflow_demo/demo_csd_disorder_workflow.py

Lines changed: 160 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
LICENSE_REQUIREMENT = "CSD-Materials or another licence tier with CSD Python API database access"
4343

4444

45+
def _escape_cif_string(value: str | None) -> str:
46+
return (value or "?").replace("'", "''")
47+
48+
4549
def _format_float(value: Any, precision: int = 4) -> str:
4650
if value is None:
4751
return "?"
@@ -62,94 +66,99 @@ def _format_esd(value: Any, esd: Any, precision: int = 4) -> str:
6266
return base
6367

6468

65-
def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]:
66-
"""Export a CSD structure to CIF with occupancy and disorder metadata."""
67-
from ccdc.io import EntryReader
69+
def _build_disorder_map(crystal: Any) -> dict[str, tuple[int | None, str | None]]:
70+
disorder_map: dict[str, tuple[int | None, str | None]] = {}
71+
disorder = getattr(crystal, "disorder", None)
72+
if disorder is None:
73+
return disorder_map
6874

69-
with EntryReader("CSD") as reader:
70-
entry = reader.entry(refcode)
71-
crystal = reader.crystal(refcode)
75+
try:
76+
for assembly in disorder.assemblies:
77+
assembly_id = str(getattr(assembly, "id", ""))
78+
for group in assembly.groups:
79+
group_id = int(getattr(group, "id", 0))
80+
for atom in group.atoms:
81+
disorder_map[str(atom.label)] = (group_id, assembly_id)
82+
except Exception:
83+
return disorder_map
84+
return disorder_map
7285

73-
molecule = crystal.disordered_molecule or crystal.molecule
7486

75-
disorder_map: dict[str, tuple[int | None, str | None]] = {}
76-
disorder = getattr(crystal, "disorder", None)
77-
if disorder is not None:
78-
try:
79-
for assembly in disorder.assemblies:
80-
assembly_id = str(getattr(assembly, "id", ""))
81-
for group in assembly.groups:
82-
group_id = int(getattr(group, "id", 0))
83-
for atom in group.atoms:
84-
disorder_map[str(atom.label)] = (group_id, assembly_id)
85-
except Exception:
86-
pass
87+
def _extract_occupancy(atom: Any) -> float:
88+
try:
89+
if atom.occupancy is not None:
90+
return float(atom.occupancy)
91+
except Exception:
92+
pass
93+
return 1.0
8794

95+
96+
def _extract_displacement(atom: Any) -> tuple[str, float | None, dict[str, Any] | None]:
97+
displacement_type = "?"
98+
u_iso = None
99+
aniso = None
100+
try:
101+
displacement = atom.displacement_parameters
102+
if displacement is None:
103+
return displacement_type, u_iso, aniso
104+
u_iso = float(displacement.isotropic_equivalent)
105+
if displacement.type != "Anisotropic":
106+
return "Uiso", u_iso, aniso
107+
values = displacement.values
108+
uncertainties = displacement.uncertainties
109+
aniso = {
110+
"u11": values[0][0], "u22": values[1][1], "u33": values[2][2],
111+
"u12": values[0][1], "u13": values[0][2], "u23": values[1][2],
112+
"e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2],
113+
"e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2],
114+
}
115+
return "Uani", u_iso, aniso
116+
except Exception:
117+
return displacement_type, u_iso, aniso
118+
119+
120+
def _extract_atom_data(molecule: Any, disorder_map: dict[str, tuple[int | None, str | None]]) -> list[dict[str, Any]]:
88121
atom_data: list[dict[str, Any]] = []
89122
for atom in molecule.atoms:
90123
label = str(atom.label) if atom.label else "?"
91124
symbol = str(atom.atomic_symbol) if atom.atomic_symbol else "?"
92125
fractional = atom.fractional_coordinates
93-
occupancy = 1.0
94-
try:
95-
if atom.occupancy is not None:
96-
occupancy = float(atom.occupancy)
97-
except Exception:
98-
pass
99-
100-
displacement_type = "?"
101-
u_iso = None
102-
aniso = None
103-
try:
104-
displacement = atom.displacement_parameters
105-
if displacement is not None:
106-
u_iso = float(displacement.isotropic_equivalent)
107-
if displacement.type == "Anisotropic":
108-
displacement_type = "Uani"
109-
values = displacement.values
110-
uncertainties = displacement.uncertainties
111-
aniso = {
112-
"u11": values[0][0], "u22": values[1][1], "u33": values[2][2],
113-
"u12": values[0][1], "u13": values[0][2], "u23": values[1][2],
114-
"e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2],
115-
"e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2],
116-
}
117-
else:
118-
displacement_type = "Uiso"
119-
except Exception:
120-
pass
121-
126+
displacement_type, u_iso, aniso = _extract_displacement(atom)
122127
disorder_group, disorder_assembly = disorder_map.get(label, (None, None))
123128
atom_data.append({
124129
"label": label,
125130
"symbol": symbol,
126131
"fx": float(fractional.x) if fractional else None,
127132
"fy": float(fractional.y) if fractional else None,
128133
"fz": float(fractional.z) if fractional else None,
129-
"occupancy": occupancy,
134+
"occupancy": _extract_occupancy(atom),
130135
"u_iso": u_iso,
131136
"displacement_type": displacement_type,
132137
"disorder_group": disorder_group,
133138
"disorder_assembly": disorder_assembly,
134139
"aniso": aniso,
135140
})
141+
return atom_data
142+
136143

144+
def _extract_bonds(molecule: Any) -> list[tuple[str, str, float, str]]:
137145
bonds: list[tuple[str, str, float, str]] = []
138146
try:
139147
for bond in molecule.bonds:
140148
atom_1, atom_2 = bond.atoms
141149
bonds.append((str(atom_1.label), str(atom_2.label), float(bond.length), str(bond.bond_type)))
142150
except Exception:
143-
pass
151+
return bonds
152+
return bonds
153+
144154

155+
def _build_header_lines(refcode: str, entry: Any, crystal: Any) -> list[str]:
145156
cell_a, cell_b, cell_c = crystal.cell_lengths
146157
alpha, beta, gamma = crystal.cell_angles
147-
disorder_present = any(atom["disorder_group"] is not None for atom in atom_data)
148-
149-
lines = [
158+
return [
150159
f"data_{refcode}",
151160
"_audit_creation_method 'CSD Python API disorder workflow demo'",
152-
f"_chemical_name_common '{(entry.chemical_name or '?').replace(chr(39), chr(39) + chr(39))}'",
161+
f"_chemical_name_common '{_escape_cif_string(entry.chemical_name)}'",
153162
f"_chemical_formula_sum '{entry.formula or '?'}'",
154163
f"_cell_length_a {_format_float(cell_a)}",
155164
f"_cell_length_b {_format_float(cell_b)}",
@@ -163,13 +172,20 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]:
163172
"",
164173
]
165174

166-
if crystal.symmetry_operators:
167-
lines.extend(["loop_", "_symmetry_equiv_pos_as_xyz"])
168-
for operator in crystal.symmetry_operators:
169-
lines.append(f" '{operator}'")
170-
lines.append("")
171175

172-
lines.extend([
176+
def _build_symmetry_lines(crystal: Any) -> list[str]:
177+
if not crystal.symmetry_operators:
178+
return []
179+
lines = ["loop_", "_symmetry_equiv_pos_as_xyz"]
180+
for operator in crystal.symmetry_operators:
181+
lines.append(f" '{operator}'")
182+
lines.append("")
183+
return lines
184+
185+
186+
def _build_atom_site_lines(atom_data: list[dict[str, Any]]) -> list[str]:
187+
disorder_present = any(atom["disorder_group"] is not None for atom in atom_data)
188+
lines = [
173189
"loop_",
174190
"_atom_site_label",
175191
"_atom_site_type_symbol",
@@ -179,7 +195,7 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]:
179195
"_atom_site_occupancy",
180196
"_atom_site_U_iso_or_equiv",
181197
"_atom_site_thermal_displace_type",
182-
])
198+
]
183199
if disorder_present:
184200
lines.extend([
185201
"_atom_site_disorder_assembly",
@@ -200,52 +216,96 @@ def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]:
200216
f"{atom['displacement_type']:>5s}",
201217
]
202218
if disorder_present:
203-
row.append(f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}")
204-
row.append(f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}")
219+
row.extend([
220+
f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}",
221+
f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}",
222+
])
205223
lines.append(" " + " ".join(row))
206224
lines.append("")
225+
return lines
226+
207227

228+
def _build_anisotropic_lines(atom_data: list[dict[str, Any]]) -> list[str]:
208229
anisotropic_atoms = [atom for atom in atom_data if atom["aniso"]]
209-
if anisotropic_atoms:
210-
lines.extend([
211-
"loop_",
212-
"_atom_site_aniso_label",
213-
"_atom_site_aniso_U_11",
214-
"_atom_site_aniso_U_22",
215-
"_atom_site_aniso_U_33",
216-
"_atom_site_aniso_U_12",
217-
"_atom_site_aniso_U_13",
218-
"_atom_site_aniso_U_23",
219-
])
220-
for atom in anisotropic_atoms:
221-
u = atom["aniso"]
222-
lines.append(
223-
f" {atom['label']:<8s}"
224-
f" {_format_esd(u['u11'], u['e11']):>12s}"
225-
f" {_format_esd(u['u22'], u['e22']):>12s}"
226-
f" {_format_esd(u['u33'], u['e33']):>12s}"
227-
f" {_format_esd(u['u12'], u['e12']):>12s}"
228-
f" {_format_esd(u['u13'], u['e13']):>12s}"
229-
f" {_format_esd(u['u23'], u['e23']):>12s}"
230-
)
231-
lines.append("")
232-
233-
if bonds:
234-
lines.extend([
235-
"loop_",
236-
"_geom_bond_atom_site_label_1",
237-
"_geom_bond_atom_site_label_2",
238-
"_geom_bond_distance",
239-
"_ccdc_geom_bond_type",
240-
])
241-
for atom_1, atom_2, distance, bond_type in bonds:
242-
lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}")
243-
lines.append("")
230+
if not anisotropic_atoms:
231+
return []
232+
lines = [
233+
"loop_",
234+
"_atom_site_aniso_label",
235+
"_atom_site_aniso_U_11",
236+
"_atom_site_aniso_U_22",
237+
"_atom_site_aniso_U_33",
238+
"_atom_site_aniso_U_12",
239+
"_atom_site_aniso_U_13",
240+
"_atom_site_aniso_U_23",
241+
]
242+
for atom in anisotropic_atoms:
243+
u = atom["aniso"]
244+
lines.append(
245+
f" {atom['label']:<8s}"
246+
f" {_format_esd(u['u11'], u['e11']):>12s}"
247+
f" {_format_esd(u['u22'], u['e22']):>12s}"
248+
f" {_format_esd(u['u33'], u['e33']):>12s}"
249+
f" {_format_esd(u['u12'], u['e12']):>12s}"
250+
f" {_format_esd(u['u13'], u['e13']):>12s}"
251+
f" {_format_esd(u['u23'], u['e23']):>12s}"
252+
)
253+
lines.append("")
254+
return lines
244255

245-
lines.append("#END")
256+
257+
def _build_bond_lines(bonds: list[tuple[str, str, float, str]]) -> list[str]:
258+
if not bonds:
259+
return []
260+
lines = [
261+
"loop_",
262+
"_geom_bond_atom_site_label_1",
263+
"_geom_bond_atom_site_label_2",
264+
"_geom_bond_distance",
265+
"_ccdc_geom_bond_type",
266+
]
267+
for atom_1, atom_2, distance, bond_type in bonds:
268+
lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}")
269+
lines.append("")
270+
return lines
271+
272+
273+
def _write_cif_file(output_path: Path, lines: list[str]) -> None:
246274
output_path.parent.mkdir(parents=True, exist_ok=True)
247275
output_path.write_text("\n".join(lines), encoding="utf-8")
248276

277+
278+
def _validated_output_path(output_dir: Path, refcode: str) -> Path:
279+
base_dir = output_dir.expanduser().resolve()
280+
candidate = (base_dir / f"{refcode}.cif").resolve()
281+
try:
282+
candidate.relative_to(base_dir)
283+
except ValueError as exc:
284+
raise ValueError(f"Refusing to write outside output directory: {candidate}") from exc
285+
return candidate
286+
287+
288+
def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]:
289+
"""Export a CSD structure to CIF with occupancy and disorder metadata."""
290+
from ccdc.io import EntryReader
291+
292+
with EntryReader("CSD") as reader:
293+
entry = reader.entry(refcode)
294+
crystal = reader.crystal(refcode)
295+
296+
molecule = crystal.disordered_molecule or crystal.molecule
297+
disorder_map = _build_disorder_map(crystal)
298+
atom_data = _extract_atom_data(molecule, disorder_map)
299+
bonds = _extract_bonds(molecule)
300+
lines = []
301+
lines.extend(_build_header_lines(refcode, entry, crystal))
302+
lines.extend(_build_symmetry_lines(crystal))
303+
lines.extend(_build_atom_site_lines(atom_data))
304+
lines.extend(_build_anisotropic_lines(atom_data))
305+
lines.extend(_build_bond_lines(bonds))
306+
lines.append("#END")
307+
_write_cif_file(output_path, lines)
308+
249309
return {
250310
"refcode": refcode,
251311
"output": str(output_path),
@@ -268,7 +328,7 @@ def main() -> None:
268328

269329
print(f"Licence requirement: {LICENSE_REQUIREMENT}")
270330
for refcode in args.refcodes:
271-
output_path = output_dir / f"{refcode}.cif"
331+
output_path = _validated_output_path(output_dir, refcode)
272332
summary = export_full_cif_from_csd(refcode, output_path)
273333
print(
274334
f"Exported {summary['refcode']} -> {summary['output']} "

0 commit comments

Comments
 (0)