diff --git a/otto/src/clyso/ceph/otto/tools/clyso-nines b/otto/src/clyso/ceph/otto/tools/clyso-nines new file mode 100755 index 0000000..109dbce --- /dev/null +++ b/otto/src/clyso/ceph/otto/tools/clyso-nines @@ -0,0 +1,1201 @@ +#!/usr/bin/env python3 +# +# clyso-nines - A tool to estimate data loss risk in Ceph clusters +# +# Copyright (C) 2025 Dan van der Ster +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import argparse +from collections import defaultdict +import math +import random +import time +import shlex +import itertools +import sys + +VERBOSE = False +DEBUG = False + +# ---------------------------- Utilities ---------------------------- + +def vprint(*args, **kwargs): + if VERBOSE or DEBUG: + print(*args, **kwargs) + +def dprint(*args, **kwargs): + if DEBUG: + print(*args, **kwargs) + +def reconstruct_command_line(parser, args): + command_parts = [parser.prog, args.command] + + for arg in vars(args): + if arg == 'command' or arg == 'func': + continue + value = getattr(args, arg) + if isinstance(value, bool): + if value: + command_parts.append(f"--{arg.replace('_', '-')}") + elif value is not None: + command_parts.append(f"--{arg.replace('_', '-')}") + command_parts.append(shlex.quote(str(value))) + return ' '.join(command_parts) + +def round_to_nearest_power_of_two(n): + if n <= 0: + raise ValueError("Input must be a positive number.") + lower = 2 ** math.floor(math.log2(n)) + upper = 2 ** math.ceil(math.log2(n)) + return lower if (n - lower) < (upper - n) else upper + +def count_nines(value, max_digits=20): + if value < 0 or value > 1: + raise ValueError(f"Value {value} must be between 0 and 1") + + # Round to given precision + rounded = round(value, max_digits) + s = f"{rounded:.{max_digits}f}" + + if not s.startswith("0."): + return f"{max_digits}-nines" # Treat 1.0 as "max nines" + + decimal_part = s.split('.')[1] + count = 0 + for c in decimal_part: + if c == '9': + count += 1 + else: + break + return f"{count}-nines" if count > 0 else "0-nines (0.0)" + +def comb(n, k): + """Return C(n, k) with math.comb-like semantics.""" + if not (isinstance(n, int) and isinstance(k, int)): + raise TypeError("n and k must be integers") + if n < 0: + raise ValueError("n must be non-negative") + if k < 0 or k > n: + return 0 + k = min(k, n - k) # symmetry + c = 1 + for i in range(1, k + 1): + c = c * (n - i + 1) // i + return c + +def mttdl(N, k, afr, fudge, recovery_time_sec): + T_year = 31_536_000 # seconds in a year + p = afr * recovery_time_sec / T_year + prob_k_failures = fudge*comb(N, k) * (p ** k) + mttdl_seconds = recovery_time_sec / prob_k_failures + return mttdl_seconds + +# https://en.wikipedia.org/wiki/Binomial_distribution +def log_prob_exact_k_failures(n, p, k): + if p == 0: + return float('-inf') if k > 0 else 0.0 + if p == 1: + return float('-inf') if k < n else 0.0 + return ( + math.log(comb(n, k)) + + k * math.log(p) + + (n - k) * math.log1p(-p) + ) + +def prob_k_or_more_failures(n, p, k, cutoff=1e-30): + """Stable + fast binomial tail probability P(X>=k).""" + logs = [] + max_log = -float('inf') + + # Only go forward until terms become negligible + for i in range(k, n + 1): + lp = log_prob_exact_k_failures(n, p, i) + + # Track maximum for log-sum-exp normalization + if lp > max_log: + max_log = lp + + logs.append(lp) + + # Stop when next term is extremely small + if len(logs) > 5: + # Compare last 5 terms with peak + if all((max_log - x) > -math.log(cutoff) for x in logs[-5:]): + break + + # log-sum-exp + return math.exp(max_log) * sum(math.exp(x - max_log) for x in logs) + +def expected_max_osd_load(n_osds, n_pgs): + if n_pgs == 0 or n_osds == 0: + return float('inf') + if n_pgs < n_osds: + # Sparse case: use log-ratio approximation + return math.ceil(math.log(n_osds) / math.log(n_osds / n_pgs)) + else: + # Dense case: average load + deviation + avg = n_pgs / n_osds + deviation = math.sqrt(2 * avg * math.log(n_osds)) + return math.ceil(avg + deviation) + +def format_bytes(val): + units = ["B", "KB", "MB", "GB", "TB", "PB"] + thresholds = [1, 1024, 1024**2, 1024**3, 1024**4, 1024**5] + + for i in reversed(range(1, len(units))): + if val >= thresholds[i]: + return f"~{val / thresholds[i]:.1f}{units[i]}" + return f"~{round(val)}B" + +def format_int(val): + units = ["", "K", "M", "B"] + thresholds = [1, 1000, 1000**2, 1000**3] + + for i in reversed(range(1, len(units))): + if val >= thresholds[i]: + return f"~{val / thresholds[i]:.1f}{units[i]}" + return f"~{round(val)}" + +def format_time(seconds): + units = ["s", "m", "h", "d", "y", "thousand years", "million years", "billion years"] + thresholds = [1, 60, 3600, 86400, 86400*365, 86400*365*1000, 86400*365*10**6, 86400*365*10**9] + + for i in reversed(range(1, len(units))): + if seconds >= thresholds[i]: + return f"{round(seconds / thresholds[i], 1)} {units[i]}" + return f"{round(seconds)} s" + +def simple_crush(osd_ids, num_domains, pg_size, num_pgs): + if pg_size > num_domains: + raise ValueError("PG size cannot be greater than num failure domains.") + + # Group OSDs into failure domains + domains = defaultdict(list) + for i, osd in enumerate(osd_ids): + domain = i % num_domains + domains[domain].append(osd) + + domain_ids = list(domains.keys()) + if DEBUG: + for domain in domain_ids: + dprint(f"Domain {domain}: OSDs {domains[domain]}") + + pgs = [] + for _ in range(num_pgs): + # Randomly choose failure domains for this PG + chosen_domains = random.sample(domain_ids, pg_size) + pg_osds = [] + for domain in chosen_domains: + # Pick a random OSD from the selected domain + pg_osds.append(random.choice(domains[domain])) + pgs.append(tuple(pg_osds)) + if DEBUG: dprint(f"Generated PG: {pgs[-1]} from domains {chosen_domains}") + + return domains, pgs + +def confidence_interval(successes, trials): + if trials == 0: + return (0, 0) + p_hat = successes / trials + # 99.9% confidence interval using normal approximation + z = 3.2905 # z-score for 99.9% confidence + margin_of_error = z * ((p_hat * (1 - p_hat)) / trials) ** 0.5 + return (max(0, p_hat - margin_of_error), min(1, p_hat + margin_of_error)) + + +def effective_recovery_rate(object_size_bytes, + recovery_bw=50*1024*1024, # 50 MB/s + osd_recovery_sleep=0.1, + osd_recovery_max_active=1): + """ + Calculate Ceph effective recovery throughput considering chunking and sleep. + + object_size_bytes: size of one object in bytes + recovery_bw_MBps: raw recovery bandwidth (MB/s) + osd_recovery_sleep: sleep time after each chunk (seconds) + osd_recovery_max_active: number of concurrent recovery operations per OSD + """ + chunk_size_bytes = (object_size_bytes * osd_recovery_max_active) + + # Time to recover chunk at given bandwidth (seconds) + transfer_time = chunk_size_bytes / recovery_bw + + # Add recovery sleep per chunk + total_time = transfer_time + osd_recovery_sleep + + # Effective throughput (Bps) + return chunk_size_bytes / total_time + + +def infer_failure_domains(pgs): + """Infer failure domains from OSD and PG distribution.""" + + # discover all 'up' OSDs -- those are outputs of the crush map + osds = set() + for pg in pgs: + osds.update(pg) + + # start with a single host containing all OSDs + osd_tree = [list(osds)] + + for pg in pgs: + pg_size = len(pg) + + # map OSDs to hosts, and count how many PG OSDs fall into each host + osd_to_host = {} + host_size = {} + + for host_idx, host in enumerate(osd_tree): + count = 0 + for osd in pg: + if osd in host: + osd_to_host[osd] = host_idx + count += 1 + if count > 0: + host_size[host_idx] = count + + # split OSDs from this PG across different hosts + for host_idx in range(len(osd_tree) + pg_size - 1): + if host_idx not in host_size: + continue + + if host_size[host_idx] < 1: + continue + + first = True + new_host_offset = 1 + for osd in list(osd_tree[host_idx]): + if osd not in pg: + continue + + if first: + first = False + continue + + osd_tree[host_idx].remove(osd) + try: + osd_tree[host_idx + new_host_offset].append(osd) + except IndexError: + osd_tree.append(list([osd])) + new_host_offset += 1 + + dprint("Inferred failure domains:") + for idx, host in enumerate(osd_tree): + dprint(f" Domain {idx}: OSDs {host}") + + return osd_tree + + +def calculate_pg_loss_rate(num_domains, + osds_per_domain, + pg_size, + num_pgs, + num_failures_for_data_loss, + num_osds, + prefailed_domains=0): + + vprint(f"---------------------------") + vprint(f"Theoretical loss rate calculation (with {prefailed_domains} pre-failed domains):") + + H = int(num_domains) + f_total_threshold = int(num_failures_for_data_loss) + f_new = max(0, f_total_threshold - int(prefailed_domains)) + + H_dead = int(prefailed_domains) + n = int(osds_per_domain) + r = int(pg_size) + N = int(num_osds) + + if H_dead >= H: return 1.0 + + H_alive = H - H_dead + N_alive = H_alive * n + + # 1. Eligibility (Global) + if f_new == 0: + eligibility = 1.0 + elif f_new > H_alive: + eligibility = 0.0 + else: + try: + eligibility = (comb(H_alive, f_new) * (n ** f_new)) / comb(N_alive, f_new) + except ValueError: + eligibility = 0.0 + + vprint(f" 1. Eligibility (New failures on distinct alive hosts): {eligibility:.3g}") + + # 2. Iterate through PG "Vulnerability Buckets" + prob_pool_survives = 1.0 + + for k in range(r + 1): + # Safety checks + if k > H_dead: continue + if (r - k) > H_alive: continue + + # Proportion of PGs in this state + try: + prob_pg_state_k = (comb(H_dead, k) * comb(H_alive, r - k)) / comb(H, r) + except ValueError: continue + + if prob_pg_state_k == 0: continue + + num_pgs_in_state = num_pgs * prob_pg_state_k + + # The number of *additional* shards we need to lose to kill this PG + needed = max(0, f_total_threshold - k) + + p_kill_pg = 0.0 + + if needed <= 0: + p_kill_pg = 1.0 + elif f_new < needed: + p_kill_pg = 0.0 + else: + # For EC, we might lose 'x' shards, where 'needed' <= x <= 'f_new'. + # Any overlap >= needed is fatal. + # We sum the probabilities of all fatal overlap sizes. + + r_alive = r - k # Healthy legs of this PG + + # The denominator for picking hosts (how many ways to pick f_new hosts from H_alive) + total_host_combos = comb(H_alive, f_new) + + # We iterate x (the number of hits on this PG's healthy legs) + # Max hits is min(f_new, r_alive) because we can't hit more hosts than we failed, + # and we can't hit more legs than the PG has. + for x in range(needed, min(f_new, r_alive) + 1): + # 1. Choose 'x' hosts that are part of the PG (from r_alive) + ways_hit_pg = comb(r_alive, x) + + # 2. Choose the remaining 'f_new - x' failures from non-PG hosts (H_alive - r_alive) + ways_miss_pg = comb(H_alive - r_alive, f_new - x) + + if total_host_combos > 0: + prob_hosts = (ways_hit_pg * ways_miss_pg) / total_host_combos + else: + prob_hosts = 0 + + # 3. Probability that we hit the specific OSDs on those 'x' hosts + # Since we assume 1 failure per host (eligibility), simply (1/n)^x + prob_osds = (1 / n) ** x + + p_kill_pg += prob_hosts * prob_osds + + # Aggregate Risk for this bucket + if p_kill_pg >= 1.0: + prob_survive_bucket = math.exp(-num_pgs_in_state) + elif p_kill_pg <= 0.0: + prob_survive_bucket = 1.0 + else: + # High precision aggregation + prob_survive_bucket = math.exp(num_pgs_in_state * math.log1p(-p_kill_pg)) + + vprint(f" Bucket [Overlap={k}]: {prob_pg_state_k*100:.1f}% of PGs. Kill Prob: {p_kill_pg:.4g}") + prob_pool_survives *= prob_survive_bucket + + # Final Risk Calculation + risk_raw = 1.0 - prob_pool_survives + + # Handle the Dead-On-Arrival edge case (where eligibility shouldn't reduce risk) + prob_survive_doa = 1.0 + for k in range(r + 1): + if k > H_dead: continue + needed = max(0, f_total_threshold - k) + if needed <= 0: + try: + p_state = (comb(H_dead, k) * comb(H_alive, r - k)) / comb(H, r) + prob_survive_doa *= math.exp(- (num_pgs * p_state)) + except ValueError: continue + + risk_doa = 1.0 - prob_survive_doa + total_risk = max(risk_doa, eligibility * risk_raw) + + return min(1.0, max(0.0, total_risk)) + + +# ---------------------------- Simulate Subcommand ---------------------------- +def simulate(args, domains=None): + # Set random seed for reproducibility + random.seed(args.seed) + + # FIXME: The math breaks without this workaround + if args.domains_down > 0: + args.fudge = 1 + + raw_to_stored_factor = 0 + profile = "" + # Derive PG size + if args.k is not None and args.m is not None: + pg_size = args.k + args.m + raw_to_stored_factor = args.k / (args.k + args.m) + profile = f"{args.k}+{args.m} erasure" + num_failures_for_data_loss = args.m + 1 + elif args.size is not None: + pg_size = args.size + raw_to_stored_factor = 1 / (args.size) + profile = f"{args.size}x replicated" + num_failures_for_data_loss = args.size + else: + raise ValueError("Specify either --size or both --k and --m.") + + num_failures_for_data_loss = max(0, num_failures_for_data_loss) + + # how many (additional) OSD failures would cause data loss? + # adjust for pre-failed domains + num_osds_to_fail = max(0, num_failures_for_data_loss - args.domains_down) + + num_simulations = args.simulations + if num_simulations < 100: + raise ValueError("Number of simulations must be at least 100 for meaningful results.") + if num_simulations % 50 != 0: + raise ValueError("Number of simulations must be a multiple of 50 for progress display.") + + vprint(f"---------------------------") + print() + + if not hasattr(args, 'pgs'): + # simulating a randomized pool + + pgs_per_osd = args.pgs_per_osd + num_domains = args.num_domains + osds_per_domain = args.osds_per_domain + num_osds = num_domains * osds_per_domain + + failure_domain = args.failure_domain + + num_pgs = round_to_nearest_power_of_two((num_osds * pgs_per_osd) / pg_size) + osds = list(range(1, num_osds + 1)) + vprint(f"Generating {num_pgs} PGs using simple CRUSH-like algorithm...") + time_start = time.time() + domains, pgs = simple_crush(osds, num_domains, pg_size, num_pgs) + time_end = time.time() + dprint(f" Generated {num_pgs} PGs in {time_end - time_start:.2f} seconds.") + + # Print simulation summary + print(f"Simulating a {profile} pool with {num_osds} OSDs and {num_domains} {failure_domain}s ({osds_per_domain} OSDs per {failure_domain})") + print(f" PGs: {num_pgs} total, averaging {num_pgs * pg_size / num_osds:.3g} per OSD") + + else: + # analyzing simulated failures on a given pool + pgs = args.pgs + osds = args.osds + num_domains = args.num_domains + failure_domain = args.failure_domain + osds_per_domain = float(len(osds)) / num_domains + num_pgs = len(pgs) + num_osds = len(osds) + pgs_per_osd = num_pgs * pg_size / num_osds + + # Print analyze summary + print(f"Analyzing '{args.pool_name}':") + print(f" Info: {profile} pool with {num_osds} OSDs and {num_domains} {failure_domain}s") + print(f" PGs: {num_pgs} total, ~{num_pgs * pg_size / num_osds:.3g} per OSD") + + # --- Calculate OSD distribution stats --- + osd_counts = defaultdict(int) + for pg in pgs: + for osd in pg: + osd_counts[osd] += 1 + + # specific list of OSDs from your previous lines + counts = [osd_counts.get(osd, 0) for osd in osds] + + if counts: + min_pgs = min(counts) + max_pgs = max(counts) + mean_pgs = sum(counts) / len(counts) + variance = sum((x - mean_pgs) ** 2 for x in counts) / len(counts) + std_dev = math.sqrt(variance) + + vprint(f" OSD Distribution (PGs per OSD):") + vprint(f" Min: {min_pgs}") + vprint(f" Max: {max_pgs}") + vprint(f" Mean: {mean_pgs:.2f}") + vprint(f" Std: {std_dev:.2f}") + + # Re-derive the stored bytes/objects per PG from tb_per_osd + bytes_per_osd = args.tb_per_osd * 1024**4 + total_bytes = num_osds * bytes_per_osd + bytes_per_pg = total_bytes / num_pgs * raw_to_stored_factor + try: + objects_per_pg = bytes_per_pg / args.object_size + except ZeroDivisionError: + objects_per_pg = 0 + if args.k is not None: + bytes_per_pg /= args.k + + print(f" Data per PG: {format_bytes(bytes_per_pg)}, {format_int(objects_per_pg)} objects") + vprint(f" Example PG: {pgs[0]}\n") + + + # Compute theoretical loss rate + # i.e. how often do random failures cause data loss + calculated_loss_rate = loss_rate = calculate_pg_loss_rate( + num_domains, + osds_per_domain, + pg_size, + num_pgs, + num_failures_for_data_loss, + num_osds, + args.domains_down + ) + + print(f" Calculated PG Loss Rate (with {args.domains_down} pre-failed domains): {loss_rate * 100:.4g}%") + + + # simulate the major incidents + vprint(f"Monte Carlo Sim: n={num_simulations} {num_failures_for_data_loss}x failures") + + num_batches = 50 + batch_size = num_simulations // num_batches + if args.skip_monte_carlo: + vprint(" Skipping Monte Carlo simulation as per user request.") + num_batches = 0 + simulations_with_loss = 0 + total_pg_losses = 0 + vprint() + if not args.skip_monte_carlo: + vprint(f" ╭{'─' * num_batches}╮") + vprint(f" │", end="", flush=True) + else: + num_simulations = 1000000000 + simulations_done = 0 + sim_runtime = args.max_mc_runtime + start_time = time.time() + frames = ["⣾", "⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽"] + spinner = itertools.cycle(frames) + spin_time = start_time + for batch in range(num_batches): + batch_good = True + for _ in range(int(num_simulations/num_batches)): + if spin_time + 0.1 < time.time(): + vprint(next(spinner), end="\b", flush=True) + spin_time = time.time() + simulations_done += 1 + failed_osds = set() + + if args.domains_down > 0: + # pre-fail args.domains_down * osds_per_domain OSDs + failed_osds = set() + failed_domains = random.sample(range(num_domains), args.domains_down) + for domain in failed_domains: + failed_osds.update(domains[domain]) + + dprint(f"Pre-failed OSDs: {failed_osds}") + failed_osds.update(random.sample(list(set(osds) - failed_osds), num_osds_to_fail)) + dprint(f"Failed OSDs: {failed_osds}") + + dead_pgs = 0 + for pg in pgs: + if sum(1 for osd in pg if osd in failed_osds) >= num_failures_for_data_loss: + dead_pgs += 1 + dprint(f"PG {pg} lost due to failed OSDs {failed_osds}") + + + total_pg_losses += dead_pgs + if dead_pgs > 0: + simulations_with_loss += 1 + batch_good = False + if batch_good: + vprint("⣿",end="",flush=True) + else: + vprint("⣀",end="",flush=True) + # Exit early if taking too long + elapsed = time.time() - start_time + if elapsed > sim_runtime: + vprint(" timeout, exiting early. ",end="",flush=True) + num_simulations = simulations_done + break + + if not args.skip_monte_carlo: + vprint(f"│") + vprint(f" ╰{'─' * num_batches}╯") + vprint() + + if args.skip_monte_carlo or simulations_with_loss < 1: + # use theoretical estimate + total_pg_losses = simulations_with_loss = calculated_loss_rate * num_simulations # use theoretical estimate + confidence = (loss_rate, loss_rate) + plus_minus = 0.0 + vprint(f" No PGs lost in Monte Carlo simulation, using theoretical estimate.") + else: + # use simulated result + loss_rate = simulations_with_loss / num_simulations + confidence = confidence_interval(simulations_with_loss, num_simulations) + plus_minus = confidence[1] - loss_rate + vprint(f" Observed Incidents with ≥1 PG lost: {simulations_with_loss}/{num_simulations} ({100 * loss_rate:.3f}%)") + print(f" Monte Carlo Data Loss Rate: {loss_rate * 100:.3g}% ± {plus_minus * 100:.3g}%") + + # warn if theoretical and simulated differ significantly + if calculated_loss_rate < confidence[0] or calculated_loss_rate > confidence[1]: + vprint() + vprint(" Warning: Theoretical and simulated loss rates differ significantly!") + vprint(f" Theoretical: {calculated_loss_rate * 100:.3g}%, Simulated: {100 * loss_rate:.3g}% ± {plus_minus * 100:.3g}%") + vprint() + + + pgs_lost_per_event = total_pg_losses / num_simulations + pct_data_loss = total_pg_losses / (num_simulations * num_pgs) + vprint(f" Expected Data Loss Per Incident: {pgs_lost_per_event:.3g} PGs, {format_bytes(pgs_lost_per_event*bytes_per_pg)} ({pct_data_loss * 100:.3g}%)") + + # Try to compute a meaningful MTTDL and Nines Durability Number + vprint("") + vprint("---------------------------") + vprint("") + + vprint(f"Estimating the probability of {num_osds_to_fail}x failures") + afr = args.afr + if pg_size > 1: + vprint(f" Estimated per-drive AFR: {afr}") + + # recovery rate is a function of object size, recovery bandwidth, and OSD recovery sleep + recovery_rate = effective_recovery_rate(args.object_size, + args.recovery_rate*1024*1024, + args.osd_recovery_sleep, + args.osd_recovery_max_active) + + try: + vprint(f" Estimated per-OSD recovery rate: {format_bytes(recovery_rate)}ps ({format_int(recovery_rate/args.object_size)} objects/s)") + except ZeroDivisionError: + vprint(f" Estimated per-OSD recovery rate: {format_bytes(recovery_rate)}ps ({format_int(recovery_rate)} Bps)") + + try: + recovery_time_per_pg = bytes_per_pg / recovery_rate + except ZeroDivisionError: + recovery_time_per_pg = 0 + + vprint(f" Expected PG recovery time ({format_bytes(bytes_per_pg)}/{format_bytes(recovery_rate)}ps): {format_time(recovery_time_per_pg)}") + + # Estimate max OSD load during recovery when things are critical + # e.g. for 3x repl: how long to recover 2 OSDs failed. + # e.g. for 6+3 EC: how long to recover 3 OSDs failed. + # TODO: check this part + if args.k is not None: + critical_osds = args.m + else: + critical_osds = args.size - 1 + num_osds_left = num_osds - args.domains_down*osds_per_domain - num_osds_to_fail + 1 + # How many PGs need to be recovered + max_osd_load = expected_max_osd_load(num_osds_left, critical_osds*pgs_per_osd) + + vprint(f" Expected most loaded OSD when recovering {format_int(critical_osds*pgs_per_osd)} PGs to {num_osds_left} OSDs = {max_osd_load} PGs") + + recovery_time = max(max_osd_load * recovery_time_per_pg, 60) # minimum 1 minutes per PG + vprint(f" Expected critical period ({max_osd_load} PGs * {format_time(recovery_time_per_pg)}) = {format_time(recovery_time)}") + + critical_windows_per_year = 31536000 / recovery_time + vprint(f" Expected critical periods annually (1 yr / {format_time(recovery_time)}) = {critical_windows_per_year:.1f}") + + failure_prob_per_disk = recovery_time / 31536000 * afr + vprint(f" Expected per-OSD critical period failure rate = {failure_prob_per_disk:.2g} (1 in {format_int(1/failure_prob_per_disk)})") + + vprint(f" Failure correlation factor = {args.fudge}") + + failure_prob = args.fudge * prob_k_or_more_failures(num_osds, failure_prob_per_disk, num_osds_to_fail) + vprint(f" Probability of {num_osds_to_fail}+ failures in any critical period ({format_time(recovery_time)}) = 1 in {format_int(1/failure_prob)}") + + expected_annual_events = critical_windows_per_year * failure_prob + vprint(f" Expected annual {num_osds_to_fail}+ failure events: {expected_annual_events:.3g} (1 every {format_time(31536000/expected_annual_events)})") + mtt_multi_failure = mttdl(num_osds, num_failures_for_data_loss, afr, args.fudge, recovery_time) + vprint(f" Alternate calculation (mean time to {num_osds_to_fail}+ concurrent failures) = {format_time(mtt_multi_failure)}") + else: # size == 1 (no redundancy) + vprint(" No redundancy configured, data loss is certain on any failure.") + expected_annual_events = num_osds * afr + mtt_multi_failure = 31536000 / expected_annual_events + vprint(f" Expected annual failure events: {expected_annual_events:.3g} (1 every {format_time(31536000/expected_annual_events)})") + + + + vprint("---------------------------") + vprint("") + + min_loss_rate = confidence[0] + max_loss_rate = confidence[1] + + inv_loss_rate = 1/loss_rate if loss_rate > 0 else float('inf') + inv_min_loss_rate = 1/min_loss_rate if min_loss_rate > 0 else float('inf') + inv_max_loss_rate = 1/max_loss_rate if max_loss_rate > 0 else float('inf') + + vprint(f"MTTDL/Durability Estimates:") + vprint(f" Mean time to {num_osds_to_fail}x failure: {format_time(mtt_multi_failure)}") + vprint(f" Probability that {num_osds_to_fail}x failures causes data loss: {loss_rate*100:.3g}% (1 in {format_int(inv_loss_rate)})") + vprint(f" 95% CI: {min_loss_rate*100:.3g}%-{max_loss_rate*100:.3g}% (1 in {format_int(inv_max_loss_rate)} to 1 in {format_int(inv_min_loss_rate)})") + + vprint("") + vprint(f"Combining the above to estimate annual data loss:") + + # Compute MTTDL + expected_losses_per_year = expected_annual_events * loss_rate + min_expected_losses_per_year = expected_annual_events * min_loss_rate + max_expected_losses_per_year = expected_annual_events * max_loss_rate + try: + mttdl_sec = 31536000/expected_losses_per_year + except ZeroDivisionError: + mttdl_sec = float('inf') + try: + min_mttdl_sec = 31536000/max_expected_losses_per_year + except ZeroDivisionError: + min_mttdl_sec = float('inf') + try: + max_mttdl_sec = 31536000/min_expected_losses_per_year + print(f" MTTDL: {format_time(mttdl_sec)}") + vprint(f" (95% CI: {format_time(min_mttdl_sec)} - {format_time(max_mttdl_sec)})") + except ZeroDivisionError: + print(f" MTTDL: {format_time(mttdl_sec)}") + vprint(f" (95% CI: {format_time(min_mttdl_sec)} - ∞)") + + # Compute Nines Durability Number + expected_pgs_lost_per_year = expected_losses_per_year * pgs_lost_per_event + min_expected_pgs_lost_per_year = min_expected_losses_per_year * pgs_lost_per_event + max_expected_pgs_lost_per_year = max_expected_losses_per_year * pgs_lost_per_event + expected_durability = max(0, 1 - (expected_pgs_lost_per_year / num_pgs)) + min_expected_durability = max(0, 1 - (max_expected_pgs_lost_per_year / num_pgs)) + max_expected_durability = max(0, 1 - (min_expected_pgs_lost_per_year / num_pgs)) + print(f" Durability: {count_nines(expected_durability)}") + vprint(f" (95% CI: {count_nines(min_expected_durability)} to {count_nines(max_expected_durability)})") + + expected_bytes_lost_per_year = expected_pgs_lost_per_year * bytes_per_pg + min_expected_bytes_lost_per_year = min_expected_pgs_lost_per_year * bytes_per_pg + max_expected_bytes_lost_per_year = max_expected_pgs_lost_per_year * bytes_per_pg + + vprint(f"Expected annual data loss: {format_bytes(expected_bytes_lost_per_year)}") + vprint(f" (95% CI: {format_bytes(min_expected_bytes_lost_per_year)} - {format_bytes(max_expected_bytes_lost_per_year)})") + + return (format_time(mttdl_sec), count_nines(expected_durability)) + + +# ---------------------------- Analyze Subcommand ---------------------------- + +def analyze_pool(args, pool, pg_dump): + import subprocess + import json + + # FIXME: The math breaks without this workaround + if args.domains_down > 0: + args.fudge = 1 + + vprint(f"Analyzing pool '{pool['pool_name']}' (id {pool['pool_id']}, size {pool['size']}) ") + if 'erasure_code_profile' in pool and pool['erasure_code_profile']: + vprint(f" Type: erasure-coded ({pool['erasure_code_profile']})") + try: + # ceph osd erasure-code-profile get + try: + ceph_cmd = ['ceph', f'--cluster={args.cluster}', 'osd', 'erasure-code-profile', 'get', pool['erasure_code_profile'], '--format=json'] + ec_profile_json = subprocess.check_output(ceph_cmd) + ec_profile_data = json.loads(ec_profile_json) + args.k = int(ec_profile_data['k']) + args.m = int(ec_profile_data['m']) + except Exception as e: + print(f"Error fetching EC profile data: {e}") + ec_profile_data = None + + # Try using local copy of ceph-osd-erasure-code-profile-get.json + if ec_profile_data is None: + try: + with open('ceph-osd-erasure-code-profile-get.json', 'r') as f: + ec_profile_data = json.load(f) + print("Reading ceph-osd-erasure-code-profile-get.json ...") + args.k = int(ec_profile_data['k']) + args.m = int(ec_profile_data['m']) + except Exception as e2: + print(f"Error loading local EC profile data: {e2}") + return + except: + print(" Warning: Unable to parse erasure code profile. Please specify --k and --m manually.") + print(pool['erasure_code_profile']) + return + else: + vprint(f" Type: replicated") + args.size = pool['size'] + args.k = None + args.m = None + + args.pg_num = pool['pg_num'] + + pg_stats = pg_dump['pg_map']['pg_stats'] + pgs = [pg['acting'] for pg in pg_stats if pg['pgid'].startswith(f"{pool['pool_id']}.")] + assert len(pgs) == args.pg_num, "PG count mismatch!" + args.pgs = pgs + args.pool_name = pool['pool_name'] + + osds = set() + for pg in pg_stats: + osds.update(pg['acting']) + osds.update(pg['up']) + + args.osds = list(osds) + args.num_osds = pg_dump['pg_map']['osd_stats_sum']['num_osds'] + assert args.num_osds == len(osds), "OSD count mismatch!" + + # infer the number of failure domains + domains = infer_failure_domains(pgs) + args.num_domains = len(domains) + args.failure_domain = "domain" + + vprint(f" Number of PGs: {args.pg_num}") + vprint(f" Number of OSDs: {len(osds)}") + vprint(f" Number of {args.failure_domain}s: {args.num_domains}") + + # Derive tb_per_osd from pool size and pg stat_sum num_bytes + bytes_stored = sum(pg['stat_sum']['num_bytes'] for pg in pg_stats if pg['pgid'].startswith(f"{pool['pool_id']}.")) + objects_stored = sum(pg['stat_sum']['num_objects'] for pg in pg_stats if pg['pgid'].startswith(f"{pool['pool_id']}.")) + raw_to_stored_factor = 0 + if args.k is not None and args.m is not None: + raw_to_stored_factor = (args.k) / (args.k + args.m) + else: + raw_to_stored_factor = 1 / (args.size) + bytes_raw = bytes_stored / raw_to_stored_factor + objects_raw = objects_stored / raw_to_stored_factor + args.tb_per_osd = bytes_raw / len(osds) / (1024**4) + args.object_size = bytes_raw / objects_raw if objects_raw > 0 else 4*1024*1024 # default to 4MB + + vprint(f" Total stored data: {format_bytes(bytes_stored)} ({format_int(objects_stored)} objects)") + vprint(f" Total raw data: {format_bytes(bytes_raw)} ({format_int(objects_raw)} objects)") + vprint(f" Data per OSD: {format_bytes(args.tb_per_osd * 1024**4)} ({format_int(objects_raw / len(osds))} objects)") + vprint(f" Average object size: {format_bytes(args.object_size)}") + vprint("") + [mttdl, nines] = simulate(args, domains=domains) + + +def analyze(args): + import subprocess + import json + + # Set random seed for reproducibility + random.seed(args.seed) + + cluster = args.cluster + pool_name = args.pool + + # fetch pool list + try: + ceph_cmd = ['ceph', f'--cluster={cluster}', 'osd', 'pool', 'ls', 'detail', '--format=json'] + pool_json = subprocess.check_output(ceph_cmd) + pool_data = json.loads(pool_json) + except Exception as e: + pool_data = None + + # Try using local copy of ceph-osd-pool-ls-detail.json + if pool_data is None: + try: + with open('ceph-osd-pool-ls-detail.json', 'r') as f: + pool_data = json.load(f) + print("Reading ceph-osd-pool-ls-detail.json ...") + except Exception as e2: + print(f"Error loading local pool data: {e2}") + return + + # fetch ceph pg dump + try: + ceph_cmd = ['ceph', f'--cluster={cluster}', 'pg', 'dump', '--format=json'] + pg_dump_json = subprocess.check_output(ceph_cmd) + pg_dump_data = json.loads(pg_dump_json) + except Exception as e: + pg_dump_data = None + + # Try using local copy of ceph-pg-dump.json + if pg_dump_data is None: + try: + with open('ceph-pg-dump.json', 'r') as f: + pg_dump_data = json.load(f) + print("Reading ceph-pg-dump.json ...") + vprint("") + vprint("========================================") + vprint("") + except Exception as e2: + print(f"Error loading local PG dump data: {e2}") + return + + found = False + for p in pool_data: + if p['pool_name'] == pool_name or not pool_name: + analyze_pool(args, p, pg_dump_data) + found = True + vprint("") + vprint("========================================") + vprint("") + + if not found: + print(f"Pool '{pool_name}' not found in cluster '{cluster}'.") + + return + +def test_prob_k_or_more_failures(): + # (n, p, k, expected_P_X_ge_k) + test_cases = [ + # Small n, moderate p (exact binomial) + (10, 0.1, 2, 0.263901), + (20, 0.05, 3, 0.0754837), + (50, 0.02, 2, 0.264229), + + # Ceph-like: moderate n, very small p (exact binomial) + (1000, 1e-6, 2, 4.99168e-07), + (1000, 1e-5, 2, 4.96189e-05), + (5000, 2e-7, 2, 4.99567e-07), + (10000, 1e-8, 2, 4.99917e-09), + (10000, 1e-7, 3, 1.66492e-10), + ] + + for n, p, k, expected in test_cases: + got = prob_k_or_more_failures(n, p, k) + if not math.isclose(got, expected, rel_tol=0.00001): + raise AssertionError( + f"Failed for n={n}, p={p}, k={k}: " + f"got {got:.18g}, expected {expected:.18g}" + ) + + print("\nAll prob_k_or_more_failures tests passed.") + +def run_tests(args): + # run some known simulate commands + # add all args + test_args = [ + [ + argparse.Namespace( + verbose=False, + failure_domain="host", + num_domains=10, + osds_per_domain=16, + tb_per_osd=10.0, + object_size=4*1024*1024, + pgs_per_osd=100, + size=3, + k=None, + m=None, + simulations=1000, + seed=42, + fudge=100, + afr=0.02, + recovery_rate=50, + osd_recovery_sleep=0.1, + osd_recovery_max_active=1, + domains_down=0, + skip_monte_carlo=False, + max_mc_runtime=30 + ), + ("197.1 thousand years", "10-nines") + ], + [ + argparse.Namespace( + verbose=False, + failure_domain="host", + num_domains=20, + osds_per_domain=16, + tb_per_osd=10.0, + object_size=4*1024*1024, + pgs_per_osd=100, + k=6, + m=3, + size=None, + simulations=1000, + seed=42, + fudge=100, + afr=0.02, + recovery_rate=50, + osd_recovery_sleep=0.1, + osd_recovery_max_active=1, + domains_down=0, + skip_monte_carlo=False, + max_mc_runtime=30 + ), + ("15.4 thousand years", "10-nines") + ], + [ + argparse.Namespace( + verbose=False, + failure_domain="host", + num_domains=20, + osds_per_domain=16, + tb_per_osd=10.0, + object_size=4*1024*1024, + pgs_per_osd=100, + k=6, + m=3, + size=None, + simulations=1000, + seed=42, + fudge=100, + afr=0.02, + recovery_rate=50, + osd_recovery_sleep=0.1, + osd_recovery_max_active=1, + domains_down=1, + skip_monte_carlo=False, + max_mc_runtime=30 + ), + ("173.4 y", "7-nines") + ], + [ + argparse.Namespace( + verbose=False, + failure_domain="host", + num_domains=500, + osds_per_domain=40, + tb_per_osd=0.0001, + object_size=4*1024*1024, + pgs_per_osd=0.01, + size=3, + k=None, + m=None, + simulations=1000, + seed=42, + fudge=100, + afr=0.02, + recovery_rate=50, + osd_recovery_sleep=0.1, + osd_recovery_max_active=1, + domains_down=0, + skip_monte_carlo=False, + max_mc_runtime=30 + ), + ("80.8 billion years", "20-nines") + ], + [ + argparse.Namespace( + verbose=False, + failure_domain="host", + num_domains=500, + osds_per_domain=40, + tb_per_osd=10, + object_size=4*1024, + pgs_per_osd=500, + size=None, + k=8, + m=3, + simulations=1000, + seed=42, + fudge=100, + afr=0.02, + recovery_rate=50, + osd_recovery_sleep=0.1, + osd_recovery_max_active=1, + domains_down=0, + skip_monte_carlo=True, + max_mc_runtime=30 + ), + ("11.4 thousand years", "20-nines") + ] + ] + for i, test_arg in enumerate(test_args): + print(f"Running test case {i+1}...") + parser = argparse.ArgumentParser(prog='clyso-nines') + test_arg[0].command = 'simulate' + cmdline = reconstruct_command_line(parser, test_arg[0]) + print(f"Command line: {cmdline}") + (mttdl, nines) = simulate(test_arg[0]) + assert test_arg[1][0] == mttdl, f"MTTDL test case {i+1} failed! {mttdl} != {test_arg[1][0]}" + assert test_arg[1][1] == nines, f"Durability test case {i+1} failed! {nines} != {test_arg[1][1]}" + print(f"\nTest case {i+1} completed.") + print("========================================") + + test_prob_k_or_more_failures() + + +# ---------------------------- Main CLI ---------------------------- +def main(): + parser = argparse.ArgumentParser( + prog='clyso-nines', + description='Clyso Nines - Data Loss Risk Estimator for Ceph Clusters' + ) + parser.add_argument('--version', action='version', version='clyso-nines 0.1') + + subparsers = parser.add_subparsers(dest='command') + + sim = subparsers.add_parser('simulate', help='Calculate durability of a randomized pool') + sim.add_argument('--verbose', action='store_true', help='enable verbose output') + sim.add_argument('--debug', action='store_true', help='enable debug output') + sim.add_argument('--failure-domain', type=str, default="host", help="Type of failure domain (host/rack/...) (default: host)") + sim.add_argument('--num-domains', type=int, default=10, help="number of failure domains (default: 10)") + sim.add_argument('--osds-per-domain', type=int, default=16, help="number of OSDs per failure domain (default: 16)") + sim.add_argument('--tb-per-osd', type=float, default=10.0, help="raw data per OSD in TB (default: 10.0)") + sim.add_argument('--object-size', type=int, default=4*1024*1024, help="average object size in bytes (default: 4MB)") + sim.add_argument('--pgs-per-osd', type=float, default=100, help="target PGs per OSD (default: 100)") + sim.add_argument('--size', type=int, default=3, help="simulate a replicated pool with SIZE replicas (default: 3)") + sim.add_argument('--k', type=int, help="simulate an erasure coded pool with K data shards") + sim.add_argument('--m', type=int, help="simulate an erasure coded pool with M parity shards") + sim.add_argument('--simulations', type=int, default=1000, help="number of incidents to simulate (default: 1000)") + sim.add_argument('--seed', type=int, default=int(time.time()), help='random seed (default: current time)') + sim.add_argument('--fudge', type=int, default=100, help='correlation "fudge" factor (default: 100)') + sim.add_argument('--afr', type=float, default=0.02, help='Device AFR (default: 0.02)') + sim.add_argument('--recovery-rate', type=float, default=50, help='Recovery rate per OSD (default: 50MBps)') + sim.add_argument('--osd-recovery-sleep', type=float, default=0.1, help='OSD Recovery Sleep in seconds (default: 0.1s)') + sim.add_argument('--osd-recovery-max-active', type=int, default=1, help='OSD Recovery Max Active (default: 1)') + sim.add_argument('--domains-down', type=int, default=0, help='number of failure domains to take down in simulation (default: 0)') + sim.add_argument('--skip-monte-carlo', action='store_true', help='Skip the Monte Carlo sims and use theoretical estimates instead') + sim.add_argument('--max-mc-runtime', type=int, default=30, help='Maximum simulation runtime in seconds (default: 30s)') + sim.set_defaults(func=simulate) + + an = subparsers.add_parser('analyze', help='Calculate durability of the local pools') + an.add_argument('--verbose', action='store_true', help='enable verbose output') + an.add_argument('--debug', action='store_true', help='enable debug output') + an.add_argument('--cluster', type=str, default='ceph', help='Ceph cluster name (default: ceph)') + an.add_argument('--pool', type=str, help='analyze only the given pool (default: all pools)') + an.add_argument('--simulations', type=int, default=1000, help="number of incidents to simulate (default: 1000)") + an.add_argument('--seed', type=int, default=int(time.time()), help='random seed (default: current time)') + an.add_argument('--fudge', type=int, default=100, help='correlation "fudge" factor (default: 100)') + an.add_argument('--afr', type=float, default=0.02, help='Device AFR (default: 0.02)') + an.add_argument('--recovery-rate', type=int, default=50, help='Per-OSD data recovery rate (default: 50MBps)') + an.add_argument('--osd-recovery-sleep', type=float, default=0.1, help='OSD Recovery Sleep in seconds (default: 0.1s)') + an.add_argument('--osd-recovery-max-active', type=int, default=1, help='OSD Recovery Max Active (default: 1)') + an.add_argument('--domains-down', type=int, default=0, help='number of failure domains to take down in simulation (default: 0)') + an.add_argument('--skip-monte-carlo', action='store_true', help='Skip the Monte Carlo sims and use theoretical estimates instead') + an.add_argument('--max-mc-runtime', type=int, default=30, help='Maximum Monte Carlo simulation runtime in seconds per pool (default: 30s)') + an.set_defaults(func=analyze) + + test = subparsers.add_parser('test', help='run internal tests') + test.add_argument('--verbose', action='store_true', help='enable verbose output') + test.add_argument('--debug', action='store_true', help='enable debug output') + test.set_defaults(func=run_tests) + + args = parser.parse_args() + if not args.command: + parser.print_help() + else: + global VERBOSE + global DEBUG + VERBOSE = args.verbose + DEBUG = args.debug + + # Reconstruct command + cmdline = reconstruct_command_line(parser, args) + # Print header + print(f"Clyso Nines - Data Loss Risk Estimator for Ceph Clusters") + print("---------------------------------------------------------") + print(f"Command line: {cmdline}") + print("---------------------------------------------------------") + vprint() + args.func(args) + vprint() + vprint("---------------------------------------------------------") + vprint(f"Command line: {cmdline}") + vprint("---------------------------------------------------------") + vprint() + +if __name__ == "__main__": + # Hide cursor + print('\033[?25l', end='', flush=True) + + try: + main() + except KeyboardInterrupt: + print("\nInterrupted.", file=sys.stderr) + import traceback + traceback.print_exc() + except Exception: + import traceback + traceback.print_exc() + finally: + # Restore cursor + print('\033[?25h', end='', flush=True)