"""The sphinx parser implementation for myst-nb."""
from __future__ import annotations
from collections import defaultdict
import json
from pathlib import Path
import re
from typing import Any, DefaultDict, cast
from docutils import nodes
from markdown_it.token import Token
from markdown_it.tree import SyntaxTreeNode
from myst_parser.docutils_renderer import token_line
from myst_parser.main import MdParserConfig, create_md_parser
from myst_parser.sphinx_parser import MystParser
from myst_parser.sphinx_renderer import SphinxRenderer
import nbformat
from sphinx.application import Sphinx
from sphinx.environment import BuildEnvironment
from sphinx.environment.collectors import EnvironmentCollector
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import logging as sphinx_logging
from myst_nb._compat import findall
from myst_nb.core.config import NbParserConfig
from myst_nb.core.execute import ExecutionResult, execute_notebook
from myst_nb.core.loggers import DEFAULT_LOG_TYPE, SphinxDocLogger
from myst_nb.core.parse import nb_node_to_dict, notebook_to_tokens
from myst_nb.core.preprocess import preprocess_notebook
from myst_nb.core.read import create_nb_reader
from myst_nb.core.render import (
MimeData,
NbElementRenderer,
create_figure_context,
get_mime_priority,
load_renderer,
)
SPHINX_LOGGER = sphinx_logging.getLogger(__name__)
class SphinxEnvType(BuildEnvironment):
"""Sphinx build environment, including attributes set by myst_nb."""
myst_config: MdParserConfig
mystnb_config: NbParserConfig
nb_metadata: DefaultDict[str, dict]
nb_new_exec_data: bool
[docs]class Parser(MystParser):
"""Sphinx parser for Jupyter Notebook formats, containing MyST Markdown."""
supported = ("myst-nb",)
translate_section_name = None
config_section = "myst-nb parser"
config_section_dependencies = ("parsers",)
[docs] def parse(self, inputstring: str, document: nodes.document) -> None:
"""Parse source text.
:param inputstring: The source string to parse
:param document: The root docutils node to add AST elements to
"""
assert self.env is not None, "env not set"
self.env: SphinxEnvType
document_path = self.env.doc2path(self.env.docname)
# get a logger for this document
logger = SphinxDocLogger(document)
# get markdown parsing configuration
md_config: MdParserConfig = self.env.myst_config
# get notebook rendering configuration
nb_config: NbParserConfig = self.env.mystnb_config
# create a reader for the notebook
nb_reader = create_nb_reader(document_path, md_config, nb_config, inputstring)
# If the nb_reader is None, then we default to a standard Markdown parser
if nb_reader is None:
return super().parse(inputstring, document)
notebook = nb_reader.read(inputstring)
# potentially replace kernel name with alias
kernel_name = notebook.metadata.get("kernelspec", {}).get("name", None)
if kernel_name is not None and nb_config.kernel_rgx_aliases:
for rgx, alias in nb_config.kernel_rgx_aliases.items():
if re.fullmatch(rgx, kernel_name):
logger.debug(
f"Replaced kernel name: {kernel_name!r} -> {alias!r}",
subtype="kernel",
)
notebook.metadata["kernelspec"]["name"] = alias
break
# Update mystnb configuration with notebook level metadata
if nb_config.metadata_key in notebook.metadata:
overrides = nb_node_to_dict(notebook.metadata[nb_config.metadata_key])
overrides.pop("output_folder", None) # this should not be overridden
try:
nb_config = nb_config.copy(**overrides)
except Exception as exc:
logger.warning(
f"Failed to update configuration with notebook metadata: {exc}",
subtype="config",
)
else:
logger.debug(
"Updated configuration with notebook metadata", subtype="config"
)
# potentially execute notebook and/or populate outputs from cache
notebook, exec_data = execute_notebook(
notebook, document_path, nb_config, logger, nb_reader.read_fmt
)
if exec_data:
NbMetadataCollector.set_exec_data(self.env, self.env.docname, exec_data)
if exec_data["traceback"]:
# store error traceback in outdir and log its path
reports_file = Path(self.env.app.outdir).joinpath(
"reports", *(self.env.docname + ".err.log").split("/")
)
reports_file.parent.mkdir(parents=True, exist_ok=True)
reports_file.write_text(exec_data["traceback"], encoding="utf8")
logger.warning(
f"Notebook exception traceback saved in: {reports_file}",
subtype="exec",
)
# Setup the parser
mdit_parser = create_md_parser(nb_reader.md_config, SphinxNbRenderer)
mdit_parser.options["document"] = document
mdit_parser.options["notebook"] = notebook
mdit_parser.options["nb_config"] = nb_config
mdit_renderer: SphinxNbRenderer = mdit_parser.renderer # type: ignore
mdit_env: dict[str, Any] = {}
# load notebook element renderer class from entry-point name
# this is separate from SphinxNbRenderer, so that users can override it
renderer_name = nb_config.render_plugin
nb_renderer: NbElementRenderer = load_renderer(renderer_name)(
mdit_renderer, logger
)
# we temporarily store nb_renderer on the document,
# so that roles/directives can access it
document.attributes["nb_renderer"] = nb_renderer
# we currently do this early, so that the nb_renderer has access to things
mdit_renderer.setup_render(mdit_parser.options, mdit_env)
# pre-process notebook and store resources for render
resources = preprocess_notebook(notebook, logger, nb_config)
mdit_renderer.md_options["nb_resources"] = resources
# parse to tokens
mdit_tokens = notebook_to_tokens(notebook, mdit_parser, mdit_env, logger)
# convert to docutils AST, which is added to the document
mdit_renderer.render(mdit_tokens, mdit_parser.options, mdit_env)
# write final (updated) notebook to output folder (utf8 is standard encoding)
path = self.env.docname.split("/")
ipynb_path = path[:-1] + [path[-1] + ".ipynb"]
content = nbformat.writes(notebook).encode("utf-8")
nb_renderer.write_file(ipynb_path, content, overwrite=True)
# write glue data to the output folder,
# and store the keys to environment doc metadata,
# so that they may be used in any post-transform steps
if resources.get("glue", None):
glue_path = path[:-1] + [path[-1] + ".glue.json"]
nb_renderer.write_file(
glue_path,
json.dumps(resources["glue"], cls=BytesEncoder).encode("utf8"),
overwrite=True,
)
NbMetadataCollector.set_doc_data(
self.env, self.env.docname, "glue", list(resources["glue"].keys())
)
# move some document metadata to environment metadata,
# so that we can later read it from the environment,
# rather than having to load the whole doctree
for key, (uri, kwargs) in document.attributes.pop("nb_js_files", {}).items():
NbMetadataCollector.add_js_file(
self.env, self.env.docname, key, uri, kwargs
)
# remove temporary state
document.attributes.pop("nb_renderer")
class SphinxNbRenderer(SphinxRenderer):
"""A sphinx renderer for Jupyter Notebooks."""
@property
def nb_config(self) -> NbParserConfig:
"""Get the notebook element renderer."""
return self.md_options["nb_config"]
@property
def nb_renderer(self) -> NbElementRenderer:
"""Get the notebook element renderer."""
return self.document["nb_renderer"]
def get_cell_level_config(
self,
field: str,
cell_metadata: dict[str, Any],
line: int | None = None,
) -> Any:
"""Get a configuration value at the cell level.
Takes the highest priority configuration from:
`cell > document > global > default`
:param field: the field name from ``NbParserConfig`` to get the value for
:param cell_metadata: the metadata for the cell
"""
def _callback(msg: str, subtype: str):
self.create_warning(msg, line=line, subtype=subtype)
return self.nb_config.get_cell_level_config(field, cell_metadata, _callback)
def render_nb_metadata(self, token: SyntaxTreeNode) -> None:
"""Render the notebook metadata."""
env = cast(BuildEnvironment, self.sphinx_env)
metadata = dict(token.meta)
special_keys = ("kernelspec", "language_info", "source_map")
for key in special_keys:
if key in metadata:
# save these special keys on the metadata, rather than as docinfo
# note, sphinx_book_theme checks kernelspec is in the metadata
env.metadata[env.docname][key] = metadata.get(key)
metadata = self.nb_renderer.render_nb_metadata(metadata)
# forward the remaining metadata to the front_matter renderer
top_matter = {k: v for k, v in metadata.items() if k not in special_keys}
self.render_front_matter(
Token( # type: ignore
"front_matter",
"",
0,
map=[0, 0],
content=top_matter, # type: ignore[arg-type]
),
)
def render_nb_cell_markdown(self, token: SyntaxTreeNode) -> None:
"""Render a notebook markdown cell."""
# TODO this is currently just a "pass-through", but we could utilise the metadata
# it would be nice to "wrap" this in a container that included the metadata,
# but unfortunately this would break the heading structure of docutils/sphinx.
# perhaps we add an "invisible" (non-rendered) marker node to the document tree,
self.render_children(token)
def render_nb_cell_raw(self, token: SyntaxTreeNode) -> None:
"""Render a notebook raw cell."""
line = token_line(token, 0)
_nodes = self.nb_renderer.render_raw_cell(
token.content, token.meta["metadata"], token.meta["index"], line
)
self.add_line_and_source_path_r(_nodes, token)
self.current_node.extend(_nodes)
def render_nb_cell_code(self, token: SyntaxTreeNode) -> None:
"""Render a notebook code cell."""
cell_index = token.meta["index"]
tags = token.meta["metadata"].get("tags", [])
# TODO do we need this -/_ duplication of tag names, or can we deprecate one?
remove_input = (
self.get_cell_level_config(
"remove_code_source",
token.meta["metadata"],
line=token_line(token, 0) or None,
)
or ("remove_input" in tags)
or ("remove-input" in tags)
)
remove_output = (
self.get_cell_level_config(
"remove_code_outputs",
token.meta["metadata"],
line=token_line(token, 0) or None,
)
or ("remove_output" in tags)
or ("remove-output" in tags)
)
# if we are remove both the input and output, we can skip the cell
if remove_input and remove_output:
return
# create a container for all the input/output
classes = ["cell"]
for tag in tags:
classes.append(f"tag_{tag.replace(' ', '_')}")
cell_container = nodes.container(
nb_element="cell_code",
cell_index=cell_index,
# TODO some way to use this to allow repr of count in outputs like HTML?
exec_count=token.meta["execution_count"],
cell_metadata=token.meta["metadata"],
classes=classes,
)
self.add_line_and_source_path(cell_container, token)
with self.current_node_context(cell_container, append=True):
# render the code source code
if not remove_input:
cell_input = nodes.container(
nb_element="cell_code_source", classes=["cell_input"]
)
self.add_line_and_source_path(cell_input, token)
with self.current_node_context(cell_input, append=True):
self.render_nb_cell_code_source(token)
# render the execution output, if any
has_outputs = self.md_options["notebook"]["cells"][cell_index].get(
"outputs", []
)
if (not remove_output) and has_outputs:
cell_output = nodes.container(
nb_element="cell_code_output", classes=["cell_output"]
)
self.add_line_and_source_path(cell_output, token)
with self.current_node_context(cell_output, append=True):
self.render_nb_cell_code_outputs(token)
def render_nb_cell_code_source(self, token: SyntaxTreeNode) -> None:
"""Render a notebook code cell's source."""
# cell_index = token.meta["index"]
lexer = token.meta.get("lexer", None)
node = self.create_highlighted_code_block(
token.content,
lexer,
number_lines=self.get_cell_level_config(
"number_source_lines",
token.meta["metadata"],
line=token_line(token, 0) or None,
),
source=self.document["source"],
line=token_line(token),
)
self.add_line_and_source_path(node, token)
self.current_node.append(node)
def render_nb_cell_code_outputs(self, token: SyntaxTreeNode) -> None:
"""Render a notebook code cell's outputs."""
line = token_line(token, 0)
cell_index = token.meta["index"]
metadata = token.meta["metadata"]
outputs: list[nbformat.NotebookNode] = self.md_options["notebook"]["cells"][
cell_index
].get("outputs", [])
# render the outputs
for output_index, output in enumerate(outputs):
if output.output_type == "stream":
if output.name == "stdout":
_nodes = self.nb_renderer.render_stdout(
output, metadata, cell_index, line
)
self.add_line_and_source_path_r(_nodes, token)
self.current_node.extend(_nodes)
elif output.name == "stderr":
_nodes = self.nb_renderer.render_stderr(
output, metadata, cell_index, line
)
self.add_line_and_source_path_r(_nodes, token)
self.current_node.extend(_nodes)
else:
pass # TODO warning
elif output.output_type == "error":
_nodes = self.nb_renderer.render_error(
output, metadata, cell_index, line
)
self.add_line_and_source_path_r(_nodes, token)
self.current_node.extend(_nodes)
elif output.output_type in ("display_data", "execute_result"):
# Note, this is different to the docutils implementation,
# where we directly select a single output, based on the mime_priority.
# Here, we do not know the mime priority until we know the output format
# so we output all the outputs during this parsing phase
# (this is what sphinx caches as "output format agnostic" AST),
# and replace the mime_bundle with the format specific output
# in a post-transform (run per output format on the cached AST)
# TODO how to output MyST Markdown?
# currently text/markdown is set to be rendered as CommonMark only,
# with headings dissallowed,
# to avoid "side effects" if the mime is discarded but contained
# targets, etc, and because we can't parse headings within containers.
# perhaps we could have a config option to allow this?
# - for non-commonmark, the text/markdown would always be considered
# the top priority, and all other mime types would be ignored.
# - for headings, we would also need to parsing the markdown
# at the "top-level", i.e. not nested in container(s)
figure_options = (
self.get_cell_level_config(
"render_figure_options", metadata, line=line
)
or None
)
with create_figure_context(self, figure_options, line):
mime_bundle = nodes.container(nb_element="mime_bundle")
with self.current_node_context(mime_bundle):
for mime_type, data in output["data"].items():
mime_container = nodes.container(mime_type=mime_type)
with self.current_node_context(mime_container):
_nodes = self.nb_renderer.render_mime_type(
MimeData(
mime_type,
data,
cell_metadata=metadata,
output_metadata=output.get("metadata", {}),
cell_index=cell_index,
output_index=output_index,
line=line,
)
)
self.current_node.extend(_nodes)
if mime_container.children:
self.current_node.append(mime_container)
if mime_bundle.children:
self.add_line_and_source_path_r([mime_bundle], token)
self.current_node.append(mime_bundle)
else:
self.create_warning(
f"Unsupported output type: {output.output_type}",
line=line,
append_to=self.current_node,
wtype=DEFAULT_LOG_TYPE,
subtype="output_type",
)
class SelectMimeType(SphinxPostTransform):
"""Select the mime type to render from mime bundles,
based on the builder and its associated priority list.
"""
default_priority = 4 # TODO set correct priority
def run(self, **kwargs: Any) -> None:
"""Run the transform."""
# get priority list for this builder
# TODO allow for per-notebook/cell priority dicts?
bname = self.app.builder.name # type: ignore
priority_list = get_mime_priority(
bname, self.config["nb_mime_priority_overrides"]
)
condition = (
lambda node: isinstance(node, nodes.container)
and node.attributes.get("nb_element", "") == "mime_bundle"
)
# remove/replace_self will not work with an iterator
for node in list(findall(self.document)(condition)):
# get available mime types
mime_types = [node["mime_type"] for node in node.children]
if not mime_types:
node.parent.remove(node)
continue
# select top priority
index = None
for mime_type in priority_list:
try:
index = mime_types.index(mime_type)
except ValueError:
continue
else:
break
if index is None:
mime_string = ",".join(repr(m) for m in mime_types)
SPHINX_LOGGER.warning(
f"No mime type available in priority list for builder {bname!r} "
f"({mime_string}) [{DEFAULT_LOG_TYPE}.mime_priority]",
type=DEFAULT_LOG_TYPE,
subtype="mime_priority",
location=node,
)
node.parent.remove(node)
elif not node.children[index].children:
node.parent.remove(node)
else:
node.replace_self(node.children[index].children)
class NbMetadataCollector(EnvironmentCollector):
"""Collect myst-nb specific metdata, and handle merging of parallel builds."""
@staticmethod
def set_doc_data(env: SphinxEnvType, docname: str, key: str, value: Any) -> None:
"""Add nb metadata for a docname to the environment."""
if not hasattr(env, "nb_metadata"):
env.nb_metadata = defaultdict(dict)
env.nb_metadata.setdefault(docname, {})[key] = value
@staticmethod
def get_doc_data(env: SphinxEnvType) -> DefaultDict[str, dict]:
"""Get myst-nb docname -> metadata dict."""
if not hasattr(env, "nb_metadata"):
env.nb_metadata = defaultdict(dict)
return env.nb_metadata
@classmethod
def set_exec_data(
cls, env: SphinxEnvType, docname: str, value: ExecutionResult
) -> None:
"""Add nb metadata for a docname to the environment."""
cls.set_doc_data(env, docname, "exec_data", value)
# TODO this does not take account of cache data
cls.note_exec_update(env)
@classmethod
def get_exec_data(cls, env: SphinxEnvType, docname: str) -> ExecutionResult | None:
"""Get myst-nb docname -> execution data."""
return cls.get_doc_data(env)[docname].get("exec_data")
def get_outdated_docs( # type: ignore[override]
self,
app: Sphinx,
env: SphinxEnvType,
added: set[str],
changed: set[str],
removed: set[str],
) -> list[str]:
# called before any docs are read
env.nb_new_exec_data = False
return []
@staticmethod
def note_exec_update(env: SphinxEnvType) -> None:
"""Note that a notebook has been executed."""
env.nb_new_exec_data = True
@staticmethod
def new_exec_data(env: SphinxEnvType) -> bool:
"""Return whether any notebooks have updated execution data."""
return getattr(env, "nb_new_exec_data", False)
@classmethod
def add_js_file(
cls,
env: SphinxEnvType,
docname: str,
key: str,
uri: str | None,
kwargs: dict[str, str],
):
"""Register a JavaScript file to include in the HTML output."""
if not hasattr(env, "nb_metadata"):
env.nb_metadata = defaultdict(dict)
js_files = env.nb_metadata.setdefault(docname, {}).setdefault("js_files", {})
# TODO handle whether overrides are allowed
js_files[key] = (uri, kwargs)
@classmethod
def get_js_files(
cls, env: SphinxEnvType, docname: str
) -> dict[str, tuple[str | None, dict[str, str]]]:
"""Get myst-nb docname -> execution data."""
return cls.get_doc_data(env)[docname].get("js_files", {})
def clear_doc( # type: ignore[override]
self,
app: Sphinx,
env: SphinxEnvType,
docname: str,
) -> None:
if not hasattr(env, "nb_metadata"):
env.nb_metadata = defaultdict(dict)
env.nb_metadata.pop(docname, None)
def process_doc(self, app: Sphinx, doctree: nodes.document) -> None:
pass
def merge_other( # type: ignore[override]
self,
app: Sphinx,
env: SphinxEnvType,
docnames: set[str],
other: SphinxEnvType,
) -> None:
if not hasattr(env, "nb_metadata"):
env.nb_metadata = defaultdict(dict)
other_metadata = getattr(other, "nb_metadata", defaultdict(dict))
for docname in docnames:
env.nb_metadata[docname] = other_metadata[docname]
if other.nb_new_exec_data:
env.nb_new_exec_data = True
class BytesEncoder(json.JSONEncoder):
"""A JSON encoder that accepts b64 (and other *ascii*) bytestrings."""
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode("ascii")
return json.JSONEncoder.default(self, obj)