Skip to content
44 changes: 24 additions & 20 deletions src/qce_interp/decoder_examples/mwpm_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,15 @@ def __init__(
optimize: bool = True,
optimized_round: int = 10,
max_optimization_shots: int = 1000,
use_diagonal_matching_weights: bool = False,
):
self._error_identifier: IErrorDetectionIdentifier = error_identifier
self._initial_state_container: InitialStateContainer = initial_state_container
self.qec_rounds = qec_rounds
self._contains_qubit_refocusing: bool = contains_qubit_refocusing
self._optimized_round = optimized_round
self._optimization_idx = list(self.qec_rounds).index(optimized_round)
self._use_diagonal_matching_weights: bool = use_diagonal_matching_weights

# binary initial state
self.initial_state = np.sum(self._initial_state_container.as_array) % 2
Expand Down Expand Up @@ -446,26 +448,28 @@ def get_fidelity(self, cycle_stabilizer_count: int, qec_round_idx: int = None, m
ancilla_q_idx = (i - (cycle_stabilizer_count + 1) * self.distance) % (self.distance - 1)
matching.add_edge(edge[0], edge[1], weight=self.time_like_weights[ancilla_q_idx],
fault_ids=edge[2]['fault_ids'])
# diagonal edges, order is A1R1, A2R1, A1R2, A2R2, A1R3, A2R3... Here we don't consider the order of CZ gates
for ancilla_q_idx in range(self.distance - 1):
for round_ in range(cycle_stabilizer_count): # The last round is assumed perfect
node_idx = ancilla_q_idx + round_ * (self.distance - 1)
if ancilla_q_idx == 0: # the first ancilla
# add right edge
matching.add_edge(node_idx, node_idx + self.distance,
weight=self.right_diagonal_weights[ancilla_q_idx],
fault_ids=ancilla_q_idx + 1) # data qb idx = idx+1
elif ancilla_q_idx == self.distance - 2: # the last ancilla
# add left edge
matching.add_edge(node_idx, node_idx + self.distance - 2,
weight=self.left_diagonal_weights[ancilla_q_idx - 1],
fault_ids=ancilla_q_idx) # data qb idx = idx
else:
# add left and right edges
matching.add_edge(node_idx, node_idx + self.distance - 2,
weight=self.left_diagonal_weights[ancilla_q_idx - 1], fault_ids=ancilla_q_idx)
matching.add_edge(node_idx, node_idx + self.distance,
weight=self.right_diagonal_weights[ancilla_q_idx], fault_ids=ancilla_q_idx + 1)

if self._use_diagonal_matching_weights:
# diagonal edges, order is A1R1, A2R1, A1R2, A2R2, A1R3, A2R3... Here we don't consider the order of CZ gates
for ancilla_q_idx in range(self.distance - 1):
for round_ in range(cycle_stabilizer_count): # The last round is assumed perfect
node_idx = ancilla_q_idx + round_ * (self.distance - 1)
if ancilla_q_idx == 0: # the first ancilla
# add right edge
matching.add_edge(node_idx, node_idx + self.distance,
weight=self.right_diagonal_weights[ancilla_q_idx],
fault_ids=ancilla_q_idx + 1) # data qb idx = idx+1
elif ancilla_q_idx == self.distance - 2: # the last ancilla
# add left edge
matching.add_edge(node_idx, node_idx + self.distance - 2,
weight=self.left_diagonal_weights[ancilla_q_idx - 1],
fault_ids=ancilla_q_idx) # data qb idx = idx
else:
# add left and right edges
matching.add_edge(node_idx, node_idx + self.distance - 2,
weight=self.left_diagonal_weights[ancilla_q_idx - 1], fault_ids=ancilla_q_idx)
matching.add_edge(node_idx, node_idx + self.distance,
weight=self.right_diagonal_weights[ancilla_q_idx], fault_ids=ancilla_q_idx + 1)

matching.set_boundary_nodes(
{(self.distance - 1) * (cycle_stabilizer_count + 1)}) # last node as the boundary node
Expand Down
49 changes: 48 additions & 1 deletion src/qce_interp/interface_definitions/intrf_error_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
ISurfaceCodeLayer,
IParityGroup,
)
from qce_interp.interface_definitions.intrf_stabilizer_index_kernel import IStabilizerIndexingKernel
from qce_interp.interface_definitions.intrf_stabilizer_index_kernel import (
IStabilizerIndexingKernel,
KernelPartitioner,
)
from qce_interp.interface_definitions.intrf_state_classification import IStateClassifierContainer


