diff --git a/setup.cfg b/setup.cfg index 608470e..68807f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,6 +23,7 @@ install_requires = h5py scikit-learn multipledispatch + pymatching cma qcecircuits @ git+https://github.com/MiniSean/QCoCircuits.git # QCoCircuits # Required additional data files diff --git a/src/qce_interp/data_manager.py b/src/qce_interp/data_manager.py index 1b6220d..50e54bd 100644 --- a/src/qce_interp/data_manager.py +++ b/src/qce_interp/data_manager.py @@ -239,8 +239,8 @@ def from_file_path(cls, file_path: Path, qec_rounds: List[int], heralded_initial for qubit_id in tqdm(involved_qubit_ids, desc='Processing data file'): channel_identifier: IAcquisitionChannelIdentifier = channel_identifier_lookup[qubit_id] - raw_shots: NDArray[np.float64] = data_dict[cls.data_key()][:, channel_identifier.channel_indices] - raw_complex_shots: NDArray[np.complex128] = StateAcquisitionContainer.real_imag_to_complex(raw_shots) + raw_shots: NDArray[np.float32] = data_dict[cls.data_key()][:, channel_identifier.channel_indices] + raw_complex_shots: NDArray[np.complex64] = StateAcquisitionContainer.real_imag_to_complex(raw_shots) # Qutrit calibration points calibration_points: StateAcquisitionContainer = StateAcquisitionContainer.from_state_acquisitions( diff --git a/src/qce_interp/decoder_examples/lookup_table.py b/src/qce_interp/decoder_examples/lookup_table.py index 5a9c678..ae8c24c 100644 --- a/src/qce_interp/decoder_examples/lookup_table.py +++ b/src/qce_interp/decoder_examples/lookup_table.py @@ -286,6 +286,13 @@ def get_fidelity(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> cycle_stabilizer_count=cycle_stabilizer_count, target_state=target_state, ) + + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + """:return: Uncertainty in Logical fidelity based on target state and stabilizer round-count.""" + return self._syndrome_decoder.get_fidelity_uncertainty( + cycle_stabilizer_count=cycle_stabilizer_count, + target_state=target_state, + ) # endregion @@ -354,3 +361,7 @@ def binary_syndrome_lookup(self) -> Dict[tuple, tuple]: return result # endregion + # region Interface Methods + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + raise NotImplemented + # endregion diff --git a/src/qce_interp/decoder_examples/majority_voting.py b/src/qce_interp/decoder_examples/majority_voting.py index e3ec002..eb68ea4 100644 --- a/src/qce_interp/decoder_examples/majority_voting.py +++ b/src/qce_interp/decoder_examples/majority_voting.py @@ -51,5 +51,16 @@ def get_fidelity(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> counter += 1 equal_fraction = counter / len(corrected_binary_output) return equal_fraction + + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + """:return: Uncertainty in Logical fidelity based on target state and stabilizer round-count.""" + logical_fidelity: float = self.get_fidelity( + cycle_stabilizer_count=cycle_stabilizer_count, + target_state=target_state, + ) + # (N, 1, D) + binary_projected_classification: np.ndarray = self._error_identifier.get_binary_projected_classification(cycle_stabilizer_count=cycle_stabilizer_count) + number_of_samples: int = binary_projected_classification.shape[0] + return float(np.sqrt(logical_fidelity * (1.0 - logical_fidelity) / number_of_samples)) # endregion diff --git a/src/qce_interp/decoder_examples/mwpm_decoders.py b/src/qce_interp/decoder_examples/mwpm_decoders.py index c7bcb38..7666331 100644 --- a/src/qce_interp/decoder_examples/mwpm_decoders.py +++ b/src/qce_interp/decoder_examples/mwpm_decoders.py @@ -227,6 +227,17 @@ def get_fidelity(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> equal_rows_count = np.sum(np.all(logical_error == 0, axis=1)) equal_fraction: float = equal_rows_count / len(logical_error) return equal_fraction + + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + """:return: Uncertainty in Logical fidelity based on target state and stabilizer round-count.""" + logical_fidelity: float = self.get_fidelity( + cycle_stabilizer_count=cycle_stabilizer_count, + target_state=target_state, + ) + # (N, 1, D) + binary_projected_classification: np.ndarray = self._error_identifier.get_binary_projected_classification(cycle_stabilizer_count=cycle_stabilizer_count) + number_of_samples: int = binary_projected_classification.shape[0] + return float(np.sqrt(logical_fidelity * (1.0 - logical_fidelity) / number_of_samples)) # endregion # region Class Methods @@ -429,6 +440,17 @@ def get_fidelity(self, cycle_stabilizer_count: int, target_state: np.ndarray = N error_rate = error_rate if self.initial_state == 0 else 1 - error_rate return 1 - error_rate + + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + """:return: Uncertainty in Logical fidelity based on target state and stabilizer round-count.""" + logical_fidelity: float = self.get_fidelity( + cycle_stabilizer_count=cycle_stabilizer_count, + target_state=target_state, + ) + # (N, 1, D) + binary_projected_classification: np.ndarray = self._error_identifier.get_binary_projected_classification(cycle_stabilizer_count=cycle_stabilizer_count) + number_of_samples: int = binary_projected_classification.shape[0] + return float(np.sqrt(logical_fidelity * (1.0 - logical_fidelity) / number_of_samples)) # endregion # region Class Methods diff --git a/src/qce_interp/interface_definitions/intrf_state_classification.py b/src/qce_interp/interface_definitions/intrf_state_classification.py index f7025ae..5a09c8b 100644 --- a/src/qce_interp/interface_definitions/intrf_state_classification.py +++ b/src/qce_interp/interface_definitions/intrf_state_classification.py @@ -9,7 +9,7 @@ from warnings import warn from sklearn.discriminant_analysis import LinearDiscriminantAnalysis from numpy.typing import NDArray -from typing import List, Dict, Optional, Callable, TypeVar +from typing import List, Dict, Optional, Callable, TypeVar, Sequence from qce_circuit.utilities.array_manipulation import unique_in_order from qce_interp.utilities.custom_exceptions import ( InterfaceMethodException, @@ -30,7 +30,7 @@ class ParityType(Enum): class StateAcquisition: """Data class, containing (complex) acquisition information, together with a state key.""" state: StateKey - shots: NDArray[np.complex128] + shots: NDArray[np.complex64] # region Class Properties @property @@ -142,13 +142,13 @@ def get_boundary_between(self, state_a: StateKey, state_b: StateKey) -> Optional return None return self.boundary_lookup[boundary_key] - def get_binary_predictions(self, shots: NDArray[np.complex128]) -> NDArray[np.int_]: + def get_binary_predictions(self, shots: NDArray[np.complex64]) -> NDArray[np.int_]: """ NOTE: Forces classification of element in group 1 or 2, disregarding other groups. NOTE: Returns integer prediction value, can be mapped to state-enum using self.prediction_index_to_state_lookup. :return: Array-like of State key predictions based on shots discrimination. """ - shot_reshaped: NDArray[np.float64] = StateAcquisitionContainer.complex_to_real_imag(shots) + shot_reshaped: NDArray[np.float32] = StateAcquisitionContainer.complex_to_real_imag(shots) # Step 1: Predict probabilities probabilities: np.ndarray = self._discriminator.predict_proba(shot_reshaped) # Step 2: Compare probabilities for groups 1 and 2 @@ -160,34 +160,34 @@ def get_binary_predictions(self, shots: NDArray[np.complex128]) -> NDArray[np.in state_indices: NDArray[np.int_] = np.where(prob_group_1 > prob_group_2, 0, 1) # 0 for group 1, 1 for group 2 return state_indices - def get_predictions(self, shots: NDArray[np.complex128]) -> NDArray[np.int_]: + def get_predictions(self, shots: NDArray[np.complex64]) -> NDArray[np.int_]: """ NOTE: Returns integer prediction value, can be mapped to state-enum using self.prediction_index_to_state_lookup. :return: Array-like of State key predictions based on shots discrimination. """ - shot_reshaped: NDArray[np.float64] = StateAcquisitionContainer.complex_to_real_imag(shots) + shot_reshaped: NDArray[np.float32] = StateAcquisitionContainer.complex_to_real_imag(shots) state_indices: NDArray[np.int_] = self._discriminator.predict(shot_reshaped) # 1 indexed return state_indices - def get_prediction(self, shot: np.complex128) -> StateKey: + def get_prediction(self, shot: np.complex64) -> StateKey: """:return: State key prediction based on shot discrimination.""" state_indices: NDArray[np.int_] = self.get_predictions(shots=np.asarray([shot])) int_to_enum = self.prediction_index_to_state_lookup return int_to_enum[state_indices[0]] - def get_fidelity(self, shots: NDArray[np.complex128], assigned_state: StateKey) -> float: + def get_fidelity(self, shots: NDArray[np.complex64], assigned_state: StateKey) -> float: """:return: Assignment fidelity defined as the probability of shots being part of assigned state.""" - shots_reshaped: NDArray[np.float64] = StateAcquisitionContainer.complex_to_real_imag(shots) + shots_reshaped: NDArray[np.float32] = StateAcquisitionContainer.complex_to_real_imag(shots) state_indices: NDArray[np.int_] = self._discriminator.predict(shots_reshaped) # 1 indexed return float(np.mean(state_indices == self._state_lookup[assigned_state])) - def post_select_on(self, shots_to_filter: NDArray[np.complex128], conditional_shots: NDArray[np.complex128], conditional_state: StateKey) -> NDArray[np.complex128]: + def post_select_on(self, shots_to_filter: NDArray[np.complex64], conditional_shots: NDArray[np.complex64], conditional_state: StateKey) -> NDArray[np.complex64]: """:return: Filtered shots based on conditional shots (of same length) and conditional state.""" # Guard clause, if conditional shots are empty, return shots without filtering if len(conditional_shots) == 0: return shots_to_filter - conditional_shots_reshaped: NDArray[np.float64] = StateAcquisitionContainer.complex_to_real_imag( + conditional_shots_reshaped: NDArray[np.float32] = StateAcquisitionContainer.complex_to_real_imag( conditional_shots) state_indices: NDArray[np.int_] = self._discriminator.predict(conditional_shots_reshaped) # 1 indexed conditional_index: int = self._state_lookup[conditional_state] @@ -315,7 +315,7 @@ def classification_boundaries(self) -> DecisionBoundaries: @property @abstractmethod - def concatenated_shots(self) -> NDArray[np.complex128]: + def concatenated_shots(self) -> NDArray[np.complex64]: """:return: Array-like of (complex-valued) concatenated acquisition shots.""" raise InterfaceMethodException # endregion @@ -358,9 +358,9 @@ def estimate_threshold(self) -> float: ) @property - def concatenated_shots(self) -> NDArray[np.complex128]: + def concatenated_shots(self) -> NDArray[np.complex64]: """:return: Array-like of (complex-valued) concatenated acquisition shots.""" - shots: List[NDArray[np.complex128]] = [value.shots for value in self.state_acquisition_lookup.values()] + shots: List[NDArray[np.complex64]] = [value.shots for value in self.state_acquisition_lookup.values()] return np.concatenate(shots) @property @@ -434,7 +434,7 @@ def real_imag_to_complex(real_imag_data: np.ndarray) -> np.ndarray: return complex_data # .reshape(-1, 1) @staticmethod - def get_threshold_estimate(shots_0: NDArray[np.complex128], shots_1: NDArray[np.complex128]) -> float: + def get_threshold_estimate(shots_0: NDArray[np.complex64], shots_1: NDArray[np.complex64]) -> float: """ Estimate 0-1 threshold based on x-axes projection. @@ -457,7 +457,7 @@ def get_threshold_estimate(shots_0: NDArray[np.complex128], shots_1: NDArray[np. class AssignmentProbabilityMatrix: """Data class, containing assignment probability matrix and array-like of state-key.""" state_keys: List[StateKey] - matrix: NDArray[np.float64] + matrix: NDArray[np.float32] # region Class Methods @classmethod @@ -479,6 +479,235 @@ def from_acquisition_container(cls, acquisition_container: IStateAcquisitionCont state_keys=states, matrix=probability_matrix, ) + + def correct_readout_error( + self, + measured_probabilities: NDArray[np.float32], + input_states: List[StateKey], + clip_values: bool = True + ) -> Optional[NDArray[np.float32]]: + """ + Corrects a vector of measured probabilities using THIS instance as the calibration matrix. + + Assumes THIS instance's matrix represents P(measure i | true k) [COLUMNS sum to 1]. + Takes a measured distribution P(measure j) and estimates P(true k). + + :param measured_probabilities: 1D array of measured probabilities P(measure j). + Order defined by input_states. Should sum to 1. + :param input_states: List of StateKeys corresponding to measured_probabilities, + defining the subspace for correction relative to THIS matrix. + :param clip_values: If True, clips results to [0, 1] and renormalizes. + :return: Numpy array of corrected probabilities P(true k) in the same order as input_states, + cast to float32, or None if correction fails (e.g., singular matrix). + Returns an empty float32 array if input sequences are empty. + :raises ValueError: If validation fails (length mismatch, unknown states in input_states). + """ + # --- Input Validation --- + if len(measured_probabilities) != len(input_states): + raise ValueError(f"Length mismatch: len(measured_probabilities)={len(measured_probabilities)} != len(input_states)={len(input_states)}.") + if not input_states: + return np.array([], dtype=np.float32) # Handle empty input + + # Check sum robustly before proceeding + prob_sum = np.sum(measured_probabilities) + if not np.isclose(prob_sum, 1.0, atol=1e-5): + # Use warning instead of error? Depends on expected usage. + warn(f"Input probabilities sum to {prob_sum:.6f}, not 1.0. Correction proceeds, but result validity depends on input.") + # raise ValueError(f"Input probabilities must sum to 1.0, but sum to {prob_sum:.4f}.") + + missing_keys = [ts for ts in input_states if ts not in self.state_keys] + if missing_keys: + raise ValueError(f"Input states {missing_keys} are not present in the " + f"calibration matrix state keys {self.state_keys}.") + if len(set(input_states)) != len(input_states): + raise ValueError(f"Duplicate states found in input_states: {input_states}") + # --- End Validation --- + + if len(self.state_keys) == 0: + warn("Cannot correct, AssignmentProbabilityMatrix (calibration matrix) is empty.") + return None + + # Get Correction Submatrix (from self.matrix, assumed P(measure|true)) + # Use float64 for the submatrix going into inversion + sub_matrix_64 = self._get_correction_submatrix(input_states).astype(np.float64) + + # Apply Core Correction Math (using float64 for precision) + raw_corrected_probs_64 = AssignmentProbabilityMatrix.apply_inverse_correction( + sub_matrix_64, + measured_probabilities.astype(np.float64) # Ensure input is float64 + ) + + if raw_corrected_probs_64 is None: + return None # Failure (error already printed by static method) + + # Clip and Re-normalize (Optional) - operates on float64 + final_probs_64 = raw_corrected_probs_64 + if clip_values: + final_probs_64 = AssignmentProbabilityMatrix.clip_and_normalize(final_probs_64) + + # Return Result Array (cast back to float32) + return final_probs_64.astype(np.float32) + + def _get_correction_submatrix(self, target_states: Sequence[StateKey]) -> NDArray[np.float32]: + """ + Extracts the relevant sub-matrix from THIS instance's matrix. + + Assumes self.matrix is P(measure i | true k). Extracts the part relevant + to the subspace defined by target_states. The target_states define both + the measured states (rows) and the true states (columns) of the submatrix. + + :param target_states: Sequence of StateKeys defining the subspace. + :return: The square sub-matrix M_sub[a, b] = P(measure target_state[a] | true target_state[b]). + """ + if not target_states: + return np.empty((0, 0), dtype=np.float32) + try: + # Get indices corresponding to target_states within self.state_keys + cal_indices = [self.state_keys.index(ts) for ts in target_states] + except ValueError as e: + # Should be caught by validation earlier, but handle defensively. + raise ValueError(f"State key not found in calibration matrix state keys during submatrix extraction: {e}") from e + # Extract rows and columns corresponding to these indices + return self.matrix[np.ix_(cal_indices, cal_indices)] + + def apply_readout_correction( + self, + noisy_assignment: 'AssignmentProbabilityMatrix', + clip_values: bool = True + ) -> Optional['AssignmentProbabilityMatrix']: + """ + Corrects a noisy assignment matrix using THIS instance as the calibration matrix. + + Corrects the input matrix where noisy_assignment.matrix[i, j] = P(assigned j | prepared i) [ROWS sum to 1]. + Uses THIS instance where self.matrix[i, k] = P(measure i | true k) [COLUMNS sum to 1] for calibration. + The output matrix represents P(true k | prepared i) [ROWS sum to 1]. + + :param noisy_assignment: An AssignmentProbabilityMatrix instance containing the noisy + assignment matrix P(assigned j | prepared i). Its state_keys + define the prepared/assigned states. ROWS should sum to 1. + :param clip_values: Whether to clip and normalize the corrected probabilities row-wise. + :return: A new AssignmentProbabilityMatrix instance containing the corrected matrix + P(true k | prepared i), where rows sum to 1, or None if correction fails for any row. + :raises ValueError: If dimensions or state keys are inconsistent between the + noisy matrix and the calibration matrix (self). + """ + # --- Input Validation --- + noisy_matrix: NDArray[np.float32] = noisy_assignment.matrix + matrix_states: List[StateKey] = noisy_assignment.state_keys + calibration_matrix: AssignmentProbabilityMatrix = self # Self is the calibrator + + n_states = len(matrix_states) + if noisy_matrix.shape != (n_states, n_states): + raise ValueError(f"Shape mismatch: noisy assignment matrix shape {noisy_matrix.shape} " + f"does not match its number of states {n_states}.") + + if n_states == 0: + warn("Input noisy_assignment matrix is empty (0 states). Returning empty matrix.") + return AssignmentProbabilityMatrix(state_keys=[], matrix=np.empty((0, 0), dtype=np.float32)) + + # Verify row sums of the input noisy matrix + row_sums = np.sum(noisy_matrix, axis=1) + if not np.allclose(row_sums, 1.0, atol=1e-5): + warn(f"Rows of input noisy assignment matrix do not all sum to 1.0 (sums: {row_sums}). " + f"Correction assumes rows represent measured probability distributions.") + + # Check if calibration matrix (self) supports the necessary states + if not all(state in calibration_matrix.state_keys for state in matrix_states): + missing = [s for s in matrix_states if s not in calibration_matrix.state_keys] + raise ValueError(f"Calibration matrix (self) is missing states required by the " + f"noisy matrix: {missing}. Cannot correct over subspace {matrix_states}.") + # --- End Validation --- + + # Create an empty matrix for the results + # Output matrix rows are prepared states, columns are TRUE states + corrected_matrix = np.zeros_like(noisy_matrix, dtype=np.float32) + + # Iterate through the ROWS of the noisy matrix + for i, prepared_state in enumerate(matrix_states): + # Row 'i' represents the measured probabilities P(assigned j | prepared i) + measured_probs_for_row_i: NDArray[np.float32] = noisy_matrix[i, :] + + # The measured probabilities correspond to the 'matrix_states' (assigned states) + # We use the 'calibration_matrix' (self) to perform the correction + # The 'input_states' for correction are the possible measured/assigned states (matrix_states) + corrected_probs_for_row_i: Optional[NDArray[np.float32]] = calibration_matrix.correct_readout_error( + measured_probabilities=measured_probs_for_row_i, + input_states=matrix_states, # These are the states corresponding to measured_probs_for_row_i + clip_values=clip_values + ) + + # Check if correction failed for this row + if corrected_probs_for_row_i is None: + # Error message already printed by correct_readout_error or its helpers + warn(f"Readout correction failed for row {i} (prepared state {prepared_state}). Returning None.") + return None # Abort correction for the whole matrix + + # Store the corrected probability vector P(true k | prepared i) in row 'i' + corrected_matrix[i, :] = corrected_probs_for_row_i + + # Optional: Verify row sums of the *output* matrix + final_row_sums = np.sum(corrected_matrix, axis=1) + if not np.allclose(final_row_sums, 1.0, atol=1e-5): + warn(f"Rows of the *corrected* matrix do not all sum to 1.0 (sums: {final_row_sums}). This can happen if clipping was aggressive or input rows didn't sum to 1.") + + # Return a new AssignmentProbabilityMatrix instance + # Note: The state_keys remain the same, representing the prepared states (rows) + # and now the TRUE states (columns). + return AssignmentProbabilityMatrix( + state_keys=matrix_states, + matrix=corrected_matrix, + ) + # endregion + + # region Static Class Methods + @staticmethod + def apply_inverse_correction(sub_matrix: NDArray[np.float64], probability_array: NDArray[np.float64]) -> Optional[NDArray[np.float64]]: + """ + Applies P_true = inv(M_sub) * P_meas using float64 for precision. + + :param sub_matrix: Calibration submatrix P(measure|true), float64. + :param probability_array: Measured probability vector P(measure), float64. + :return: Raw corrected probability P(true) as float64 array, or None if inversion fails. + """ + if probability_array.size == 0: + return np.array([], dtype=np.float64) + + p_meas_sub = probability_array.reshape(-1, 1) + try: + m_sub_inv = np.linalg.inv(sub_matrix) + except np.linalg.LinAlgError: + warn(f"Singular matrix encountered during inversion:\n{sub_matrix}") + return None + p_true_sub = m_sub_inv @ p_meas_sub + return p_true_sub.flatten() + + @staticmethod + def clip_and_normalize(prob_array: NDArray[np.float64]) -> NDArray[np.float64]: + """ + Clips float64 probabilities to [0, 1] and renormalizes to sum to 1. + + :param prob_array: Float64 probability array P(true). + :return: Clipped and normalized float64 probability array. + """ + if prob_array.size == 0: + return np.array([], dtype=np.float64) + + # Check for significant negative values before clipping + if np.any(prob_array < -1e-7): + warn(f"Corrected probabilities contained negative values before clipping: {prob_array}") + + clipped_probs = np.clip(prob_array, 0.0, 1.0) + sum_clipped = np.sum(clipped_probs) + + if sum_clipped > 1e-9: + normalized_probs = clipped_probs / sum_clipped + # Ensure sum is exactly 1? Usually not necessary but can enforce: + # normalized_probs /= np.sum(normalized_probs) + return normalized_probs + else: + # If sum is zero after clipping + warn("Corrected probabilities clipped/normalized to zero sum.") + return clipped_probs # Return the array of zeros # endregion @@ -496,6 +725,12 @@ class IStateClassifierContainer(ABC): def expected_parity(self) -> ParityType: """:return: Expected parity property.""" raise InterfaceMethodException + + @property + @abstractmethod + def stabilizer_reset(self) -> bool: + """:return: Boolean whether parity resets each round.""" + raise InterfaceMethodException # endregion # region Interface Methods @@ -606,14 +841,20 @@ def eigenvalue_to_binary(m: np.ndarray) -> np.ndarray: @dataclass(frozen=True) class StateClassifierContainer(IStateClassifierContainer): """Data class, containing classified states based on already classified states.""" - state_classification: NDArray[int] + state_classification: NDArray[np.int_] _expected_parity: ParityType = field(default=ParityType.EVEN) + _stabilizer_reset: bool = field(default=False) # region Interface Properties @property def expected_parity(self) -> ParityType: """:return: Expected parity property.""" return self._expected_parity + + @property + def stabilizer_reset(self) -> bool: + """:return: Boolean whether parity resets each round.""" + return self._stabilizer_reset # endregion # region Class Methods @@ -631,6 +872,9 @@ def get_eigenvalue_classification(self) -> NDArray[np.int_]: def get_parity_classification(self) -> NDArray[np.int_]: """:return: Parity classification based on eigenvalue classification.""" + if self._stabilizer_reset: + return self.get_eigenvalue_classification() + return IStateClassifierContainer.calculate_parity( m=self.get_eigenvalue_classification(), ) @@ -655,6 +899,7 @@ def reshape(cls, container: TStateClassifierContainer, index_slices: NDArray[np. return StateClassifierContainer( state_classification=np.array([container.state_classification[index_slice] for index_slice in index_slices]), _expected_parity=container.expected_parity, + _stabilizer_reset=container.stabilizer_reset, ) # endregion @@ -662,15 +907,21 @@ def reshape(cls, container: TStateClassifierContainer, index_slices: NDArray[np. @dataclass(frozen=True) class ShotsClassifierContainer(IStateClassifierContainer): """Data class, containing classified states based on (complex) acquisition and decision boundaries.""" - shots: NDArray[np.complex128] + shots: NDArray[np.complex64] decision_boundaries: DecisionBoundaries _expected_parity: ParityType = field(default=ParityType.EVEN) + _stabilizer_reset: bool = field(default=False) # region Interface Properties @property def expected_parity(self) -> ParityType: """:return: Expected parity property.""" return self._expected_parity + + @property + def stabilizer_reset(self) -> bool: + """:return: Boolean whether parity resets each round.""" + return self._stabilizer_reset # endregion # region Class Properties @@ -684,6 +935,7 @@ def state_classifier(self) -> StateClassifierContainer: return StateClassifierContainer( state_classification=self._process_tensor(self.shots, self.decision_boundaries.get_binary_predictions), _expected_parity=self.expected_parity, + _stabilizer_reset=self.stabilizer_reset, ) @property @@ -692,6 +944,7 @@ def state_classifier_ternary(self) -> StateClassifierContainer: return StateClassifierContainer( state_classification=self._process_tensor(self.shots, self.decision_boundaries.get_predictions), _expected_parity=self.expected_parity, + _stabilizer_reset=self.stabilizer_reset, ) # endregion @@ -727,6 +980,7 @@ def reshape(cls, container: TStateClassifierContainer, index_slices: NDArray[np. shots=np.array([container.shots[index_slice] for index_slice in index_slices]), decision_boundaries=container.decision_boundaries, _expected_parity=container.expected_parity, + _stabilizer_reset=container.stabilizer_reset, ) # endregion diff --git a/src/qce_interp/interface_definitions/intrf_syndrome_decoder.py b/src/qce_interp/interface_definitions/intrf_syndrome_decoder.py index 52a3fe0..e380278 100644 --- a/src/qce_interp/interface_definitions/intrf_syndrome_decoder.py +++ b/src/qce_interp/interface_definitions/intrf_syndrome_decoder.py @@ -32,6 +32,13 @@ def get_fidelity(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> raise InterfaceMethodException # endregion + # region Interface Methods + @abstractmethod + def get_fidelity_uncertainty(self, cycle_stabilizer_count: int, target_state: np.ndarray) -> float: + """:return: Uncertainty in Logical fidelity based on target state and stabilizer round-count.""" + raise InterfaceMethodException + # endregion + class ISyndromeDecoder(IDecoder, metaclass=ABCMeta): """ diff --git a/src/qce_interp/utilities/expected_parities.py b/src/qce_interp/utilities/expected_parities.py index 55bc187..cc4ef64 100644 --- a/src/qce_interp/utilities/expected_parities.py +++ b/src/qce_interp/utilities/expected_parities.py @@ -11,7 +11,7 @@ from qce_interp.interface_definitions.intrf_error_identifier import ErrorDetectionIdentifier -def initial_state_to_expected_parity(initial_state: InitialStateContainer, parity_layout: ISurfaceCodeLayer, involved_data_qubit_ids: List[IQubitID], involved_ancilla_qubit_ids: List[IQubitID]) -> Dict[IQubitID, ParityType]: +def initial_state_to_expected_parity(initial_state: InitialStateContainer, parity_layout: ISurfaceCodeLayer, involved_data_qubit_ids: List[IQubitID], involved_ancilla_qubit_ids: List[IQubitID], inverse_parity: bool = False) -> Dict[IQubitID, ParityType]: # Data allocation result: Dict[IQubitID, ParityType] = {} parity_index_lookup: Dict[IQubitID, NDArray[np.int_]] = ErrorDetectionIdentifier.get_parity_index_lookup( @@ -34,7 +34,7 @@ def initial_state_to_expected_parity(initial_state: InitialStateContainer, parit assert len(computed_parity) == len(involved_ancilla_qubit_ids), f"Expects parity to be defined for each involved ancilla qubit. Instead {len(computed_parity)} out of {len(involved_ancilla_qubit_ids)} are present." for qubit_id, parity in zip(involved_ancilla_qubit_ids, computed_parity): - even_parity: bool = parity == +1 + even_parity: bool = parity != +1 if inverse_parity else parity == +1 if even_parity: result[qubit_id] = ParityType.EVEN else: diff --git a/src/qce_interp/visualization/plot_combined_logical_fidelity.py b/src/qce_interp/visualization/plot_combined_logical_fidelity.py index 04c39cb..75e2ed9 100644 --- a/src/qce_interp/visualization/plot_combined_logical_fidelity.py +++ b/src/qce_interp/visualization/plot_combined_logical_fidelity.py @@ -37,9 +37,14 @@ def plot_combined_overview(decoder: IDecoder, error_identifier: IErrorDetectionI # Data allocation decoder_post_selected = decoder_constructor(error_identifier.copy_with_post_selection( use_heralded_post_selection=True, - use_projected_leakage_post_selection=False, + use_projected_leakage_post_selection=True, use_stabilizer_leakage_post_selection=True, )) + decoder_post_selected_final = decoder_constructor(error_identifier.copy_with_post_selection( + use_heralded_post_selection=True, + use_projected_leakage_post_selection=True, + use_stabilizer_leakage_post_selection=False, + )) # Define grid details gridspec_constructor_kwargs: dict = dict(height_ratios=[1, 1, 1], width_ratios=[1]) @@ -95,12 +100,21 @@ def plot_combined_overview(decoder: IDecoder, error_identifier: IErrorDetectionI target_state=target_state, **kwargs_logical, ) + plot_fidelity( + decoder=decoder_post_selected_final, + included_rounds=qec_rounds, + target_state=target_state, + color='cyan', + label='PS final-meas leakage', + fit_error_rate=True, + **kwargs_logical + ) plot_fidelity( decoder=decoder_post_selected, included_rounds=qec_rounds, target_state=target_state, color='magenta', - label='PS Heralded + Ancilla Leakage', + label='PS final/parity-meas leakage', fit_error_rate=True, **kwargs_logical ) diff --git a/src/qce_interp/visualization/plot_defect_rate.py b/src/qce_interp/visualization/plot_defect_rate.py index 749ad28..679fb2a 100644 --- a/src/qce_interp/visualization/plot_defect_rate.py +++ b/src/qce_interp/visualization/plot_defect_rate.py @@ -108,7 +108,7 @@ def is_post_selection_valid(labeled_error_identifier: LabeledErrorDetectionIdent return valid_post_selection -def plot_defect_rate(error_identifier: IErrorDetectionIdentifier, qubit_id: IQubitID, qec_cycles: int, **kwargs) -> IFigureAxesPair: +def plot_defect_rate(error_identifier: IErrorDetectionIdentifier, qubit_id: IQubitID, qec_cycles: int, inversed_defect_rate: bool = False, **kwargs) -> IFigureAxesPair: """ :param error_identifier: Instance that identifiers errors. :param qubit_id: Qubit identifier for which the defects are plotted. @@ -131,6 +131,8 @@ def plot_defect_rate(error_identifier: IErrorDetectionIdentifier, qubit_id: IQub ) # Calculate the mean across 'measurement_repetition' averages = data_array.mean(dim=DataArrayLabels.MEASUREMENT.value) + if inversed_defect_rate: + averages = 1.0 - averages # Temporarily inverts defect-rate expression label: str = qubit_id.id color: str = kwargs.pop('color', blue_shades[0]) @@ -172,7 +174,7 @@ def plot_defect_rate(error_identifier: IErrorDetectionIdentifier, qubit_id: IQub return fig, ax -def plot_all_defect_rate(error_identifier: IErrorDetectionIdentifier, included_rounds: int, **kwargs) -> IFigureAxesPair: +def plot_all_defect_rate(error_identifier: IErrorDetectionIdentifier, included_rounds: int, inversed_defect_rate: bool = False, **kwargs) -> IFigureAxesPair: """ :param error_identifier: Instance that identifiers errors. :param included_rounds: Integer number of qec cycles that should be included in the defect plot. @@ -189,6 +191,7 @@ def plot_all_defect_rate(error_identifier: IErrorDetectionIdentifier, included_r error_identifier=error_identifier, qubit_id=qubit_id, qec_cycles=included_rounds, + inversed_defect_rate=inversed_defect_rate, **kwargs, ) return fig, ax diff --git a/src/qce_interp/visualization/plot_logical_fidelity.py b/src/qce_interp/visualization/plot_logical_fidelity.py index 791ac4f..0bf6b34 100644 --- a/src/qce_interp/visualization/plot_logical_fidelity.py +++ b/src/qce_interp/visualization/plot_logical_fidelity.py @@ -1,14 +1,21 @@ # ------------------------------------------- # Module for visualizing logical fidelity and error rate. # ------------------------------------------- +from dataclasses import dataclass, field import itertools -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Type, Dict from tqdm import tqdm import numpy as np from scipy.optimize import curve_fit from qce_circuit.language import InitialStateContainer +from qce_interp.definitions import Singleton from qce_interp.utilities.custom_exceptions import ZeroClassifierShotsException from qce_interp.interface_definitions.intrf_syndrome_decoder import IDecoder +from qce_interp.decoder_examples.mwpm_decoders import ( + MWPMDecoder, + MWPMDecoderFast, +) +from qce_interp.decoder_examples.majority_voting import MajorityVotingDecoder from qce_interp.visualization.plotting_functionality import ( construct_subplot, IFigureAxesPair, @@ -30,6 +37,36 @@ ] +@dataclass(frozen=True) +class DecoderToLabel: + """ + Data class, containing decoder class or instance to label. + """ + default_label: str = "Unlabeled" + decoder_type_to_label: Dict[Type[IDecoder], str] = field(default_factory=dict) + decoder_instance_to_label: Dict[IDecoder, str] = field(default_factory=dict) + + # region Class Methods + def __post_init__(self): + default_decoder_type_to_label: Dict[Type[IDecoder], str] = { + MWPMDecoder: "MWPM", + MWPMDecoderFast: "MWPM (optimized)", + MajorityVotingDecoder: "MV", + } + default_decoder_type_to_label.update(self.decoder_type_to_label) + object.__setattr__(self, 'decoder_type_to_label', default_decoder_type_to_label) + + def to_label(self, decoder: IDecoder) -> str: + """:return: label based on decoder instance, else based on decoder type, else default label.""" + if decoder in self.decoder_instance_to_label: + return self.decoder_instance_to_label[decoder] + decoder_type = type(decoder) + if decoder_type in self.decoder_type_to_label: + return self.decoder_type_to_label[decoder_type] + return f"{decoder.__class__.__name__}" + # endregion + + def fit_function(x: np.ndarray, error: float, x_0: float) -> np.ndarray: """ Calculate the fit function value for given inputs. @@ -111,13 +148,17 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I """ # Data allocation x_array: np.ndarray = np.asarray(included_rounds) - y_array: np.ndarray = np.full_like(x_array, np.nan, dtype=np.float64) + y_array: np.ndarray = np.full_like(x_array, np.nan, dtype=np.float32) + y_err_array: np.ndarray = np.full_like(x_array, np.nan, dtype=np.float32) for i, x in tqdm(enumerate(x_array), desc=f"Processing {decoder.__class__.__name__} Decoder", total=len(x_array)): try: value: float = decoder.get_fidelity(x, target_state=target_state.as_array) + value_err: float = decoder.get_fidelity_uncertainty(x, target_state=target_state.as_array) except ZeroClassifierShotsException: value = np.nan + value_err = np.nan y_array[i] = value + y_err_array[i] = value_err color: str = kwargs.pop('color', orange_red_purple_shades[0]) label_format: LabelFormat = kwargs.get(SubplotKeywordEnum.LABEL_FORMAT.value, LabelFormat( @@ -129,25 +170,27 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I kwargs[SubplotKeywordEnum.LABEL_FORMAT.value] = label_format fig, ax = construct_subplot(**kwargs) - ax.plot( + ax.errorbar( x_array, y_array, + yerr=y_err_array, linestyle='-', marker='.', color=color, label=label, + capsize=3, ) contains_nan_values: bool = np.isnan(y_array).any() if fit_error_rate and not contains_nan_values: code_distance: int = len(target_state.as_array) exclude_first_n: int = code_distance - if code_distance < 7: + if code_distance < 5: exclude_first_n = 2 * code_distance - try: args, kwargs = get_fit_plot_arguments(x_array=x_array, y_array=y_array, exclude_first_n=exclude_first_n) - ax.plot( + ax.errorbar( *args, + yerr=0.0, **kwargs, ) except RuntimeError: @@ -159,12 +202,13 @@ def plot_fidelity(decoder: IDecoder, included_rounds: List[int], target_state: I return fig, ax -def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], target_state: InitialStateContainer, **kwargs) -> IFigureAxesPair: +def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], target_state: InitialStateContainer, decoder_labels: DecoderToLabel = DecoderToLabel(), **kwargs) -> IFigureAxesPair: """ Plots multiple decoders fidelity in one subplot. :param decoders: Decoder used to evaluate fidelity at each QEC-round. :param included_rounds: Array-like of included QEC-rounds. Each round will be evaluated. :param target_state: InitialStateContainer instance representing target state. + :param decoder_labels: (Optional) Translation from decoder to label. :param kwargs: Key-word arguments passed to subplot constructor. :return: Tuple of Figure and Axes pair. """ @@ -178,7 +222,7 @@ def plot_compare_fidelity(decoders: List[IDecoder], included_rounds: List[int], decoder=decoder, included_rounds=included_rounds, target_state=target_state, - label=f"{decoder.__class__.__name__}", + label=decoder_labels.to_label(decoder=decoder), fit_error_rate=True, **kwargs, ) diff --git a/src/qce_interp/visualization/plot_post_selection_fraction.py b/src/qce_interp/visualization/plot_post_selection_fraction.py index fbc2fb0..9dfd501 100644 --- a/src/qce_interp/visualization/plot_post_selection_fraction.py +++ b/src/qce_interp/visualization/plot_post_selection_fraction.py @@ -77,7 +77,7 @@ def plot_post_selection_fraction(error_identifier: IErrorDetectionIdentifier, qe """ # Data allocation qec_rounds: np.ndarray = np.asarray(qec_rounds) - fraction = np.zeros_like(qec_rounds, dtype=np.float64) + fraction = np.zeros_like(qec_rounds, dtype=np.float32) for i, qec_round in enumerate(qec_rounds): post_selection_mask: NDArray[np.bool_] = error_identifier.get_post_selection_mask(cycle_stabilizer_count=qec_round) fraction[i] = 1.0 - np.sum(post_selection_mask) / len(post_selection_mask) @@ -142,7 +142,7 @@ def plot_post_selection_fraction_composite(error_identifier: IErrorDetectionIden post_selection_qubits=post_selection_qubits, ), qec_rounds=qec_rounds, - label='Heralded fraction', + label='Initiation fraction', **kwargs, ) plot_post_selection_fraction( @@ -188,7 +188,7 @@ def plot_retained_fraction(error_identifier: IErrorDetectionIdentifier, qec_roun """ # Data allocation qec_rounds: np.ndarray = np.asarray(qec_rounds) - fraction = np.zeros_like(qec_rounds, dtype=np.float64) + fraction = np.zeros_like(qec_rounds, dtype=np.float32) for i, qec_round in enumerate(qec_rounds): post_selection_mask: NDArray[np.bool_] = error_identifier.get_post_selection_mask(cycle_stabilizer_count=qec_round) fraction[i] = np.sum(post_selection_mask) / len(post_selection_mask) diff --git a/src/qce_interp/visualization/plot_state_classification.py b/src/qce_interp/visualization/plot_state_classification.py index 57c498a..8033d47 100644 --- a/src/qce_interp/visualization/plot_state_classification.py +++ b/src/qce_interp/visualization/plot_state_classification.py @@ -91,7 +91,7 @@ def determine_axes_limits(state_classifier: IStateAcquisitionContainer) -> Tuple :param state_classifier: State acquisition container, containing single-shot data. :return: Tuple of 4 limits (min_x, max_x, min_y, max_y). """ - all_shots: NDArray[np.complex128] = state_classifier.concatenated_shots + all_shots: NDArray[np.complex64] = state_classifier.concatenated_shots maximum_limit: float = max( abs(min(all_shots.real)), abs(max(all_shots.real)), diff --git a/src/qce_interp/visualization/plot_state_distribution.py b/src/qce_interp/visualization/plot_state_distribution.py index 9fd294d..05c8c84 100644 --- a/src/qce_interp/visualization/plot_state_distribution.py +++ b/src/qce_interp/visualization/plot_state_distribution.py @@ -31,7 +31,7 @@ def generate_gray_code(n: int) -> NDArray[np.int_]: return gray_code_array -def calculate_state_fractions(large_array: NDArray[np.int_], gray_code_array: NDArray[np.int_]) -> NDArray[np.float64]: +def calculate_state_fractions(large_array: NDArray[np.int_], gray_code_array: NDArray[np.int_]) -> NDArray[np.float32]: """ Calculate the fraction of each Gray code state in the large M by N array. @@ -52,7 +52,7 @@ def get_state_fraction_array( rounds: List[int], gray_code_n: int, classification_method: Callable[[Any, int], NDArray[np.int_]] -) -> NDArray[np.float64]: +) -> NDArray[np.float32]: """ Calculate the state fraction array for given rounds using a specified classification method. diff --git a/tests/utilities/test_expected_parities.py b/tests/utilities/test_expected_parities.py index b093d5a..08d11ad 100644 --- a/tests/utilities/test_expected_parities.py +++ b/tests/utilities/test_expected_parities.py @@ -1,8 +1,11 @@ import unittest +import numpy as np +from numpy.testing import assert_array_equal from qce_circuit.language.intrf_declarative_circuit import InitialStateContainer, InitialStateEnum from qce_circuit.connectivity.intrf_channel_identifier import QubitIDObj from qce_interp.interface_definitions.intrf_state_classification import ParityType from qce_interp.utilities.expected_parities import initial_state_to_expected_parity +from qce_interp.interface_definitions.intrf_state_classification import StateClassifierContainer from qce_circuit.library.repetition_code.repetition_code_connectivity import Repetition9Round6Code as Repetition17Layer from qce_circuit.connectivity.connectivity_surface_code import Surface17Layer @@ -94,3 +97,80 @@ def tearDownClass(cls) -> None: """Closes any left over processes after testing""" pass # endregion + + +class ActiveParityResetTestCase(unittest.TestCase): + + # region Setup + @classmethod + def setUpClass(cls) -> None: + """Set up for all test cases""" + pass + + def setUp(self) -> None: + """Set up for every test case""" + pass + # endregion + + # region Test Cases + def test_classification_without_stabilizer_reset(self): + """Tests initial state to weight-2 parities.""" + + state_classification: np.ndarray = np.asarray([0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0]) + state_classifier = StateClassifierContainer( + state_classification=state_classification, + _expected_parity=ParityType.EVEN, + _stabilizer_reset=False, + ) + + assert_array_equal( + state_classifier.get_binary_classification(), + state_classification, + ) + assert_array_equal( + state_classifier.get_eigenvalue_classification(), + np.array([1, 1, 1, 1, -1, 1, -1, 1, -1, 1, 1, 1, 1]), + ) + assert_array_equal( + state_classifier.get_parity_classification(), + np.array([1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1]), + ) + assert_array_equal( + state_classifier.get_defect_classification(), + np.array([1, 1, 1, 1, -1, 1, 1, 1, 1, 1, -1, 1, 1]), + ) + + def test_classification_with_stabilizer_reset(self): + """Tests initial state to weight-2 parities.""" + + state_classification: np.ndarray = np.asarray([0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0]) + state_classifier = StateClassifierContainer( + state_classification=state_classification, + _expected_parity=ParityType.EVEN, + _stabilizer_reset=True, + ) + + assert_array_equal( + state_classifier.get_binary_classification(), + state_classification, + ) + assert_array_equal( + state_classifier.get_eigenvalue_classification(), + np.array([ 1, 1, 1, 1, -1, -1, -1, -1, -1, 1, 1, 1, 1]), + ) + assert_array_equal( + state_classifier.get_parity_classification(), + np.array([ 1, 1, 1, 1, -1, -1, -1, -1, -1, 1, 1, 1, 1]), + ) + assert_array_equal( + state_classifier.get_defect_classification(), + np.array([ 1, 1, 1, 1, -1, 1, 1, 1, 1, -1, 1, 1, 1]), + ) + # endregion + + # region Teardown + @classmethod + def tearDownClass(cls) -> None: + """Closes any left over processes after testing""" + pass + # endregion