From 8a9666248ad3c00cfb990f75b3ccdad9ba64c8b5 Mon Sep 17 00:00:00 2001 From: Wutche Date: Sun, 6 Jul 2025 16:41:26 -0500 Subject: [PATCH] done with week 3 exercise --- main.py | 224 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 132 insertions(+), 92 deletions(-) diff --git a/main.py b/main.py index 0925a78..5c931ad 100644 --- a/main.py +++ b/main.py @@ -25,17 +25,18 @@ def encode(self, value: int) -> bytes: Raises: ValueError: If the value is negative or exceeds u64 max. """ - # TODO: Implement the CompactSize encoding logic here. - # 1. Add validation for `value`: must be a non-negative integer and fit within u64 (0 to 18446744073709551615). - # Raise ValueError for invalid inputs. - # 2. Use `if/elif/else` to check the `value` range against 0xFD, 0xFFFF, 0xFFFFFFFF. - # 3. For each range, prepend the correct prefix byte (0xFD, 0xFE, 0xFF) if necessary. - # 4. Convert the `value` to bytes using `.to_bytes()` with `length` and `byteorder='little'`. - # - For single byte, no prefix needed. - # - For 2-byte, use 2 for length. - # - For 4-byte, use 4 for length. - # - For 8-byte, use 8 for length. - pass + if not isinstance(value, int) or value < 0: + raise ValueError("Value must be a non-negative integer.") + if value < 0xFD: + return value.to_bytes(1, 'little') + elif value <= 0xFFFF: + return b'\xFD' + value.to_bytes(2, 'little') + elif value <= 0xFFFFFFFF: + return b'\xFE' + value.to_bytes(4, 'little') + elif value <= 0xFFFFFFFFFFFFFFFF: + return b'\xFF' + value.to_bytes(8, 'little') + else: + raise ValueError("Value too large for CompactSize encoding.") class CompactSizeDecoder: """ @@ -55,20 +56,26 @@ def decode(self, data: bytes) -> tuple[int, int]: Raises: ValueError: If data is too short or has an invalid prefix. """ - # TODO: Implement the CompactSize decoding logic here. - # 1. Check if `data` is empty. If so, raise ValueError ("Data is too short to decode CompactSize."). - # 2. Get the `first_byte` from `data[0]`. - # 3. Use `if/elif/else` to determine the length based on `first_byte`: - # - If `first_byte < 0xFD`: The value is the `first_byte` itself; 1 byte consumed. - # - If `first_byte == 0xFD`: Expect 2 more bytes. Check `len(data)` is at least 3. - # Convert `data[1:3]` to int using `int.from_bytes()` with `byteorder='little'`. 3 bytes consumed. - # - If `first_byte == 0xFE`: Expect 4 more bytes. Check `len(data)` is at least 5. - # Convert `data[1:5]` to int. 5 bytes consumed. - # - If `first_byte == 0xFF`: Expect 8 more bytes. Check `len(data)` is at least 9. - # Convert `data[1:9]` to int. 9 bytes consumed. - # 4. Raise ValueError for `data` being too short for the indicated prefix. - # 5. Return the decoded integer and the number of bytes consumed as a tuple. - pass + if not data: + raise ValueError("Data is too short to decode CompactSize.") + + first_byte = data[0] + if first_byte < 0xFD: + return first_byte, 1 + elif first_byte == 0xFD: + if len(data) < 3: + raise ValueError("Data too short") + return int.from_bytes(data[1:3], 'little'), 3 + elif first_byte == 0xFE: + if len(data) < 5: + raise ValueError("Data too short") + return int.from_bytes(data[1:5], 'little'), 5 + elif first_byte == 0xFF: + if len(data) < 9: + raise ValueError("Data too short") + return int.from_bytes(data[1:9], 'little'), 9 + else: + raise ValueError("Invalid CompactSize prefix.") class TransactionData: """ @@ -92,9 +99,14 @@ def add_input(self, tx_id: str, vout_index: int, script_sig: str, sequence: int script_sig (str): The unlocking script. sequence (int): The sequence number. """ - # TODO: Create a dictionary for the input and add to the `inputs` list. - # TODO: Add a print statement confirming the input was added. - pass + input_data = { + "prev_txid": tx_id, + "prev_vout": vout_index, + "script_sig": script_sig, + "sequence": sequence + } + self.inputs.append(input_data) + print(f"Added input: {input_data}") def add_output(self, value_satoshi: int, script_pubkey: str): """ @@ -104,9 +116,9 @@ def add_output(self, value_satoshi: int, script_pubkey: str): value_satoshi (int): The amount in satoshis. script_pubkey (str): The locking script. """ - # TODO: Create a tuple for the output and add to the `outputs` list. - # TODO: Add a print statement confirming the output was added. - pass + output = (value_satoshi, script_pubkey) + self.outputs.append(output) + print(f"Added output: {output}") def get_input_details(self) -> list[dict]: """ @@ -118,14 +130,14 @@ def get_input_details(self) -> list[dict]: """ detailed_inputs = [] print("\n--- Input Details (using for and enumerate) ---") - # TODO: Iterate through `self.inputs` using a `for` loop with `enumerate` to get both index and input_data. - # TODO: Inside the loop, print the input index. - # TODO: Use multiple assignment/dictionary unpacking (e.g., `input_data.get(...)`) to extract - # `prev_txid`, `prev_vout`, and `script_sig` from `input_data`. - # TODO: Print these extracted details. - # TODO: Append a `copy` of the `input_data` dictionary to `detailed_inputs`. - # TODO: Return `detailed_inputs`. - pass + for idx, input_data in enumerate(self.inputs): + print(f"Input {idx}") + prev_txid = input_data.get("prev_txid") + prev_vout = input_data.get("prev_vout") + script_sig = input_data.get("script_sig") + print(f"TXID: {prev_txid}, VOUT: {prev_vout}, SCRIPT: {script_sig}") + detailed_inputs.append(input_data.copy()) + return detailed_inputs def summarize_outputs(self, min_value: int = 0) -> tuple[int, int]: """ @@ -142,19 +154,28 @@ def summarize_outputs(self, min_value: int = 0) -> tuple[int, int]: valid_outputs_count = 0 index = 0 print("\n--- Summarizing Outputs (using while, continue, break) ---") - # TODO: Use a `while` loop that continues as long as `index` is less than `len(self.outputs)`. - # TODO: Inside the loop, unpack the current `value` and `script` from `self.outputs[index]` using tuple unpacking. - # TODO: Implement a `continue` condition: - # - If `value` is not an integer or is negative, print a message and `continue` to the next iteration. - # TODO: Implement another `continue` condition: - # - If `value` is less than `min_value`, print a message and `continue` to the next iteration. - # TODO: If the output is valid, add `value` to `total_satoshi` and increment `valid_outputs_count`. - # TODO: Print details of the included output. - # TODO: Implement a `break` condition: - # - If `total_satoshi` exceeds a certain threshold (e.g., 1,000,000,000 satoshis), print a message and `break` out of the loop. - # TODO: Increment `index` at the end of each iteration (before `continue`/`break` checks). - # TODO: Return `(total_satoshi, valid_outputs_count)` as a tuple. - pass + + while index < len(self.outputs): + value, script = self.outputs[index] + if not isinstance(value, int) or value < 0: + print(f"Skipping invalid output at index {index}: {value}") + index += 1 + continue + if value < min_value: + print(f"Skipping output at index {index}: {value} < {min_value}") + index += 1 + continue + total_satoshi += value + valid_outputs_count += 1 + print(f"Including output {index}: {value} satoshis") + + if total_satoshi > 1000000000: # 1 billion satoshis + print(f"Total satoshis exceeded 1 Billion. Breaking summarization.") + break + index += 1 + return (total_satoshi, valid_outputs_count) + + def update_metadata(self, new_data: dict): """ @@ -163,25 +184,21 @@ def update_metadata(self, new_data: dict): Args: new_data (dict): A dictionary of new metadata to add/update. """ - # TODO: Using dict.update() to merge new_data into metadata - # TODO: Add a print statement showing the updated metadata. - pass + self.metadata.update(new_data) + print(f"Updated metadata: {self.metadata}") def get_metadata_value(self, key: str, default=None): """ Retrieves a value from metadata using dict.get(). """ - # TODO: Return the retrieved value. - pass + return self.metadata.get(key, default) def get_transaction_header(self) -> tuple: """ Returns core transaction header elements. Demonstrates simple tuple creation and returning. """ - # A simple tuple of header components - # TODO: Create and return a tuple containing `version`, `length of inputs`, `length of outputs`, and `lock_time`. - pass + return (self.version, len(self.inputs), len(self.outputs), self.lock_time) def set_transaction_header(self, version: int, num_inputs: int, num_outputs: int, lock_time: int): """ @@ -189,11 +206,8 @@ def set_transaction_header(self, version: int, num_inputs: int, num_outputs: int Note: num_inputs and num_outputs here are for demonstration of multiple assignment and wouldn't typically directly set list lengths in a real scenario. """ - # Multiple assignment for header elements - # TODO: Use multiple assignment to set `version`, and `lock_time`. - # You can use `_` for `num_inputs` and `num_outputs` if you don't intend to use them. - # TODO: Add a print statement confirming the attributes were set. - pass + self.version, self.lock_time = version, lock_time + print(f"Set header via multiple assignment: version={version}, lock_time={lock_time}") class UTXOSet: """ @@ -209,10 +223,9 @@ def add_utxo(self, tx_id: str, vout_index: int, amount: int): """ Adds a UTXO to the set. """ - # TODO: Create a UTXO tuple using tx_id, vout_index, amount. - # TODO: Add this tuple to the set. - # TODO: Add a print statement confirming the UTXO was added. - pass + utxo = (tx_id, vout_index, amount) + self.utxos.add(utxo) + print(f"Added UTXO: {utxo}") def remove_utxo(self, tx_id: str, vout_index: int, amount: int) -> bool: """ @@ -221,15 +234,19 @@ def remove_utxo(self, tx_id: str, vout_index: int, amount: int) -> bool: Returns: bool: True if removed, False otherwise. """ - # TODO: Create and remove the UTXO tuple from the set. - pass + utxo = (tx_id, vout_index, amount) + if utxo in self.utxos: + self.utxos.remove(utxo) + print(f"Removed UTXO: {utxo}") + return True + print(f"UTXO not found: {utxo}") + return False def get_balance(self) -> int: """ Calculates the total balance from all UTXOs in the set. """ - # TODO: Iterate through the utxos and return the total - pass + return sum(amount for _, _, amount in self.utxos) def find_sufficient_utxos(self, target_amount: int) -> set: """ @@ -242,8 +259,16 @@ def find_sufficient_utxos(self, target_amount: int) -> set: Returns: set: A set of UTXOs that fulfill the amount, or empty set if not possible. """ - # TODO: return set of UTXOs that fulfill the target_amount, or empty set if not possible. - pass + selected = set() + running_total = 0 + for utxo in sorted(self.utxos, key=lambda x: x[2]): + selected.add(utxo) + running_total += utxo[2] + if running_total >= target_amount: + print(f"Found sufficient UTXOs: {selected}") + return selected + print(f"Could not find sufficient UTXOs for target {target_amount}") + return set() def get_total_utxo_count(self) -> int: @@ -251,30 +276,30 @@ def get_total_utxo_count(self) -> int: Returns the number of UTXOs in the set. Demonstrates `len()` on a set. """ - # TODO: Return the length of the utxos set - pass + return len(self.utxos) def is_subset_of(self, other_utxo_set: 'UTXOSet') -> bool: """ Checks if this UTXO set is a subset of another. Demonstrates set.issubset(). """ - # TODO: check if is subset and return the result. - pass + return self.utxos.issubset(other_utxo_set.utxos) def combine_utxos(self, other_utxo_set: 'UTXOSet') -> 'UTXOSet': """ Combines two UTXO sets """ - # TODO: Return `combined_set`. - pass + combined = UTXOSet() + combined.utxos = self.utxos.union(other_utxo_set.utxos) + return combined def find_common_utxos(self, other_utxo_set: 'UTXOSet') -> 'UTXOSet': """ Finds UTXOs common to two sets using set.intersection(). """ - # TODO: Get the intersection of the two sets and Return the common_set - pass + common = UTXOSet() + common.utxos = self.utxos.intersection(other_utxo_set.utxos) + return common def generate_block_headers( prev_block_hash: str, @@ -300,12 +325,27 @@ def generate_block_headers( dict: A dictionary representing a potential block header, including the current nonce. """ print(f"\n--- Generating Block Headers (using generator) ---") - # TODO: Use a `while` loop that continues as long as `attempts < max_attempts`. - # TODO: Inside the loop, create a dictionary `header_data` with keys like "version", - # "prev_block_hash", "merkle_root", "timestamp", "bits", and the current "nonce". - # TODO: Simulate a hash calculation (e.g., using `hashlib.sha256` on a string representation of `header_data`). - # TODO: Print the current attempt, nonce, and simulated hash prefix. - # TODO: Use `yield header_data` to return the current header without exiting the function. - # TODO: Increment `nonce` and `attempts`. - # TODO: Add a conditional print statement (e.g., every 100 attempts) to show progress. - pass \ No newline at end of file + nonce = start_nonce + attempts = 0 + while attempts < max_attempts: + header_data = { + "version": 1, + "prev_block_hash": prev_block_hash, + "merkle_root": merkle_root, + "timestamp": timestamp, + "bits": bits, + "nonce": nonce + } + + header_str = str(header_data) + simulated_hash = hashlib.sha256(header_str.encode()).hexdigest() + + print(f"Attempt {attempts + 1}: Nonce {nonce}, Hash: {simulated_hash[:8]}...") + + yield header_data + + nonce += 1 + attempts += 1 + + if attempts % 100 == 0 and attempts > 0: + print(f"... {attempts} attempts made ...") \ No newline at end of file