Expand Down Expand Up @@ -195,6 +198,15 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo
:return: Tensor of boolean mask based on post-selection conditions (at specific cycle).
"""
raise InterfaceMethodException

@abstractmethod
def partition_in_equal_sections(self, sections: int) -> List['IErrorDetectionIdentifier']:
"""
Creates a list of subset identifiers based on a self.
:param sections: The number of subsets (partitions) to generate.
:return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data.
"""
raise InterfaceMethodException
# endregion


Expand Down Expand Up @@ -889,6 +901,28 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo
)
result = np.logical_and(result, stabilizer_leakage_selection_mask)
return result

def partition_in_equal_sections(self, sections: int) -> List[IErrorDetectionIdentifier]:
"""
Creates a list of subset identifiers based on a self.
:param sections: The number of subsets (partitions) to generate.
:return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data.
"""
results: List[IErrorDetectionIdentifier] = []
for subsection_index_kernel in KernelPartitioner.partition_in_equal_sections(index_kernel=self._index_kernel, sections=sections):
results.append(ErrorDetectionIdentifier(
classifier_lookup=self._classifier_lookup,
index_kernel=subsection_index_kernel,
involved_qubit_ids=self._involved_qubit_ids,
device_layout=self._device_layout,
qec_rounds=self._qec_rounds,
use_heralded_post_selection=self._use_post_selection,
use_projected_leakage_post_selection=self._use_projected_leakage_post_selection,
use_stabilizer_leakage_post_selection=self._use_stabilizer_leakage_post_selection,
post_selection_qubits=self._post_selection_qubits,
use_computational_parity=self._use_computational_parity,
))
return results
# endregion

# region Static Class Methods
Expand Down Expand Up @@ -1478,5 +1512,18 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo
return self._error_detection_identifier.get_post_selection_mask(
cycle_stabilizer_count=cycle_stabilizer_count,
)

def partition_in_equal_sections(self, sections: int) -> List[ILabeledErrorDetectionIdentifier]:
"""
Creates a list of subset identifiers based on a self.
:param sections: The number of subsets (partitions) to generate.
:return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data.
"""
results: List[ILabeledErrorDetectionIdentifier] = []
for error_identifier in super().partition_in_equal_sections(sections=sections):
results.append(LabeledErrorDetectionIdentifier(
error_detection_identifier=error_identifier,
))
return results
# endregion

Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
IIndexingKernel,
IStabilizerIndexingKernel,
)
from qce_circuit.structure.acquisition_indexing.factory_stabilizer_index_kernel import KernelPartitioner

__all__ = [
"IIndexStrategy",
"FixedIndexStrategy",
"RelativeIndexStrategy",
"IIndexingKernel",
"IStabilizerIndexingKernel",
"KernelPartitioner",
]
4 changes: 2 additions & 2 deletions src/qce_interp/utilities/expected_parities.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def initial_state_to_expected_parity(initial_state: InitialStateContainer, parit
involved_data_qubit_ids=involved_data_qubit_ids,
involved_ancilla_qubit_ids=involved_ancilla_qubit_ids,
)
assert initial_state.distance == len(involved_data_qubit_ids), f"Expects initial state for all involved data qubits. Instead {initial_state.distance} out of {len(involved_data_qubit_ids)} are present."
assert all([_qubit_id in initial_state.initial_states for _qubit_id in involved_data_qubit_ids]), f"Expects initial state for all involved data qubits. Instead {initial_state.distance} out of {len(involved_data_qubit_ids)} are present."

# Reshape to (N, D) array to fit staticmethod function
initial_state_array = initial_state.as_array.reshape(1, -1)
initial_state_array = initial_state.as_ordered_array(involved_data_qubit_ids).reshape(1, -1)
computed_parity: np.ndarray = ErrorDetectionIdentifier.calculate_computational_parity(
array=initial_state_array,
parity_index_lookup=parity_index_lookup,
Expand Down
138 changes: 130 additions & 8 deletions src/qce_interp/utilities/serialize_error_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
)
from qce_circuit.connectivity.intrf_channel_identifier import IQubitID
from qce_circuit.connectivity.intrf_connectivity_surface_code import ISurfaceCodeLayer
from qce_circuit.structure.acquisition_indexing.kernel_repetition_code import SimulatedRepetitionExperimentKernel
from qce_interp.utilities.custom_exceptions import ZeroClassifierShotsException
from qce_interp.interface_definitions.intrf_state_classification import StateClassifierContainer, ParityType
from qce_interp.interface_definitions.intrf_error_identifier import (
IErrorDetectionIdentifier,
ErrorDetectionIdentifier,
ILabeledErrorDetectionIdentifier,
LabeledErrorDetectionIdentifier,
Expand All @@ -22,16 +25,18 @@
from qce_interp.decoder_examples.mwpm_decoders import MWPMDecoderFast
from qce_interp.decoder_examples.majority_voting import MajorityVotingDecoder
from qce_interp.utilities.initial_state_manager import InitialStateManager
from qce_interp.utilities.expected_parities import initial_state_to_expected_parity


__all__ = [
"construct_processed_dataset",
]

T = TypeVar("T")
TErrorDetectionIdentifier = TypeVar("TErrorDetectionIdentifier", bound=IErrorDetectionIdentifier)


def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, qec_rounds: List[int], code_layout: ISurfaceCodeLayer) -> xr.Dataset:
def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, qec_rounds: List[int], code_layout: ISurfaceCodeLayer, include_partitioned_fidelities: bool = False) -> xr.Dataset:

processed_dataset = xr.Dataset()
decoder_set: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = construct_sub_error_identifiers(
Expand All @@ -51,6 +56,15 @@ def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, init
decoder_set=decoder_set,
qec_rounds=qec_rounds,
)
if include_partitioned_fidelities:
# Add bootstrapped logical fidelities
processed_dataset = update_bootstrapped_logical_fidelity(
dataset=processed_dataset,
error_identifier=error_identifier,
initial_state=initial_state,
code_layout=code_layout,
qec_rounds=qec_rounds,
)

return processed_dataset

Expand Down Expand Up @@ -81,15 +95,17 @@ def construct_sub_error_identifiers(error_identifier: ErrorDetectionIdentifier,
involved_data_qubit_ids=error_identifier.involved_qubit_ids,
)

initial_state_arrays = get_odd_subarrays(full_array=initial_state.as_array, skip=1)
initial_state_arrays = get_odd_subarrays(full_array=initial_state.as_ordered_array(error_identifier.involved_data_qubit_ids), skip=1)
involved_qubit_arrays = get_odd_subarrays(full_array=ordered_involved_qubit_ids, skip=2)

result: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = []
for _initial_state, _involved_qubits in zip(initial_state_arrays, involved_qubit_arrays):
initial_state_container: InitialStateContainer = InitialStateContainer.from_ordered_list([
InitialStateEnum.ZERO if state == 0 else InitialStateEnum.ONE
for state in _initial_state
])
involved_data_qubits = [q for q in _involved_qubits if q in code_layout.data_qubit_ids]
initial_state_container: InitialStateContainer = InitialStateContainer(
initial_states={
_qubit_id: InitialStateEnum.ZERO if state == 0 else InitialStateEnum.ONE
for _qubit_id, state in zip(involved_data_qubits, _initial_state)
})

_error_identifier: ErrorDetectionIdentifier = error_identifier.copy_with_involved_qubit_ids(
involved_qubit_ids=_involved_qubits,
Expand All @@ -98,9 +114,9 @@ def construct_sub_error_identifiers(error_identifier: ErrorDetectionIdentifier,
error_identifier=_error_identifier,
qec_rounds=_error_identifier.qec_rounds,
initial_state_container=initial_state_container,
max_optimization_shots=2000,
optimize=False,
optimized_round=_error_identifier.qec_rounds[-1]
optimized_round=_error_identifier.qec_rounds[-1],
use_diagonal_matching_weights=False,
)
decoder_mv = MajorityVotingDecoder(
error_identifier=_error_identifier,
Expand Down Expand Up @@ -167,3 +183,109 @@ def update_logical_fidelity(dataset: xr.Dataset, decoder_set: List[Tuple[MWPMDec
)

return dataset


def partition(error_identifier: TErrorDetectionIdentifier, sections: int) -> List[TErrorDetectionIdentifier]:
return error_identifier.partition_in_equal_sections(sections=sections)


def update_bootstrapped_logical_fidelity(dataset: xr.Dataset, error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, code_layout: ISurfaceCodeLayer, qec_rounds: Union[NDArray[np.int_], List[int]], partition_sections: int = 10) -> xr.Dataset:
processed_datasets = []

for sub_error_identifier in partition(error_identifier, sections=partition_sections):

decoder_set: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = construct_sub_error_identifiers(
error_identifier=sub_error_identifier,
initial_state=initial_state,
code_layout=code_layout,
)
processed_dataset = xr.Dataset()
processed_dataset = update_logical_fidelity(
dataset=processed_dataset,
decoder_set=decoder_set,
qec_rounds=qec_rounds,
)
processed_datasets.append(processed_dataset)
# Concatenate datasets
bootstrap_dim: str = "bootstrapped"
combined_ds = xr.concat(processed_datasets, dim=bootstrap_dim)
combined_ds = combined_ds.assign_coords({bootstrap_dim: np.arange(combined_ds.sizes[bootstrap_dim])})
combined_ds = combined_ds.squeeze(dim="qec_cycles")
rename_dict = {
var_name: f"{bootstrap_dim}_{var_name}"
for var_name in combined_ds.data_vars
}
renamed_ds = combined_ds.rename(rename_dict)

dataset = dataset.merge(renamed_ds)

return dataset


def tensor_to_error_identifier(
result_tensor: np.ndarray,
depth: int,
repetitions: int,
initial_state: InitialStateContainer,
code_layout: ISurfaceCodeLayer,
involved_qubit_ids: List[IQubitID], # Ordered
**kwargs,
) -> ErrorDetectionIdentifier:
# Extract keyword arguments
inverse_parity: bool = kwargs.get("inverse_parity", False)
stabilizer_active: bool = kwargs.get("stabilizer_active", True)

# Data allocation
involved_data_qubit_ids: List[IQubitID] = [
qubit_id
for qubit_id in involved_qubit_ids
if qubit_id in code_layout.data_qubit_ids
]
involved_ancilla_qubit_ids: List[IQubitID] = [
qubit_id
for qubit_id in involved_qubit_ids
if qubit_id in code_layout.ancilla_qubit_ids
]
expected_parity = initial_state_to_expected_parity(
initial_state=initial_state,
parity_layout=code_layout,
involved_data_qubit_ids=involved_data_qubit_ids,
involved_ancilla_qubit_ids=involved_ancilla_qubit_ids,
inverse_parity=inverse_parity,
stabilizer_active=stabilizer_active,
)

classifier_lookup = {}
for i, qubit_id in enumerate(involved_qubit_ids):
state_classification = result_tensor[:, :, i].flatten()
state_classification[state_classification == 2] = (
1 # Map 0, 1, 2 outcomes to 0, 1
)

classifier_lookup[qubit_id] = StateClassifierContainer(
state_classification=state_classification,
_expected_parity=(
ParityType.EVEN
if qubit_id not in expected_parity
else expected_parity[qubit_id]
),
_stabilizer_reset=False,
)

return ErrorDetectionIdentifier(
classifier_lookup=classifier_lookup,
index_kernel=SimulatedRepetitionExperimentKernel(
rounds=[depth],
involved_data_qubit_ids=involved_data_qubit_ids,
involved_ancilla_qubit_ids=involved_ancilla_qubit_ids,
experiment_repetitions=repetitions,
),
involved_qubit_ids=involved_qubit_ids,
device_layout=code_layout,
qec_rounds=[depth],
use_heralded_post_selection=False,
use_projected_leakage_post_selection=False,
use_stabilizer_leakage_post_selection=False,
post_selection_qubits=None,
use_computational_parity=True,
)
Loading
Loading