Source code for sphinx.util.logging

From Get docs
Sphinx/docs/4.1.x/ modules/sphinx/util/logging

Source code for sphinx.util.logging

"""
    sphinx.util.logging
    ~~~~~~~~~~~~~~~~~~~

    Logging utility functions for Sphinx.

    :copyright: Copyright 2007-2021 by the Sphinx team, see AUTHORS.
    :license: BSD, see LICENSE for details.
"""

import logging
import logging.handlers
from collections import defaultdict
from contextlib import contextmanager
from typing import IO, TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union

from docutils import nodes
from docutils.nodes import Node
from docutils.utils import get_source_line

from sphinx.errors import SphinxWarning
from sphinx.util.console import colorize

if TYPE_CHECKING:
    from sphinx.application import Sphinx


NAMESPACE = 'sphinx'
VERBOSE = 15

LEVEL_NAMES: Dict[str, int] = defaultdict(lambda: logging.WARNING)
LEVEL_NAMES.update({
    'CRITICAL': logging.CRITICAL,
    'SEVERE': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'INFO': logging.INFO,
    'VERBOSE': VERBOSE,
    'DEBUG': logging.DEBUG,
})

VERBOSITY_MAP: Dict[int, int] = defaultdict(lambda: 0)
VERBOSITY_MAP.update({
    0: logging.INFO,
    1: VERBOSE,
    2: logging.DEBUG,
})

COLOR_MAP = defaultdict(lambda: 'blue',
                        {
                            logging.ERROR: 'darkred',
                            logging.WARNING: 'red',
                            logging.DEBUG: 'darkgray'
                        })


[docs]def getLogger(name: str) -> "SphinxLoggerAdapter":
    """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`.

    Sphinx logger always uses ``sphinx.*`` namespace to be independent from
    settings of root logger.  It ensures logging is consistent even if a
    third-party extension or imported application resets logger settings.

    Example usage::

        >>> from sphinx.util import logging
        >>> logger = logging.getLogger(__name__)
        >>> logger.info('Hello, this is an extension!')
        Hello, this is an extension!
    """
    # add sphinx prefix to name forcely
    logger = logging.getLogger(NAMESPACE + '.' + name)
    # Forcely enable logger
    logger.disabled = False
    # wrap logger by SphinxLoggerAdapter
    return SphinxLoggerAdapter(logger, {})


def convert_serializable(records: List[logging.LogRecord]) -> None:
    """Convert LogRecord serializable."""
    for r in records:
        # extract arguments to a message and clear them
        r.msg = r.getMessage()
        r.args = ()

        location = getattr(r, 'location', None)
        if isinstance(location, nodes.Node):
            r.location = get_node_location(location)  # type: ignore


class SphinxLogRecord(logging.LogRecord):
    """Log record class supporting location"""
    prefix = ''
    location: Any = None

    def getMessage(self) -> str:
        message = super().getMessage()
        location = getattr(self, 'location', None)
        if location:
            message = '%s: %s%s' % (location, self.prefix, message)
        elif self.prefix not in message:
            message = self.prefix + message

        return message


class SphinxInfoLogRecord(SphinxLogRecord):
    """Info log record class supporting location"""
    prefix = ''  # do not show any prefix for INFO messages


class SphinxWarningLogRecord(SphinxLogRecord):
    """Warning log record class supporting location"""
    prefix = 'WARNING: '


[docs]class SphinxLoggerAdapter(logging.LoggerAdapter):
    """LoggerAdapter allowing ``type`` and ``subtype`` keywords."""
    KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once']

[docs]    def log(self, level: Union[int, str], msg: str, *args: Any, **kwargs: Any) -> None:
        if isinstance(level, int):
            super().log(level, msg, *args, **kwargs)
        else:
            levelno = LEVEL_NAMES[level]
            super().log(levelno, msg, *args, **kwargs)

[docs]    def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None:
        self.log(VERBOSE, msg, *args, **kwargs)

    def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]:  # type: ignore
        extra = kwargs.setdefault('extra', {})
        for keyword in self.KEYWORDS:
            if keyword in kwargs:
                extra[keyword] = kwargs.pop(keyword)

        return msg, kwargs

    def handle(self, record: logging.LogRecord) -> None:
        self.logger.handle(record)


