diff --git a/src/qce_interp/decoder_examples/mwpm_decoders.py b/src/qce_interp/decoder_examples/mwpm_decoders.py index 6e5e355..33c050c 100644 --- a/src/qce_interp/decoder_examples/mwpm_decoders.py +++ b/src/qce_interp/decoder_examples/mwpm_decoders.py @@ -370,6 +370,7 @@ def __init__( optimize: bool = True, optimized_round: int = 10, max_optimization_shots: int = 1000, + use_diagonal_matching_weights: bool = False, ): self._error_identifier: IErrorDetectionIdentifier = error_identifier self._initial_state_container: InitialStateContainer = initial_state_container @@ -377,6 +378,7 @@ def __init__( self._contains_qubit_refocusing: bool = contains_qubit_refocusing self._optimized_round = optimized_round self._optimization_idx = list(self.qec_rounds).index(optimized_round) + self._use_diagonal_matching_weights: bool = use_diagonal_matching_weights # binary initial state self.initial_state = np.sum(self._initial_state_container.as_array) % 2 @@ -446,26 +448,28 @@ def get_fidelity(self, cycle_stabilizer_count: int, qec_round_idx: int = None, m ancilla_q_idx = (i - (cycle_stabilizer_count + 1) * self.distance) % (self.distance - 1) matching.add_edge(edge[0], edge[1], weight=self.time_like_weights[ancilla_q_idx], fault_ids=edge[2]['fault_ids']) - # diagonal edges, order is A1R1, A2R1, A1R2, A2R2, A1R3, A2R3... Here we don't consider the order of CZ gates - for ancilla_q_idx in range(self.distance - 1): - for round_ in range(cycle_stabilizer_count): # The last round is assumed perfect - node_idx = ancilla_q_idx + round_ * (self.distance - 1) - if ancilla_q_idx == 0: # the first ancilla - # add right edge - matching.add_edge(node_idx, node_idx + self.distance, - weight=self.right_diagonal_weights[ancilla_q_idx], - fault_ids=ancilla_q_idx + 1) # data qb idx = idx+1 - elif ancilla_q_idx == self.distance - 2: # the last ancilla - # add left edge - matching.add_edge(node_idx, node_idx + self.distance - 2, - weight=self.left_diagonal_weights[ancilla_q_idx - 1], - fault_ids=ancilla_q_idx) # data qb idx = idx - else: - # add left and right edges - matching.add_edge(node_idx, node_idx + self.distance - 2, - weight=self.left_diagonal_weights[ancilla_q_idx - 1], fault_ids=ancilla_q_idx) - matching.add_edge(node_idx, node_idx + self.distance, - weight=self.right_diagonal_weights[ancilla_q_idx], fault_ids=ancilla_q_idx + 1) + + if self._use_diagonal_matching_weights: + # diagonal edges, order is A1R1, A2R1, A1R2, A2R2, A1R3, A2R3... Here we don't consider the order of CZ gates + for ancilla_q_idx in range(self.distance - 1): + for round_ in range(cycle_stabilizer_count): # The last round is assumed perfect + node_idx = ancilla_q_idx + round_ * (self.distance - 1) + if ancilla_q_idx == 0: # the first ancilla + # add right edge + matching.add_edge(node_idx, node_idx + self.distance, + weight=self.right_diagonal_weights[ancilla_q_idx], + fault_ids=ancilla_q_idx + 1) # data qb idx = idx+1 + elif ancilla_q_idx == self.distance - 2: # the last ancilla + # add left edge + matching.add_edge(node_idx, node_idx + self.distance - 2, + weight=self.left_diagonal_weights[ancilla_q_idx - 1], + fault_ids=ancilla_q_idx) # data qb idx = idx + else: + # add left and right edges + matching.add_edge(node_idx, node_idx + self.distance - 2, + weight=self.left_diagonal_weights[ancilla_q_idx - 1], fault_ids=ancilla_q_idx) + matching.add_edge(node_idx, node_idx + self.distance, + weight=self.right_diagonal_weights[ancilla_q_idx], fault_ids=ancilla_q_idx + 1) matching.set_boundary_nodes( {(self.distance - 1) * (cycle_stabilizer_count + 1)}) # last node as the boundary node diff --git a/src/qce_interp/interface_definitions/intrf_error_identifier.py b/src/qce_interp/interface_definitions/intrf_error_identifier.py index 77dd0d9..a9a4b02 100644 --- a/src/qce_interp/interface_definitions/intrf_error_identifier.py +++ b/src/qce_interp/interface_definitions/intrf_error_identifier.py @@ -15,7 +15,10 @@ ISurfaceCodeLayer, IParityGroup, ) -from qce_interp.interface_definitions.intrf_stabilizer_index_kernel import IStabilizerIndexingKernel +from qce_interp.interface_definitions.intrf_stabilizer_index_kernel import ( + IStabilizerIndexingKernel, + KernelPartitioner, +) from qce_interp.interface_definitions.intrf_state_classification import IStateClassifierContainer @@ -195,6 +198,15 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo :return: Tensor of boolean mask based on post-selection conditions (at specific cycle). """ raise InterfaceMethodException + + @abstractmethod + def partition_in_equal_sections(self, sections: int) -> List['IErrorDetectionIdentifier']: + """ + Creates a list of subset identifiers based on a self. + :param sections: The number of subsets (partitions) to generate. + :return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data. + """ + raise InterfaceMethodException # endregion @@ -889,6 +901,28 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo ) result = np.logical_and(result, stabilizer_leakage_selection_mask) return result + + def partition_in_equal_sections(self, sections: int) -> List[IErrorDetectionIdentifier]: + """ + Creates a list of subset identifiers based on a self. + :param sections: The number of subsets (partitions) to generate. + :return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data. + """ + results: List[IErrorDetectionIdentifier] = [] + for subsection_index_kernel in KernelPartitioner.partition_in_equal_sections(index_kernel=self._index_kernel, sections=sections): + results.append(ErrorDetectionIdentifier( + classifier_lookup=self._classifier_lookup, + index_kernel=subsection_index_kernel, + involved_qubit_ids=self._involved_qubit_ids, + device_layout=self._device_layout, + qec_rounds=self._qec_rounds, + use_heralded_post_selection=self._use_post_selection, + use_projected_leakage_post_selection=self._use_projected_leakage_post_selection, + use_stabilizer_leakage_post_selection=self._use_stabilizer_leakage_post_selection, + post_selection_qubits=self._post_selection_qubits, + use_computational_parity=self._use_computational_parity, + )) + return results # endregion # region Static Class Methods @@ -1478,5 +1512,18 @@ def get_post_selection_mask(self, cycle_stabilizer_count: int) -> NDArray[np.boo return self._error_detection_identifier.get_post_selection_mask( cycle_stabilizer_count=cycle_stabilizer_count, ) + + def partition_in_equal_sections(self, sections: int) -> List[ILabeledErrorDetectionIdentifier]: + """ + Creates a list of subset identifiers based on a self. + :param sections: The number of subsets (partitions) to generate. + :return: List of independent IErrorDetectionIdentifier objects, each pointing to a unique subset of the data. + """ + results: List[ILabeledErrorDetectionIdentifier] = [] + for error_identifier in super().partition_in_equal_sections(sections=sections): + results.append(LabeledErrorDetectionIdentifier( + error_detection_identifier=error_identifier, + )) + return results # endregion diff --git a/src/qce_interp/interface_definitions/intrf_stabilizer_index_kernel.py b/src/qce_interp/interface_definitions/intrf_stabilizer_index_kernel.py index 7df081a..eee90a9 100644 --- a/src/qce_interp/interface_definitions/intrf_stabilizer_index_kernel.py +++ b/src/qce_interp/interface_definitions/intrf_stabilizer_index_kernel.py @@ -10,6 +10,7 @@ IIndexingKernel, IStabilizerIndexingKernel, ) +from qce_circuit.structure.acquisition_indexing.factory_stabilizer_index_kernel import KernelPartitioner __all__ = [ "IIndexStrategy", @@ -17,4 +18,5 @@ "RelativeIndexStrategy", "IIndexingKernel", "IStabilizerIndexingKernel", + "KernelPartitioner", ] \ No newline at end of file diff --git a/src/qce_interp/utilities/expected_parities.py b/src/qce_interp/utilities/expected_parities.py index 6f2f2bf..faee69e 100644 --- a/src/qce_interp/utilities/expected_parities.py +++ b/src/qce_interp/utilities/expected_parities.py @@ -19,10 +19,10 @@ def initial_state_to_expected_parity(initial_state: InitialStateContainer, parit involved_data_qubit_ids=involved_data_qubit_ids, involved_ancilla_qubit_ids=involved_ancilla_qubit_ids, ) - assert initial_state.distance == len(involved_data_qubit_ids), f"Expects initial state for all involved data qubits. Instead {initial_state.distance} out of {len(involved_data_qubit_ids)} are present." + assert all([_qubit_id in initial_state.initial_states for _qubit_id in involved_data_qubit_ids]), f"Expects initial state for all involved data qubits. Instead {initial_state.distance} out of {len(involved_data_qubit_ids)} are present." # Reshape to (N, D) array to fit staticmethod function - initial_state_array = initial_state.as_array.reshape(1, -1) + initial_state_array = initial_state.as_ordered_array(involved_data_qubit_ids).reshape(1, -1) computed_parity: np.ndarray = ErrorDetectionIdentifier.calculate_computational_parity( array=initial_state_array, parity_index_lookup=parity_index_lookup, diff --git a/src/qce_interp/utilities/serialize_error_identifier.py b/src/qce_interp/utilities/serialize_error_identifier.py index cf8abd2..b2773ca 100644 --- a/src/qce_interp/utilities/serialize_error_identifier.py +++ b/src/qce_interp/utilities/serialize_error_identifier.py @@ -12,8 +12,11 @@ ) from qce_circuit.connectivity.intrf_channel_identifier import IQubitID from qce_circuit.connectivity.intrf_connectivity_surface_code import ISurfaceCodeLayer +from qce_circuit.structure.acquisition_indexing.kernel_repetition_code import SimulatedRepetitionExperimentKernel from qce_interp.utilities.custom_exceptions import ZeroClassifierShotsException +from qce_interp.interface_definitions.intrf_state_classification import StateClassifierContainer, ParityType from qce_interp.interface_definitions.intrf_error_identifier import ( + IErrorDetectionIdentifier, ErrorDetectionIdentifier, ILabeledErrorDetectionIdentifier, LabeledErrorDetectionIdentifier, @@ -22,6 +25,7 @@ from qce_interp.decoder_examples.mwpm_decoders import MWPMDecoderFast from qce_interp.decoder_examples.majority_voting import MajorityVotingDecoder from qce_interp.utilities.initial_state_manager import InitialStateManager +from qce_interp.utilities.expected_parities import initial_state_to_expected_parity __all__ = [ @@ -29,9 +33,10 @@ ] T = TypeVar("T") +TErrorDetectionIdentifier = TypeVar("TErrorDetectionIdentifier", bound=IErrorDetectionIdentifier) -def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, qec_rounds: List[int], code_layout: ISurfaceCodeLayer) -> xr.Dataset: +def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, qec_rounds: List[int], code_layout: ISurfaceCodeLayer, include_partitioned_fidelities: bool = False) -> xr.Dataset: processed_dataset = xr.Dataset() decoder_set: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = construct_sub_error_identifiers( @@ -51,6 +56,15 @@ def construct_processed_dataset(error_identifier: ErrorDetectionIdentifier, init decoder_set=decoder_set, qec_rounds=qec_rounds, ) + if include_partitioned_fidelities: + # Add bootstrapped logical fidelities + processed_dataset = update_bootstrapped_logical_fidelity( + dataset=processed_dataset, + error_identifier=error_identifier, + initial_state=initial_state, + code_layout=code_layout, + qec_rounds=qec_rounds, + ) return processed_dataset @@ -81,15 +95,17 @@ def construct_sub_error_identifiers(error_identifier: ErrorDetectionIdentifier, involved_data_qubit_ids=error_identifier.involved_qubit_ids, ) - initial_state_arrays = get_odd_subarrays(full_array=initial_state.as_array, skip=1) + initial_state_arrays = get_odd_subarrays(full_array=initial_state.as_ordered_array(error_identifier.involved_data_qubit_ids), skip=1) involved_qubit_arrays = get_odd_subarrays(full_array=ordered_involved_qubit_ids, skip=2) result: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = [] for _initial_state, _involved_qubits in zip(initial_state_arrays, involved_qubit_arrays): - initial_state_container: InitialStateContainer = InitialStateContainer.from_ordered_list([ - InitialStateEnum.ZERO if state == 0 else InitialStateEnum.ONE - for state in _initial_state - ]) + involved_data_qubits = [q for q in _involved_qubits if q in code_layout.data_qubit_ids] + initial_state_container: InitialStateContainer = InitialStateContainer( + initial_states={ + _qubit_id: InitialStateEnum.ZERO if state == 0 else InitialStateEnum.ONE + for _qubit_id, state in zip(involved_data_qubits, _initial_state) + }) _error_identifier: ErrorDetectionIdentifier = error_identifier.copy_with_involved_qubit_ids( involved_qubit_ids=_involved_qubits, @@ -98,9 +114,9 @@ def construct_sub_error_identifiers(error_identifier: ErrorDetectionIdentifier, error_identifier=_error_identifier, qec_rounds=_error_identifier.qec_rounds, initial_state_container=initial_state_container, - max_optimization_shots=2000, optimize=False, - optimized_round=_error_identifier.qec_rounds[-1] + optimized_round=_error_identifier.qec_rounds[-1], + use_diagonal_matching_weights=False, ) decoder_mv = MajorityVotingDecoder( error_identifier=_error_identifier, @@ -167,3 +183,109 @@ def update_logical_fidelity(dataset: xr.Dataset, decoder_set: List[Tuple[MWPMDec ) return dataset + + +def partition(error_identifier: TErrorDetectionIdentifier, sections: int) -> List[TErrorDetectionIdentifier]: + return error_identifier.partition_in_equal_sections(sections=sections) + + +def update_bootstrapped_logical_fidelity(dataset: xr.Dataset, error_identifier: ErrorDetectionIdentifier, initial_state: InitialStateContainer, code_layout: ISurfaceCodeLayer, qec_rounds: Union[NDArray[np.int_], List[int]], partition_sections: int = 10) -> xr.Dataset: + processed_datasets = [] + + for sub_error_identifier in partition(error_identifier, sections=partition_sections): + + decoder_set: List[Tuple[MWPMDecoderFast, MajorityVotingDecoder, InitialStateContainer]] = construct_sub_error_identifiers( + error_identifier=sub_error_identifier, + initial_state=initial_state, + code_layout=code_layout, + ) + processed_dataset = xr.Dataset() + processed_dataset = update_logical_fidelity( + dataset=processed_dataset, + decoder_set=decoder_set, + qec_rounds=qec_rounds, + ) + processed_datasets.append(processed_dataset) + # Concatenate datasets + bootstrap_dim: str = "bootstrapped" + combined_ds = xr.concat(processed_datasets, dim=bootstrap_dim) + combined_ds = combined_ds.assign_coords({bootstrap_dim: np.arange(combined_ds.sizes[bootstrap_dim])}) + combined_ds = combined_ds.squeeze(dim="qec_cycles") + rename_dict = { + var_name: f"{bootstrap_dim}_{var_name}" + for var_name in combined_ds.data_vars + } + renamed_ds = combined_ds.rename(rename_dict) + + dataset = dataset.merge(renamed_ds) + + return dataset + + +def tensor_to_error_identifier( + result_tensor: np.ndarray, + depth: int, + repetitions: int, + initial_state: InitialStateContainer, + code_layout: ISurfaceCodeLayer, + involved_qubit_ids: List[IQubitID], # Ordered + **kwargs, +) -> ErrorDetectionIdentifier: + # Extract keyword arguments + inverse_parity: bool = kwargs.get("inverse_parity", False) + stabilizer_active: bool = kwargs.get("stabilizer_active", True) + + # Data allocation + involved_data_qubit_ids: List[IQubitID] = [ + qubit_id + for qubit_id in involved_qubit_ids + if qubit_id in code_layout.data_qubit_ids + ] + involved_ancilla_qubit_ids: List[IQubitID] = [ + qubit_id + for qubit_id in involved_qubit_ids + if qubit_id in code_layout.ancilla_qubit_ids + ] + expected_parity = initial_state_to_expected_parity( + initial_state=initial_state, + parity_layout=code_layout, + involved_data_qubit_ids=involved_data_qubit_ids, + involved_ancilla_qubit_ids=involved_ancilla_qubit_ids, + inverse_parity=inverse_parity, + stabilizer_active=stabilizer_active, + ) + + classifier_lookup = {} + for i, qubit_id in enumerate(involved_qubit_ids): + state_classification = result_tensor[:, :, i].flatten() + state_classification[state_classification == 2] = ( + 1 # Map 0, 1, 2 outcomes to 0, 1 + ) + + classifier_lookup[qubit_id] = StateClassifierContainer( + state_classification=state_classification, + _expected_parity=( + ParityType.EVEN + if qubit_id not in expected_parity + else expected_parity[qubit_id] + ), + _stabilizer_reset=False, + ) + + return ErrorDetectionIdentifier( + classifier_lookup=classifier_lookup, + index_kernel=SimulatedRepetitionExperimentKernel( + rounds=[depth], + involved_data_qubit_ids=involved_data_qubit_ids, + involved_ancilla_qubit_ids=involved_ancilla_qubit_ids, + experiment_repetitions=repetitions, + ), + involved_qubit_ids=involved_qubit_ids, + device_layout=code_layout, + qec_rounds=[depth], + use_heralded_post_selection=False, + use_projected_leakage_post_selection=False, + use_stabilizer_leakage_post_selection=False, + post_selection_qubits=None, + use_computational_parity=True, + ) diff --git a/src/qce_interp/visualization/plot_logical_fidelity.py b/src/qce_interp/visualization/plot_logical_fidelity.py index 0bf6b34..5daec1a 100644 --- a/src/qce_interp/visualization/plot_logical_fidelity.py +++ b/src/qce_interp/visualization/plot_logical_fidelity.py @@ -8,7 +8,7 @@ import numpy as np from scipy.optimize import curve_fit from qce_circuit.language import InitialStateContainer -from qce_interp.definitions import Singleton +from qce_circuit.connectivity.intrf_channel_identifier import IQubitID from qce_interp.utilities.custom_exceptions import ZeroClassifierShotsException from qce_interp.interface_definitions.intrf_syndrome_decoder import IDecoder from qce_interp.decoder_examples.mwpm_decoders import ( @@ -67,6 +67,36 @@ def to_label(self, decoder: IDecoder) -> str: # endregion +@dataclass(frozen=True) +class DecoderToColor: + """ + Data class, containing decoder class or instance to color. + """ + default_color: str = "grey" + decoder_type_to_color: Dict[Type[IDecoder], str] = field(default_factory=dict) + decoder_instance_to_color: Dict[IDecoder, str] = field(default_factory=dict) + + # region Class Methods + def __post_init__(self): + default_decoder_type_to_color: Dict[Type[IDecoder], str] = { + MWPMDecoder: orange_red_purple_shades[0], + MWPMDecoderFast: orange_red_purple_shades[0], + MajorityVotingDecoder: "grey", + } + default_decoder_type_to_color.update(self.decoder_type_to_color) + object.__setattr__(self, 'decoder_type_to_color', default_decoder_type_to_color) + + def to_color(self, decoder: IDecoder) -> str: + """:return: label based on decoder instance, else based on decoder type, else default label.""" + if decoder in self.decoder_instance_to_color: + return self.decoder_instance_to_color[decoder] + decoder_type = type(decoder) + if decoder_type in self.decoder_type_to_color: + return self.decoder_type_to_color[decoder_type] + return self.default_color + # endregion + + def fit_function(x: np.ndarray, error: float, x_0: float) -> np.ndarray: """ Calculate the fit function value for given inputs. @@ -136,11 +166,12 @@ def get_fit_plot_arguments(x_array: np.ndarray, y_array: np.ndarray, exclude_fir return (plot_x_values, plot_y_values), plot_args -def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: InitialStateContainer, label: Optional[str] = None, fit_error_rate: bool = False, **kwargs) -> IFigureAxesPair: +def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: InitialStateContainer, target_state_order: Optional[List[IQubitID]] = None, label: Optional[str] = None, fit_error_rate: bool = False, **kwargs) -> IFigureAxesPair: """ :param decoder: Decoder used to evaluate fidelity at each QEC-round. :param included_rounds: Array-like of included QEC-rounds. Each round will be evaluated. :param target_state: InitialStateContainer instance representing target state. + :param target_state_order: (Optional) list of ordered qubit-ID's, used to construct the target state. :param fit_error_rate: (Optional) Boolean whether or not to fit the logical error rate to fidelity values. :param label: (Optional) Label passed to plot constructor. :param kwargs: Key-word arguments passed to subplot constructor. @@ -152,8 +183,8 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I y_err_array: np.ndarray = np.full_like(x_array, np.nan, dtype=np.float32) for i, x in tqdm(enumerate(x_array), desc=f"Processing {decoder.__class__.__name__} Decoder", total=len(x_array)): try: - value: float = decoder.get_fidelity(x, target_state=target_state.as_array) - value_err: float = decoder.get_fidelity_uncertainty(x, target_state=target_state.as_array) + value: float = decoder.get_fidelity(x, target_state=target_state.as_ordered_array(qubit_order=target_state_order)) + value_err: float = decoder.get_fidelity_uncertainty(x, target_state=target_state.as_ordered_array(qubit_order=target_state_order)) except ZeroClassifierShotsException: value = np.nan value_err = np.nan @@ -182,7 +213,7 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I ) contains_nan_values: bool = np.isnan(y_array).any() if fit_error_rate and not contains_nan_values: - code_distance: int = len(target_state.as_array) + code_distance: int = len(target_state.as_ordered_array(qubit_order=target_state_order)) exclude_first_n: int = code_distance if code_distance < 5: exclude_first_n = 2 * code_distance @@ -202,12 +233,13 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I return fig, ax -def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], target_state: InitialStateContainer, decoder_labels: DecoderToLabel = DecoderToLabel(), **kwargs) -> IFigureAxesPair: +def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], target_state: InitialStateContainer, target_state_order: Optional[List[IQubitID]] = None, decoder_labels: DecoderToLabel = DecoderToLabel(), decoder_colors: DecoderToColor = DecoderToColor(), **kwargs) -> IFigureAxesPair: """ Plots multiple decoders fidelity in one subplot. :param decoders: Decoder used to evaluate fidelity at each QEC-round. :param included_rounds: Array-like of included QEC-rounds. Each round will be evaluated. :param target_state: InitialStateContainer instance representing target state. + :param target_state_order: (Optional) list of ordered qubit-ID's, used to construct the target state. :param decoder_labels: (Optional) Translation from decoder to label. :param kwargs: Key-word arguments passed to subplot constructor. :return: Tuple of Figure and Axes pair. @@ -217,11 +249,12 @@ def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], fig, ax = construct_subplot(**kwargs) for decoder in decoders: kwargs[SubplotKeywordEnum.HOST_AXES.value] = (fig, ax) - kwargs['color'] = next(color_cycle) + kwargs['color'] = decoder_colors.to_color(decoder) # next(color_cycle) fig, ax = plot_fidelity( decoder=decoder, included_rounds=included_rounds, target_state=target_state, + target_state_order=target_state_order, label=decoder_labels.to_label(decoder=decoder), fit_error_rate=True, **kwargs,