Skip to content

Commit

Permalink
Add "reinstall()" method to make it easier in spawn multiprocessing (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
monchin authored Feb 19, 2025
1 parent f64b9e3 commit ab794fe
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
34 changes: 34 additions & 0 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,6 +1812,40 @@ def configure(self, *, handlers=None, levels=None, extra=None, patcher=None, act

return [self.add(**params) for params in handlers]

def reinstall(self):
"""Reinstall the core of logger.
When using multiprocessing, you can pass logger as a parameter to the target of
``multiprocessing.Process``, and run this method once, thus you don't need to pass
logger to every function you called in the same process with spawn multiprocessing.
Examples
--------
>>> def subworker(logger):
... logger.reinstall()
... logger.info("Child")
... deeper_subworker()
>>> def deeper_subworker():
... logger.info("Grandchild")
>>> def test_process_spawn():
... spawn_context = multiprocessing.get_context("spawn")
... logger.add("file.log", context=spawn_context, enqueue=True, catch=False)
...
... process = spawn_context.Process(target=subworker, args=(logger,))
... process.start()
... process.join()
... assert process.exitcode == 0
... logger.info("Main")
... logger.remove()
"""
from loguru import logger

logger._core = self._core

def _change_activation(self, name, status):
if not (name is None or isinstance(name, str)):
raise TypeError(
Expand Down
72 changes: 72 additions & 0 deletions tests/test_reinstall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import multiprocessing
import os

import pytest

from loguru import logger


@pytest.fixture
def fork_context():
yield multiprocessing.get_context("fork")


@pytest.fixture
def spawn_context():
yield multiprocessing.get_context("spawn")


class Writer:
def __init__(self):
self._output = ""

def write(self, message):
self._output += message

def read(self):
return self._output


def subworker(logger):
logger.reinstall()
logger.info("Child")
deeper_subworker()


def deeper_subworker():
logger.info("Grandchild")


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
def test_process_fork(fork_context):
writer = Writer()

logger.add(writer, context=fork_context, format="{message}", enqueue=True, catch=False)

process = fork_context.Process(target=subworker, args=(logger,))
process.start()
process.join()

assert process.exitcode == 0

logger.info("Main")
logger.remove()

assert writer.read() == "Child\nGrandchild\nMain\n"


def test_process_spawn(spawn_context):
writer = Writer()

logger.add(writer, context=spawn_context, format="{message}", enqueue=True, catch=False)

process = spawn_context.Process(target=subworker, args=(logger,))
process.start()
process.join()

assert process.exitcode == 0

logger.info("Main")
logger.remove()

assert writer.read() == "Child\nGrandchild\nMain\n"

0 comments on commit ab794fe

Please sign in to comment.