nipoppy.workflows

Classes for running workflows on datasets.

Submodules

Package Contents

class nipoppy.workflows.BasePipelineWorkflow(dpath_root, name, pipeline_name, pipeline_version=None, pipeline_step=None, participant_id=None, session_id=None, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.base.BaseWorkflow, abc.ABC

A workflow for a pipeline that has a Boutiques descriptor.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • name (str)

  • pipeline_name (str)

  • pipeline_version (Optional[str])

  • pipeline_step (Optional[str])

  • participant_id (str)

  • session_id (str)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

boutiques_config()

Get the Boutiques configuration.

check_dir(dpath)

Create directory if it does not exist.

Parameters:

dpath (pathlib.Path)

check_pipeline_version()

Set the pipeline version based on the config if it is not given.

descriptor()

Load the pipeline step’s Boutiques descriptor.

Return type:

dict

dpath_pipeline()

Return the path to the pipeline’s derivatives directory.

Return type:

pathlib.Path

dpath_pipeline_bids_db()

Return the path to the pipeline’s BIDS database directory.

Return type:

pathlib.Path

dpath_pipeline_output()

Return the path to the pipeline’s output directory.

Return type:

pathlib.Path

dpath_pipeline_work()

Return the path to the pipeline’s working directory.

Return type:

pathlib.Path

dpaths_to_check()

Directory paths to create if needed during the setup phase.

Return type:

list[pathlib.Path]

fpath_container()

Return the full path to the pipeline’s container.

Return type:

pathlib.Path

generate_fpath_log(dnames_parent=None, fname_stem=None)

Generate a log file path.

Parameters:
  • dnames_parent (Optional[str | list[str]])

  • fname_stem (Optional[str])

Return type:

pathlib.Path

abstract get_participants_sessions_to_run(participant_id, session_id)

Return participant-session pairs to loop over with run_single().

This is an abstract method that should be defined explicitly in subclasses.

Parameters:
  • participant_id (Optional[str])

  • session_id (Optional[str])

invocation()

Load the pipeline step’s Boutiques invocation.

Return type:

dict

pipeline_config()

Get the user config for the pipeline.

Return type:

nipoppy.config.pipeline.ProcPipelineConfig

process_template_json(template_json, participant_id, session_id, bids_participant=None, bids_session=None, objs=None, return_str=False, **kwargs)

Replace template strings in a JSON object.

Parameters:
  • template_json (dict)

  • participant_id (Optional[str])

  • session_id (Optional[str])

  • bids_participant (Optional[str])

  • bids_session (Optional[str])

  • objs (Optional[list])

  • return_str (bool)

pybids_ignore_patterns()

Load the pipeline step’s PyBIDS ignore pattern list.

Note: this does not apply any substitutions, since the subject/session patterns are always added.

Return type:

list[str]

run_cleanup()

Log a summary message.

run_main()

Run the pipeline.

run_setup()

Run pipeline setup.

abstract run_single(participant_id, session_id)

Run on a single participant/session.

This is an abstract method that should be defined explicitly in subclasses.

Parameters:
  • participant_id (Optional[str])

  • session_id (Optional[str])

set_up_bids_db(dpath_bids_db, participant_id=None, session_id=None)

Set up the BIDS database.

Parameters:
  • dpath_bids_db (nipoppy.env.StrOrPathLike)

  • participant_id (Optional[str])

  • session_id (Optional[str])

Return type:

bids.BIDSLayout

class nipoppy.workflows.BaseWorkflow(dpath_root, name, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.base.Base, abc.ABC

Base class with logging/subprocess utilities.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • name (str)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

log_prefix_run = '[RUN]'
log_prefix_run_stderr = '[RUN STDERR]'
log_prefix_run_stdout = '[RUN STDOUT]'
path_sep = '-'
pipeline_name = '[[NIPOPPY_PIPELINE_NAME]]'
pipeline_version = '[[NIPOPPY_PIPELINE_VERSION]]'
validate_layout = True
config()

Load the configuration.

Return type:

nipoppy.config.main.Config

copy(path_source, path_dest, log_level=logging.INFO, **kwargs)

Copy a file or directory.

Create a symlink to another path.

dicom_dir_map()

Get the DICOM directory mapping.

Return type:

nipoppy.tabular.dicom_dir_map.DicomDirMap

doughnut()

Load the doughnut.

Return type:

nipoppy.tabular.doughnut.Doughnut

generate_fpath_log(dnames_parent=None, fname_stem=None)

Generate a log file path.

Parameters:
  • dnames_parent (Optional[str | list[str]])

  • fname_stem (Optional[str])

Return type:

pathlib.Path

log_command(command)

Write a command to the log with a special prefix.

Parameters:

command (str)

manifest()

Load the manifest.

Return type:

nipoppy.tabular.manifest.Manifest

mkdir(dpath, log_level=logging.INFO, **kwargs)

Create a directory (by default including parents).

Do nothing if the directory already exists.

rm(path, log_level=logging.INFO, **kwargs)

Remove a file or directory.

run()

Run the workflow.

run_cleanup()

Run the cleanup part of the workflow.

run_command(command_or_args, check=True, **kwargs)

Run a command in a subprocess.

The command’s stdout and stderr outputs are written to the log with special prefixes.

If in “dry run” mode, the command is not executed, and the method returns the command string. Otherwise, the subprocess.Popen object is returned unless capture_output is True.

Parameters:
  • command_or_args (Sequence[str] | str) – The command to run.

  • check (bool, optional) – If True, raise an error if the process exits with a non-zero code, by default True

  • **kwargs – Passed to subprocess.Popen.

Return type:

subprocess.Popen or str

abstract run_main()

Run the main part of the workflow.

run_setup()

Run the setup part of the workflow.

save_tabular_file(tabular, fpath)

Save a tabular file.

Parameters:
class nipoppy.workflows.BidsConversionRunner(dpath_root, pipeline_name, pipeline_version=None, pipeline_step=None, participant_id=None, session_id=None, simulate=False, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.runner.PipelineRunner

Convert data to BIDS.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • pipeline_name (str)

  • pipeline_version (Optional[str])

  • pipeline_step (Optional[str])

  • participant_id (str)

  • session_id (str)

  • simulate (bool)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

  • dry_run (bool)

dpaths_to_check()

Directory paths to create if needed during the setup phase.

Return type:

list[pathlib.Path]

get_participants_sessions_to_run(participant_id, session_id)

Return participant-session pairs to run the pipeline on.

Parameters:
  • participant_id (Optional[str])

  • session_id (Optional[str])

pipeline_config()

Get the user config for the BIDS conversion software.

Return type:

nipoppy.config.pipeline.BidsPipelineConfig

run_cleanup(**kwargs)

Clean up after main BIDS conversion part is run.

Specifically: - Write updated doughnut file

run_single(participant_id, session_id)

Run BIDS conversion on a single participant/session.

Parameters:
  • participant_id (str)

  • session_id (str)

class nipoppy.workflows.DicomReorgWorkflow(dpath_root, copy_files=False, check_dicoms=False, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.base.BaseWorkflow

Workflow for organizing raw DICOM files.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • copy_files (bool)

  • check_dicoms (bool)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

  • dry_run (bool)

apply_fname_mapping(fname_source, participant_id, session_id)

Apply a mapping to the file name.

This method does not change the file name by default, but it can be overridden if the file names need to be changed during reorganization (e.g. for easier BIDS conversion).

Parameters:
  • fname_source (str)

  • participant_id (str)

  • session_id (str)

Return type:

str

get_fpaths_to_reorg(participant_id, session_id)

Get file paths to reorganize for a single participant and session.

Parameters:
  • participant_id (str)

  • session_id (str)

Return type:

list[pathlib.Path]

get_participants_sessions_to_run()

Return participant-session pairs to reorganize.

run_cleanup()

Clean up after main DICOM reorg part is run.

Specifically: - Write updated doughnut file - Log a summary message

run_main()

Reorganize all downloaded DICOM files.

run_setup()

Update the doughnut in case it is not up-to-date.

run_single(participant_id, session_id)

Reorganize downloaded DICOM files for a single participant and session.

Parameters:
  • participant_id (str)

  • session_id (str)

class nipoppy.workflows.DoughnutWorkflow(dpath_root, empty=False, regenerate=False, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.base.BaseWorkflow

Workflow for creating/updating a dataset’s doughnut file.

Parameters:
run_cleanup()

Log a success message.

run_main()

Generate/update the dataset’s doughnut file.

class nipoppy.workflows.InitWorkflow(dpath_root, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.base.BaseWorkflow

Workflow for init command.

Parameters:
validate_layout = False
run_cleanup()

Log a success message.

run_main()

Create dataset directory structure.

class nipoppy.workflows.PipelineRunner(dpath_root, pipeline_name, pipeline_version=None, pipeline_step=None, participant_id=None, session_id=None, simulate=False, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.pipeline.BasePipelineWorkflow

Pipeline runner.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • pipeline_name (str)

  • pipeline_version (Optional[str])

  • pipeline_step (Optional[str])

  • participant_id (str)

  • session_id (str)

  • simulate (bool)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

  • dry_run (bool)

dpaths_to_check()

Directory paths to create if needed during the setup phase.

Return type:

list[pathlib.Path]

get_participants_sessions_to_run(participant_id, session_id)

Generate a list of participant and session IDs to run.

Specifically, this list will include participants who have BIDS data but who have not previously successfully completed the pipeline (according) to the bagel file.

Parameters:
  • participant_id (Optional[str])

  • session_id (Optional[str])

launch_boutiques_run(participant_id, session_id, objs=None, **kwargs)

Launch a pipeline run using Boutiques.

Parameters:
  • participant_id (str)

  • session_id (str)

  • objs (Optional[list])

process_container_config(participant_id, session_id, bind_paths=None)

Update container config and generate container command.

Parameters:
  • participant_id (str)

  • session_id (str)

  • bind_paths (Optional[list[nipoppy.env.StrOrPathLike]])

Return type:

str

run_cleanup()

Run pipeline runner cleanup.

run_single(participant_id, session_id)

Run pipeline on a single participant/session.

Parameters:
  • participant_id (str)

  • session_id (str)

class nipoppy.workflows.PipelineTracker(dpath_root, pipeline_name, pipeline_version=None, participant_id=None, session_id=None, fpath_layout=None, logger=None, dry_run=False)

Bases: nipoppy.workflows.pipeline.BasePipelineWorkflow

Pipeline tracker.

Parameters:
  • dpath_root (nipoppy.env.StrOrPathLike)

  • pipeline_name (str)

  • pipeline_version (Optional[str])

  • participant_id (str)

  • session_id (str)

  • fpath_layout (Optional[nipoppy.env.StrOrPathLike])

  • logger (Optional[logging.Logger])

  • dry_run (bool)

check_status(relative_paths)

Check the processing status based on a list of expected paths.

Parameters:

relative_paths (nipoppy.env.StrOrPathLike)

get_participants_sessions_to_run(participant_id, session_id)

Get participant-session pairs with BIDS data to run the tracker on.

Parameters:
  • participant_id (Optional[str])

  • session_id (Optional[str])

run_cleanup()

Save the bagel file.

run_setup()

Load/initialize the bagel file.

run_single(participant_id, session_id)

Run tracker on a single participant/session.

Parameters:
  • participant_id (str)

  • session_id (str)