Skip to content

Instantly share code, notes, and snippets.

@YannickJadoul
Last active October 21, 2022 05:58
Show Gist options
  • Select an option

  • Save YannickJadoul/f1fc8db711ed980cf02610277af058e4 to your computer and use it in GitHub Desktop.

Select an option

Save YannickJadoul/f1fc8db711ed980cf02610277af058e4 to your computer and use it in GitHub Desktop.
Python with-statement in pybind11
#include <pybind11/embed.h>
#include "with.h"
#include <fstream>
#include <iostream>
namespace py = pybind11;
class PrintingMgr { public: int enter() { std::cout << "__enter__" << " - " << std::flush; return 42; }; void exit(const py::object &type, const py::object &value, const py::object &traceback) { std::cout << " - " << "__exit__" << std::endl; py::print(type, value, traceback); }; };
class SwallowExceptionsMgr {};
class NonPythonException {};
int main() {
py::scoped_interpreter guard{};
// Normal use
auto io = py::module::import("io");
py::with(io.attr("open")("test.out", "w"), [](py::object f) {
f.attr("write")(__TIME__);
f.attr("write")("\n");
auto locals = py::dict(py::arg("f") = f);
py::exec("import time\nf.write(time.strftime('%X'))", py::globals(), locals);
});
std::string line1, line2;
std::getline(std::getline(std::ifstream("test.out"), line1), line2);
std::cout << line1 << std::endl;
std::cout << line2 << std::endl;
// Python error inside
try {
py::with(io.attr("open")("test.out", "w"), [](py::object f) {
f.attr("write")(__TIME__);
f.attr("blah")();
});
}
catch (const py::error_already_set &e) {
std::cout << e.what() << std::endl;
}
// Error with context manager
try {
py::with(py::int_(42), [](py::object i) {});
}
catch (const py::error_already_set &e) {
std::cout << e.what() << std::endl;
}
// Custom C++ class, printing during enter and exit
py::module test_module("test_module");
py::class_<PrintingMgr>(test_module, "PrintingMgr")
.def("__enter__", &PrintingMgr::enter)
.def("__exit__", &PrintingMgr::exit);
// Test different signatures of lambda
py::with(py::cast(PrintingMgr()), []() { py::print(py::none(), py::arg("end")="", py::arg("flush")=true); });
py::with(py::cast(PrintingMgr()), [](py::object as) { py::print(as, py::arg("end")="", py::arg("flush")=true); });
py::with(py::cast(PrintingMgr()), [](py::object &&as) { py::print(as, py::arg("end")="", py::arg("flush")=true); });
py::with(py::cast(PrintingMgr()), [](const py::object &as) { py::print(as, py::arg("end")="", py::arg("flush")=true); });
py::with(py::cast(PrintingMgr()), [](py::object &as) { py::print(as, py::arg("end")="", py::arg("flush")=true); });
// Does not compile (and rightfully so)
// py::with(py::cast(PrintingMgr()), [](int a) { });
try {
py::with(py::cast(PrintingMgr()), [](py::object) { py::eval("42 / 0"); });
}
catch (const py::error_already_set &e)
{
std::cout << e.what() << std::endl;
}
// A custom C++ class that will swallow any Python exception inside
py::class_<SwallowExceptionsMgr>(test_module, "SwallowExceptionsMgr")
.def("__enter__", [](const SwallowExceptionsMgr &) {})
.def("__exit__", [](const SwallowExceptionsMgr &, const py::object &, const py::object &, const py::object &) { return true; });
py::with(py::cast(SwallowExceptionsMgr()), [](py::object f) {
py::print("HERE");
});
py::with(py::cast(SwallowExceptionsMgr()), [](py::object f) {
py::print("THERE");
throw py::value_error("Bleh");
py::print("WHEREVER");
});
py::with(py::cast(SwallowExceptionsMgr()), [](py::object f) {
std::cout << "Going to throw NonPythonException in with-block with SwallowExceptionsMgr" << std::endl;
throw NonPythonException();
});
try {
py::with(py::cast(SwallowExceptionsMgr()), [](py::object f) {
std::cout << "Going to throw NonPythonException in with-block with SwallowExceptionsMgr without translating exceptions" << std::endl;
throw NonPythonException();
}, py::exception_policy::cascade);
}
catch (const NonPythonException &e) {
std::cout << "\tBut caught it" << std::endl;
}
try {
py::with(io.attr("open")("test.out", "r"), []() {
std::cout << "Going to throw NonPythonException in with-block with Python file" << std::endl;
throw NonPythonException();
});
}
catch (const py::error_already_set &e) {
std::cout << "Should NOT be here" << std::endl;
}
catch (const NonPythonException &e) {
std::cout << "\tBut caught it" << std::endl;
}
// Illustrate break and continue
for (auto i = 0; i < 4; ++i) {
bool break_ = false;
py::with(io.attr("open")("test.out", "r"), [&](py::object f) {
if (i == 3)
std::cout << "Should NOT be here" << std::endl;
std::cout << py::cast<std::string>(f.attr("read")()) << std::endl;
if (i == 0)
/* continue */ return;
std::cout << "Don't print this the first time" << std::endl;
if (i == 2) {
break_ = true; return; }
std::cout << "Don't print this the first or last time" << std::endl;
});
if (break_) break;
}
}
// ...
/// Fetch and hold an error which was already set in Python. An instance of this is typically
/// thrown to propagate python-side errors back through C++ which can either be caught manually or
/// else falls back to the function dispatcher (which then raises the captured error back to
/// python).
class error_already_set : public std::runtime_error {
public:
/// Constructs a new exception from the current Python error indicator, if any. The current
/// Python error indicator will be cleared.
error_already_set() : std::runtime_error(detail::error_string()) {
PyErr_Fetch(&type.ptr(), &value.ptr(), &trace.ptr());
}
inline ~error_already_set();
/// Give the currently-held error back to Python, if any. If there is currently a Python error
/// already set it is cleared first. After this call, the current object no longer stores the
/// error variables (but the `.what()` string is still available).
void restore() { PyErr_Restore(type.release().ptr(), value.release().ptr(), trace.release().ptr()); }
// Does nothing; provided for backwards compatibility.
PYBIND11_DEPRECATED("Use of error_already_set.clear() is deprecated")
void clear() {}
/// Check if the currently trapped error type matches the given Python exception class (or a
/// subclass thereof). May also be passed a tuple to search for any exception class matches in
/// the given tuple.
bool matches(handle ex) const { return PyErr_GivenExceptionMatches(ex.ptr(), type.ptr()); }
const object &exc_type() const { return type; }
const object &exc_value() const { return value; }
const object &exc_trace() const { return trace; }
private:
object type, value, trace;
};
// ...
#include <type_traits>
namespace pybind11 {
namespace detail {
void translate_exception_to_python(std::exception_ptr last_exception) {
auto &registered_exception_translators = get_internals().registered_exception_translators;
for (auto& translator : registered_exception_translators) {
try {
translator(last_exception);
} catch (...) {
last_exception = std::current_exception();
continue;
}
throw error_already_set();
}
// Not a Python error
std::rethrow_exception(last_exception);
}
template <typename F, typename SFINAE=void>
struct takes_object_rval_ref_argument : std::false_type {};
template <typename F>
struct takes_object_rval_ref_argument<F, decltype(std::declval<F>()(std::declval<object>()))> : std::true_type {};
template <typename F, typename SFINAE=void>
struct takes_object_lval_ref_argument : std::false_type {};
template <typename F>
struct takes_object_lval_ref_argument<F, decltype(std::declval<F>()(std::declval<object&>()))> : std::true_type {};
template <typename F, typename SFINAE=void>
struct takes_no_arguments : std::false_type {};
template <typename F>
struct takes_no_arguments<F, decltype(std::declval<F>()())> : std::true_type {};
}
// PEP 343 specification: https://www.python.org/dev/peps/pep-0343/#specification-the-with-statement
enum class exception_policy {
cascade,
translate_to_python
};
template <typename Block, typename std::enable_if<detail::takes_object_rval_ref_argument<Block>::value>::type* = nullptr>
void with(const object &mgr, Block &&block, exception_policy policy = exception_policy::translate_to_python) {
object exit = mgr.attr("__exit__");
object value = mgr.attr("__enter__")();
bool exc = true;
std::exception_ptr original_exception = nullptr;
try {
try {
std::forward<Block>(block)(std::move(value));
}
catch (const error_already_set &) {
// If already a Python error, catch in the outer try-catch
original_exception = std::current_exception();
throw;
}
catch (...) {
// Else, try our best to translate the error into a Python error before calling mrg.__exit__
original_exception = std::current_exception();
if (policy == exception_policy::translate_to_python)
detail::translate_exception_to_python(std::current_exception());
else
throw;
}
}
catch (const error_already_set &e) {
// A Python error
auto exit_result = exit(e.exc_type() ? e.exc_type() : none(),
e.exc_value() ? e.exc_value() : none(),
e.exc_trace() ? e.exc_trace() : none());
if (!bool_(std::move(exit_result)))
std::rethrow_exception(original_exception);
}
catch (...) {
// Not a Python error
exit(none(), none(), none());
std::rethrow_exception(original_exception);
}
exit(none(), none(), none());
}
template <typename Block, typename std::enable_if<!detail::takes_object_rval_ref_argument<Block>::value &&
detail::takes_object_lval_ref_argument<Block>::value>::type* = nullptr>
inline void with(const object &mgr, Block &&block) {
with(mgr, [&block](object &&o) { block(o); });
}
template <typename Block, typename std::enable_if<detail::takes_no_arguments<Block>::value>::type* = nullptr>
inline void with(const object &mgr, Block &&block) {
with(mgr, [&block](object &&) { block(); });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment