Source code for myst_nb.sphinx_

"""The sphinx parser implementation for myst-nb."""
from __future__ import annotations

from collections import defaultdict
import json
from pathlib import Path
import re
from typing import Any, DefaultDict, cast

from docutils import nodes
from markdown_it.token import Token
from markdown_it.tree import SyntaxTreeNode
from myst_parser.config.main import MdParserConfig, merge_file_level
from myst_parser.mdit_to_docutils.base import token_line
from myst_parser.mdit_to_docutils.sphinx_ import SphinxRenderer, create_warning
from myst_parser.parsers.mdit import create_md_parser
from myst_parser.parsers.sphinx_ import MystParser
import nbformat
from sphinx.application import Sphinx
from sphinx.environment import BuildEnvironment
from sphinx.environment.collectors import EnvironmentCollector
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import logging as sphinx_logging

from myst_nb._compat import findall
from myst_nb.core.config import NbParserConfig
from myst_nb.core.execute import ExecutionResult, create_client
from myst_nb.core.loggers import DEFAULT_LOG_TYPE, SphinxDocLogger
from myst_nb.core.nb_to_tokens import nb_node_to_dict, notebook_to_tokens
from myst_nb.core.read import create_nb_reader
from myst_nb.core.render import (
    MditRenderMixin,
    MimeData,
    NbElementRenderer,
    create_figure_context,
    get_mime_priority,
    load_renderer,
)

SPHINX_LOGGER = sphinx_logging.getLogger(__name__)


class SphinxEnvType(BuildEnvironment):
    """Sphinx build environment, including attributes set by myst_nb."""

    myst_config: MdParserConfig
    mystnb_config: NbParserConfig
    nb_metadata: DefaultDict[str, dict]
    nb_new_exec_data: bool