class WarningStreamHandler(logging.StreamHandler):
    """StreamHandler for warnings."""
    pass


class NewLineStreamHandler(logging.StreamHandler):
    """StreamHandler which switches line terminator by record.nonl flag."""

    def emit(self, record: logging.LogRecord) -> None:
        try:
            self.acquire()
            if getattr(record, 'nonl', False):
                # skip appending terminator when nonl=True
                self.terminator = ''
            super().emit(record)
        finally:
            self.terminator = '\n'
            self.release()


class MemoryHandler(logging.handlers.BufferingHandler):
    """Handler buffering all logs."""

    buffer: List[logging.LogRecord]

    def __init__(self) -> None:
        super().__init__(-1)

    def shouldFlush(self, record: logging.LogRecord) -> bool:
        return False  # never flush

    def flushTo(self, logger: logging.Logger) -> None:
        self.acquire()
        try:
            for record in self.buffer:
                logger.handle(record)
            self.buffer = []
        finally:
            self.release()

    def clear(self) -> List[logging.LogRecord]:
        buffer, self.buffer = self.buffer, []
        return buffer


[docs]@contextmanager
def pending_warnings() -> Generator[logging.Handler, None, None]:
    """Contextmanager to pend logging warnings temporary.

    Similar to :func:`pending_logging`.
    """
    logger = logging.getLogger(NAMESPACE)
    memhandler = MemoryHandler()
    memhandler.setLevel(logging.WARNING)

    try:
        handlers = []
        for handler in logger.handlers[:]:
            if isinstance(handler, WarningStreamHandler):
                logger.removeHandler(handler)
                handlers.append(handler)

        logger.addHandler(memhandler)
        yield memhandler
    finally:
        logger.removeHandler(memhandler)

        for handler in handlers:
            logger.addHandler(handler)

        memhandler.flushTo(logger)


@contextmanager
def suppress_logging() -> Generator[MemoryHandler, None, None]:
    """Contextmanager to suppress logging all logs temporary.

    For example::

        >>> with suppress_logging():
        >>>     logger.warning('Warning message!')  # suppressed
        >>>     some_long_process()
        >>>
    """
    logger = logging.getLogger(NAMESPACE)
    memhandler = MemoryHandler()

    try:
        handlers = []
        for handler in logger.handlers[:]:
            logger.removeHandler(handler)
            handlers.append(handler)

        logger.addHandler(memhandler)
        yield memhandler
    finally:
        logger.removeHandler(memhandler)

        for handler in handlers:
            logger.addHandler(handler)


[docs]@contextmanager
def pending_logging() -> Generator[MemoryHandler, None, None]:
    """Contextmanager to pend logging all logs temporary.

    For example::

        >>> with pending_logging():
        >>>     logger.warning('Warning message!')  # not flushed yet
        >>>     some_long_process()
        >>>
        Warning message!  # the warning is flushed here
    """
    logger = logging.getLogger(NAMESPACE)
    try:
        with suppress_logging() as memhandler:
            yield memhandler
    finally:
        memhandler.flushTo(logger)


@contextmanager
def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]:
    """contextmanager to skip WarningIsErrorFilter for a while."""
    logger = logging.getLogger(NAMESPACE)

    if skip is False:
        yield
    else:
        try:
            disabler = DisableWarningIsErrorFilter()
            for handler in logger.handlers:
                # use internal method; filters.insert() directly to install disabler
                # before WarningIsErrorFilter
                handler.filters.insert(0, disabler)
            yield
        finally:
            for handler in logger.handlers:
                handler.removeFilter(disabler)


