diff --git a/asyncpp/optim/__init__.py b/asyncpp/optim/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/asyncpp/optim/optimizer.py b/asyncpp/optim/optimizer.py index 8db279e..ca564aa 100644 --- a/asyncpp/optim/optimizer.py +++ b/asyncpp/optim/optimizer.py @@ -84,7 +84,7 @@ def append_to_queue(self, data): def get_from_queue(self, index): if self.save_dir is not None: fname = self.queue[index] - d = torch.load(fname) + d = torch.load(fname, weights_only=False) return d["state_dicts"], d["version"] else: return self.queue[index] diff --git a/asyncpp/runtime/__init__.py b/asyncpp/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/asyncpp/runtime/threadsafe_queue.py b/asyncpp/runtime/threadsafe_queue.py index fae3c10..7c32775 100644 --- a/asyncpp/runtime/threadsafe_queue.py +++ b/asyncpp/runtime/threadsafe_queue.py @@ -2,13 +2,14 @@ # Licensed under the MIT license. import threading +from collections import deque """ Implementation of a thread-safe queue with one producer and one consumer. """ class Queue: def __init__(self): - self.queue = [] + self.queue = deque() self.cv = threading.Condition() def add(self, tensor): @@ -21,6 +22,6 @@ def remove(self): self.cv.acquire() while len(self.queue) == 0: self.cv.wait() - tensor = self.queue.pop(0) + tensor = self.queue.popleft() self.cv.release() return tensor diff --git a/examples/models/__init__.py b/examples/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/pp_diloco_async.py b/examples/pp_diloco_async.py index 265a6a7..b6c6fd1 100644 --- a/examples/pp_diloco_async.py +++ b/examples/pp_diloco_async.py @@ -400,7 +400,7 @@ def main(args): parser.add_argument('--eval_iters', type=int, default=25, help='Number of evaluation iterations') parser.add_argument('--ckpt_interval', type=int, default=1000, help='Checkpoint interval') parser.add_argument('--diloco_interval', type=int, default=10000000, help='Diloco interval') # disable diloco by default - parser.add_argument('--cosine_anneal', type=bool, default=True, help='Use cosine annealing') + parser.add_argument('--cosine_anneal', type=lambda v: v.lower() in ('true', '1', 'yes'), default=True, help='Use cosine annealing') parser.add_argument('--warmup_steps', type=int, default=3000, help='Number of warmup steps') parser.add_argument('--max_local_step', type=int, default=30000, help='Maximum local step') parser.add_argument('--wandb_project', type=str, default=None, help='WandB project name') @@ -409,10 +409,10 @@ def main(args): parser.add_argument('--wandb_name', type=str, default=None, help='WandB name') parser.add_argument('--eval_interval', type=int, default=1000, help='Evaluation interval') parser.add_argument('--num_microbatches', type=int, default=1, help='Number of microbatches') - parser.add_argument('--deterministic', type=bool, default=False, help='Deterministic training') + parser.add_argument('--deterministic', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Deterministic training') parser.add_argument('--max_norm', type=float, default=1.0, help='Maximum norm') parser.add_argument('--num_inner_steps', type=int, default=1000, help='Number of inner steps') - parser.add_argument('--adaptive_momentum', type=bool, default=False, help='Use adaptive momentum') + parser.add_argument('--adaptive_momentum', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Use adaptive momentum') parser.add_argument('--optimizer', type=str, default="nadamw", help='Optimizer class') parser.add_argument('--backend', type=str, default="gloo", help='Backend') parser.add_argument('--sparta_interval', type=int, default=1, help='Sparta interval') @@ -420,13 +420,13 @@ def main(args): parser.add_argument('--sparta_method', type=str, default='avg', help='Sparta method') parser.add_argument('--sparta_lambda', type=float, default=1.0, help='Sparta lambda') parser.add_argument('--sparta_momentum', type=float, default=0.5, help='Sparta momentum') - parser.add_argument('--sparta_nesterov', type=bool, default=False, help='Sparta nesterov') - parser.add_argument('--sparta_adaptive_momentum', type=bool, default=True, help='Sparta adaptive momentum') + parser.add_argument('--sparta_nesterov', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Sparta nesterov') + parser.add_argument('--sparta_adaptive_momentum', type=lambda v: v.lower() in ('true', '1', 'yes'), default=True, help='Sparta adaptive momentum') parser.add_argument('--sparta_warmup_steps', type=int, default=1000, help='Number of warmup steps') parser.add_argument("--instance_id", type=int, default=0, help="Instance ID") parser.add_argument("--num_nodes_per_instance", type=int, default=None, help="Number of nodes per instance") parser.add_argument('--master_addr', type=str, default="127.0.0.1", help='Master address for distributed training') - parser.add_argument('--buffer_to_cpu', type=bool, default=False, help='Buffer to CPU') + parser.add_argument('--buffer_to_cpu', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Buffer to CPU') args = parser.parse_args() main(args) diff --git a/examples/pp_diloco_sync.py b/examples/pp_diloco_sync.py index f387e14..26c7aa5 100644 --- a/examples/pp_diloco_sync.py +++ b/examples/pp_diloco_sync.py @@ -342,7 +342,7 @@ def main(args): parser.add_argument('--eval_iters', type=int, default=25, help='Number of evaluation iterations') parser.add_argument('--ckpt_interval', type=int, default=1000, help='Checkpoint interval') parser.add_argument('--diloco_interval', type=int, default=10000000, help='Diloco interval') # disable diloco by default - parser.add_argument('--cosine_anneal', type=bool, default=True, help='Use cosine annealing') + parser.add_argument('--cosine_anneal', type=lambda v: v.lower() in ('true', '1', 'yes'), default=True, help='Use cosine annealing') parser.add_argument('--warmup_steps', type=int, default=3000, help='Number of warmup steps') parser.add_argument('--max_local_step', type=int, default=30000, help='Maximum local step') parser.add_argument('--wandb_project', type=str, default=None, help='WandB project name') @@ -350,7 +350,7 @@ def main(args): parser.add_argument('--async_sparta_delay', type=int, default=0, help='Async Sparta delay') parser.add_argument('--wandb_name', type=str, default=None, help='WandB name') parser.add_argument('--eval_interval', type=int, default=1000, help='Evaluation interval') - parser.add_argument('--deterministic', type=bool, default=False, help='Deterministic training') + parser.add_argument('--deterministic', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Deterministic training') parser.add_argument('--max_norm', type=float, default=1.0, help='Maximum norm') parser.add_argument('--num_inner_steps', type=int, default=1000, help='Number of inner steps') parser.add_argument('--optimizer', type=str, default="adamw", help='Optimizer class') @@ -360,13 +360,13 @@ def main(args): parser.add_argument('--sparta_method', type=str, default='avg', help='Sparta method') parser.add_argument('--sparta_lambda', type=float, default=1.0, help='Sparta lambda') parser.add_argument('--sparta_momentum', type=float, default=0.5, help='Sparta momentum') - parser.add_argument('--sparta_nesterov', type=bool, default=False, help='Sparta nesterov') - parser.add_argument('--sparta_adaptive_momentum', type=bool, default=True, help='Sparta adaptive momentum') + parser.add_argument('--sparta_nesterov', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Sparta nesterov') + parser.add_argument('--sparta_adaptive_momentum', type=lambda v: v.lower() in ('true', '1', 'yes'), default=True, help='Sparta adaptive momentum') parser.add_argument('--sparta_warmup_steps', type=int, default=1000, help='Number of warmup steps') parser.add_argument("--instance_id", type=int, default=0, help="Instance ID") parser.add_argument("--num_nodes_per_instance", type=int, default=None, help="Number of nodes per instance") parser.add_argument('--master_addr', type=str, default="127.0.0.1", help='Master address for distributed training') - parser.add_argument('--buffer_to_cpu', type=bool, default=False, help='Buffer to CPU') + parser.add_argument('--buffer_to_cpu', type=lambda v: v.lower() in ('true', '1', 'yes'), default=False, help='Buffer to CPU') args = parser.parse_args() main(args) diff --git a/requirements.txt b/requirements.txt index 9751a55..b6b575b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,5 +11,5 @@ Pillow wandb transformers datasets -wandb +pyarrow huggingface_hub[hf_transfer] \ No newline at end of file diff --git a/sparta/setup.py b/sparta/setup.py index 13ec2aa..13b7ab8 100644 --- a/sparta/setup.py +++ b/sparta/setup.py @@ -133,7 +133,7 @@ def _get_stage_master(self): return self.pp_stage def _cleanup(self): - if self.rank == 0: + if self.rank == 0 and self.config.wandb_project: wandb.finish() if self.pbar: self.pbar.close() @@ -248,6 +248,5 @@ def _setup(self, rank: int): dist.barrier() def load_model(self, path): - self.master_model.load_state_dict(torch.load(path)) - for model in self.models: - model.load_state_dict(self.master_model.state_dict()) + self.master_model.load_state_dict(torch.load(path, weights_only=True)) + self.model.load_state_dict(self.master_model.state_dict())