Source code for myst_nb.core.preprocess

"""notebook "pre-processing" (after execution, but before parsing)"""
from __future__ import annotations

import re
from typing import Any

from nbformat import NotebookNode

from myst_nb.core.config import NbParserConfig
from myst_nb.core.loggers import LoggerType
from myst_nb.glue import extract_glue_data


[docs]def preprocess_notebook( notebook: NotebookNode, logger: LoggerType, nb_config: NbParserConfig ) -> dict[str, Any]: """Modify notebook and resources in-place.""" # TODO make this pluggable # (similar to nbconvert preprocessors, but parse config, source map and logger) resources: dict[str, Any] = {} # create source map source_map = notebook.metadata.get("source_map", None) # use 1-based indexing rather than 0, or pseudo base of the cell index source_map = [ (source_map[i] if source_map else ((i + 1) * 10000)) + 1 for i, _ in enumerate(notebook.cells) ] # coalesce_streams for i, cell in enumerate(notebook.cells): if cell.cell_type == "code": _callback = lambda m, t: logger.warning( # noqa: E731 m, subtype=t, line=source_map[i] ) if nb_config.get_cell_level_config( "merge_streams", cell.metadata, _callback ): cell["outputs"] = coalesce_streams(cell.get("outputs", [])) # extract all scrapbook (aka glue) outputs from notebook extract_glue_data(notebook, resources, source_map, logger) return resources
_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])") _RGX_BACKSPACE = re.compile(r"[^\n]\b") def coalesce_streams(outputs: list[NotebookNode]) -> list[NotebookNode]: """Merge all stream outputs with shared names into single streams. This ensure deterministic outputs. Adapted from: https://github.com/computationalmodelling/nbval/blob/master/nbval/plugin.py. """ if not outputs: return [] new_outputs = [] streams: dict[str, NotebookNode] = {} for output in outputs: if output["output_type"] == "stream": if output["name"] in streams: streams[output["name"]]["text"] += output["text"] else: new_outputs.append(output) streams[output["name"]] = output else: new_outputs.append(output) # process \r and \b characters for output in streams.values(): old = output["text"] while len(output["text"]) < len(old): old = output["text"] # Cancel out anything-but-newline followed by backspace output["text"] = _RGX_BACKSPACE.sub("", output["text"]) # Replace all carriage returns not followed by newline output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"]) # We also want to ensure stdout and stderr are always in the same consecutive order, # because they are asynchronous, so order isn't guaranteed. for i, output in enumerate(new_outputs): if output["output_type"] == "stream" and output["name"] == "stderr": if ( len(new_outputs) >= i + 2 and new_outputs[i + 1]["output_type"] == "stream" and new_outputs[i + 1]["name"] == "stdout" ): stdout = new_outputs.pop(i + 1) new_outputs.insert(i, stdout) return new_outputs