[docs]@contextmanager
def prefixed_warnings(prefix: str) -> Generator[None, None, None]:
    """Prepend prefix to all records for a while.

    For example::

        >>> with prefixed_warnings("prefix:"):
        >>>     logger.warning('Warning message!')  # => prefix: Warning message!

    .. versionadded:: 2.0
    """
    logger = logging.getLogger(NAMESPACE)
    warning_handler = None
    for handler in logger.handlers:
        if isinstance(handler, WarningStreamHandler):
            warning_handler = handler
            break
    else:
        # warning stream not found
        yield
        return

    prefix_filter = None
    for _filter in warning_handler.filters:
        if isinstance(_filter, MessagePrefixFilter):
            prefix_filter = _filter
            break

    if prefix_filter:
        # already prefixed
        try:
            previous = prefix_filter.prefix
            prefix_filter.prefix = prefix
            yield
        finally:
            prefix_filter.prefix = previous
    else:
        # not prefixed yet
        prefix_filter = MessagePrefixFilter(prefix)
        try:
            warning_handler.addFilter(prefix_filter)
            yield
        finally:
            warning_handler.removeFilter(prefix_filter)


class LogCollector:
    def __init__(self) -> None:
        self.logs: List[logging.LogRecord] = []

    @contextmanager
    def collect(self) -> Generator[None, None, None]:
        with pending_logging() as memhandler:
            yield

            self.logs = memhandler.clear()


class InfoFilter(logging.Filter):
    """Filter error and warning messages."""

    def filter(self, record: logging.LogRecord) -> bool:
        if record.levelno < logging.WARNING:
            return True
        else:
            return False


def is_suppressed_warning(type: str, subtype: str, suppress_warnings: List[str]) -> bool:
    """Check the warning is suppressed or not."""
    if type is None:
        return False

    subtarget: Optional[str]

    for warning_type in suppress_warnings:
        if '.' in warning_type:
            target, subtarget = warning_type.split('.', 1)
        else:
            target, subtarget = warning_type, None

        if target == type:
            if (subtype is None or subtarget is None or
               subtarget == subtype or subtarget == '*'):
                return True

    return False


class WarningSuppressor(logging.Filter):
    """Filter logs by `suppress_warnings`."""

    def __init__(self, app: "Sphinx") -> None:
        self.app = app
        super().__init__()

    def filter(self, record: logging.LogRecord) -> bool:
        type = getattr(record, 'type', None)
        subtype = getattr(record, 'subtype', None)

        try:
            suppress_warnings = self.app.config.suppress_warnings
        except AttributeError:
            # config is not initialized yet (ex. in conf.py)
            suppress_warnings = []

        if is_suppressed_warning(type, subtype, suppress_warnings):
            return False
        else:
            self.app._warncount += 1
            return True


class WarningIsErrorFilter(logging.Filter):
    """Raise exception if warning emitted."""

    def __init__(self, app: "Sphinx") -> None:
        self.app = app
        super().__init__()

    def filter(self, record: logging.LogRecord) -> bool:
        if getattr(record, 'skip_warningsiserror', False):
            # disabled by DisableWarningIsErrorFilter
            return True
        elif self.app.warningiserror:
            location = getattr(record, 'location', '')
            try:
                message = record.msg % record.args
            except (TypeError, ValueError):
                message = record.msg  # use record.msg itself

            if location:
                exc = SphinxWarning(location + ":" + str(message))
            else:
                exc = SphinxWarning(message)
            if record.exc_info is not None:
                raise exc from record.exc_info[1]
            else:
                raise exc
        else:
            return True


class DisableWarningIsErrorFilter(logging.Filter):
    """Disable WarningIsErrorFilter if this filter installed."""

    def filter(self, record: logging.LogRecord) -> bool:
        record.skip_warningsiserror = True  # type: ignore
        return True


class MessagePrefixFilter(logging.Filter):
    """Prepend prefix to all records."""

    def __init__(self, prefix: str) -> None:
        self.prefix = prefix
        super().__init__()

    def filter(self, record: logging.LogRecord) -> bool:
        if self.prefix:
            record.msg = self.prefix + ' ' + record.msg
        return True


class OnceFilter(logging.Filter):
    """Show the message only once."""

    def __init__(self, name: str = '') -> None:
        super().__init__(name)
        self.messages: Dict[str, List] = {}

    def filter(self, record: logging.LogRecord) -> bool:
        once = getattr(record, 'once', '')
        if not once:
            return True
        else:
            params = self.messages.setdefault(record.msg, [])
            if record.args in params:
                return False

            params.append(record.args)
            return True


