diff --git a/.gitignore b/.gitignore index 47650c0e..9291a157 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ PyORBIT.egg-info *.so .*.swp .eggs +.ipynb_checkpoints diff --git a/examples/Envelope/bench_danilov.py b/examples/Envelope/bench_danilov.py new file mode 100644 index 00000000..7b760996 --- /dev/null +++ b/examples/Envelope/bench_danilov.py @@ -0,0 +1,160 @@ +"""Benchmark Danilov envelope tracker vs. PIC.""" + +import argparse +import copy +import os +import pathlib +from pprint import pprint + +import numpy as np +import matplotlib.pyplot as plt + +from orbit.core import orbit_mpi +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.envelope import DanilovEnvelope +from orbit.envelope import DanilovEnvelopeMonitor +from orbit.envelope import DanilovEnvelopeTracker +from orbit.envelope import add_danilov_envelope_tracker_nodes +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.space_charge.sc2p5d import SC2p5D_AccNode +from orbit.space_charge.sc2p5d import setSC2p5DAccNodes +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice +from utils import BunchMonitor + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--phase-adv-x", type=float, default=85.0) +parser.add_argument("--phase-adv-y", type=float, default=85.0) +parser.add_argument("--intensity", type=float, default=20.0e14) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--mismatch", type=float, default=0.0) +parser.add_argument("--periods", type=int, default=5) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = DanilovEnvelope( + eps_1=20.00e-06, + eps_2=0.0, + mass=0.938, + kin_energy=1.0, + length=100.0, + line_density=(args.intensity / 100.0), + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv_x), + phase_adv_y=np.radians(args.phase_adv_y), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +tracker = DanilovEnvelopeTracker(lattice, path_length_max=args.max_part_length) +tracker.match_zero_sc(envelope, method="2d") +envelope_init = envelope.copy() + + +# Track envelope +# -------------------------------------------------------------------------------------- + +histories = {} + +envelope, history = tracker.track(envelope_init.copy(), periods=args.periods, history=True) +histories["envelope"] = copy.deepcopy(history) + + +# Track bunch +# -------------------------------------------------------------------------------------- + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv_x), + phase_adv_y=np.radians(args.phase_adv_y), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +monitor = BunchMonitor() +action_container = AccActionsContainer() +action_container.addAction(monitor, AccActionsContainer.ENTRANCE) +action_container.addAction(monitor, AccActionsContainer.EXIT) + +sc_calc = SpaceChargeCalc2p5D(128, 128, 1) +sc_path_length_min = 1.00e-06 +sc_nodes = setSC2p5DAccNodes(lattice, sc_path_length_min, sc_calc) + +bunch = envelope_init.to_bunch(env=False, size=128_000) + +for periods in range(args.periods): + lattice.trackBunch(bunch, actionContainer=action_container) + +history = monitor.get_history() +histories["bunch"] = copy.deepcopy(history) + + +# Plot comparison +# -------------------------------------------------------------------------------------- + +figwidth = 3.0 * args.periods +figwidth = min(figwidth, 7.0) + +fig, axs = plt.subplots(nrows=2, figsize=(figwidth, 4.0), sharex=True, sharey=True) +for i, ax in enumerate(axs): + param = ["xrms", "yrms"][i] + for j, key in enumerate(histories): + history = histories[key] + if key == "envelope": + ax.plot( + history["s"], + np.multiply(history[param], 1000.0), + color="black", + lw=1.5, + ) + else: + stride = 10 + ax.plot( + history["s"][::stride], + np.multiply(history[param][::stride], 1000.0), + marker=".", + lw=0, + color="red", + ) + +for ax in axs: + ax.set_ylim(0.0, ax.get_ylim()[1]) +axs[1].set_xlabel("Distance [m]") +axs[0].set_ylabel("RMS x [mm]") +axs[1].set_ylabel("RMS y [mm]") + +filename = "fig_benchmark_rms.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() diff --git a/examples/Envelope/bench_danilov_particle.py b/examples/Envelope/bench_danilov_particle.py new file mode 100644 index 00000000..92d59061 --- /dev/null +++ b/examples/Envelope/bench_danilov_particle.py @@ -0,0 +1,212 @@ +"""Benchmark Danilov envelope tracker vs. PIC for test particles.""" + +import argparse +import copy +import os +import pathlib +from pprint import pprint + +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as patches +from tqdm import tqdm +from tqdm import trange + +from orbit.core import orbit_mpi +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.envelope import DanilovEnvelope +from orbit.envelope import DanilovEnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.space_charge.sc2p5d import SC2p5D_AccNode +from orbit.space_charge.sc2p5d import setSC2p5DAccNodes +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice +from utils import BunchMonitor +from utils import rms_ellipse_params + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--phase-adv", type=float, default=100.0) +parser.add_argument("--intensity", type=float, default=7.0e14) +parser.add_argument("--eps_x", type=float, default=10.00e-06) +parser.add_argument("--eps_y", type=float, default=10.00e-06) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--mismatch", type=float, default=0.0) +parser.add_argument("--periods", type=int, default=100) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = DanilovEnvelope( + eps_1=(args.eps_x + args.eps_y), + eps_2=0.0, + mass=0.938, + kin_energy=1.0, + length=100.0, + line_density=(args.intensity / 100.0), + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv), + phase_adv_y=np.radians(args.phase_adv), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +tracker = DanilovEnvelopeTracker(lattice, path_length_max=args.max_part_length) +tracker.match_zero_sc(envelope, method="2d") +tracker.match(envelope, method="replace_avg", periods_avg=20, verbose=2) +envelope_init = envelope.copy() + + +# Set test particle coordinates +# -------------------------------------------------------------------------------------- + +test_particles = [ + [0.001, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.005, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.010, 0.0, 0.0, 0.0, 0.0, 0.0], +] +test_particles = np.array(test_particles) + + +# Track envelope with test particles +# -------------------------------------------------------------------------------------- + +particles_tbt = {} +particles_tbt["envelope"] = [] +particles_tbt["bunch"] = [] + +envelope = envelope_init.copy() +particles = test_particles.copy() +for period in range(args.periods): + envelope, particles = tracker.track_particles(envelope, particles=particles) + + cov_matrix = envelope.cov() + cov_matrix *= 1.00e+06 + xrms = np.sqrt(cov_matrix[0, 0]) + yrms = np.sqrt(cov_matrix[2, 2]) + print(f"turn={period} xrms={xrms:0.3f} yrms={yrms:0.3f}") + + particles_tbt["envelope"].append(particles.copy()) + + +# Track bunch with test particles +# -------------------------------------------------------------------------------------- + +def set_particle_macrosizes(bunch: Bunch, macrosizes: list[float]) -> Bunch: + bunch.addPartAttr("macrosize") # sets macrosize=0 for all particles + attribute_array_index = 0 + for index in range(bunch.getSize()): + bunch.partAttrValue("macrosize", index, attribute_array_index, macrosizes[index]) + return bunch + + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv), + phase_adv_y=np.radians(args.phase_adv), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +sc_calc = SpaceChargeCalc2p5D(64, 64, 1) +sc_path_length_min = 1.00e-06 +sc_nodes = setSC2p5DAccNodes(lattice, sc_path_length_min, sc_calc) + +bunch_size = 128_000 +bunch = envelope_init.to_bunch(env=False, size=bunch_size) + +for i in range(test_particles.shape[0]): + bunch.addParticle(*test_particles[i]) + +macrosize = args.intensity / bunch_size +macrosizes = macrosize * np.ones(bunch.getSize()) +macrosizes[bunch_size:] = 0 +set_particle_macrosizes(bunch, macrosizes) + +particles_tbt["bunch"] = [] +for period in range(args.periods): + lattice.trackBunch(bunch) + + bunch_calc = BunchTwissAnalysis() + bunch_calc.analyzeBunch(bunch) + xrms = 1000.0 * np.sqrt(bunch_calc.getCorrelation(0, 0)) + yrms = 1000.0 * np.sqrt(bunch_calc.getCorrelation(2, 2)) + print(f"turn={period} xrms={xrms:0.3f} yrms={yrms:0.3f}") + + particles = [] + for i in range(bunch.getSize() - test_particles.shape[0], bunch.getSize()): + particles.append([bunch.x(i), bunch.xp(i), bunch.y(i), bunch.yp(i), bunch.z(i), bunch.dE(i)]) + particles = np.array(particles) + particles_tbt["bunch"].append(particles.copy()) + + +# Analysis +# -------------------------------------------------------------------------------------- + +# Collect data +for key in ["envelope", "bunch"]: + particles_tbt[key] = np.stack(particles_tbt[key]) + +# Plot x-x' +fig, axs = plt.subplots(ncols=2, figsize=(6.0, 3.0), sharex=True, sharey=True) +for ax, key in zip(axs, ["envelope", "bunch"]): + particles = particles_tbt[key] * 1000.0 + for index in range(particles.shape[1]): + ax.scatter(particles[:, index, 0], particles[:, index, 1], s=3) + +cov_matrix = envelope.cov() +cov_matrix = cov_matrix[0:2, 0:2] +cov_matrix = cov_matrix * 1.00e+06 +cx, cy, angle = rms_ellipse_params(cov_matrix) +for ax in axs: + ax.add_patch( + patches.Ellipse( + xy=(0.0, 0.0), + width=(4.0 * cx), + height=(4.0 * cy), + angle=(-np.degrees(angle)), + color="black", + ec="none", + alpha=0.1, + zorder=0, + ) + ) +for ax in axs: + ax.set_xlabel("x [mm]") + ax.set_ylabel("xp [mrad]") + +filename = "fig_benchmark_particles_xxp.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() + + diff --git a/examples/Envelope/bench_kv.py b/examples/Envelope/bench_kv.py new file mode 100644 index 00000000..f456d78c --- /dev/null +++ b/examples/Envelope/bench_kv.py @@ -0,0 +1,160 @@ +"""Benchmark KV envelope tracker vs. PIC.""" + +import argparse +import copy +import os +import pathlib +from pprint import pprint + +import numpy as np +import matplotlib.pyplot as plt + +from orbit.core import orbit_mpi +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.envelope import KVEnvelope +from orbit.envelope import KVEnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.space_charge.sc2p5d import SC2p5D_AccNode +from orbit.space_charge.sc2p5d import setSC2p5DAccNodes +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice +from utils import BunchMonitor + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--phase-adv-x", type=float, default=85.0) +parser.add_argument("--phase-adv-y", type=float, default=85.0) +parser.add_argument("--intensity", type=float, default=50.0e14) +parser.add_argument("--eps_x", type=float, default=10.00e-06) +parser.add_argument("--eps_y", type=float, default=10.00e-06) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--mismatch", type=float, default=0.0) +parser.add_argument("--periods", type=int, default=5) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = KVEnvelope( + eps_x=args.eps_x, + eps_y=args.eps_y, + mass=mass_proton, + kin_energy=1.000, + line_density=(args.intensity / 100.0), + length=100.0, + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv_x), + phase_adv_y=np.radians(args.phase_adv_y), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +tracker = KVEnvelopeTracker(lattice, path_length_max=args.max_part_length) +tracker.match_zero_sc(envelope) +envelope_init = envelope.copy() + + +# Track envelope +# -------------------------------------------------------------------------------------- + +histories = {} + +envelope = envelope_init.copy() +envelope, history = tracker.track(envelope, history=True, periods=args.periods) +histories["envelope"] = copy.deepcopy(history) + + +# Track bunch +# -------------------------------------------------------------------------------------- + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv_x), + phase_adv_y=np.radians(args.phase_adv_y), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +monitor = BunchMonitor() +action_container = AccActionsContainer() +action_container.addAction(monitor, AccActionsContainer.ENTRANCE) +action_container.addAction(monitor, AccActionsContainer.EXIT) + +sc_calc = SpaceChargeCalc2p5D(128, 128, 1) +sc_path_length_min = 1.00e-06 +sc_nodes = setSC2p5DAccNodes(lattice, sc_path_length_min, sc_calc) + +bunch = envelope_init.to_bunch(env=False, size=128_000) +for periods in range(args.periods): + lattice.trackBunch(bunch, actionContainer=action_container) + +history = monitor.get_history() +histories["bunch"] = copy.deepcopy(history) + + +# Plot comparison +# -------------------------------------------------------------------------------------- + +figwidth = 3.0 * args.periods +figwidth = min(figwidth, 7.0) + +fig, axs = plt.subplots(nrows=2, figsize=(figwidth, 4.0), sharex=True, sharey=True) +for i, ax in enumerate(axs): + param = ["xrms", "yrms"][i] + for j, key in enumerate(histories): + history = histories[key] + if key == "envelope": + ax.plot( + history["s"], + np.multiply(history[param], 1000.0), + color="black", + lw=1.5, + ) + else: + stride = 10 + ax.plot( + history["s"][::stride], + np.multiply(history[param][::stride], 1000.0), + marker=".", + lw=0, + color="red", + ) + +for ax in axs: + ax.set_ylim(0.0, ax.get_ylim()[1]) +axs[1].set_xlabel("Distance [m]") +axs[0].set_ylabel("RMS x [mm]") +axs[1].set_ylabel("RMS y [mm]") + +filename = "fig_benchmark_rms.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() diff --git a/examples/Envelope/bench_kv_particle.py b/examples/Envelope/bench_kv_particle.py new file mode 100644 index 00000000..8ff39a61 --- /dev/null +++ b/examples/Envelope/bench_kv_particle.py @@ -0,0 +1,211 @@ +"""Benchmark KV envelope tracker vs. PIC for test particles.""" + +import argparse +import copy +import os +import pathlib +from pprint import pprint + +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as patches +from tqdm import tqdm +from tqdm import trange + +from orbit.core import orbit_mpi +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.envelope import KVEnvelope +from orbit.envelope import KVEnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.space_charge.sc2p5d import SC2p5D_AccNode +from orbit.space_charge.sc2p5d import setSC2p5DAccNodes +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice +from utils import BunchMonitor +from utils import rms_ellipse_params + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--phase-adv", type=float, default=100.0) +parser.add_argument("--intensity", type=float, default=7.0e14) +parser.add_argument("--eps_x", type=float, default=10.00e-06) +parser.add_argument("--eps_y", type=float, default=10.00e-06) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--mismatch", type=float, default=0.0) +parser.add_argument("--periods", type=int, default=100) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = KVEnvelope( + eps_x=args.eps_x, + eps_y=args.eps_y, + mass=mass_proton, + kin_energy=1.000, + line_density=(args.intensity / 100.0), + length=100.0, + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv), + phase_adv_y=np.radians(args.phase_adv), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +tracker = KVEnvelopeTracker(lattice, path_length_max=args.max_part_length) +tracker.match_zero_sc(envelope) +tracker.match(envelope, verbose=2) +envelope_init = envelope.copy() + + +# Set test particle coordinates +# -------------------------------------------------------------------------------------- + +test_particles = [ + [0.001, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.005, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.010, 0.0, 0.0, 0.0, 0.0, 0.0], +] +test_particles = np.array(test_particles) + + +# Track envelope with test particles +# -------------------------------------------------------------------------------------- + +particles_tbt = {} +particles_tbt["envelope"] = [] +particles_tbt["bunch"] = [] + +envelope = envelope_init.copy() +particles = test_particles.copy() +for period in range(args.periods): + envelope, particles = tracker.track_particles(envelope, particles=particles) + + cov_matrix = envelope.cov() + cov_matrix *= 1.00e+06 + xrms = np.sqrt(cov_matrix[0, 0]) + yrms = np.sqrt(cov_matrix[2, 2]) + print(f"turn={period} xrms={xrms:0.3f} yrms={yrms:0.3f}") + + particles_tbt["envelope"].append(particles.copy()) + + +# Track bunch with test particles +# -------------------------------------------------------------------------------------- + +def set_particle_macrosizes(bunch: Bunch, macrosizes: list[float]) -> Bunch: + bunch.addPartAttr("macrosize") # sets macrosize=0 for all particles + attribute_array_index = 0 + for index in range(bunch.getSize()): + bunch.partAttrValue("macrosize", index, attribute_array_index, macrosizes[index]) + return bunch + + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv), + phase_adv_y=np.radians(args.phase_adv), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +sc_calc = SpaceChargeCalc2p5D(64, 64, 1) +sc_path_length_min = 1.00e-06 +sc_nodes = setSC2p5DAccNodes(lattice, sc_path_length_min, sc_calc) + +bunch_size = 128_000 +bunch = envelope_init.to_bunch(env=False, size=bunch_size) +for i in range(test_particles.shape[0]): + bunch.addParticle(*test_particles[i]) + +macrosize = args.intensity / bunch_size +macrosizes = macrosize * np.ones(bunch.getSize()) +macrosizes[bunch_size:] = 0 +set_particle_macrosizes(bunch, macrosizes) + +particles_tbt["bunch"] = [] +for period in range(args.periods): + lattice.trackBunch(bunch) + + bunch_calc = BunchTwissAnalysis() + bunch_calc.analyzeBunch(bunch) + xrms = 1000.0 * np.sqrt(bunch_calc.getCorrelation(0, 0)) + yrms = 1000.0 * np.sqrt(bunch_calc.getCorrelation(2, 2)) + print(f"turn={period} xrms={xrms:0.3f} yrms={yrms:0.3f}") + + particles = [] + for i in range(bunch.getSize() - test_particles.shape[0], bunch.getSize()): + particles.append([bunch.x(i), bunch.xp(i), bunch.y(i), bunch.yp(i), bunch.z(i), bunch.dE(i)]) + particles = np.array(particles) + particles_tbt["bunch"].append(particles.copy()) + + +# Analysis +# -------------------------------------------------------------------------------------- + +# Collect data +for key in ["envelope", "bunch"]: + particles_tbt[key] = np.stack(particles_tbt[key]) + +# Plot x-x' +fig, axs = plt.subplots(ncols=2, figsize=(6.0, 3.0), sharex=True, sharey=True) +for ax, key in zip(axs, ["envelope", "bunch"]): + particles = particles_tbt[key] * 1000.0 + for index in range(particles.shape[1]): + ax.scatter(particles[:, index, 0], particles[:, index, 1], s=3) + +cov_matrix = envelope.cov() +cov_matrix = cov_matrix[0:2, 0:2] +cov_matrix = cov_matrix * 1.00e+06 +cx, cy, angle = rms_ellipse_params(cov_matrix) +for ax in axs: + ax.add_patch( + patches.Ellipse( + xy=(0.0, 0.0), + width=(4.0 * cx), + height=(4.0 * cy), + angle=(-np.degrees(angle)), + color="black", + ec="none", + alpha=0.1, + zorder=0, + ) + ) +for ax in axs: + ax.set_xlabel("x [mm]") + ax.set_ylabel("xp [mrad]") + +filename = "fig_benchmark_particles_xxp.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() + + diff --git a/examples/Envelope/match_danilov.py b/examples/Envelope/match_danilov.py new file mode 100644 index 00000000..26119e60 --- /dev/null +++ b/examples/Envelope/match_danilov.py @@ -0,0 +1,112 @@ +import argparse +import copy +import os +import pathlib + +import numpy as np +import matplotlib.pyplot as plt + +from orbit.envelope import DanilovEnvelope +from orbit.envelope import DanilovEnvelopeMonitor +from orbit.envelope import DanilovEnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--phase-adv-x", type=float, default=85.0) +parser.add_argument("--phase-adv-y", type=float, default=85.0) +parser.add_argument("--intensity", type=float, default=50.0e14) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--periods", type=int, default=5) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = DanilovEnvelope( + eps_1=20.00e-06, + eps_2=0.0, + mass=0.938, + kin_energy=1.0, + length=100.0, + line_density=(args.intensity / 100.0), + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(args.phase_adv_x), + phase_adv_y=np.radians(args.phase_adv_y), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +tracker = DanilovEnvelopeTracker(lattice, path_length_max=args.max_part_length) + + +# Find periodic envelope +# -------------------------------------------------------------------------------------- + +envelopes = {} + +tracker.match_zero_sc(envelope, method="2d") +envelopes["mismatched"] = envelope.copy() + +tracker.match(envelope, periods=args.periods, method="replace_avg", verbose=2) +# tracker.match( +# envelope, +# periods=args.periods, +# method="least_squares", +# xtol=1e-15, +# ftol=1e-15, +# gtol=1e-15, +# verbose=2 +# ) +envelopes["matched"] = envelope.copy() + + +# Plot results bunch +# -------------------------------------------------------------------------------------- + +_, history = tracker.track(envelopes["matched"], periods=args.periods, history=True) +_, history_unmatched = tracker.track(envelopes["mismatched"], periods=args.periods, history=True) + + +figwidth = 3.0 * args.periods +figwidth = min(figwidth, 10.0) + +fig, ax = plt.subplots(figsize=(figwidth, 2.5), constrained_layout=True) +ax.plot(history["s"], history["xrms"] * 1000.0, color="blue", alpha=1.0) +ax.plot(history["s"], history["yrms"] * 1000.0, color="red", alpha=1.0) +ax.plot(history_unmatched["s"], history_unmatched["xrms"] * 1000.0, color="blue", alpha=0.2) +ax.plot(history_unmatched["s"], history_unmatched["yrms"] * 1000.0, color="red", alpha=0.2) +ax.set_ylim(0.0, ax.get_ylim()[1]) +ax.set_xlabel("Distance [m]") +ax.set_ylabel("Size [mm]") + +filename = "fig_match_rms.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() \ No newline at end of file diff --git a/examples/Envelope/match_kv.py b/examples/Envelope/match_kv.py new file mode 100644 index 00000000..77bb191a --- /dev/null +++ b/examples/Envelope/match_kv.py @@ -0,0 +1,103 @@ +import argparse +import copy +import os +import pathlib + +import numpy as np +import matplotlib.pyplot as plt + +from orbit.envelope import KVEnvelope +from orbit.envelope import KVEnvelopeMonitor +from orbit.envelope import KVEnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.lattice import AccActionsContainer +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from utils import make_fodo_lattice + +plt.style.use("style.mplstyle") + + +# Parse arguments +# -------------------------------------------------------------------------------------- + +parser = argparse.ArgumentParser() +parser.add_argument("--intensity", type=float, default=100.0e14) +parser.add_argument("--eps_x", type=float, default=10.00e-06) +parser.add_argument("--eps_y", type=float, default=10.00e-06) +parser.add_argument("--max-part-length", type=float, default=0.1) +parser.add_argument("--periods", type=int, default=5) +args = parser.parse_args() + + +# Setup +# -------------------------------------------------------------------------------------- + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Set up simulation +# -------------------------------------------------------------------------------------- + +envelope = KVEnvelope( + eps_x=args.eps_x, + eps_y=args.eps_y, + mass=mass_proton, + kin_energy=1.000, + length=100.0, + line_density=(args.intensity / 100.0), + params=None, +) + +lattice = make_fodo_lattice( + phase_adv_x=np.radians(85.0), + phase_adv_y=np.radians(85.0), + length=5.0, + mass=envelope.mass, + kin_energy=envelope.kin_energy, + max_part_length=args.max_part_length, + verbose=1, +) + +matrix_lattice = TEAPOT_MATRIX_Lattice(lattice, envelope.to_bunch()) +lattice_params = matrix_lattice.getRingParametersDict() + +tracker = KVEnvelopeTracker(lattice, path_length_max=args.max_part_length) + + +# Find periodic envelope +# -------------------------------------------------------------------------------------- + +tracker.match_zero_sc(envelope) + +envelope_unmatched = envelope.copy() + +tracker.match(envelope, periods=args.periods, verbose=2) + + +# Plot results bunch +# -------------------------------------------------------------------------------------- + +_, history = tracker.track(envelope, periods=args.periods, history=True) +_, history_unmatched = tracker.track(envelope_unmatched, periods=args.periods, history=True) + +figwidth = 3.0 * args.periods +figwidth = min(figwidth, 10.0) + +fig, ax = plt.subplots(figsize=(figwidth, 2.5), constrained_layout=True) +ax.plot(history["s"], history["xrms"] * 1000.0, color="blue", alpha=1.0) +ax.plot(history["s"], history["yrms"] * 1000.0, color="red", alpha=1.0) +ax.plot(history_unmatched["s"], history_unmatched["xrms"] * 1000.0, color="blue", alpha=0.2) +ax.plot(history_unmatched["s"], history_unmatched["yrms"] * 1000.0, color="red", alpha=0.2) +ax.set_ylim(0.0, ax.get_ylim()[1]) +ax.set_xlabel("Distance [m]") +ax.set_ylabel("Size [mm]") + +filename = "fig_match_rms.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() diff --git a/examples/Envelope/plot_danilov.py b/examples/Envelope/plot_danilov.py new file mode 100644 index 00000000..76a0d985 --- /dev/null +++ b/examples/Envelope/plot_danilov.py @@ -0,0 +1,46 @@ +import os +import pathlib +import numpy as np +import matplotlib.pyplot as plt + +from orbit.envelope import DanilovEnvelope + +plt.style.use("style.mplstyle") + + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +envelope = DanilovEnvelope( + eps_1=20.00e-06, + eps_2=0.0, + mass=0.938, + kin_energy=1.0, + length=100.0, + line_density=0.0, + params=None, +) + +x = envelope.sample(100_000) +xmax = np.std(x, axis=0) * 3.0 +limits = list(zip(-xmax, xmax)) + +fig, axs = plt.subplots(ncols=4, nrows=4, figsize=(4, 4), constrained_layout=True) +for i in range(4): + for j in range(4): + ax = axs[i, j] + ax.hist2d(x[:, j], x[:, i], bins=50, range=[limits[j], limits[i]], cmap="Blues") +for ax in axs.flat: + ax.set_xticks([]) + ax.set_yticks([]) +for i, dim in enumerate(["x", "xp", "y", "yp"]): + axs[i, 0].set_ylabel(dim) + axs[3, i].set_xlabel(dim) +fig.align_ylabels() + +filename = "fig_corner.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() diff --git a/examples/Envelope/plot_kv.py b/examples/Envelope/plot_kv.py new file mode 100644 index 00000000..e11e5a22 --- /dev/null +++ b/examples/Envelope/plot_kv.py @@ -0,0 +1,45 @@ +import os +import pathlib +import numpy as np +import matplotlib.pyplot as plt + +from orbit.envelope import KVEnvelope + +plt.style.use("style.mplstyle") + + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +envelope = KVEnvelope( + eps_x=10.00e-06, + eps_y=10.00e-06, + mass=0.938, + kin_energy=1.0, + line_density=0.0, + length=100.0, +) + +x = envelope.sample(100_000) +xmax = np.std(x, axis=0) * 3.0 +limits = list(zip(-xmax, xmax)) + +fig, axs = plt.subplots(ncols=4, nrows=4, figsize=(4, 4), constrained_layout=True) +for i in range(4): + for j in range(4): + ax = axs[i, j] + ax.hist2d(x[:, j], x[:, i], bins=50, range=[limits[j], limits[i]], cmap="Blues") +for ax in axs.flat: + ax.set_xticks([]) + ax.set_yticks([]) +for i, dim in enumerate(["x", "xp", "y", "yp"]): + axs[i, 0].set_ylabel(dim) + axs[3, i].set_xlabel(dim) +fig.align_ylabels() + +filename = "fig_corner.png" +filename = os.path.join(output_dir, filename) +plt.savefig(filename, dpi=300) +plt.show() diff --git a/examples/Envelope/style.mplstyle b/examples/Envelope/style.mplstyle new file mode 100644 index 00000000..0e7c57d1 --- /dev/null +++ b/examples/Envelope/style.mplstyle @@ -0,0 +1,4 @@ +axes.linewidth: 1.25 +figure.constrained_layout.use: True +xtick.minor.visible: True +ytick.minor.visible: True \ No newline at end of file diff --git a/examples/Envelope/utils.py b/examples/Envelope/utils.py new file mode 100644 index 00000000..3fd2e0f5 --- /dev/null +++ b/examples/Envelope/utils.py @@ -0,0 +1,284 @@ +import copy +import numpy as np +import scipy.optimize + +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.bunch_generators import TwissContainer +from orbit.bunch_generators import GaussDist2D +from orbit.bunch_generators import KVDist2D +from orbit.bunch_generators import WaterBagDist2D +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.teapot import TEAPOT_Lattice +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.teapot import DriftTEAPOT +from orbit.teapot import QuadTEAPOT + + +def split_node(node: AccNode, max_part_length: float = None) -> AccNode: + if max_part_length: + if node.getLength() > max_part_length: + node.setnParts(1 + int(node.getLength() / max_part_length)) + return node + + +def get_phase_adv(lattice: AccLattice, mass: float, kin_energy: float) -> np.ndarray: + bunch = Bunch() + bunch.mass(mass) + bunch.getSyncParticle().kinEnergy(kin_energy) + matrix_lattice = TEAPOT_MATRIX_Lattice(lattice, bunch) + lattice_params = matrix_lattice.getRingParametersDict() + phase_adv = [ + lattice_params["fractional tune x"], + lattice_params["fractional tune y"], + ] + phase_adv = np.array(phase_adv) + phase_adv = phase_adv * 2.0 * np.pi + return phase_adv + + +def make_fodo_lattice( + phase_adv_x: float, + phase_adv_y: float, + length: float, + mass: float, + kin_energy: float, + fill_frac: float = 0.5, + start: str = "drift", + fringe: bool = False, + max_part_length: float = 0.1, + verbose: bool = False, +) -> AccLattice: + """Create FODO lattice with specified phase advances. + + Args: + phase_adv_x: Phase advance in x plane [rad]. + phase_adv_y: Phase advance in y plane [rad]. + length: Length of the lattice [m]. + mass: Particle mass [GeV/c^2] + kin_energy: Synchronous particle kinetic energy [GeV]. + fill_frac : Fraction of the lattice occupied by quadrupoles. + start : str + "drift": + O-F-F-O-O-D-D-O. + "quad": + F-O-O-D-D-O-O-F + fringe: Toggles fringe fields before/after quads. + max_part_length: Maximum node part length [m]. + verbose: Print optimization status. + + Returns: + TEAPOT_Lattice + """ + + def _make_lattice(k1: float, k2: float) -> AccLattice: + """Create FODO lattice with specified focusing strengths. + + k1 and k2 are the focusing strengths of the + focusing (1st) and defocusing (2nd) quads, respectively. + """ + length_quad = length * fill_frac / 2.0 + length_drift = length * (1.0 - fill_frac) / 2.0 + + if start == "quad": + drift_nodes = [ + DriftTEAPOT("drift1"), + DriftTEAPOT("drift2"), + ] + quad_nodes = [ + QuadTEAPOT("qf1"), + QuadTEAPOT("qd"), + QuadTEAPOT("qf2"), + ] + + drift_nodes[0].setLength(length_drift) + drift_nodes[1].setLength(length_drift) + + quad_nodes[0].setLength(length_quad * 0.5) + quad_nodes[1].setLength(length_quad) + quad_nodes[2].setLength(length_quad * 0.5) + + quad_nodes[0].setParam("kq", +k1) + quad_nodes[1].setParam("kq", -k2) + quad_nodes[2].setParam("kq", +k1) + + lattice = TEAPOT_Lattice() + lattice.addNode(quad_nodes[0]) + lattice.addNode(drift_nodes[0]) + lattice.addNode(quad_nodes[1]) + lattice.addNode(drift_nodes[1]) + lattice.addNode(quad_nodes[2]) + lattice.initialize() + + elif start == "drift": + + drift_nodes = [ + DriftTEAPOT("drift1"), + DriftTEAPOT("drift2"), + DriftTEAPOT("drift3"), + ] + quad_nodes = [ + QuadTEAPOT("qf"), + QuadTEAPOT("qd"), + ] + + drift_nodes[0].setLength(length_drift * 0.5) + drift_nodes[1].setLength(length_drift) + drift_nodes[2].setLength(length_drift * 0.5) + + quad_nodes[0].setLength(length_quad) + quad_nodes[1].setLength(length_quad) + + quad_nodes[0].setParam("kq", +k1) + quad_nodes[1].setParam("kq", -k2) + + lattice = TEAPOT_Lattice() + lattice.addNode(drift_nodes[0]) + lattice.addNode(quad_nodes[0]) + lattice.addNode(drift_nodes[1]) + lattice.addNode(quad_nodes[1]) + lattice.addNode(drift_nodes[2]) + lattice.initialize() + + # Toggle fringe fields + for node in lattice.getNodes(): + node.setUsageFringeFieldIN(fringe) + node.setUsageFringeFieldOUT(fringe) + + lattice.initialize() + return lattice + + def loss_function(parameters: np.ndarray) -> float: + lattice = _make_lattice(*parameters) + phase_adv_calc = get_phase_adv(lattice, mass=mass, kin_energy=kin_energy) + phase_adv_targ = np.array([phase_adv_x, phase_adv_y]) + return np.abs(phase_adv_calc - phase_adv_targ) + + parameters = np.array([0.5, 0.5]) # ~ 80 deg phase advance + result = scipy.optimize.least_squares(loss_function, parameters, verbose=verbose) + lattice = _make_lattice(*result.x) + + for node in lattice.getNodes(): + node = split_node(node, max_part_length) + + if verbose: + phase_adv_calc = get_phase_adv(lattice, mass=mass, kin_energy=kin_energy) + phase_adv_targ = np.array([phase_adv_x, phase_adv_y]) + phase_adv_calc *= 180.0 / np.pi + phase_adv_targ *= 180.0 / np.pi + print(f"phase_adv_x = {phase_adv_calc[0]} (target={phase_adv_targ[0]})") + print(f"phase_adv_y = {phase_adv_calc[1]} (target={phase_adv_targ[1]})") + + return lattice + + +def get_bunch_coords(bunch: Bunch, axis: tuple[int, ...] = None) -> np.ndarray: + if axis is None: + axis = tuple(range(6)) + + X = np.zeros((bunch.getSize(), 6)) + for i in range(bunch.getSize()): + X[i, 0] = bunch.x(i) + X[i, 1] = bunch.xp(i) + X[i, 2] = bunch.y(i) + X[i, 3] = bunch.yp(i) + X[i, 4] = bunch.z(i) + X[i, 5] = bunch.dE(i) + return X[:, axis] + + +def get_bunch_cov(bunch: Bunch) -> np.ndarray: + calc = BunchTwissAnalysis() + calc.computeBunchMoments(bunch, 2, 0, 0) + + cov_matrix = np.zeros((6, 6)) + for i in range(6): + for j in range(i + 1): + cov_matrix[i, j] = calc.getCorrelation(j, i) + cov_matrix[j, i] = cov_matrix[i, j] + return cov_matrix + + +class BunchMonitor: + def __init__(self, verbose: int = 1) -> None: + self.distance = 0.0 + self._pos_old = 0.0 + self._pos_new = 0.0 + self.verbose = verbose + + self.history = {} + for key in [ + "s", + "xrms", + "yrms", + "epsx", + "epsy", + "cov_00", + "cov_01", + "cov_02", + "cov_03", + "cov_11", + "cov_12", + "cov_13", + "cov_22", + "cov_23", + "cov_33", + "rxy", + ]: + self.history[key] = [] + + def get_history(self) -> None: + history = copy.deepcopy(self.history) + for key in history: + history[key] = np.array(history[key]) + history["s"] -= history["s"][0] + return history + + def __call__(self, params_dict: dict) -> None: + bunch = params_dict["bunch"] + node = params_dict["node"] + + self._pos_new = params_dict["path_length"] + if self._pos_old > self._pos_new: + self._pos_old = 0.0 + self.distance += self._pos_new - self._pos_old + self._pos_old = self._pos_new + + cov_matrix = get_bunch_cov(bunch) + + for i in range(4): + for j in range(i, 4): + key = f"cov_{i}{j}" + self.history[key].append(cov_matrix[i, j]) + + self.history["s"].append(self.distance) + self.history["xrms"].append(np.sqrt(cov_matrix[0, 0])) + self.history["yrms"].append(np.sqrt(cov_matrix[2, 2])) + self.history["epsx"].append(np.sqrt(np.linalg.det(cov_matrix[0:2, 0:2]))) + self.history["epsy"].append(np.sqrt(np.linalg.det(cov_matrix[2:4, 2:4]))) + self.history["rxy"].append( + self.history["cov_02"][-1] + / np.sqrt(self.history["cov_00"][-1] * self.history["cov_22"][-1]) + ) + + if self.verbose: + message = "" + message += "s={:0.3f} ".format(self.history["s"][-1]) + message += "xrms={:0.3f} ".format(self.history["xrms"][-1] * 1000.0) + message += "yrms={:0.3f} ".format(self.history["yrms"][-1] * 1000.0) + print(message) + + +def rms_ellipse_params(cov_matrix: np.ndarray) -> tuple[float, float, float]: + sii = cov_matrix[0, 0] + sjj = cov_matrix[1, 1] + sij = cov_matrix[0, 1] + angle = -0.5 * np.arctan2(2 * sij, sii - sjj) + _sin = np.sin(angle) + _cos = np.cos(angle) + _sin2 = _sin**2 + _cos2 = _cos**2 + c1 = np.sqrt(abs(sii * _cos2 + sjj * _sin2 - 2 * sij * _sin * _cos)) + c2 = np.sqrt(abs(sii * _sin2 + sjj * _cos2 + 2 * sij * _sin * _cos)) + return (c1, c2, angle) diff --git a/py/orbit/envelope/__init__.py b/py/orbit/envelope/__init__.py new file mode 100644 index 00000000..553906b2 --- /dev/null +++ b/py/orbit/envelope/__init__.py @@ -0,0 +1,13 @@ +"""Envelope tracking module.""" + +from .nodes import EnvelopeTrackerNode +from .nodes import KVEnvelopeTrackerNode +from .nodes import DanilovEnvelopeTrackerNode +from .lattice_modifications import add_kv_envelope_tracker_nodes +from .lattice_modifications import add_danilov_envelope_tracker_nodes +from .kv import KVEnvelope +from .kv import KVEnvelopeMonitor +from .kv import KVEnvelopeTracker +from .danilov import DanilovEnvelope +from .danilov import DanilovEnvelopeMonitor +from .danilov import DanilovEnvelopeTracker diff --git a/py/orbit/envelope/danilov.py b/py/orbit/envelope/danilov.py new file mode 100644 index 00000000..aa494d83 --- /dev/null +++ b/py/orbit/envelope/danilov.py @@ -0,0 +1,792 @@ +"""Envelope model for Danilov distribution.""" +import copy +import time +from typing import Callable +from typing import Iterable + +# from typing import Self + +import numpy as np +import scipy.optimize +from tqdm import tqdm + +from orbit.core.bunch import Bunch + +from ..lattice import AccActionsContainer +from ..lattice import AccLattice +from ..lattice import AccNode +from ..teapot import TEAPOT_Lattice +from ..teapot import TEAPOT_MATRIX_Lattice +from ..utils import consts + +from .nodes import DanilovEnvelopeTrackerNode +from .lattice_modifications import add_danilov_envelope_tracker_nodes +from .transfer_matrix import build_unnorm_matrix_from_params_cs +from .transfer_matrix import build_unnorm_matrix_from_params_cs_2d +from .transfer_matrix import build_unnorm_matrix_from_params_lb_one_mode +from .transfer_matrix import build_norm_matrix_from_params_cs_2d +from .transfer_matrix import build_norm_matrix_from_tmat +from .transfer_matrix import is_tmat_coupled +from .transfer_matrix import calc_params_from_tmat_cs +from .utils import bunch_to_numpy +from .utils import get_perveance +from .utils import get_transfer_matrix +from .utils import build_rotation_matrix_xy + + +def env_matrix_to_vector(env_matrix: np.ndarray) -> np.ndarray: + return env_matrix.ravel() + + +def env_vector_to_matrix(env_vector: np.ndarray) -> np.ndarray: + return env_vector.reshape(4, 2) + + +def env_params_from_bunch(bunch: Bunch) -> np.ndarray: + a = bunch.x(0) + b = bunch.x(1) + e = bunch.y(0) + f = bunch.y(1) + ap = bunch.xp(0) + bp = bunch.xp(1) + ep = bunch.yp(0) + fp = bunch.yp(1) + params = [a, b, ap, bp, e, f, ep, fp] + params = np.array(params) + return params + + +class DanilovEnvelope: + """Represents Danilov distribution. + + The Danilov distribution is the limit of the KV distribution as the emittance goes to + zero in one of the planes. + """ + def __init__( + self, + eps_1: float, + eps_2: float, + mass: float, + kin_energy: float, + line_density: float, + length: float, + params: np.ndarray = None, + ) -> None: + """Constructor. + + Args: + eps_1: rms emittance in mode 1 [m * rad]. (Either eps_1 or eps_2 must be zero.) + eps_2: rms emittance in mode 2 [m * rad]. + mass : Particle mass [GeV/c^2] + kin_energy: Synchronous particle kinetic energy [GeV]. + line_density: Bunch longitudinal density [1 / m]. + length: Bunch length [m] (used to convert to Bunch object). + params: Envelope parameters [a, b, a', b', e, f, e', f']. + The coordinates of a particle on the beam envelope are parameterized as + x = a*cos(psi) + b*sin(psi), x' = a'*cos(psi) + b'*sin(psi), + y = e*cos(psi) + f*sin(psi), y' = e'*cos(psi) + f'*sin(psi), + where 0 <= psi <= 2pi. + """ + self.mass = mass + self.kin_energy = kin_energy + self.line_density = line_density + self.length = length + self.intensity = self.line_density * self.length + self.perveance = 0.0 + self.set_line_density(line_density) + + self.eps_1 = eps_1 + self.eps_2 = eps_2 + self.intrinsic_emittance = eps_1 + self.set_emittances(eps_1, eps_2) + + self.params = params + if self.params is None: + eps_x = 0.5 * self.intrinsic_emittance + eps_y = 0.5 * self.intrinsic_emittance + rx = 2.0 * np.sqrt(eps_x) + ry = 2.0 * np.sqrt(eps_y) + if self.mode == 1: + self.params = np.array([rx, 0, 0, rx, 0, -ry, +ry, 0]) + else: + self.params = np.array([rx, 0, 0, rx, 0, +ry, -ry, 0]) + + self.set_params(self.params) + + def copy(self): + return copy.deepcopy(self) + + def set_emittances(self, eps_1: float, eps_2: float) -> None: + self.eps_1 = eps_1 + self.eps_2 = eps_2 + + if self.eps_2 == 0: + self.mode = 1 + self.intrinsic_emittance = self.eps_1 + elif self.eps_1 == 0: + self.mode = 2 + self.intrinsic_emittance = self.eps_2 + else: + raise ValueError("eps_1 or eps_2 must be zero") + + def set_line_density(self, line_density: float) -> None: + self.line_density = line_density + self.intensity = self.line_density * self.length + self.perveance = get_perveance(self.mass, self.kin_energy, self.line_density) + + def set_length(self, length: float) -> None: + self.length = length + self.intensity = self.line_density * self.length + + def set_params(self, params: np.ndarray) -> None: + self.params = np.copy(params) + + # Recompute intrinsic emittances + self.intrinsic_emittance = sum(self.projected_emittances()) + if self.mode == 1: + self.set_emittances(self.intrinsic_emittance, 0.0) + else: + self.set_emittances(0.0, self.intrinsic_emittance) + + def param_matrix(self) -> np.ndarray: + """Build envelope matrix. + + The envelope matrix P defined by x = Wc, where x = [x, x', y, y']^T and + c = [cos(psi), sin(psi)], with 0 <= psi <= 2pi. This is useful because + any transformation to the phase space coordinate vector x is also done to + W. For example, if x -> Mx, then W -> MW. + """ + return env_vector_to_matrix(self.params) + + def param_vector(self, axis: int) -> np.ndarray: + """Return vector of envelope parameters [a, b, a', b', e, f, e', f'].""" + if axis is None: + return self.params + param_matrix = self.param_matrix() + return param_matrix[axis, :] + + def transform(self, matrix: np.ndarray) -> None: + """Linearly transform phase space coordinates.""" + param_matrix = self.param_matrix() + param_matrix = np.matmul(matrix, param_matrix) + self.set_params(np.ravel(param_matrix)) + + def rotate(self, angle: float) -> None: + """Apply clockwise rotation by `angle`` radians in x-y plane.""" + self.transform(build_rotation_matrix_xy(angle)) + + def projected_tilt_angle(self, axis: tuple[int, int] = (0, 2)) -> float: + """Return angle of projected ellipse.""" + a, b = self.param_vector(axis[0]) + e, f = self.param_vector(axis[1]) + return 0.5 * np.arctan2(2 * (a * e + b * f), a**2 + b**2 - e**2 - f**2) + + def projected_radii(self, axis: tuple[int, int] = (0, 2)) -> tuple[float, float]: + """Return semi-major and semi-minor axes of projected ellipse.""" + a, b = self.param_vector(axis[0]) + e, f = self.param_vector(axis[1]) + phi = self.projected_tilt_angle(axis) + _sin = np.sin(phi) + _cos = np.cos(phi) + _sin2 = _sin**2 + _cos2 = _cos**2 + xx = a**2 + b**2 + yy = e**2 + f**2 + xy = a * e + b * f + cx = np.sqrt(abs(xx * _cos2 + yy * _sin2 - 2 * xy * _sin * _cos)) + cy = np.sqrt(abs(xx * _sin2 + yy * _cos2 + 2 * xy * _sin * _cos)) + return (cx, cy) + + def projected_area(self, axis: tuple[int, int] = (0, 2)) -> float: + """Return area of projected ellipse.""" + a, b = self.param_vector(axis[0]) + e, f = self.param_vector(axis[1]) + return np.pi * abs(a * f - b * e) + + def cov(self) -> np.ndarray: + """Return covariance matrix.""" + param_matrix = self.param_matrix() + return 0.25 * np.matmul(param_matrix, param_matrix.T) + + def corr(self) -> np.ndarray: + """Return correlation matrix.""" + S = self.cov() + D = np.sqrt(np.diag(S.diagonal())) + D_inv = np.linalg.inv(D) + return np.linalg.multi_dot([D_inv, S, D_inv]) + + def projected_emittances(self) -> tuple[float, float]: + """Return rms apparent emittances eps_x, eps_y [m * rad].""" + cov_matrix = self.cov() + eps_x = np.sqrt(np.clip(np.linalg.det(cov_matrix[0:2, 0:2]), 0.0, None)) + eps_y = np.sqrt(np.clip(np.linalg.det(cov_matrix[2:4, 2:4]), 0.0, None)) + return (eps_x, eps_y) + + def intrinsic_emittances(self) -> tuple[float, float]: + """Return rms intrinsic emittances eps1, eps2 [m * rad].""" + return (self.eps_1, self.eps_2) + + def phases_xy(self) -> tuple[float, float]: + envelope = self.copy() + envelope.normalize(method="2d", scale=True) + a, b, ap, bp, e, f, ep, fp = envelope.params + phase_x = -np.arctan2(ap, a) + phase_y = -np.arctan2(ep, e) + if phase_x < 0.0: + phase_x += 2.0 * np.pi + if phase_y < 0.0: + phase_y += 2.0 * np.pi + return (phase_x, phase_y) + + def twiss_2d(self) -> dict[str, float]: + """Return twiss parameters in x-x', y-y' planes.""" + cov_matrix = self.cov() + (eps_x, eps_y) = self.projected_emittances() + + beta_x = beta_y = alpha_x = alpha_y = None + if eps_x > 0.0: + beta_x = cov_matrix[0, 0] / eps_x + alpha_x = -cov_matrix[0, 1] / eps_x + if eps_y > 0.0: + beta_y = cov_matrix[2, 2] / eps_y + alpha_y = -cov_matrix[2, 3] / eps_y + + return { + "alpha_x": alpha_x, + "beta_x": beta_x, + "alpha_y": alpha_y, + "beta_y": beta_y, + } + + def twiss_4d(self) -> dict[str, float]: + """Return Lebedev-Bogacz parameters for occupied mode.""" + cov_matrix = self.cov() + beta_lx = cov_matrix[0, 0] / self.intrinsic_emittance + beta_ly = cov_matrix[2, 2] / self.intrinsic_emittance + alpha_lx = -cov_matrix[0, 1] / self.intrinsic_emittance + alpha_ly = -cov_matrix[2, 3] / self.intrinsic_emittance + + u = 0.0 + (eps_x, eps_y) = self.projected_emittances() + if self.mode == 1: + u = eps_y / self.intrinsic_emittance + else: + u = eps_x / self.intrinsic_emittance + + (phase_x, phase_y) = self.phases_xy() + nu = abs(phase_y - phase_x) + if nu > np.pi: + nu = 2.0 * np.pi - nu + + return { + "alpha_lx": alpha_lx, + "beta_lx": beta_lx, + "alpha_ly": alpha_ly, + "beta_ly": beta_ly, + "u": u, + "nu": nu, + } + + def twiss_4d_vector(self) -> np.ndarray: + twiss_params = self.twiss_4d() + twiss_params = [ + twiss_params["alpha_lx"], + twiss_params["beta_lx"], + twiss_params["alpha_ly"], + twiss_params["beta_ly"], + twiss_params["u"], + twiss_params["nu"], + ] + twiss_params = np.array(twiss_params) + return twiss_params + + def unnorm_matrix(self, method: str = "2d") -> np.ndarray: + """Return unnormalization matrix V.""" + unnorm_matrix = None + if method == "2d": + twiss_params = self.twiss_2d() + unnorm_matrix = build_unnorm_matrix_from_params_cs(**twiss_params) + elif method == "4d": + twiss_params = self.twiss_4d() + unnorm_matrix = build_unnorm_matrix_from_params_lb_one_mode( + mode=self.mode, **twiss_params + ) + else: + raise ValueError(f"Invalid normalization {method}") + return unnorm_matrix + + def normalize(self, method: str = "2d", scale: bool = False) -> None: + """Normalize the distribution. + + Args: + method: Normalization method {"2d", "4d"} + "2d": The x-x' and y-y' ellipses will be circles of radius sqrt(eps_x) + and sqrt(eps_y), where eps_x and eps_y are the rms projected + emittances. + "4d": The 4 x 4 covariance matrix becomes diagonal. The x-x' and y-y' + ellipses wil be circles of radius radius sqrt(eps_1) and + sqrt(eps_2), where eps_1, and eps_2 are the rms intrinsic + emittances. + scale: Whether to divide by the emittances to scale all coordinates to unit variance. + This converts all circles to unit circles. + """ + if method == "2d": + eps_x, eps_y = self.projected_emittances() + twiss_params = self.twiss_2d() + alpha_x = twiss_params["alpha_x"] + alpha_y = twiss_params["alpha_y"] + beta_x = twiss_params["beta_x"] + beta_y = twiss_params["beta_y"] + + norm_matrix = np.eye(4) + if eps_x > 0.0: + norm_matrix[0:2, 0:2] = build_norm_matrix_from_params_cs_2d(alpha_x, beta_x) + if eps_y > 0.0: + norm_matrix[2:4, 2:4] = build_norm_matrix_from_params_cs_2d(alpha_y, beta_y) + + self.transform(norm_matrix) + + if scale: + if eps_x > 0.0: + self.params[0:4] /= np.sqrt(4.0 * eps_x) + if eps_y > 0.0: + self.params[4:8] /= np.sqrt(4.0 * eps_y) + + elif method == "4d": + # Cannot invert V (singular matrix). However, we know the envelope parameters in the + # normalized frame. + r_n = np.sqrt(4.0 * self.intrinsic_emittance) + if self.mode == 1: + self.params = np.array([r_n, 0, 0, r_n, 0, 0, 0, 0]) + else: + self.params = np.array([0, 0, 0, 0, 0, r_n, r_n, 0]) + if scale: + self.params = self.params / r_n + else: + raise ValueError(f"Invalid normalization {method}") + + def set_twiss_2d(self, **twiss_params) -> None: + """Set 2D twiss parameters. + + Args: + alpha_x (optional): alpha parameter in x plane. + alpha_y (optional): alpha parameter in y plane. + beta_x (optional): beta parameter in x plane. + beta_y (optional): beta parameter in y plane. + """ + _twiss_params = self.twiss_2d() + _twiss_params.update(twiss_params) + + V = build_unnorm_matrix_from_params_cs(**_twiss_params) + self.normalize(method="2d", scale=False) + self.transform(V) + + def set_twiss_4d(self, **twiss_params) -> None: + """Set 4D twiss parameters parameters. + + Args: + alpha_lx (optional): Horizontal alpha function - / eps_l. + beta_lx (optional): Horizontal beta function / eps_l. + alpha_ly (optional): Vertical alpha_function - / eps_l. + beta_ly (optional): Vertical beta function - / eps_l. + u (optional): Coupling parameter in range [0, 1]. + If eps_1 > 0: u = eps_y / eps_1. + If eps_2 > 0: u = eps_x / eps_1. + nu (optional): The x-y phase difference in range [0, pi]. + """ + _twiss_params = self.twiss_4d() + _twiss_params.update(twiss_params) + + V = build_unnorm_matrix_from_params_lb_one_mode(mode=self.mode, **twiss_params) + self.normalize(method="4d", scale=False) + self.transform(V) + + def set_twiss_4d_vector(self, twiss_params: np.ndarray) -> None: + keys = ["alpha_lx", "beta_lx", "alpha_ly", "beta_ly", "u", "nu"] + + twiss_params_dict = {} + for i, key in enumerate(keys): + twiss_params_dict[key] = twiss_params[i] + + return self.set_twiss_4d(**twiss_params_dict) + + def set_cov(self, cov_matrix: np.ndarray, verbose: int = 0) -> scipy.optimize.OptimizeResult: + """Set envelope parameters from covariance matrix.""" + + def loss_function(params: np.ndarray, cov_matrix: np.ndarray) -> np.ndarray: + self.set_params(params) + loss = np.mean(np.abs(cov_matrix - self.cov())) + loss = loss * 1.00e06 + return loss + + result = scipy.optimize.least_squares( + loss_function, self.params, args=(cov_matrix,), xtol=1.00e-12, verbose=verbose + ) + return result + + def get_coordinates(self, psi: float = 0.0) -> np.ndarray: + """Return phase space coordinates of particle on envelope. + + psi is in the range [0, 2 * pi]. + """ + W = self.param_matrix() + c = [np.cos(psi), np.sin(psi)] + return np.matmul(W, c) + + def sample(self, size: int) -> np.ndarray: + X = np.random.normal(size=(size, 4)) + X = X / np.linalg.norm(X, axis=1)[:, None] + X = X / np.std(X, axis=0) + + A = np.sqrt(np.diag([self.eps_1, self.eps_1, self.eps_2, self.eps_2])) + V = self.unnorm_matrix(method="4d") + + X = np.matmul(X, A.T) + X = np.matmul(X, V.T) + return X + + def from_bunch(self, bunch: Bunch) -> None: + """Set the envelope parameters from a Bunch object.""" + self.set_params(env_params_from_bunch(bunch)) + + def to_bunch(self, size: int = 0, env: bool = True) -> Bunch: + """Create Bunch object from envelope parameters. + + Args: + size: Number of macroparticles in the bunch. + These are the number of "test"particles not counting the first particle, + which stores the envelope parameters. + env: Whether first two particles store envelope parameters. + + Returns: + Bunch with sampled particles. + """ + bunch = Bunch() + bunch.mass(self.mass) + bunch.getSyncParticle().kinEnergy(self.kin_energy) + + if env: + a, b, ap, bp, e, f, ep, fp = self.params + bunch.addParticle(a, ap, e, ep, 0.0, 0.0) + bunch.addParticle(b, bp, f, fp, 0.0, 0.0) + + if size: + particles = np.zeros((size, 6)) + particles[:, :4] = self.sample(size) + particles[:, 4] = self.length * np.random.uniform(-0.5, 0.5, size=size) + for i in range(size): + bunch.addParticle(*particles[i, :]) + + macrosize = self.intensity / size + if self.intensity == 0.0: + macrosize = 1.0 + bunch.macroSize(macrosize) + + return bunch + + +class DanilovEnvelopeMonitor: + def __init__(self, verbose: int = 0) -> None: + self.verbose = verbose + self.distance = 0.0 + self._pos_old = 0.0 + self._pos_new = 0.0 + + self.history = {} + for key in [ + "s", + "xrms", + "yrms", + "cov_00", + "cov_01", + "cov_02", + "cov_03", + "cov_11", + "cov_12", + "cov_13", + "cov_22", + "cov_23", + "cov_33", + "epsx", + "epsy", + "rxy", + ]: + self.history[key] = [] + + def get_history(self) -> None: + history = copy.deepcopy(self.history) + for key in history: + history[key] = np.array(history[key]) + history["s"] -= history["s"][0] + return history + + def __call__(self, params_dict: dict) -> None: + bunch = params_dict["bunch"] + node = params_dict["node"] + + self._pos_new = params_dict["path_length"] + if self._pos_old > self._pos_new: + self._pos_old = 0.0 + self.distance += self._pos_new - self._pos_old + self._pos_old = self._pos_new + + params = env_params_from_bunch(bunch) + param_matrix = env_vector_to_matrix(params) + cov_matrix = 0.25 * np.matmul(param_matrix, param_matrix.T) + + for i in range(4): + for j in range(i, 4): + key = f"cov_{i}{j}" + self.history[key].append(cov_matrix[i, j]) + + self.history["s"].append(self.distance) + self.history["xrms"].append(np.sqrt(cov_matrix[0, 0])) + self.history["yrms"].append(np.sqrt(cov_matrix[2, 2])) + self.history["epsx"].append(np.sqrt(np.linalg.det(cov_matrix[0:2, 0:2]))) + self.history["epsy"].append(np.sqrt(np.linalg.det(cov_matrix[2:4, 2:4]))) + self.history["rxy"].append( + self.history["cov_02"][-1] + / np.sqrt(self.history["cov_00"][-1] * self.history["cov_22"][-1]) + ) + + if self.verbose: + message = "" + message += "s={:0.3f} ".format(self.history["s"][-1]) + message += "xrms={:0.3f} ".format(self.history["xrms"][-1]) + message += "yrms={:0.3f} ".format(self.history["yrms"][-1]) + + +class DanilovEnvelopeTracker: + def __init__(self, lattice: AccLattice, path_length_max: float = None) -> None: + self.lattice = lattice + self.nodes = self.add_nodes(path_length_min=1.00e-06, path_length_max=path_length_max) + + # Bounds on LB twiss parameters + pad = 1.00e-05 + alpha_min = -np.inf + alpha_max = +np.inf + beta_min = pad + beta_max = np.inf + u_min = pad + u_max = 1.0 - pad + nu_min = pad + nu_max = np.pi - pad + self.twiss_lb = (alpha_min, beta_min, alpha_min, beta_min, u_min, nu_min) + self.twiss_ub = (alpha_max, beta_max, alpha_max, beta_max, u_max, nu_max) + + def add_nodes( + self, + path_length_min: float, + path_length_max: float, + ) -> list[DanilovEnvelopeTrackerNode]: + + self.nodes = add_danilov_envelope_tracker_nodes( + lattice=self.lattice, + path_length_min=path_length_min, + path_length_max=path_length_max, + perveance=0.0, # will update based on envelope + ) + return self.nodes + + def toggle_nodes(self, setting: bool) -> None: + for node in self.nodes: + node.active = setting + + def update_nodes(self, envelope: DanilovEnvelope) -> None: + for node in self.nodes: + node.setPerveance(envelope.perveance) + + def track( + self, + envelope: DanilovEnvelope, + periods: int = 1, + history: bool = False, + ) -> DanilovEnvelope: + + self.update_nodes(envelope) + + monitor = DanilovEnvelopeMonitor() + action_container = AccActionsContainer() + if history: + action_container.addAction(monitor, AccActionsContainer.ENTRANCE) + action_container.addAction(monitor, AccActionsContainer.EXIT) + + bunch = envelope.to_bunch() + for period in range(periods): + self.lattice.trackBunch(bunch, actionContainer=action_container) + + envelope.from_bunch(bunch) + + if history: + history = monitor.get_history() + return (envelope, history) + else: + return envelope + + def track_particles(self, envelope: DanilovEnvelope, particles: np.ndarray = None) -> tuple[DanilovEnvelope, np.ndarray]: + self.update_nodes(envelope) + + bunch = envelope.to_bunch() + for i in range(particles.shape[0]): + bunch.addParticle(*particles[i]) + + self.lattice.trackBunch(bunch) + + envelope.from_bunch(bunch) + + particles_out = [] + for i in range(2, bunch.getSize()): + x = bunch.x(i) + y = bunch.y(i) + z = bunch.z(i) + xp = bunch.xp(i) + yp = bunch.yp(i) + dE = bunch.dE(i) + particles_out.append([x, xp, y, yp, z, dE]) + particles_out = np.array(particles_out) + + return (envelope, particles_out) + + def transfer_matrix(self, envelope: DanilovEnvelope) -> np.ndarray: + bunch = envelope.to_bunch(size=0, env=True) + + if envelope.perveance == 0: + self.toggle_nodes(False) + matrix = get_transfer_matrix(self.lattice, bunch) + self.toggle_nodes(True) + return matrix + + step_arr = np.ones(6) * 1.00e-06 + step_reduce = 20.0 + + bunch.addParticle(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(step_arr[0] / step_reduce, 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, step_arr[1] / step_reduce, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, step_arr[2] / step_reduce, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, 0.0, step_arr[3] / step_reduce, 0.0, 0.0) + bunch.addParticle(step_arr[0], 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, step_arr[1], 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, step_arr[2], 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, 0.0, step_arr[3], 0.0, 0.0) + + self.lattice.trackBunch(bunch) + + X = bunch_to_numpy(bunch) + X = X[:, (0, 1, 2, 3)] + X = X[2:, :] # ignore first two particles, which track envelope parameters + + M = np.zeros((4, 4)) + for i in range(4): + for j in range(4): + x1 = step_arr[i] / step_reduce + x2 = step_arr[i] + y0 = X[0, j] + y1 = X[i + 1, j] + y2 = X[i + 1 + 4, j] + M[j, i] = ((y1 - y0) * x2 * x2 - (y2 - y0) * x1 * x1) / (x1 * x2 * (x2 - x1)) + return M + + def match_zero_sc(self, envelope: DanilovEnvelope, method: str = "auto") -> None: + self.toggle_nodes(False) + bunch = Bunch() + bunch.mass(envelope.mass) + bunch.getSyncParticle().kinEnergy(envelope.kin_energy) + transfer_matrix = get_transfer_matrix(self.lattice, bunch) + self.toggle_nodes(True) + + # Match to the bare lattice. + if method == "auto": + if is_tmat_coupled(transfer_matrix): + method = "4d" + else: + method = "2d" + + if method == "2d": + twiss_params = calc_params_from_tmat_cs(transfer_matrix) + envelope.set_twiss_2d(**twiss_params) + elif method == "4d": + norm_matrix = build_norm_matrix_from_tmat(transfer_matrix) + norm_matrix_inv = np.linalg.inv(norm_matrix) + envelope.normalize(method="4d") + envelope.transform(norm_matrix_inv) + else: + raise ValueError + + def match( + self, + envelope: DanilovEnvelope, + periods: int = 1, + method: str = "least_squares", + **kwargs + ) -> None: + + if envelope.intensity == 0.0: + return self.match_zero_sc(envelope) + + def loss_function(theta: np.ndarray) -> float: + envelope.set_twiss_4d_vector(theta) + envelope_copy = envelope.copy() + + cov_matrix_init = envelope_copy.cov() + + loss = 0.0 + for period in range(periods): + self.track(envelope_copy) + cov_matrix = envelope_copy.cov() + loss += np.mean(np.square(cov_matrix - cov_matrix_init)) + loss /= periods + loss *= 1.00e06 + return loss + + if method == "least_squares": + # kwargs.setdefault("xtol", 1e-8) + # kwargs.setdefault("ftol", 1e-8) + # kwargs.setdefault("gtol", 1e-8) + kwargs.setdefault("verbose", 2) + + result = scipy.optimize.least_squares( + loss_function, + envelope.twiss_4d_vector(), + bounds=(self.twiss_lb, self.twiss_ub), + **kwargs, + ) + + elif method == "replace_avg": + periods_avg = kwargs.get("periods_avg", 20) + iters = kwargs.get("iters", 20) + rtol = kwargs.get("rtol", 1.00e-03) + + converged = False + message = "" + theta_old = envelope.twiss_4d_vector() + + for iteration in range(iters): + theta_tbt = np.zeros((periods_avg + 1, 6)) + for i in range(theta_tbt.shape[0]): + self.track(envelope) + theta_tbt[i, :] = envelope.twiss_4d_vector() + + theta = np.mean(theta_tbt, axis=0) + envelope.set_twiss_4d_vector(theta) + + loss = loss_function(theta) + + # Check relative change in parameter vector norm + theta_norm = np.linalg.norm(theta) + theta_norm_old = np.linalg.norm(theta_old) + + theta_norm_rel_change = abs(theta_norm - theta_norm_old) / ( + theta_norm_old + 1.00e-15 + ) + if theta_norm_rel_change < rtol: + converged = True + message = f"Relative change in parameter vector norm {theta_norm_rel_change} < rtol ({rtol})" + + print("{} {} {}".format(iteration, loss, theta_norm)) + + if converged: + print("CONVERGED") + print(message) + break + + theta_old = np.copy(theta) diff --git a/py/orbit/envelope/kv.py b/py/orbit/envelope/kv.py new file mode 100644 index 00000000..cb654b05 --- /dev/null +++ b/py/orbit/envelope/kv.py @@ -0,0 +1,452 @@ +"""Envelope model for upright KV distribution.""" +import copy +import math +import time +from typing import Callable +from typing import Iterable + +# from typing import Self + +import numpy as np +import scipy.optimize +from tqdm import tqdm + +from orbit.core.bunch import Bunch + +from ..lattice import AccActionsContainer +from ..lattice import AccLattice +from ..lattice import AccNode +from ..teapot import TEAPOT_Lattice +from ..teapot import TEAPOT_MATRIX_Lattice +from ..utils import consts + +from .nodes import KVEnvelopeTrackerNode +from .lattice_modifications import add_kv_envelope_tracker_nodes +from .utils import calc_cov_twiss +from .utils import bunch_to_numpy +from .utils import get_perveance +from .utils import get_transfer_matrix +from .utils import fit_transfer_matrix +from .utils import build_norm_matrix_from_twiss_2d + + +class KVEnvelope: + """Models KV distribution.""" + def __init__( + self, + eps_x: float, + eps_y: float, + mass: float, + kin_energy: float, + line_density: float, + length: float, + params: Iterable[float] = None, + ) -> None: + """Constructor. + + Args: + eps_x: RMS emittance in x plane. + eps_y: RMS emittance in y plane. + mass: Particle [GeV/c^2]. + kin_energy: Synchronous particle kinetic energy [GeV]. + line_density: Bunch line density [m]. + length: Bunch length [m] (used to convert to Bunch object). + params: The envelope parameters [cx, cx', cy, cy']. + The cx and cy parameters represent the envelope extent along the x and y + axis; cx' and cy' are their derivatives with respect to the distance x. + """ + self.eps_x = eps_x + self.eps_y = eps_y + self.mass = mass + self.kin_energy = kin_energy + + self.line_density = line_density + self.length = length + self.intensity = self.line_density * self.length + self.perveance = 0.0 + self.set_line_density(line_density) + + self.params = params + if self.params is None: + cx = 2.0 * np.sqrt(self.eps_x * 4.0) + cy = 2.0 * np.sqrt(self.eps_y * 4.0) + self.params = [cx, 0.0, cy, 0.0] + self.params = np.array(self.params) + + def set_line_density(self, line_density: float) -> None: + self.line_density = line_density + self.intensity = self.line_density * self.length + self.perveance = get_perveance(self.mass, self.kin_energy, self.line_density) + + def set_length(self, length: float) -> None: + self.length = length + self.intensity = self.line_density * self.length + + def set_params(self, params: np.ndarray) -> None: + self.params = np.copy(params) + + def copy(self): + return copy.deepcopy(self) + + def cov(self) -> np.ndarray: + """Return covariance matrix. + + See Table II here: https://journals.aps.org/prab/abstract/10.1103/PhysRevSTAB.7.024801 + """ + (cx, cxp, cy, cyp) = self.params + cov_matrix = np.zeros((4, 4)) + cov_matrix[0, 0] = 0.25 * cx**2 + cov_matrix[2, 2] = 0.25 * cy**2 + cov_matrix[1, 1] = 0.25 * cxp**2 + 4.0 * (self.eps_x / cx) ** 2 + cov_matrix[3, 3] = 0.25 * cyp**2 + 4.0 * (self.eps_y / cy) ** 2 + cov_matrix[0, 1] = cov_matrix[1, 0] = 0.25 * cx * cxp + cov_matrix[2, 3] = cov_matrix[3, 2] = 0.25 * cy * cyp + return cov_matrix + + def set_cov(self, cov_matrix: np.ndarray) -> None: + self.eps_x = np.sqrt(np.linalg.det(cov_matrix[0:2, 0:2])) + self.eps_y = np.sqrt(np.linalg.det(cov_matrix[2:4, 2:4])) + cx = np.sqrt(4.0 * cov_matrix[0, 0]) + cy = np.sqrt(4.0 * cov_matrix[2, 2]) + cxp = 2.0 * cov_matrix[0, 1] / np.sqrt(cov_matrix[0, 0]) + cyp = 2.0 * cov_matrix[2, 3] / np.sqrt(cov_matrix[2, 2]) + self.set_params([cx, cxp, cy, cyp]) + + def twiss(self) -> dict[str, float]: + cov_matrix = self.cov() + alpha_x, beta_x, emittance_x = calc_cov_twiss(cov_matrix[0:2, 0:2]) + alpha_y, beta_y, emittance_y = calc_cov_twiss(cov_matrix[2:4, 2:4]) + + params = {} + params["alpha_x"] = alpha_x + params["alpha_y"] = alpha_y + params["beta_x"] = beta_x + params["beta_y"] = beta_y + params["emittance_x"] = emittance_x + params["emittance_y"] = emittance_y + return params + + def set_twiss( + self, + alpha_x: float = None, + beta_x: float = None, + alpha_y: float = None, + beta_y: float = None, + ) -> None: + twiss_params = self.twiss() + if alpha_x is None: + alpha_x = twiss_params["alpha_x"] + if alpha_y is None: + alpha_y = twiss_params["alpha_y"] + if beta_x is None: + beta_x = twiss_params["beta_x"] + if beta_y is None: + beta_y = twiss_params["beta_y"] + + gamma_x = (1.0 + alpha_x**2) / beta_x + gamma_y = (1.0 + alpha_y**2) / beta_y + cov_matrix = np.zeros((4, 4)) + cov_matrix[0, 0] = beta_x * self.eps_x + cov_matrix[2, 2] = beta_y * self.eps_y + cov_matrix[1, 1] = gamma_x * self.eps_x + cov_matrix[3, 3] = gamma_y * self.eps_y + cov_matrix[0, 1] = cov_matrix[1, 0] = -alpha_x * self.eps_x + cov_matrix[2, 3] = cov_matrix[3, 2] = -alpha_y * self.eps_y + self.set_cov(cov_matrix) + + def sample(self, size: int) -> np.ndarray: + twiss_params = self.twiss() + + alpha_x = twiss_params["alpha_x"] + alpha_y = twiss_params["alpha_y"] + beta_x = twiss_params["beta_x"] + beta_y = twiss_params["beta_y"] + eps_x = twiss_params["emittance_x"] + eps_y = twiss_params["emittance_y"] + + V_inv = np.identity(4) + V_inv[0:2, 0:2] = build_norm_matrix_from_twiss_2d(alpha_x, beta_x) + V_inv[2:4, 2:4] = build_norm_matrix_from_twiss_2d(alpha_y, beta_y) + V = np.linalg.inv(V_inv) + + A = np.sqrt(np.diag([eps_x, eps_x, eps_y, eps_y])) + + X = np.random.normal(size=(size, 4)) + X /= np.linalg.norm(X, axis=1)[:, None] + X /= np.std(X, axis=0) + X = np.matmul(X, A.T) + X = np.matmul(X, V.T) + return X + + def from_bunch(self, bunch: Bunch) -> np.ndarray: + """Set envelope parameters from Bunch.""" + self.params = np.zeros(4) + self.params[0] = bunch.x(0) + self.params[1] = bunch.xp(0) + self.params[2] = bunch.y(0) + self.params[3] = bunch.yp(0) + return self.params + + def to_bunch(self, size: int = 0, env: bool = True) -> Bunch: + """Create Bunch object from envelope parameters. + + Args: + size: Number of macroparticles in the bunch. + These are the number of "test"particles not counting the first particle, + which stores the envelope parameters. + env: Whether first two particles store envelope parameters. + + Returns: + Bunch with sampled particles. + """ + bunch = Bunch() + bunch.mass(self.mass) + bunch.getSyncParticle().kinEnergy(self.kin_energy) + + if env: + (cx, cxp, cy, cyp) = self.params + bunch.addParticle(cx, cxp, cy, cyp, 0.0, 0.0) + + if size: + particles = np.zeros((size, 6)) + particles[:, :4] = self.sample(size) + particles[:, 4] = self.length * np.random.uniform(-0.5, 0.5, size=size) + for i in range(size): + bunch.addParticle(*particles[i]) + + macrosize = self.intensity / size + if self.intensity == 0.0: + macrosize = 1.0 + bunch.macroSize(macrosize) + + return bunch + + +class KVEnvelopeMonitor: + def __init__(self, verbose: int = 0) -> None: + self.verbose = verbose + self.distance = 0.0 + self._pos_old = 0.0 + self._pos_new = 0.0 + + self.history = {} + for key in [ + "s", + "xrms", + "yrms", + ]: + self.history[key] = [] + + def get_history(self) -> None: + history = copy.deepcopy(self.history) + for key in history: + history[key] = np.array(history[key]) + history["s"] -= history["s"][0] + return history + + def __call__(self, params_dict: dict) -> None: + bunch = params_dict["bunch"] + node = params_dict["node"] + + self._pos_new = params_dict["path_length"] + if self._pos_old > self._pos_new: + self._pos_old = 0.0 + self.distance += self._pos_new - self._pos_old + self._pos_old = self._pos_new + + x_rms = bunch.x(0) * 0.5 + y_rms = bunch.y(0) * 0.5 + + self.history["s"].append(self.distance) + self.history["xrms"].append(x_rms) + self.history["yrms"].append(y_rms) + + if self.verbose: + print("s={:0.3f} x_rms={:0.2f}, y_rms={:0.2f}".format(self.distance, x_rms, y_rms)) + + +class KVEnvelopeTracker: + def __init__(self, lattice: AccLattice, path_length_max: float = None) -> None: + self.lattice = lattice + self.nodes = self.add_nodes( + path_length_min=1.00e-06, + path_length_max=path_length_max, + ) + + # Lower bounds on envelope parameters + self.lb = np.zeros(4) + self.lb[0] = +1.00 - 12 + self.lb[1] = -np.inf + self.lb[2] = +1.00e-12 + self.lb[3] = -np.inf + + # Upper bounds on envelope parameters + self.ub = np.zeros(4) + self.ub[0] = np.inf + self.ub[1] = np.inf + self.ub[2] = np.inf + self.ub[3] = np.inf + + def add_nodes( + self, + path_length_min: float, + path_length_max: float, + ) -> list[KVEnvelopeTrackerNode]: + + self.nodes = add_kv_envelope_tracker_nodes( + lattice=self.lattice, + path_length_min=path_length_min, + path_length_max=path_length_max, + perveance=0.0, # will update based on envelope + eps_x=1.0, # will update based on envelope + eps_y=1.0, # will update based on envelope + ) + return self.nodes + + def toggle_nodes(self, setting: bool) -> None: + for node in self.nodes: + node.active = setting + + def update_nodes(self, envelope: KVEnvelope) -> None: + for node in self.nodes: + node.setPerveance(envelope.perveance) + node.setEmittances(envelope.eps_x, envelope.eps_y) + + def track( + self, + envelope: KVEnvelope, + periods: int = 1, + history: bool = False, + ) -> KVEnvelope: + self.update_nodes(envelope) + + monitor = KVEnvelopeMonitor() + action_container = AccActionsContainer() + if history: + action_container.addAction(monitor, AccActionsContainer.ENTRANCE) + action_container.addAction(monitor, AccActionsContainer.EXIT) + + bunch = envelope.to_bunch() + for period in range(periods): + self.lattice.trackBunch(bunch, actionContainer=action_container) + + envelope.from_bunch(bunch) + + if history: + history = monitor.get_history() + return (envelope, history) + + return envelope + + def track_particles(self, envelope: KVEnvelope, particles: np.ndarray = None) -> tuple[KVEnvelope, np.ndarray]: + self.update_nodes(envelope) + + bunch = envelope.to_bunch() + for i in range(particles.shape[0]): + bunch.addParticle(*particles[i]) + + self.lattice.trackBunch(bunch) + + envelope.from_bunch(bunch) + + particles_out = [] + for i in range(1, bunch.getSize()): + x = bunch.x(i) + y = bunch.y(i) + z = bunch.z(i) + xp = bunch.xp(i) + yp = bunch.yp(i) + dE = bunch.dE(i) + particles_out.append([x, xp, y, yp, z, dE]) + particles_out = np.array(particles_out) + + return (envelope, particles_out) + + def transfer_matrix(self, envelope: KVEnvelope) -> np.ndarray: + bunch = envelope.to_bunch(size=0, env=True) + + if envelope.perveance == 0: + self.toggle_nodes(False) + matrix = get_transfer_matrix(self.lattice, bunch) + self.toggle_nodes(True) + return matrix + + step_arr = np.ones(6) * 1.00e-06 + step_reduce = 20.0 + + bunch.addParticle(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(step_arr[0] / step_reduce, 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, step_arr[1] / step_reduce, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, step_arr[2] / step_reduce, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, 0.0, step_arr[3] / step_reduce, 0.0, 0.0) + bunch.addParticle(step_arr[0], 0.0, 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, step_arr[1], 0.0, 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, step_arr[2], 0.0, 0.0, 0.0) + bunch.addParticle(0.0, 0.0, 0.0, step_arr[3], 0.0, 0.0) + + self.lattice.trackBunch(bunch) + + X = bunch_to_numpy(bunch) + X = X[:, (0, 1, 2, 3)] + X = X[1:, :] # ignore first particle, which tracks envelope parameters + + M = np.zeros((4, 4)) + for i in range(4): + for j in range(4): + x1 = step_arr[i] / step_reduce + x2 = step_arr[i] + y0 = X[0, j] + y1 = X[i + 1, j] + y2 = X[i + 1 + 4, j] + M[j, i] = ((y1 - y0) * x2 * x2 - (y2 - y0) * x1 * x1) / (x1 * x2 * (x2 - x1)) + return M + + def match_zero_sc(self, envelope: KVEnvelope) -> None: + self.toggle_nodes(False) + bunch = envelope.to_bunch(size=0, env=False) + matrix_lattice = TEAPOT_MATRIX_Lattice(self.lattice, bunch) + lattice_params = matrix_lattice.getRingParametersDict() + self.toggle_nodes(True) + + alpha_x = lattice_params["alpha x"] + alpha_y = lattice_params["alpha y"] + beta_x = lattice_params["beta x [m]"] + beta_y = lattice_params["beta y [m]"] + envelope.set_twiss(alpha_x, beta_x, alpha_y, beta_y) + + def match( + self, envelope: KVEnvelope, periods: int = 1, method: str = "least_squares", **kwargs + ) -> None: + if envelope.perveance == 0.0: + return self.match_zero_sc(envelope) + + def loss_function(params: np.ndarray) -> np.ndarray: + envelope.set_params(params) + + loss = 0.0 + for period in range(periods): + self.track(envelope) + residuals = envelope.params - params + residuals = 1000.0 * residuals + loss += np.mean(np.abs(residuals)) + return loss / float(periods) + + if method == "least_squares": + kwargs.setdefault("xtol", 1.00e-12) + kwargs.setdefault("ftol", 1.00e-12) + kwargs.setdefault("gtol", 1.00e-12) + kwargs.setdefault("verbose", 2) + + result = scipy.optimize.least_squares( + loss_function, envelope.params.copy(), bounds=(self.lb, self.ub), **kwargs + ) + return result + elif method == "minimize": + result = scipy.optimize.minimize( + loss_function, + envelope.params.copy(), + bounds=scipy.optimize.Bounds(self.lb, self.ub), + **kwargs, + ) + else: + raise ValueError diff --git a/py/orbit/envelope/lattice_modifications.py b/py/orbit/envelope/lattice_modifications.py new file mode 100644 index 00000000..078bccd6 --- /dev/null +++ b/py/orbit/envelope/lattice_modifications.py @@ -0,0 +1,123 @@ +import collections + +from ..lattice import AccActionsContainer +from ..lattice import AccLattice +from ..lattice import AccNode +from ..lattice import AccNodeBunchTracker + +from .nodes import EnvelopeTrackerNode +from .nodes import KVEnvelopeTrackerNode +from .nodes import DanilovEnvelopeTrackerNode + + +class Parent: + def __init__(self, node: AccNode, part_index: int, position: float, path_length: float) -> None: + self.node = node + self.name = self.node.getName() + self.part_index = part_index + self.position = position + self.path_length = path_length + + +def set_max_path_length(lattice: AccLattice, length: float) -> AccLattice: + if length: + for node in lattice.getNodes(): + if node.getLength() > length: + node.setnParts(1 + int(node.getLength() / length)) + return lattice + + +def add_envelope_tracker_nodes( + lattice: AccLattice, + path_length_max: float, + path_length_min: float, + constructor: EnvelopeTrackerNode, + constructor_kwargs: dict, +) -> list[EnvelopeTrackerNode]: + + nodes = lattice.getNodes() + if not nodes: + return + + lattice = set_max_path_length(lattice, path_length_max) + + parents = [] + length_total = 0.0 + length_total = running_path = rest_length = 0.0 + for node in nodes: + for part_index in range(node.getnParts()): + part_length = node.getLength(part_index) + parent = Parent(node, part_index, position=length_total, path_length=running_path) + if running_path > path_length_min: + parents.append(parent) + running_path = 0.0 + running_path += part_length + length_total += part_length + + if len(parents) > 0: + rest_length = length_total - parents[-1].position + else: + rest_length = length_total + + parents.insert(0, Parent(node=nodes[0], part_index=0, position=0.0, path_length=rest_length)) + + tracker_nodes = [] + for i in range(len(parents) - 1): + parent = parents[i] + parent_new = parents[i + 1] + + tracker_node_name = "{}:{}:".format(parent.name, parent.part_index) + tracker_node = constructor( + name=tracker_node_name, + kick_length=parent_new.path_length, + **constructor_kwargs, + ) + parent.node.addChildNode( + tracker_node, parent.node.BODY, parent.part_index, parent.node.BEFORE + ) + tracker_nodes.append(tracker_node) + + parent = parents[-1] + tracker_node = constructor( + name="{}:{}:".format(parent.node.getName(), parent.part_index), + kick_length=rest_length, + **constructor_kwargs, + ) + tracker_nodes.append(tracker_node) + parent.node.addChildNode(tracker_node, parent.node.BODY, parent.part_index, parent.node.BEFORE) + + return tracker_nodes + + +def add_kv_envelope_tracker_nodes( + lattice: AccLattice, path_length_max: float = None, path_length_min: float = 1.00e-06, **kwargs +) -> None: + tracker_nodes = add_envelope_tracker_nodes( + lattice=lattice, + path_length_max=path_length_max, + path_length_min=path_length_min, + constructor=KVEnvelopeTrackerNode, + constructor_kwargs=kwargs, + ) + for tracker_node in tracker_nodes: + name = "".join([tracker_node.getName(), ":", "kv_envelope_tracker"]) + tracker_node.setName(name) + lattice.initialize() + return tracker_nodes + + +def add_danilov_envelope_tracker_nodes( + lattice: AccLattice, path_length_max: float = None, path_length_min: float = 1.00e-06, **kwargs +) -> None: + tracker_nodes = add_envelope_tracker_nodes( + lattice=lattice, + path_length_max=path_length_max, + path_length_min=path_length_min, + constructor=DanilovEnvelopeTrackerNode, + constructor_kwargs=kwargs, + ) + for tracker_node in tracker_nodes: + name = "".join([tracker_node.getName(), ":", "danilov_envelope_tracker"]) + tracker_node.setName(name) + lattice.initialize() + return tracker_nodes diff --git a/py/orbit/envelope/meson.build b/py/orbit/envelope/meson.build new file mode 100644 index 00000000..0fa716c1 --- /dev/null +++ b/py/orbit/envelope/meson.build @@ -0,0 +1,15 @@ +py_sources = files([ + '__init__.py', + 'nodes.py', + 'lattice_modifications.py', + 'kv.py', + 'danilov.py', + 'transfer_matrix.py', + 'utils.py', +]) + +python.install_sources( + py_sources, + subdir: 'orbit/envelope', + # pure: true, +) diff --git a/py/orbit/envelope/nodes.py b/py/orbit/envelope/nodes.py new file mode 100644 index 00000000..faad7882 --- /dev/null +++ b/py/orbit/envelope/nodes.py @@ -0,0 +1,63 @@ +from typing import Any + +from ..lattice import AccActionsContainer +from ..lattice import AccLattice +from ..lattice import AccNode +from ..lattice import AccNodeBunchTracker +from ..utils import orbitFinalize + +from orbit.core.envelope import KVEnvelopeTracker +from orbit.core.envelope import DanilovEnvelopeTracker + + +class EnvelopeTrackerNode(AccNodeBunchTracker): + def __init__(self, name: str = None, kick_length: float = 0.0) -> None: + super().__init__(name=name) + self.setType("EnvelopeTracker") + self.setLength(0.0) + + self.kick_length = kick_length + self.active = True + self.tracker = None + + def setActive(self, setting: bool) -> None: + self.active = setting + + def setKickLength(self, kick_length: float) -> None: + self.kick_length = kick_length + + def isRFGap(self) -> bool: + # In case this node is used in linac tracking + return False + + def trackDesign(self, params_dict): + # In case this node is used in linac tracking + pass + + def track(self, params_dict: dict) -> None: + if not self.active: + return + bunch = params_dict["bunch"] + self.tracker.trackBunch(bunch, self.kick_length) + + +class KVEnvelopeTrackerNode(EnvelopeTrackerNode): + def __init__(self, eps_x: float, eps_y: float, perveance: float, **kwargs) -> None: + super().__init__(**kwargs) + self.tracker = KVEnvelopeTracker(perveance, 4.0 * eps_x, 4.0 * eps_y) + + def setPerveance(self, perveance: float) -> None: + self.tracker.setPerveance(perveance) + + def setEmittances(self, eps_x: float, eps_y: float) -> None: + self.tracker.setEmittanceX(4.0 * eps_x) + self.tracker.setEmittanceY(4.0 * eps_y) + + +class DanilovEnvelopeTrackerNode(EnvelopeTrackerNode): + def __init__(self, perveance: float, **kwargs) -> None: + super().__init__(**kwargs) + self.tracker = DanilovEnvelopeTracker(perveance) + + def setPerveance(self, perveance: float) -> None: + self.tracker.setPerveance(perveance) diff --git a/py/orbit/envelope/transfer_matrix.py b/py/orbit/envelope/transfer_matrix.py new file mode 100644 index 00000000..d4051e05 --- /dev/null +++ b/py/orbit/envelope/transfer_matrix.py @@ -0,0 +1,205 @@ +import numpy as np + + +def is_tmat_coupled(M: np.ndarray) -> bool: + if M.shape[0] < 4: + return False + + mask = np.zeros(M.shape) + for i in range(0, M.shape[0], 2): + mask[i : i + 2, i : i + 2] = 1.0 + + return np.any(np.ma.masked_array(M, mask=mask)) + + +def is_tmat_stable(M: np.ndarray, tol: float = 1.00e-08) -> np.ndarray: + return all_eigvals_on_unit_circle(np.linalg.eigenvalues(M), tol=tol) + + +def is_tmat_symplectic(M: np.ndarray, tol: float = 1.00e-06) -> bool: + U = build_unit_symplectic_matrix(M.shape[0]) + return np.isclose(U - np.linalg.multi_dot([M.T, U, M])) + + +def build_rotation_matrix(angle: float) -> np.ndarray: + c, s = np.cos(angle), np.sin(angle) + return np.array([[c, s], [-s, c]]) + + +def build_unit_symplectic_matrix(ndim: int) -> np.ndarray: + U = np.zeros((ndim, ndim)) + for i in range(0, ndim, 2): + U[i : i + 2, i : i + 2] = [[0.0, 1.0], [-1.0, 0.0]] + return U + + +def build_phase_advance_matrix(*phase_advances) -> np.ndarray: + ndim = 2 * len(phase_advances) + M = np.zeros((ndim, ndim)) + for i in range(0, ndim, 2): + M[i : i + 2, i : i + 2] = build_rotation_matrix(phase_advances[i]) + return M + + +def build_unnorm_matrix_from_eigvecs(v1: np.ndarray, v2: np.ndarray) -> np.ndarray: + V = np.zeros((4, 4)) + V[:, 0] = +np.real(v1) + V[:, 1] = -np.imag(v1) + V[:, 2] = +np.real(v2) + V[:, 3] = -np.imag(v2) + return V + + +def build_unnorm_matrix_from_tmat(M: np.ndarray) -> np.ndarray: + eigvals, eigvecs = np.linalg.eig(M) + v1 = eigvecs[:, 0] + v2 = eigvecs[:, 2] + v1 = normalize_eigvec(v1) + v2 = normalize_eigvec(v2) + return build_unnorm_matrix_from_eigvecs(v1, v2) + + +def build_unnorm_matrix_from_params_cs_2d(alpha: float, beta: float) -> np.ndarray: + return np.array([[beta, 0.0], [-alpha, 1.0]]) / np.sqrt(beta) + + +def build_unnorm_matrix_from_params_cs( + alpha_x: float, + beta_x: float, + alpha_y: float, + beta_y: float, +) -> np.ndarray: + V = np.zeros((4, 4)) + V[0:2, 0:2] = build_unnorm_matrix_from_params_cs_2d(alpha_x, beta_x) + V[2:4, 2:4] = build_unnorm_matrix_from_params_cs_2d(alpha_y, beta_y) + return V + + +def build_unnorm_matrix_from_params_lb_one_mode( + alpha_lx: float, + beta_lx: float, + alpha_ly: float, + beta_ly: float, + u: float, + nu: float, + mode: int, +) -> np.ndarray: + + V = np.zeros((4, 4)) + if mode == 1: + V[0, 0] = np.sqrt(beta_lx) + V[0, 1] = 0.0 + V[1, 0] = -alpha_lx / np.sqrt(beta_lx) + V[1, 1] = (1.0 - u) / np.sqrt(beta_lx) + V[2, 0] = +np.sqrt(beta_ly) * np.cos(nu) + V[2, 1] = -np.sqrt(beta_ly) * np.sin(nu) + V[3, 0] = (u * np.sin(nu) - alpha_ly * np.cos(nu)) / np.sqrt(beta_ly) + V[3, 1] = (u * np.cos(nu) + alpha_ly * np.sin(nu)) / np.sqrt(beta_ly) + elif mode == 2: + V[0, 2] = +np.sqrt(beta_lx) * np.cos(nu) + V[0, 3] = -np.sqrt(beta_lx) * np.sin(nu) + V[1, 2] = (u * np.sin(nu) - alpha_lx * np.cos(nu)) / np.sqrt(beta_lx) + V[1, 3] = (u * np.cos(nu) + alpha_lx * np.sin(nu)) / np.sqrt(beta_lx) + V[2, 2] = np.sqrt(beta_ly) + V[2, 3] = 0.0 + V[3, 2] = -alpha_ly / np.sqrt(beta_ly) + V[3, 3] = (1.0 - u) / np.sqrt(beta_ly) + return V + + +def build_norm_matrix_from_eigvecs(*args, **kwargs) -> np.ndarray: + return np.linalg.inv(build_unnorm_matrix_from_eigvecs(*args, **kwargs)) + + +def build_norm_matrix_from_tmat(*args, **kwargs) -> np.ndarray: + return np.linalg.inv(build_unnorm_matrix_from_tmat(*args, **kwargs)) + + +def build_norm_matrix_from_params_cs_2d(*args, **kwargs) -> np.ndarray: + return np.linalg.inv(build_unnorm_matrix_from_params_cs_2d(*args, **kwargs)) + + +def build_norm_matrix_from_params_cs(*args, **kwargs) -> np.ndarray: + return np.linalg.inv(build_unnorm_matrix_from_params_cs(*args, **kwargs)) + + +def normalize_eigvec(v: np.ndarray) -> np.ndarray: + U = build_unit_symplectic_matrix(len(v)) + + def _norm(v): + return np.linalg.multi_dot([np.conj(v), U, v]) + + if _norm(v) > 0.0: + v = np.conj(v) + + v *= np.sqrt(2.0 / np.abs(_norm(v))) + assert np.isclose(np.imag(_norm(v)), -2.0) + assert np.isclose(np.real(_norm(v)), +0.0) + return v + + +def all_eigvals_on_unit_circle(eigvals: np.ndarray, tol: float = 1.00e-08) -> bool: + for eigval in eigvals: + if abs(np.linalg.norm(eigval) - 1.0) > tol: + return False + return True + + +def eigtune_from_eigval(eigval: float) -> float: + return np.arccos(np.real(eigval)) / (2.0 * np.pi) + + +def calc_params_from_tmat_cs_2d(M: np.ndarray) -> np.ndarray: + params = dict() + cos_phi = (M[0, 0] + M[1, 1]) / 2.0 + sign = 1.0 + if abs(M[0, 1]) != 0: + sign = M[0, 1] / abs(M[0, 1]) + sin_phi = sign * np.sqrt(1.0 - cos_phi**2) + beta = M[0, 1] / sin_phi + alpha = (M[0, 0] - M[1, 1]) / (2.0 * sin_phi) + params["alpha"] = alpha + params["beta"] = beta + params["tune"] = np.arccos(cos_phi) / (2.0 * np.pi) * sign + return params + + +def calc_params_from_tmat_cs(M: np.ndarray) -> np.ndarray: + params_x = calc_params_from_tmat_cs_2d(M[0:2, 0:2]) + params_y = calc_params_from_tmat_cs_2d(M[2:4, 2:4]) + return { + "alpha_x": params_x["alpha"], + "alpha_y": params_y["alpha"], + "beta_x": params_x["beta"], + "beta_y": params_y["beta"], + } + + +def calc_params_from_tmat_lb(M: np.ndarray) -> dict[str, float]: + V_inv = build_unnorm_matrix_from_tmat(M) + V = np.linalg.inv(V_inv) + + beta_1x = V[0, 0] ** 2 + beta_2y = V[2, 2] ** 2 + alpha_1x = -np.sqrt(beta_1x) * V[1, 0] + alpha_2y = -np.sqrt(beta_2y) * V[3, 2] + u = 1.0 - (V[0, 0] * V[1, 1]) + nu1 = np.arctan2(-V[2, 1], V[2, 0]) + nu2 = np.arctan2(-V[0, 3], V[0, 2]) + beta_1y = (V[2, 0] / np.cos(nu1)) ** 2 + beta_2x = (V[0, 2] / np.cos(nu2)) ** 2 + alpha_1y = (u * np.sin(nu1) - V[3, 0] * np.sqrt(beta_1y)) / np.cos(nu1) + alpha_2x = (u * np.sin(nu2) - V[1, 2] * np.sqrt(beta_2x)) / np.cos(nu2) + return { + "alpha_1x": alpha_1x, + "alpha_1y": alpha_1y, + "alpha_2x": alpha_2x, + "alpha_2y": alpha_2y, + "beta_1x": beta_1x, + "beta_1y": beta_1y, + "beta_2x": beta_2x, + "beta_2y": beta_2y, + "u": u, + "nu1": nu1, + "nu2": nu2, + } diff --git a/py/orbit/envelope/utils.py b/py/orbit/envelope/utils.py new file mode 100644 index 00000000..bfb86cf8 --- /dev/null +++ b/py/orbit/envelope/utils.py @@ -0,0 +1,97 @@ +import numpy as np + +from orbit.core.bunch import Bunch +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.teapot import TEAPOT_Lattice +from orbit.teapot import TEAPOT_MATRIX_Lattice + + +def build_rotation_matrix(angle: float) -> np.ndarray: + c, s = np.cos(angle), np.sin(angle) + return np.array([[c, s], [-s, c]]) + + +def build_rotation_matrix_xy(angle: float) -> np.ndarray: + c, s = np.cos(angle), np.sin(angle) + return np.array([[c, 0, s, 0], [0, c, 0, s], [-s, 0, c, 0], [0, -s, 0, c]]) + + +def build_phase_advance_matrix(*phase_advances: list[float]) -> np.ndarray: + n = len(phase_advances) + M = np.zeros((2 * n, 2 * n)) + for i, phase_advance in enumerate(phase_advances): + i = i * 2 + M[i : i + 2, i : i + 2] = build_rotation_matrix(phase_advance) + return M + + +def build_norm_matrix_from_twiss_2d(alpha: float, beta: float) -> np.ndarray: + norm_matrix_inv = np.array([[beta, 0.0], [-alpha, 1.0]]) * np.sqrt(1.0 / beta) + norm_matrix = np.linalg.inv(norm_matrix_inv) + return norm_matrix + + +def get_transfer_matrix(lattice: AccLattice, bunch: Bunch) -> np.ndarray: + matrix_lattice = TEAPOT_MATRIX_Lattice(lattice, bunch) + M = np.zeros((4, 4)) + for i in range(4): + for j in range(4): + M[i, j] = matrix_lattice.oneTurnMatrix.get(i, j) + return M + + +def fit_transfer_matrix(lattice: AccLattice, bunch: Bunch) -> np.ndarray: + step_arr_init = np.full(4, 1.00e-06) + step_arr = np.copy(step_arr_init) + step_reduce = 20.0 + + _bunch = Bunch() + bunch.copyEmptyBunchTo(_bunch) + + _bunch.addParticle(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) + _bunch.addParticle(step_arr[0] / step_reduce, 0.0, 0.0, 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, step_arr[1] / step_reduce, 0.0, 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, 0.0, step_arr[2] / step_reduce, 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, 0.0, 0.0, step_arr[3] / step_reduce, 0.0, 0.0) + _bunch.addParticle(step_arr[0], 0.0, 0.0, 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, step_arr[1], 0.0, 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, 0.0, step_arr[2], 0.0, 0.0, 0.0) + _bunch.addParticle(0.0, 0.0, 0.0, step_arr[3], 0.0, 0.0) + + lattice.trackBunch(_bunch) + + X = get_bunch_coords(bunch) + X = X[:, (0, 2, 3, 4)] + + M = np.zeros((4, 4)) + for i in range(4): + for j in range(4): + x1 = step_arr[i] / step_reduce + x2 = step_arr[i] + y0 = X[0, j] + y1 = X[i + 1, j] + y2 = X[i + 1 + 4, j] + M[j, i] = ((y1 - y0) * x2 * x2 - (y2 - y0) * x1 * x1) / (x1 * x2 * (x2 - x1)) + return M + + +def get_perveance(mass: float, kin_energy: float, line_density: float) -> float: + classical_proton_radius = 1.53469e-18 # [m] + gamma = 1.0 + (kin_energy / mass) # Lorentz factor + beta = np.sqrt(1.0 - (1.0 / gamma) ** 2) # velocity/speed_of_light + return (2.0 * classical_proton_radius * line_density) / (beta**2 * gamma**3) + + +def calc_cov_twiss(cov_matrix: np.ndarray) -> tuple[float, float, float]: + emittance = np.sqrt(np.linalg.det(cov_matrix)) + alpha = -cov_matrix[0, 1] / emittance + beta = cov_matrix[0, 0] / emittance + return (alpha, beta, emittance) + + +def bunch_to_numpy(bunch: Bunch) -> np.ndarray: + x = np.zeros((bunch.getSize(), 6)) + for i in range(bunch.getSize()): + x[i, :] = [bunch.x(i), bunch.xp(i), bunch.y(i), bunch.yp(i), bunch.z(i), bunch.dE(i)] + return x diff --git a/py/orbit/meson.build b/py/orbit/meson.build index 2ba16e73..651b14d5 100644 --- a/py/orbit/meson.build +++ b/py/orbit/meson.build @@ -24,6 +24,7 @@ subdir('space_charge') subdir('errors') subdir('matrix_lattice') subdir('teapot') +subdir('envelope') py_sources = files([ diff --git a/src/core/envelope_init.cc b/src/core/envelope_init.cc new file mode 100644 index 00000000..90f25775 --- /dev/null +++ b/src/core/envelope_init.cc @@ -0,0 +1,4 @@ +#include "wrap_envelope.hh" +PyMODINIT_FUNC PyInit_envelope(void) { + return wrap_envelope::initenvelope(); +} diff --git a/src/envelope/DanilovEnvelopeTracker.cc b/src/envelope/DanilovEnvelopeTracker.cc new file mode 100644 index 00000000..1ec3a228 --- /dev/null +++ b/src/envelope/DanilovEnvelopeTracker.cc @@ -0,0 +1,104 @@ +#include "DanilovEnvelopeTracker.hh" + +DanilovEnvelopeTracker::DanilovEnvelopeTracker(double perveance) : CppPyWrapper(NULL) { + Q = perveance; +} + +void DanilovEnvelopeTracker::setPerveance(double perveance) { + Q = perveance; +} + +double DanilovEnvelopeTracker::getPerveance() { + return Q; +} + +void DanilovEnvelopeTracker::trackBunch(Bunch *bunch, double length) { + // Compute ellipse size and orientation. + // ----------------------------------------------------------------------------------- + + // Get envelope parameters from first two bunch particles. + double a = bunch->x(0); + double b = bunch->x(1); + double e = bunch->y(0); + double f = bunch->y(1); + + // Compute covariance matrix in x-y plane + double cov_xx = a * a + b * b; // 4 * + double cov_yy = e * e + f * f; // 4 * + double cov_xy = a * e + b * f; // 4 * + + // Compute tilt angle phi in x-y plane (below x axis). + double phi = -0.5 * atan2(2.0 * cov_xy, cov_xx - cov_yy); + + // Store sin(phi) and cos(phi). + double cs = cos(phi); + double sn = sin(phi); + double cs2 = cs * cs; + double sn2 = sn * sn; + double sn_cs = sn * cs; + + // Compute ellipse radii cu and cv in upright frame. + double cu = sqrt(abs(cov_xx * cs2 + cov_yy * sn2 - 2.0 * cov_xy * sn_cs)); + double cv = sqrt(abs(cov_xx * sn2 + cov_yy * cs2 + 2.0 * cov_xy * sn_cs)); + double cu2 = cu * cu; + double cv2 = cv * cv; + + // Kick envelope + // ----------------------------------------------------------------------------------- + + double sc_term = (2.0 * Q / (cu + cv)); + if (cu > 0.0) { + bunch->xp(0) += length * sc_term * (a * cs2 - e * sn_cs) / cu; + bunch->xp(1) += length * sc_term * (b * cs2 - f * sn_cs) / cu; + bunch->yp(0) += length * sc_term * (e * sn2 - a * sn_cs) / cu; + bunch->yp(1) += length * sc_term * (f * sn2 - b * sn_cs) / cu; + } + if (cv > 0.0) { + bunch->xp(0) += length * sc_term * (a * sn2 + e * sn_cs) / cv; + bunch->xp(1) += length * sc_term * (b * sn2 + f * sn_cs) / cv; + bunch->yp(0) += length * sc_term * (e * cs2 + a * sn_cs) / cv; + bunch->yp(1) += length * sc_term * (f * cs2 + b * sn_cs) / cv; + } + + // Kick particles + // ----------------------------------------------------------------------------------- + + for (int i = 2; i < bunch->getSize(); i++) { + // Collect particle coordinates. + double x = bunch->x(i); + double y = bunch->y(i); + double u = x * cs - y * sn; + double v = x * sn + y * cs; + + // Check if particle is inside beam ellipse. + double u2 = u * u; + double v2 = v * v; + bool in_ellipse = ((u2 / cu2) + (v2 / cv2)) <= 1.0; + + // Update momentum. + double delta_up = 0.0; + double delta_vp = 0.0; + + if (in_ellipse) { + if (cu > 0.0) { + delta_up = length * sc_term * u / cu; + } + if (cv > 0.0) { + delta_vp = length * sc_term * v / cv; + } + } + else { + // https://arxiv.org/abs/physics/0108040 + double B = u2 + v2 - cu2 - cv2; + double C = u2 * cv2 + v2 * cu2 - cu2 * cv2; + double t1 = pow(0.25 * B * B + C, 0.5) + 0.5 * B; + double Dx = pow(cu2 + t1, 0.5); + double Dy = pow(cv2 + t1, 0.5); + delta_up = length * 2.0 * Q * u / (Dx * (Dx + Dy)); + delta_vp = length * 2.0 * Q * v / (Dy * (Dx + Dy)); + } + + bunch->xp(i) += (+delta_up * cs + delta_vp * sn); + bunch->yp(i) += (-delta_up * sn + delta_vp * cs); + } +} \ No newline at end of file diff --git a/src/envelope/DanilovEnvelopeTracker.hh b/src/envelope/DanilovEnvelopeTracker.hh new file mode 100644 index 00000000..2a81ec23 --- /dev/null +++ b/src/envelope/DanilovEnvelopeTracker.hh @@ -0,0 +1,36 @@ +#ifndef DANILOV_ENVELOPE_TRACKER_H +#define DANILOV_ENVELOPE_TRACKER_H + +#include "Bunch.hh" +#include "CppPyWrapper.hh" + +using namespace std; + +/** Envelope solver for the Danilov distribution (KV distribution with + zero emittance in one plane). + + The ellipse in the x-y plane can be parameterized as [1]: + x = a * cos(psi) + b * sin(psi), + y = e * cos(psi) + f * sin(psi), + where 0 <= psi <= 2pi. The first two particles in the bunch are used to track + the envelope parameters {a, b, e, f}, which are used to apply space charge + kicks to the other particles in the bunch. + + References + ---------- + [1] V. Danilov, S. Cousineau, S. Henderson, and J. Holmes, "Self-consistent + time dependent two dimensional and three dimensional space charge distributions + with linear force", PPRAB 6, 74–85 (2003). +*/ +class DanilovEnvelopeTracker : public OrbitUtils::CppPyWrapper { + public: + DanilovEnvelopeTracker(double perveanceQ); + void trackBunch(Bunch *bunch, double length); + void setPerveance(double perveance); + double getPerveance(); + + private: + double Q; // beam perveance +}; + +#endif \ No newline at end of file diff --git a/src/envelope/KVEnvelopeTracker.cc b/src/envelope/KVEnvelopeTracker.cc new file mode 100644 index 00000000..5a8e621c --- /dev/null +++ b/src/envelope/KVEnvelopeTracker.cc @@ -0,0 +1,72 @@ +#include "KVEnvelopeTracker.hh" + +KVEnvelopeTracker::KVEnvelopeTracker(double perveance, double emittance_x, double emittance_y): CppPyWrapper(NULL) { + Q = perveance; + eps_x = emittance_x; + eps_y = emittance_y; +} + +void KVEnvelopeTracker::setPerveance(double perveance) { + Q = perveance; +} + +void KVEnvelopeTracker::setEmittanceX(double emittance) { + eps_x = emittance; +} + +void KVEnvelopeTracker::setEmittanceY(double emittance) { + eps_y = emittance; +} + +double KVEnvelopeTracker::getPerveance() { + return Q; +} + +double KVEnvelopeTracker::getEmittanceX() { + return eps_x; +} + +double KVEnvelopeTracker::getEmittanceY() { + return eps_y; +} + +void KVEnvelopeTracker::trackBunch(Bunch *bunch, double length) { + // Get envelope parameters + double cx = bunch->x(0); + double cy = bunch->y(0); + double cx2 = cx * cx; + double cy2 = cy * cy; + + // Kick envelope + double eps_x_term = (eps_x * eps_x) / (cx * cx * cx); + double eps_y_term = (eps_y * eps_y) / (cy * cy * cy); + double sc_term = 2.0 * Q / (cx + cy); + bunch->xp(0) += length * (sc_term + eps_x_term); + bunch->yp(0) += length * (sc_term + eps_y_term); + + // Kick particles + for (int i = 1; i < bunch->getSize(); i++) { + // Check if particle is inside beam ellipse. + double x = bunch->x(i); + double y = bunch->y(i); + double x2 = x * x; + double y2 = y * y; + bool in_ellipse = ((x2 / cx2) + (y2 / cy2)) <= 1.0; + + // Update momentum coordinates + if (in_ellipse) { + bunch->xp(i) += length * sc_term * (x / cx); + bunch->yp(i) += length * sc_term * (y / cy); + } + else { + // https://arxiv.org/abs/physics/0108040 + double B = x2 + y2 - cx2 - cy2; + double C = x2 * cy2 + y2 * cx2 - cx2 * cy2; + double t1 = pow(0.25 * B * B + C, 0.5) + 0.5 * B; + double Dx = pow(cx2 + t1, 0.5); + double Dy = pow(cy2 + t1, 0.5); + bunch->xp(i) += length * (2.0 * Q * x / (Dx * (Dx + Dy))); + bunch->yp(i) += length * (2.0 * Q * y / (Dy * (Dx + Dy))); + } + } +} \ No newline at end of file diff --git a/src/envelope/KVEnvelopeTracker.hh b/src/envelope/KVEnvelopeTracker.hh new file mode 100644 index 00000000..ffbd3805 --- /dev/null +++ b/src/envelope/KVEnvelopeTracker.hh @@ -0,0 +1,34 @@ +#ifndef KV_ENVELOPE_TRACKER_H +#define KV_ENVELOPE_TRACKER_H + +#include "Bunch.hh" +#include "CppPyWrapper.hh" + +using namespace std; + +/** Envelope solver for upright KV distribution in uncoupled lattice. + +The first particle in the bunch is used to track the envelope parameters. +The envelope parameters are use to apply space charge kicks to the other +particles in the bunch. + +This class does not yet handle boudary conditions or dispersion. + */ +class KVEnvelopeTracker : public OrbitUtils::CppPyWrapper { + public: + KVEnvelopeTracker(double perveance, double emittance_x, double emittance_y); + void trackBunch(Bunch *bunch, double length); + void setPerveance(double perveance); + void setEmittanceX(double emittance); + void setEmittanceY(double emittance); + double getPerveance(); + double getEmittanceX(); + double getEmittanceY(); + + private: + double Q; // beam perveance + double eps_x; // (4 * sqrt( - )) + double eps_y; // (4 * sqrt( - )) +}; + +#endif \ No newline at end of file diff --git a/src/envelope/wrap_danilov_envelope_tracker.cc b/src/envelope/wrap_danilov_envelope_tracker.cc new file mode 100644 index 00000000..0cba286d --- /dev/null +++ b/src/envelope/wrap_danilov_envelope_tracker.cc @@ -0,0 +1,149 @@ +#include + +#include "orbit_mpi.hh" +#include "pyORBIT_Object.hh" + +#include "DanilovEnvelopeTracker.hh" +#include "wrap_bunch.hh" +#include "wrap_danilov_envelope_tracker.hh" +#include "wrap_envelope.hh" + +namespace wrap_envelope { + +#ifdef __cplusplus +extern "C" { +#endif + +// Constructor for Python class wrapping DanilovEnvelopeTracker instance. +// It never will be called directly. +static PyObject *DanilovEnvelopeTracker_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { + pyORBIT_Object *self; + self = (pyORBIT_Object *)type->tp_alloc(type, 0); + self->cpp_obj = NULL; + return (PyObject *)self; +} + +// Initialization of Python DanilovEnvelopeTracker class. +// This is implementation of the __init__ method. +static int DanilovEnvelopeTracker_init(pyORBIT_Object *self, PyObject *args, PyObject *kwds) { + double perveance = 0.0; + self->cpp_obj = new DanilovEnvelopeTracker(perveance); + ((DanilovEnvelopeTracker *)self->cpp_obj)->setPyWrapper((PyObject *)self); + return 0; +} + +// Method: trackBunch(Bunch* bunch, double length) +static PyObject *DanilovEnvelopeTracker_trackBunch(PyObject *self, PyObject *args) { + pyORBIT_Object *pyDanilovEnvelopeTracker = (pyORBIT_Object *)self; + DanilovEnvelopeTracker *cpp_DanilovEnvelopeTracker = (DanilovEnvelopeTracker *)pyDanilovEnvelopeTracker->cpp_obj; + PyObject *pyBunch; + double length; + if (!PyArg_ParseTuple(args, "Od:trackBunch", &pyBunch, &length)) { + ORBIT_MPI_Finalize("PyDanilovEnvelopeTracker - trackBunch(Bunch* bunch, double length) - parameters are needed."); + } + PyObject *pyORBIT_Bunch_Type = wrap_orbit_bunch::getBunchType("Bunch"); + if (!PyObject_IsInstance(pyBunch, pyORBIT_Bunch_Type)) { + ORBIT_MPI_Finalize("PyDanilovEnvelopeTracker - trackBunch(Bunch* bunch, double length) - first parameter should be Bunch."); + } + Bunch *cpp_bunch = (Bunch *)((pyORBIT_Object *)pyBunch)->cpp_obj; + cpp_DanilovEnvelopeTracker->trackBunch(cpp_bunch, length); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: setPerveance(double perveance) +static PyObject *DanilovEnvelopeTracker_setPerveance(PyObject *self, PyObject *args) { + pyORBIT_Object *pyDanilovEnvelopeTracker = (pyORBIT_Object *)self; + DanilovEnvelopeTracker *cpp_DanilovEnvelopeTracker = (DanilovEnvelopeTracker *)pyDanilovEnvelopeTracker->cpp_obj; + double perveance; + if (!PyArg_ParseTuple(args, "d:setPerveance", &perveance)) { + ORBIT_MPI_Finalize("PyDanilovEnvelopeTracker - setPerveance(double perveance) - parameters are needed."); + } + cpp_DanilovEnvelopeTracker->setPerveance(perveance); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: getPerveance() +static PyObject *DanilovEnvelopeTracker_getPerveance(PyObject *self, PyObject *args) { + pyORBIT_Object *pyDanilovEnvelopeTracker = (pyORBIT_Object *)self; + DanilovEnvelopeTracker *cpp_DanilovEnvelopeTracker = (DanilovEnvelopeTracker *)pyDanilovEnvelopeTracker->cpp_obj; + double perveance = cpp_DanilovEnvelopeTracker->getPerveance(); + return Py_BuildValue("d", perveance); +} + +// Destructor for python DanilovEnvelopeTracker class (__del__ method) +static void DanilovEnvelopeTracker_del(pyORBIT_Object *self) { + DanilovEnvelopeTracker *cpp_DanilovEnvelopeTracker = (DanilovEnvelopeTracker *)self->cpp_obj; + delete cpp_DanilovEnvelopeTracker; + self->ob_base.ob_type->tp_free((PyObject *)self); +} + +// Definition of Python DanilovEnvelopeTracker wrapper class methods. +// They will be available from the Python level. +static PyMethodDef DanilovEnvelopeTrackerClassMethods[] = { + {"getPerveance", DanilovEnvelopeTracker_getPerveance, METH_VARARGS, "Get space charge perveance."}, + {"setPerveance", DanilovEnvelopeTracker_setPerveance, METH_VARARGS, "Set space charge perveance."}, + {"trackBunch", DanilovEnvelopeTracker_trackBunch, METH_VARARGS, "Apply space charge kick to beam envelope."}, + {NULL} +}; + +// Definition of Python DanilovEnvelopeTracker wrapper class members. +// They will be available from the Python level. +static PyMemberDef DanilovEnvelopeTrackerClassMembers[] = {{NULL}}; + +// New python DanilovEnvelopeTracker wrapper type definition. +static PyTypeObject pyORBIT_DanilovEnvelopeTracker_Type = { + PyVarObject_HEAD_INIT(NULL, 0) "DanilovEnvelopeTracker", /*tp_name*/ + sizeof(pyORBIT_Object), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)DanilovEnvelopeTracker_del, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/ + "The DanilovEnvelopeTracker python wrapper", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + DanilovEnvelopeTrackerClassMethods, /* tp_methods */ + DanilovEnvelopeTrackerClassMembers, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)DanilovEnvelopeTracker_init, /* tp_init */ + 0, /* tp_alloc */ + DanilovEnvelopeTracker_new, /* tp_new */ +}; + +// Initialization function of the pyDanilovEnvelopeTracker class +void initDanilovEnvelopeTracker(PyObject *module) { + if (PyType_Ready(&pyORBIT_DanilovEnvelopeTracker_Type) < 0) { + return; + } + Py_INCREF(&pyORBIT_DanilovEnvelopeTracker_Type); + PyModule_AddObject(module, "DanilovEnvelopeTracker", (PyObject *)&pyORBIT_DanilovEnvelopeTracker_Type); +} + +#ifdef __cplusplus +} +#endif + +} // namespace wrap_envelope diff --git a/src/envelope/wrap_danilov_envelope_tracker.hh b/src/envelope/wrap_danilov_envelope_tracker.hh new file mode 100644 index 00000000..ae52db5e --- /dev/null +++ b/src/envelope/wrap_danilov_envelope_tracker.hh @@ -0,0 +1,18 @@ +#ifndef WRAP_DANILOV_ENVELOPE_TRACKER_H +#define WRAP_DANILOV_ENVELOPE_TRACKER_H + +#include "Python.h" + +#ifdef __cplusplus +extern "C" { +#endif + +namespace wrap_envelope { +void initDanilovEnvelopeTracker(PyObject *module); +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/envelope/wrap_envelope.cc b/src/envelope/wrap_envelope.cc new file mode 100644 index 00000000..1e6d8468 --- /dev/null +++ b/src/envelope/wrap_envelope.cc @@ -0,0 +1,38 @@ +#include + +#include "orbit_mpi.hh" +#include "pyORBIT_Object.hh" + +#include "wrap_bunch.hh" +#include "wrap_danilov_envelope_tracker.hh" +#include "wrap_envelope.hh" +#include "wrap_kv_envelope_tracker.hh" + +namespace wrap_envelope { + +#ifdef __cplusplus +extern "C" { +#endif + +static PyMethodDef envelopeModuleMethods[] = {{NULL, NULL}}; + +static struct PyModuleDef cModPyDem = { + PyModuleDef_HEAD_INIT, + "envelope", + "Beam envelope solvers", + -1, + envelopeModuleMethods +}; + +PyMODINIT_FUNC initenvelope() { + PyObject *module = PyModule_Create(&cModPyDem); + wrap_envelope::initKVEnvelopeTracker(module); + wrap_envelope::initDanilovEnvelopeTracker(module); + return module; +} + +#ifdef __cplusplus +} +#endif + +} // namespace wrap_envelope diff --git a/src/envelope/wrap_envelope.hh b/src/envelope/wrap_envelope.hh new file mode 100644 index 00000000..fd7663ca --- /dev/null +++ b/src/envelope/wrap_envelope.hh @@ -0,0 +1,18 @@ +#ifndef WRAP_ENVELOPE_MODULE_HH_ +#define WRAP_ENVELOPE_MODULE_HH_ + +#include "Python.h" + +#ifdef __cplusplus +extern "C" { +#endif + +namespace wrap_envelope { +PyMODINIT_FUNC initenvelope(); +} + +#ifdef __cplusplus +} +#endif + +#endif /*WRAP_ENVELOPE_MODULE_HH_*/ diff --git a/src/envelope/wrap_kv_envelope_tracker.cc b/src/envelope/wrap_kv_envelope_tracker.cc new file mode 100644 index 00000000..cd5406f8 --- /dev/null +++ b/src/envelope/wrap_kv_envelope_tracker.cc @@ -0,0 +1,197 @@ +#include + +#include "orbit_mpi.hh" +#include "pyORBIT_Object.hh" + +#include "KVEnvelopeTracker.hh" +#include "wrap_bunch.hh" +#include "wrap_envelope.hh" +#include "wrap_kv_envelope_tracker.hh" + +namespace wrap_envelope { + +#ifdef __cplusplus +extern "C" { +#endif + +// Constructor for Python class wrapping KVEnvelopeTracker instance. +// It never will be called directly. +static PyObject *KVEnvelopeTracker_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { + pyORBIT_Object *self; + self = (pyORBIT_Object *)type->tp_alloc(type, 0); + self->cpp_obj = NULL; + return (PyObject *)self; +} + +// Initialization of Python KVEnvelopeTracker class. +// This is implementation of the __init__ method. +static int KVEnvelopeTracker_init(pyORBIT_Object *self, PyObject *args, PyObject *kwds) { + double perveance = 0.0; + double eps_x = 1.0; + double eps_y = 1.0; + self->cpp_obj = new KVEnvelopeTracker(perveance, eps_x, eps_y); + ((KVEnvelopeTracker *)self->cpp_obj)->setPyWrapper((PyObject *)self); + return 0; +} + +// Method: trackBunch(Bunch* bunch, double length) +static PyObject *KVEnvelopeTracker_trackBunch(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + PyObject *pyBunch; + double length; + if (!PyArg_ParseTuple(args, "Od:trackBunch", &pyBunch, &length)) { + ORBIT_MPI_Finalize("PyKVEnvelopeTracker - trackBunch(Bunch* bunch, double length) - parameters are needed."); + } + PyObject *pyORBIT_Bunch_Type = wrap_orbit_bunch::getBunchType("Bunch"); + if (!PyObject_IsInstance(pyBunch, pyORBIT_Bunch_Type)) { + ORBIT_MPI_Finalize("PyKVEnvelopeTracker - trackBunch(Bunch* bunch, double length) - first parameter should be Bunch."); + } + Bunch *cpp_bunch = (Bunch *)((pyORBIT_Object *)pyBunch)->cpp_obj; + cpp_KVEnvelopeTracker->trackBunch(cpp_bunch, length); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: setPerveance(double perveance) +static PyObject *KVEnvelopeTracker_setPerveance(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double perveance; + if (!PyArg_ParseTuple(args, "d:setPerveance", &perveance)) { + ORBIT_MPI_Finalize("PyKVEnvelopeTracker - setPerveance(double perveance) - parameters are needed."); + } + cpp_KVEnvelopeTracker->setPerveance(perveance); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: setEmittanceX(double emittance_x) +static PyObject *KVEnvelopeTracker_setEmittanceX(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double emittance_x; + if (!PyArg_ParseTuple(args, "d:setEmittanceX", &emittance_x)) { + ORBIT_MPI_Finalize("PyKVEnvelopeTracker - setEmittanceX(double emittance_x) - parameters are needed."); + } + cpp_KVEnvelopeTracker->setEmittanceX(emittance_x); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: setEmittanceY(double emittance_y) +static PyObject *KVEnvelopeTracker_setEmittanceY(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double emittance_y; + if (!PyArg_ParseTuple(args, "d:setEmittanceY", &emittance_y)) { + ORBIT_MPI_Finalize("PyKVEnvelopeTracker - setEmittanceY(double emittance_y) - parameters are needed."); + } + cpp_KVEnvelopeTracker->setEmittanceY(emittance_y); + Py_INCREF(Py_None); + return Py_None; +} + +// Method: getPerveance() +static PyObject *KVEnvelopeTracker_getPerveance(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double perveance = cpp_KVEnvelopeTracker->getPerveance(); + return Py_BuildValue("d", perveance); +} + +// Method: getEmittanceX() +static PyObject *KVEnvelopeTracker_getEmittanceX(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double eps_x = cpp_KVEnvelopeTracker->getEmittanceX(); + return Py_BuildValue("d", eps_x); +} + +// Method: getEmittanceY() +static PyObject *KVEnvelopeTracker_getEmittanceY(PyObject *self, PyObject *args) { + pyORBIT_Object *pyKVEnvelopeTracker = (pyORBIT_Object *)self; + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)pyKVEnvelopeTracker->cpp_obj; + double eps_y = cpp_KVEnvelopeTracker->getEmittanceY(); + return Py_BuildValue("d", eps_y); +} + +// Destructor for python KVEnvelopeTracker class (__del__ method) +static void KVEnvelopeTracker_del(pyORBIT_Object *self) { + KVEnvelopeTracker *cpp_KVEnvelopeTracker = (KVEnvelopeTracker *)self->cpp_obj; + delete cpp_KVEnvelopeTracker; + self->ob_base.ob_type->tp_free((PyObject *)self); +} + +// Definition of Python KVEnvelopeTracker wrapper class methods. +// They will be available from the Python level. +static PyMethodDef KVEnvelopeTrackerClassMethods[] = { + {"getEmittanceX", KVEnvelopeTracker_getEmittanceX, METH_VARARGS, "Get emittance (x)."}, + {"getEmittanceY", KVEnvelopeTracker_getEmittanceY, METH_VARARGS, "Get emittance (y)."}, + {"getPerveance", KVEnvelopeTracker_getPerveance, METH_VARARGS, "Get space charge perveance."}, + {"setEmittanceX", KVEnvelopeTracker_setEmittanceX, METH_VARARGS, "Set emittance (x)."}, + {"setEmittanceY", KVEnvelopeTracker_setEmittanceY, METH_VARARGS, "Set emittance (y)."}, + {"setPerveance", KVEnvelopeTracker_setPerveance, METH_VARARGS, "Set space charge perveance."}, + {"trackBunch", KVEnvelopeTracker_trackBunch, METH_VARARGS, "Apply space charge kick to beam envelope."}, + {NULL} +}; + +// Definition of Python KVEnvelopeTracker wrapper class members. +// They will be available from the Python level. +static PyMemberDef KVEnvelopeTrackerClassMembers[] = {{NULL}}; + +// New python KVEnvelopeTracker wrapper type definition. +static PyTypeObject pyORBIT_KVEnvelopeTracker_Type = { + PyVarObject_HEAD_INIT(NULL, 0) "KVEnvelopeTracker", /*tp_name*/ + sizeof(pyORBIT_Object), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)KVEnvelopeTracker_del, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/ + "The KVEnvelopeTracker python wrapper", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + KVEnvelopeTrackerClassMethods, /* tp_methods */ + KVEnvelopeTrackerClassMembers, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)KVEnvelopeTracker_init, /* tp_init */ + 0, /* tp_alloc */ + KVEnvelopeTracker_new, /* tp_new */ +}; + +// Initialization function of the pyKVEnvelopeTracker class +void initKVEnvelopeTracker(PyObject *module) { + if (PyType_Ready(&pyORBIT_KVEnvelopeTracker_Type) < 0) { + return; + } + Py_INCREF(&pyORBIT_KVEnvelopeTracker_Type); + PyModule_AddObject(module, "KVEnvelopeTracker", (PyObject *)&pyORBIT_KVEnvelopeTracker_Type); +} + +#ifdef __cplusplus +} +#endif + +} // namespace wrap_envelope diff --git a/src/envelope/wrap_kv_envelope_tracker.hh b/src/envelope/wrap_kv_envelope_tracker.hh new file mode 100644 index 00000000..2c505d72 --- /dev/null +++ b/src/envelope/wrap_kv_envelope_tracker.hh @@ -0,0 +1,18 @@ +#ifndef WRAP_KV_ENVELOPE_TRACKER_H +#define WRAP_KV_ENVELOPE_TRACKER_H + +#include "Python.h" + +#ifdef __cplusplus +extern "C" { +#endif + +namespace wrap_envelope { +void initKVEnvelopeTracker(PyObject *module); +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/meson.build b/src/meson.build index 841fc403..2242ea83 100644 --- a/src/meson.build +++ b/src/meson.build @@ -246,7 +246,12 @@ sources = files([ 'teapot/wrap_teapotbase.cc', 'teapot/wrap_matrix_generator.cc', 'teapot/teapotbase.cc', - 'teapot/MatrixGenerator.cc' + 'teapot/MatrixGenerator.cc', + 'envelope/KVEnvelopeTracker.cc', + 'envelope/DanilovEnvelopeTracker.cc', + 'envelope/wrap_kv_envelope_tracker.cc', + 'envelope/wrap_danilov_envelope_tracker.cc', + 'envelope/wrap_envelope.cc', ]) inc = include_directories([ python.get_variable('INCLUDEPY', ''), @@ -276,8 +281,8 @@ inc = include_directories([ 'orbit/BunchDiagnostics', 'orbit', 'utils/integration', - 'orbit/Apertures' - + 'orbit/Apertures', + 'envelope', ]) @@ -427,3 +432,12 @@ python.extension_module('error_base', install: true, subdir: 'orbit/core', ) + +python.extension_module('envelope', + sources: [base + '/envelope_init.cc'], + include_directories: inc, + cpp_args: ['-fPIC', '-std=c++11'], + dependencies: [core_dep], + install: true, + subdir: 'orbit/core', +)