[docs]class Parser(MystParser): """Sphinx parser for Jupyter Notebook formats, containing MyST Markdown.""" supported = ("myst-nb",) translate_section_name = None config_section = "myst-nb parser" config_section_dependencies = ("parsers",)
[docs] def parse(self, inputstring: str, document: nodes.document) -> None: """Parse source text. :param inputstring: The source string to parse :param document: The root docutils node to add AST elements to """ assert self.env is not None, "env not set" self.env: SphinxEnvType document_path = self.env.doc2path(self.env.docname) # get a logger for this document logger = SphinxDocLogger(document) # get markdown parsing configuration md_config: MdParserConfig = self.env.myst_config # get notebook rendering configuration nb_config: NbParserConfig = self.env.mystnb_config # create a reader for the notebook nb_reader = create_nb_reader(document_path, md_config, nb_config, inputstring) # If the nb_reader is None, then we default to a standard Markdown parser if nb_reader is None: return super().parse(inputstring, document) notebook = nb_reader.read(inputstring) # update the global markdown config with the file-level config warning = lambda wtype, msg: create_warning( # noqa: E731 document, msg, line=1, append_to=document, subtype=wtype ) nb_reader.md_config = merge_file_level( nb_reader.md_config, notebook.metadata, warning ) # potentially replace kernel name with alias kernel_name = notebook.metadata.get("kernelspec", {}).get("name", None) if kernel_name is not None and nb_config.kernel_rgx_aliases: for rgx, alias in nb_config.kernel_rgx_aliases.items(): if re.fullmatch(rgx, kernel_name): logger.debug( f"Replaced kernel name: {kernel_name!r} -> {alias!r}", subtype="kernel", ) notebook.metadata["kernelspec"]["name"] = alias break # Update mystnb configuration with notebook level metadata if nb_config.metadata_key in notebook.metadata: overrides = nb_node_to_dict(notebook.metadata[nb_config.metadata_key]) overrides.pop("output_folder", None) # this should not be overridden try: nb_config = nb_config.copy(**overrides) except Exception as exc: logger.warning( f"Failed to update configuration with notebook metadata: {exc}", subtype="config", ) else: logger.debug( "Updated configuration with notebook metadata", subtype="config" ) # Setup the parser mdit_parser = create_md_parser(nb_reader.md_config, SphinxNbRenderer) mdit_parser.options["document"] = document mdit_parser.options["nb_config"] = nb_config mdit_renderer: SphinxNbRenderer = mdit_parser.renderer # type: ignore mdit_env: dict[str, Any] = {} # load notebook element renderer class from entry-point name # this is separate from SphinxNbRenderer, so that users can override it renderer_name = nb_config.render_plugin nb_renderer: NbElementRenderer = load_renderer(renderer_name)( mdit_renderer, logger ) # we temporarily store nb_renderer on the document, # so that roles/directives can access it document.attributes["nb_renderer"] = nb_renderer # we currently do this early, so that the nb_renderer has access to things mdit_renderer.setup_render(mdit_parser.options, mdit_env) # parse notebook structure to markdown-it tokens # note, this does not assume that the notebook has been executed yet mdit_tokens = notebook_to_tokens(notebook, mdit_parser, mdit_env, logger) # open the notebook execution client, # this may execute the notebook immediately or during the page render with create_client( notebook, document_path, nb_config, logger, nb_reader.read_fmt ) as nb_client: mdit_parser.options["nb_client"] = nb_client # convert to docutils AST, which is added to the document mdit_renderer.render(mdit_tokens, mdit_parser.options, mdit_env) # save final execution data if nb_client.exec_metadata: NbMetadataCollector.set_exec_data( self.env, self.env.docname, nb_client.exec_metadata ) if nb_client.exec_metadata["traceback"]: # store error traceback in outdir and log its path reports_file = Path(self.env.app.outdir).joinpath( "reports", *(self.env.docname + ".err.log").split("/") ) reports_file.parent.mkdir(parents=True, exist_ok=True) reports_file.write_text( nb_client.exec_metadata["traceback"], encoding="utf8" ) logger.warning( f"Notebook exception traceback saved in: {reports_file}", subtype="exec", ) # write final (updated) notebook to output folder (utf8 is standard encoding) path = self.env.docname.split("/") ipynb_path = path[:-1] + [path[-1] + ".ipynb"] content = nbformat.writes(notebook).encode("utf-8") nb_renderer.write_file(ipynb_path, content, overwrite=True) # write glue data to the output folder, # and store the keys to environment doc metadata, # so that they may be used in any post-transform steps if nb_client.glue_data: glue_path = path[:-1] + [path[-1] + ".glue.json"] nb_renderer.write_file( glue_path, json.dumps(nb_client.glue_data, cls=BytesEncoder).encode("utf8"), overwrite=True, ) NbMetadataCollector.set_doc_data( self.env, self.env.docname, "glue", list(nb_client.glue_data.keys()) ) # move some document metadata to environment metadata, # so that we can later read it from the environment, # rather than having to load the whole doctree for key, (uri, kwargs) in document.attributes.pop("nb_js_files", {}).items(): NbMetadataCollector.add_js_file( self.env, self.env.docname, key, uri, kwargs ) # remove temporary state document.attributes.pop("nb_renderer")
class SphinxNbRenderer(SphinxRenderer, MditRenderMixin): """A sphinx renderer for Jupyter Notebooks.""" def render_nb_initialise(self, token: SyntaxTreeNode) -> None: env = cast(BuildEnvironment, self.sphinx_env) metadata = self.nb_client.nb_metadata special_keys = ["kernelspec", "language_info", "source_map"] for key in special_keys: if key in metadata: # save these special keys on the metadata, rather than as docinfo # note, sphinx_book_theme checks kernelspec is in the metadata env.metadata[env.docname][key] = metadata.get(key) # forward the remaining metadata to the front_matter renderer special_keys.append("widgets") top_matter = {k: v for k, v in metadata.items() if k not in special_keys} self.render_front_matter( Token( # type: ignore "front_matter", "", 0, map=[0, 0], content=top_matter, # type: ignore[arg-type] ), ) def _render_nb_cell_code_outputs( self, token: SyntaxTreeNode, outputs: list[nbformat.NotebookNode] ) -> None: """Render a notebook code cell's outputs.""" line = token_line(token, 0) cell_index = token.meta["index"] metadata = token.meta["metadata"] # render the outputs for output_index, output in enumerate(outputs): if output.output_type == "stream": if output.name == "stdout": _nodes = self.nb_renderer.render_stdout( output, metadata, cell_index, line ) self.add_line_and_source_path_r(_nodes, token) self.current_node.extend(_nodes) elif output.name == "stderr": _nodes = self.nb_renderer.render_stderr( output, metadata, cell_index, line ) self.add_line_and_source_path_r(_nodes, token) self.current_node.extend(_nodes) else: pass # TODO warning elif output.output_type == "error": _nodes = self.nb_renderer.render_error( output, metadata, cell_index, line ) self.add_line_and_source_path_r(_nodes, token) self.current_node.extend(_nodes) elif output.output_type in ("display_data", "execute_result"): # Note, this is different to the docutils implementation, # where we directly select a single output, based on the mime_priority. # Here, we do not know the mime priority until we know the output format # so we output all the outputs during this parsing phase # (this is what sphinx caches as "output format agnostic" AST), # and replace the mime_bundle with the format specific output # in a post-transform (run per output format on the cached AST) figure_options = ( self.get_cell_level_config( "render_figure_options", metadata, line=line ) or None ) with create_figure_context(self, figure_options, line): mime_bundle = nodes.container(nb_element="mime_bundle") with self.current_node_context(mime_bundle): for mime_type, data in output["data"].items(): mime_container = nodes.container(mime_type=mime_type) with self.current_node_context(mime_container): _nodes = self.nb_renderer.render_mime_type( MimeData( mime_type, data, cell_metadata=metadata, output_metadata=output.get("metadata", {}), cell_index=cell_index, output_index=output_index, line=line, ) ) self.current_node.extend(_nodes) if mime_container.children: self.current_node.append(mime_container) if mime_bundle.children: self.add_line_and_source_path_r([mime_bundle], token) self.current_node.append(mime_bundle) else: self.create_warning( f"Unsupported output type: {output.output_type}", line=line, append_to=self.current_node, wtype=DEFAULT_LOG_TYPE, subtype="output_type", ) class SelectMimeType(SphinxPostTransform): """Select the mime type to render from mime bundles, based on the builder and its associated priority list. """ default_priority = 4 # TODO set correct priority def run(self, **kwargs: Any) -> None: """Run the transform.""" # get priority list for this builder # TODO allow for per-notebook/cell priority dicts? bname = self.app.builder.name priority_list = get_mime_priority( bname, self.config["nb_mime_priority_overrides"] ) condition = ( lambda node: isinstance(node, nodes.container) and node.attributes.get("nb_element", "") == "mime_bundle" ) # remove/replace_self will not work with an iterator for node in list(findall(self.document)(condition)): # get available mime types mime_types = [node["mime_type"] for node in node.children] if not mime_types: node.parent.remove(node) continue # select top priority index = None for mime_type in priority_list: try: index = mime_types.index(mime_type) except ValueError: continue else: break if index is None: mime_string = ",".join(repr(m) for m in mime_types) SPHINX_LOGGER.warning( f"No mime type available in priority list for builder {bname!r} " f"({mime_string}) [{DEFAULT_LOG_TYPE}.mime_priority]", type=DEFAULT_LOG_TYPE, subtype="mime_priority", location=node, ) node.parent.remove(node) elif not node.children[index].children: node.parent.remove(node) else: node.replace_self(node.children[index].children) class NbMetadataCollector(EnvironmentCollector): """Collect myst-nb specific metdata, and handle merging of parallel builds.""" @staticmethod def set_doc_data(env: SphinxEnvType, docname: str, key: str, value: Any) -> None: """Add nb metadata for a docname to the environment.""" if not hasattr(env, "nb_metadata"): env.nb_metadata = defaultdict(dict) env.nb_metadata.setdefault(docname, {})[key] = value @staticmethod def get_doc_data(env: SphinxEnvType) -> DefaultDict[str, dict]: """Get myst-nb docname -> metadata dict.""" if not hasattr(env, "nb_metadata"): env.nb_metadata = defaultdict(dict) return env.nb_metadata @classmethod def set_exec_data( cls, env: SphinxEnvType, docname: str, value: ExecutionResult ) -> None: """Add nb metadata for a docname to the environment.""" cls.set_doc_data(env, docname, "exec_data", value) # TODO this does not take account of cache data cls.note_exec_update(env) @classmethod def get_exec_data(cls, env: SphinxEnvType, docname: str) -> ExecutionResult | None: """Get myst-nb docname -> execution data.""" return cls.get_doc_data(env)[docname].get("exec_data") def get_outdated_docs( # type: ignore[override] self, app: Sphinx, env: SphinxEnvType, added: set[str], changed: set[str], removed: set[str], ) -> list[str]: # called before any docs are read env.nb_new_exec_data = False return [] @staticmethod def note_exec_update(env: SphinxEnvType) -> None: """Note that a notebook has been executed.""" env.nb_new_exec_data = True @staticmethod def new_exec_data(env: SphinxEnvType) -> bool: """Return whether any notebooks have updated execution data.""" return getattr(env, "nb_new_exec_data", False) @classmethod def add_js_file( cls, env: SphinxEnvType, docname: str, key: str, uri: str | None, kwargs: dict[str, str], ): """Register a JavaScript file to include in the HTML output.""" if not hasattr(env, "nb_metadata"): env.nb_metadata = defaultdict(dict) js_files = env.nb_metadata.setdefault(docname, {}).setdefault("js_files", {}) # TODO handle whether overrides are allowed js_files[key] = (uri, kwargs) @classmethod def get_js_files( cls, env: SphinxEnvType, docname: str ) -> dict[str, tuple[str | None, dict[str, str]]]: """Get myst-nb docname -> execution data.""" return cls.get_doc_data(env)[docname].get("js_files", {}) def clear_doc( # type: ignore[override] self, app: Sphinx, env: SphinxEnvType, docname: str, ) -> None: if not hasattr(env, "nb_metadata"): env.nb_metadata = defaultdict(dict) env.nb_metadata.pop(docname, None) def process_doc(self, app: Sphinx, doctree: nodes.document) -> None: pass def merge_other( # type: ignore[override] self, app: Sphinx, env: SphinxEnvType, docnames: set[str], other: SphinxEnvType, ) -> None: if not hasattr(env, "nb_metadata"): env.nb_metadata = defaultdict(dict) other_metadata = getattr(other, "nb_metadata", defaultdict(dict)) for docname in docnames: env.nb_metadata[docname] = other_metadata[docname] if other.nb_new_exec_data: env.nb_new_exec_data = True class BytesEncoder(json.JSONEncoder): """A JSON encoder that accepts b64 (and other *ascii*) bytestrings.""" def default(self, obj): if isinstance(obj, bytes): return obj.decode("ascii") return json.JSONEncoder.default(self, obj)