class SphinxLogRecordTranslator(logging.Filter):
    """Converts a log record to one Sphinx expects

    * Make a instance of SphinxLogRecord
    * docname to path if location given
    """
    LogRecordClass: Type[logging.LogRecord]

    def __init__(self, app: "Sphinx") -> None:
        self.app = app
        super().__init__()

    def filter(self, record: SphinxWarningLogRecord) -> bool:  # type: ignore
        if isinstance(record, logging.LogRecord):
            # force subclassing to handle location
            record.__class__ = self.LogRecordClass  # type: ignore

        location = getattr(record, 'location', None)
        if isinstance(location, tuple):
            docname, lineno = location
            if docname and lineno:
                record.location = '%s:%s' % (self.app.env.doc2path(docname), lineno)
            elif docname:
                record.location = '%s' % self.app.env.doc2path(docname)
            else:
                record.location = None
        elif isinstance(location, nodes.Node):
            record.location = get_node_location(location)
        elif location and ':' not in location:
            record.location = '%s' % self.app.env.doc2path(location)

        return True


class InfoLogRecordTranslator(SphinxLogRecordTranslator):
    """LogRecordTranslator for INFO level log records."""
    LogRecordClass = SphinxInfoLogRecord


class WarningLogRecordTranslator(SphinxLogRecordTranslator):
    """LogRecordTranslator for WARNING level log records."""
    LogRecordClass = SphinxWarningLogRecord


def get_node_location(node: Node) -> Optional[str]:
    (source, line) = get_source_line(node)
    if source and line:
        return "%s:%s" % (source, line)
    elif source:
        return "%s:" % source
    elif line:
        return "<unknown>:%s" % line
    else:
        return None


class ColorizeFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        message = super().format(record)
        color = getattr(record, 'color', None)
        if color is None:
            color = COLOR_MAP.get(record.levelno)

        if color:
            return colorize(color, message)
        else:
            return message


class SafeEncodingWriter:
    """Stream writer which ignores UnicodeEncodeError silently"""
    def __init__(self, stream: IO) -> None:
        self.stream = stream
        self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii'

    def write(self, data: str) -> None:
        try:
            self.stream.write(data)
        except UnicodeEncodeError:
            # stream accept only str, not bytes.  So, we encode and replace
            # non-encodable characters, then decode them.
            self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding))

    def flush(self) -> None:
        if hasattr(self.stream, 'flush'):
            self.stream.flush()


class LastMessagesWriter:
    """Stream writer which memories last 10 messages to save trackback"""
    def __init__(self, app: "Sphinx", stream: IO) -> None:
        self.app = app

    def write(self, data: str) -> None:
        self.app.messagelog.append(data)


def setup(app: "Sphinx", status: IO, warning: IO) -> None:
    """Setup root logger for Sphinx"""
    logger = logging.getLogger(NAMESPACE)
    logger.setLevel(logging.DEBUG)
    logger.propagate = False

    # clear all handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)

    info_handler = NewLineStreamHandler(SafeEncodingWriter(status))
    info_handler.addFilter(InfoFilter())
    info_handler.addFilter(InfoLogRecordTranslator(app))
    info_handler.setLevel(VERBOSITY_MAP[app.verbosity])
    info_handler.setFormatter(ColorizeFormatter())

    warning_handler = WarningStreamHandler(SafeEncodingWriter(warning))
    warning_handler.addFilter(WarningSuppressor(app))
    warning_handler.addFilter(WarningLogRecordTranslator(app))
    warning_handler.addFilter(WarningIsErrorFilter(app))
    warning_handler.addFilter(OnceFilter())
    warning_handler.setLevel(logging.WARNING)
    warning_handler.setFormatter(ColorizeFormatter())

    messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status))
    messagelog_handler.addFilter(InfoFilter())
    messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity])
    messagelog_handler.setFormatter(ColorizeFormatter())

    logger.addHandler(info_handler)
    logger.addHandler(warning_handler)
    logger.addHandler(messagelog_handler)