Parallel Execution¶
For large files, .parallel() splits work across multiple processes.
Usage¶
from crxml import CrystalXMLSource, RenameFields, CastTypes, to_dataframe
pipe = (
CrystalXMLSource("report.xml")
| RenameFields({"f1": "name", "f2": "total"})
| CastTypes({"total": float})
)
df = pipe.parallel(workers=4, batch_size=5000) |> to_dataframe
Parameters¶
| Param | Type | Default | Description |
|---|---|---|---|
workers |
int |
None |
Number of worker processes (CPU count) |
batch_size |
int |
10000 |
Rows per batch sent to workers |
Requirements¶
- All stages in the pipeline must be fusable (implement
apply+__call__) - All stages must be picklable (no lambdas, no closures)
- The source must be iterable multiple times (file re-opened per batch)
How it works¶
- A reader thread reads the source and splits rows into batches.
- Batches are dispatched to a
ProcessPoolExecutor. - Each worker runs the fused pipeline on its batch.
- Results are returned in order via futures.
When to use¶
Parallel mode adds overhead for batch serialization and IPC. The heuristic:
| File size | Recommended |
|---|---|
| < 50 MB | Sequential |
| 50–200 MB | Recommended |
| > 200 MB | Parallel |
Validation¶
crxml validates all stages at pipeline construction:
from crxml import CrystalXMLSource, RenameFields, FilterRows
pipe = CrystalXMLSource("report.xml") | RenameFields({"a": "b"})
# This works, both stages are Fusable and picklable
pipe2 = CrystalXMLSource("report.xml") | FilterRows(lambda r: r["x"] > 1)
pipe2.parallel() # raises UnpicklableStageError, lambda not picklable