diff --git a/graphix/flow/core.py b/graphix/flow/core.py index fa674c3fd..c39f23880 100644 --- a/graphix/flow/core.py +++ b/graphix/flow/core.py @@ -11,10 +11,12 @@ from typing import TYPE_CHECKING, Generic, TypeVar import networkx as nx +import numpy as np # `override` introduced in Python 3.12, `assert_never` introduced in Python 3.11 from typing_extensions import assert_never, override +from graphix._linalg import MatGF2, solve_f2_linear_system from graphix.circ_ext.extraction import ( CliffordMap, ExtractionResult, @@ -266,6 +268,60 @@ def to_gflow(self: XZCorrections[_PM_co]) -> GFlow[_PM_co]: gf.check_well_formed() # Raises a `FlowError` if the partial order and the correction function are not compatible. return gf + def to_pauli_flow(self) -> PauliFlow[_AM_co]: + r"""Extract a Pauli flow from XZ-corrections. + + This method does not invoke the flow-extraction routine on the underlying open graph. + Instead, it reconstructs, for every measured node, a correction set whose *future* + part matches the observed XZ-corrections and which satisfies the Pauli-flow + propositions (P1-P9, see :meth:`PauliFlow.check_well_formed`). + + The difficulty, compared with :meth:`to_gflow`, is that the Pauli-flow correction + sets may contain *anachronical corrections*: corrections targeting nodes measured in + the X or Y Pauli bases that lie in the present or the past of the corrected node. + Such corrections do not appear in the pattern, because :meth:`PauliFlow.to_corrections` + only keeps the part of each correction set lying in the future (the ``& future`` filter). + They must therefore be reconstructed rather than read off the corrections. + + For each measured node ``i`` this is cast as a system of linear equations over GF(2): + the membership of the future nodes in the correction set is pinned by the X-corrections + of ``i``; the free variables are the anachronical (non-future, X/Y-measured) candidates + and, where the proposition allows it, ``i`` itself; and the equations encode the + odd-neighbourhood constraints, namely the Z-corrections on the future nodes (P-future), + the vanishing of the odd neighbourhood on past non-(Y/Z) nodes (P2), the coupling on + past Y-measured nodes (P3) and the local proposition on ``i`` (P4-P9). The system is + reduced with :meth:`graphix._linalg.MatGF2.gauss_elimination` and solved with + :func:`graphix._linalg.solve_f2_linear_system`. + + Returns + ------- + PauliFlow[_AM_co] + + Raises + ------ + FlowError + If no Pauli flow is compatible with the XZ-corrections (raised by + :func:`_reconstruct_pauli_correction_function` when the GF(2) system has no + solution for at least one measured node). + + Notes + ----- + See Theorem 4 in Ref. [1] and Definition 5 therein for the Pauli-flow propositions. + + References + ---------- + [1] Browne et al., 2007 New J. Phys. 9 250 (arXiv:quant-ph/0702212). + """ + correction_function = _reconstruct_pauli_correction_function(self) + pf: PauliFlow[_AM_co] = PauliFlow(self.og, correction_function, self.partial_order_layers) + # Defensive: by construction the GF(2) equations of `_reconstruct_pauli_correction_function` + # encode propositions P2-P9 exactly, and the anachronical candidates are restricted to X/Y + # axes which guarantees P1; the general flow properties are also satisfied by construction + # (the correction function is defined on the measured nodes and its image is included in + # the non-input nodes). This check is kept as a regression guard. + pf.check_well_formed() + return pf + def to_bloch(self: XZCorrections[Measurement]) -> XZCorrections[BlochMeasurement]: """Return the XZ-corrections where all measurements in the open graph are converted to Bloch. @@ -1391,3 +1447,132 @@ def _check_flow_general_properties(flow: PauliFlow[_AM_co]) -> None: o_set = set(flow.og.output_nodes) if first_layer != o_set or not first_layer: raise PartialOrderLayerError(PartialOrderLayerErrorReason.FirstLayer, layer_index=0, layer=first_layer) + + +def _solve_pauli_correction_set( + xz: XZCorrections[AbstractMeasurement], + node: int, + future: set[int], + adjacency: Mapping[int, set[int]], + labels: Mapping[int, Plane | Axis], +) -> set[int] | None: + """Reconstruct the Pauli-flow correction set of a single ``node``. + + See :meth:`XZCorrections.to_pauli_flow` for the description of the GF(2) system solved here. + """ + og = xz.og + nodes = set(og.graph.nodes) + non_inputs = nodes - set(og.input_nodes) + x_corr = set(xz.x_corrections.get(node, ())) + z_corr = set(xz.z_corrections.get(node, ())) + label = labels[node] + + # Self-membership in the correction set, dictated by the local proposition (P4-P9). + self_fixed_in = label in {Plane.XZ, Plane.YZ, Axis.Z} + self_is_var = label in {Axis.X, Axis.Y} and node in non_inputs + if self_fixed_in and node not in non_inputs: + return None # `node` must correct itself but is an input node. + + # Anachronical candidates: non-future, X/Y-measured, non-input nodes (other than `node`). + nonfuture_others = nodes - future - {node} + candidates = sorted(a for a in nonfuture_others if a in non_inputs and labels.get(a) in {Axis.X, Axis.Y}) + variables = [*candidates, node] if self_is_var else list(candidates) + var_index = {v: i for i, v in enumerate(variables)} + + fixed_in_p = set(x_corr) + if self_fixed_in: + fixed_in_p.add(node) + + def const_at(g: int) -> int: + return len(adjacency[g] & fixed_in_p) % 2 + + def row_at(g: int) -> list[int]: + return [1 if v in adjacency[g] else 0 for v in variables] + + matrix: list[list[int]] = [] + rhs: list[int] = [] + + # Odd-neighbourhood constraints on the future nodes (the Z-corrections of `node`). + for g in future: + matrix.append(row_at(g)) + rhs.append((1 if g in z_corr else 0) ^ const_at(g)) + + # P2: the odd neighbourhood vanishes on non-future, non-(Y/Z) nodes. + for g in nonfuture_others: + lab_g = labels.get(g) + if lab_g is not None and lab_g not in {Axis.Y, Axis.Z}: + matrix.append(row_at(g)) + rhs.append(const_at(g)) + + # P3: a non-future Y-measured node `g` must lie outside the closed odd neighbourhood of the + # correction set, i.e. its membership and odd-neighbourhood membership must coincide. + for g in nonfuture_others: + if labels.get(g) == Axis.Y: + row = row_at(g) + if g in var_index: + row[var_index[g]] ^= 1 + matrix.append(row) + rhs.append(const_at(g)) + + # Local proposition on `node` (P4-P9). + if label == Axis.Y: + # P9: exactly one of (node in p, node in Odd(p)). + row = row_at(node) + if node in var_index: + row[var_index[node]] ^= 1 + matrix.append(row) + rhs.append(1 ^ const_at(node)) + elif label != Axis.Z: + # XY, XZ, X -> node in Odd(p); YZ -> node not in Odd(p). + target = 0 if label == Plane.YZ else 1 + matrix.append(row_at(node)) + rhs.append(target ^ const_at(node)) + + n_vars = len(variables) + if not matrix: + # No constraints: free variables default to 0, so the correction set is exactly + # the part of ``p(node)`` pinned by the X-corrections and the local proposition. + return set(fixed_in_p) + + # Reduce the augmented matrix ``[A | b]`` to row echelon form together so that the + # row operations propagate to the right-hand side. Inconsistent systems leave a row + # ``[0...0 | 1]`` after reduction, which signals that no Pauli flow exists. + augmented = np.array([[*row, c] for row, c in zip(matrix, rhs, strict=True)], dtype=np.uint8).view(MatGF2) + augmented = augmented.gauss_elimination(ncols=n_vars) + lhs = MatGF2(augmented[:, :n_vars]) + b = augmented[:, n_vars] + for i in range(lhs.shape[0]): + if not lhs[i].any() and b[i] != 0: + return None + solution = solve_f2_linear_system(lhs, MatGF2(b)) + + correction_set = set(fixed_in_p) + correction_set.update(v for v, bit in zip(variables, solution, strict=True) if int(bit)) + return correction_set + + +def _reconstruct_pauli_correction_function(xz: XZCorrections[AbstractMeasurement]) -> dict[int, set[int]]: + """Reconstruct a Pauli-flow correction function from XZ-corrections. + + See :meth:`XZCorrections.to_pauli_flow`. + """ + og = xz.og + adjacency: dict[int, set[int]] = {n: set(og.graph.neighbors(n)) for n in og.graph.nodes} + labels: dict[int, Plane | Axis] = {n: meas.to_plane_or_axis() for n, meas in og.measurements.items()} + + # future_of[node]: nodes measured strictly after `node` (more future in the partial order). + future_of: dict[int, set[int]] = {} + accumulated: set[int] = set() + for layer in xz.partial_order_layers: + for node in layer: + future_of[node] = set(accumulated) + accumulated |= set(layer) + + correction_function: dict[int, set[int]] = {} + for node in og.measurements: + correction_set = _solve_pauli_correction_set(xz, node, future_of[node], adjacency, labels) + if correction_set is None: + # No correction set reconciles the XZ-corrections with the Pauli-flow propositions. + raise FlowError + correction_function[node] = correction_set + return correction_function diff --git a/graphix/pattern.py b/graphix/pattern.py index bcc18035b..ec20cd760 100644 --- a/graphix/pattern.py +++ b/graphix/pattern.py @@ -991,6 +991,35 @@ def extract_gflow(self) -> GFlow[BlochMeasurement]: """ return self.extract_xzcorrections().downcast_bloch().to_gflow() + def extract_pauli_flow(self) -> PauliFlow[Measurement]: + r"""Extract the Pauli flow structure from the current measurement pattern. + + This method does not call the flow-extraction routine on the underlying open graph, but + reconstructs the Pauli flow from the pattern corrections instead (see + :meth:`graphix.flow.core.XZCorrections.to_pauli_flow`). Contrary to + :meth:`extract_causal_flow` and :meth:`extract_gflow`, Pauli measurements are kept as + axes rather than downcast to planar Bloch measurements, so that the Pauli-basis + structure is preserved. + + Returns + ------- + PauliFlow[Measurement] + The Pauli flow associated with the current pattern. + + Raises + ------ + FlowError + If the pattern is empty or if no Pauli flow is compatible with the pattern corrections. + ValueError + If `N` commands in the pattern do not represent a :math:`|+\rangle` state or if the + pattern corrections form closed loops. + + Notes + ----- + The notes provided in :func:`self.extract_causal_flow` apply here as well. + """ + return self.extract_xzcorrections().to_pauli_flow() + def extract_xzcorrections(self) -> XZCorrections[Measurement]: """Extract the XZ-corrections from the current measurement pattern. diff --git a/tests/test_pauli_flow_extraction.py b/tests/test_pauli_flow_extraction.py new file mode 100644 index 000000000..aa91a99de --- /dev/null +++ b/tests/test_pauli_flow_extraction.py @@ -0,0 +1,184 @@ +r"""Tests for Pauli-flow extraction from a pattern / XZ-corrections. + +Correctness criterion +--------------------- +A reconstructed Pauli flow ``pf`` generates the original pattern if and only if +``pf.check_well_formed()`` succeeds *and* ``pf.to_corrections()`` reproduces the pattern's +X- and Z-corrections exactly. The latter "round-trip" property is the decisive check: it +guarantees that the flow generates *this* pattern (and not merely some Pauli flow of the +underlying open graph, which need not be unique). The tests below verify this on the three +worked examples of the issue, on a Pauli-measured open graph, and on a randomized family of +open graphs that admit a Pauli flow. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import networkx as nx +import numpy as np +import pytest + +from graphix import Measurement, OpenGraph, Pattern +from graphix.command import E, M, N, X, Z +from graphix.flow.core import XZCorrections +from graphix.flow.exceptions import FlowError +from graphix.opengraph import OpenGraphError + +if TYPE_CHECKING: + from collections.abc import Callable, Mapping + from collections.abc import Set as AbstractSet + + from numpy.random import Generator + + +def _norm(corrections: Mapping[int, AbstractSet[int]]) -> dict[int, frozenset[int]]: + """Drop empty correction sets to compare correction dictionaries up to empty entries.""" + return {k: frozenset(v) for k, v in corrections.items() if v} + + +def _assert_round_trip(pattern: Pattern) -> None: + xz = pattern.extract_xzcorrections() + pf = xz.to_pauli_flow() # `check_well_formed` runs inside `to_pauli_flow`. + assert pf.is_well_formed() + rt = pf.to_corrections() + assert _norm(rt.x_corrections) == _norm(xz.x_corrections) + assert _norm(rt.z_corrections) == _norm(xz.z_corrections) + + +def _correction_function(pattern: Pattern) -> dict[int, set[int]]: + pf = pattern.extract_pauli_flow() + return {k: set(v) for k, v in pf.correction_function.items()} + + +def _causal_pattern() -> Pattern: + return Pattern(input_nodes=[0], cmds=[N(1), E((0, 1)), M(0, Measurement.XY(0)), X(1, {0})], output_nodes=[1]) + + +def _gflow_pattern() -> Pattern: + return Pattern( + input_nodes=[0], + cmds=[ + N(1), N(2), N(3), E((0, 1)), E((0, 2)), E((1, 2)), E((1, 3)), + M(0, Measurement.XY(0.1)), X(2, {0}), X(3, {0}), + M(1, Measurement.XZ(0.2)), Z(2, {1}), Z(3, {1}), X(2, {1}), + ], + output_nodes=[2, 3], + ) # fmt: skip + + +def _pauli_pattern() -> Pattern: + return Pattern( + input_nodes=[0], + cmds=[ + N(1), N(2), N(3), E((0, 1)), E((1, 2)), E((2, 3)), + M(0, Measurement.X), X(3, {0}), + M(1, Measurement.X), Z(3, {1}), + M(2, Measurement.X), X(3, {2}), + ], + output_nodes=[3], + ) # fmt: skip + + +def test_extract_pauli_flow_causal_example() -> None: + pattern = _causal_pattern() + assert _correction_function(pattern) == {0: {1}} + _assert_round_trip(pattern) + + +def test_extract_pauli_flow_gflow_example() -> None: + pattern = _gflow_pattern() + assert _correction_function(pattern) == {0: {2, 3}, 1: {1, 2}} + _assert_round_trip(pattern) + + +def test_extract_pauli_flow_pauli_example() -> None: + # The flow must include the anachronical correction (node 1 in p(0)) that does not + # appear in the pattern, in order to satisfy the X-axis proposition (P7). + pattern = _pauli_pattern() + assert _correction_function(pattern) == {0: {1, 3}, 1: {2}, 2: {3}} + _assert_round_trip(pattern) + + +@pytest.mark.filterwarnings("ignore:Open graph with non-inferred Pauli measurements.") +def test_extract_pauli_flow_pauli_opengraph() -> None: + og = OpenGraph( + graph=nx.Graph([(0, 2), (2, 4), (3, 4), (4, 6), (1, 4), (1, 6), (2, 3), (3, 5), (2, 6), (3, 6)]), + input_nodes=[0], + output_nodes=[5, 6], + measurements={ + 0: Measurement.XY(0.1), + 1: Measurement.XZ(0.1), + 2: Measurement.Y, + 3: Measurement.XY(0.1), + 4: Measurement.Z, + }, + ) + _assert_round_trip(og.to_pattern()) + + +_MEASUREMENTS: list[Callable[[Generator], Measurement]] = [ + lambda r: Measurement.XY(float(r.random())), + lambda r: Measurement.XZ(float(r.random())), + lambda r: Measurement.YZ(float(r.random())), + lambda _r: Measurement.X, + lambda _r: Measurement.Y, + lambda _r: Measurement.Z, +] + + +@pytest.mark.filterwarnings("ignore:Open graph with non-inferred Pauli measurements.") +def test_extract_pauli_flow_randomized_round_trip() -> None: + # Generate random open graphs; those that admit a Pauli flow (so that `to_pattern` + # succeeds) are converted to a pattern, and the reconstructed flow is checked to be + # well formed and to reproduce the pattern's corrections. + tested = 0 + for seed in range(400): + rng = np.random.default_rng(seed) + n = int(rng.integers(4, 10)) + graph = nx.gnp_random_graph(n, 0.45, seed=seed) + if graph.number_of_edges() == 0: + continue + nodes = list(graph.nodes()) + rng.shuffle(nodes) + n_out = int(rng.integers(1, max(2, n // 2))) + n_in = int(rng.integers(0, max(1, n // 2))) + outputs = nodes[:n_out] + inputs = nodes[n_out : n_out + n_in] + measurements = { + m: _MEASUREMENTS[int(rng.integers(0, len(_MEASUREMENTS)))](rng) for m in nodes if m not in outputs + } + try: + pattern = OpenGraph( + graph=graph, input_nodes=inputs, output_nodes=outputs, measurements=measurements + ).to_pattern() + except OpenGraphError: + # The randomly drawn open graph does not admit a flow (the only documented + # raise condition of `OpenGraph.to_pattern`) -> not a valid test case. + continue + _assert_round_trip(pattern) + tested += 1 + assert tested >= 30 # ensure the randomized sweep actually exercised the extraction + + +def test_to_pauli_flow_raises_when_no_flow_exists() -> None: + # A measured input node that must correct itself (Z axis) admits no Pauli flow, + # because the correction set's image cannot contain an input node. + og1 = OpenGraph(graph=nx.Graph([(0, 1)]), input_nodes=[0], output_nodes=[1], measurements={0: Measurement.Z}) + with pytest.raises(FlowError): + XZCorrections(og1, {}, {}, [{1}, {0}]).to_pauli_flow() + + # An isolated node measured in the XY plane cannot satisfy proposition P4 + # (it must lie in the odd neighbourhood of its correction set), so the GF(2) + # system has no solution and no Pauli flow exists. + graph: nx.Graph[int] = nx.Graph() + graph.add_node(0) + graph.add_edge(1, 2) + og2 = OpenGraph( + graph=graph, + input_nodes=[], + output_nodes=[2], + measurements={0: Measurement.XY(0.1), 1: Measurement.XY(0.1)}, + ) + with pytest.raises(FlowError): + XZCorrections(og2, {}, {}, [{2}, {1}, {0}]).to_pauli_flow()