From 15cc3540a1b63cf4965b0880927564dbfbe6531d Mon Sep 17 00:00:00 2001 From: Salvatore Ingala <6681844+bigspider@users.noreply.github.com> Date: Sun, 10 Dec 2023 00:14:10 +0100 Subject: [PATCH] Compatibility of type annotations with older python versions --- examples/ram/ram.py | 6 ++- examples/ram/ram_contracts.py | 3 +- examples/vault/vault.py | 7 +-- examples/vault/vault_contracts.py | 5 +- matt/__init__.py | 82 +++++++++++++++---------------- matt/argtypes.py | 22 ++++----- matt/environment.py | 3 +- matt/merkle.py | 12 ++--- matt/utils.py | 6 +-- test_utils/__init__.py | 3 +- 10 files changed, 78 insertions(+), 71 deletions(-) diff --git a/examples/ram/ram.py b/examples/ram/ram.py index cd5ef92..85e8047 100644 --- a/examples/ram/ram.py +++ b/examples/ram/ram.py @@ -1,4 +1,5 @@ import argparse +from ast import Tuple import json import os @@ -6,6 +7,7 @@ import logging import shlex import traceback +from typing import Dict, List from dotenv import load_dotenv @@ -63,7 +65,7 @@ def get_completions(self, document, complete_event): rpc_port = os.getenv("RPC_PORT", 18443) -def parse_outputs(output_strings: list[str]) -> list[tuple[str, int]]: +def parse_outputs(output_strings: List[str]) -> List[Tuple[str, int]]: """Parses a list of strings in the form "address:amount" into a list of (address, amount) tuples. Args: @@ -250,7 +252,7 @@ def script_main(script_filename: str): environment = Environment(rpc, manager, None, None, False) # map from known ctv hashes to the corresponding template (used for withdrawals) - ctv_templates: dict[bytes, CTransaction] = {} + ctv_templates: Dict[bytes, CTransaction] = {} if args.script: diff --git a/examples/ram/ram_contracts.py b/examples/ram/ram_contracts.py index a9907eb..889c65b 100644 --- a/examples/ram/ram_contracts.py +++ b/examples/ram/ram_contracts.py @@ -1,3 +1,4 @@ +from typing import List from matt.argtypes import BytesType, MerkleProofType from matt.btctools.script import OP_CAT, OP_CHECKCONTRACTVERIFY, OP_DUP, OP_ELSE, OP_ENDIF, OP_EQUAL, OP_EQUALVERIFY, OP_FROMALTSTACK, OP_IF, OP_NOTIF, OP_PICK, OP_ROLL, OP_ROT, OP_SHA256, OP_SWAP, OP_TOALTSTACK, OP_TRUE, CScript from matt import CCV_FLAG_CHECK_INPUT, NUMS_KEY, ClauseOutput, StandardClause, StandardAugmentedP2TR @@ -5,7 +6,7 @@ from matt.merkle import is_power_of_2, floor_lg class RAM(StandardAugmentedP2TR): - def __init__(self, size: list[bytes]): + def __init__(self, size: List[bytes]): assert is_power_of_2(size) self.size = size diff --git a/examples/vault/vault.py b/examples/vault/vault.py index 016f29a..cdb8adf 100644 --- a/examples/vault/vault.py +++ b/examples/vault/vault.py @@ -10,6 +10,7 @@ import logging import shlex import traceback +from typing import Dict, List, Tuple from dotenv import load_dotenv @@ -81,7 +82,7 @@ def segwit_addr_to_scriptpubkey(addr: str) -> bytes: ]) -def parse_outputs(output_strings: list[str]) -> list[tuple[str, int]]: +def parse_outputs(output_strings: List[str]) -> List[Tuple[str, int]]: """Parses a list of strings in the form "address:amount" into a list of (address, amount) tuples. Args: @@ -160,7 +161,7 @@ def execute_command(input_line: str): if not isinstance(items_idx, list) or len(set(items_idx)) != len(items_idx): raise ValueError("Invalid items") - spending_vaults: list[ContractInstance] = [] + spending_vaults: List[ContractInstance] = [] for idx in items_idx: if idx >= len(manager.instances): raise ValueError(f"No such instance: {idx}") @@ -343,7 +344,7 @@ def script_main(script_filename: str): print(f"Vault address: {V.get_address()}\n") # map from known ctv hashes to the corresponding template (used for withdrawals) - ctv_templates: dict[bytes, CTransaction] = {} + ctv_templates: Dict[bytes, CTransaction] = {} if args.script: diff --git a/examples/vault/vault_contracts.py b/examples/vault/vault_contracts.py index b5bebcd..adbcb7a 100644 --- a/examples/vault/vault_contracts.py +++ b/examples/vault/vault_contracts.py @@ -1,10 +1,11 @@ +from typing import Optional from matt.argtypes import BytesType, IntType, SignerType from matt.btctools.script import OP_CHECKCONTRACTVERIFY, OP_CHECKSEQUENCEVERIFY, OP_CHECKSIG, OP_CHECKTEMPLATEVERIFY, OP_DROP, OP_DUP, OP_SWAP, OP_TRUE, CScript from matt import CCV_FLAG_CHECK_INPUT, CCV_FLAG_DEDUCT_OUTPUT_AMOUNT, NUMS_KEY, ClauseOutput, ClauseOutputAmountBehaviour, OpaqueP2TR, StandardClause, StandardP2TR, StandardAugmentedP2TR class Vault(StandardP2TR): - def __init__(self, alternate_pk: bytes | None, spend_delay: int, recover_pk: bytes, unvault_pk: bytes): + def __init__(self, alternate_pk: Optional[bytes], spend_delay: int, recover_pk: bytes, unvault_pk: bytes): assert (alternate_pk is None or len(alternate_pk) == 32) and len(recover_pk) == 32 and len(unvault_pk) self.alternate_pk = alternate_pk @@ -88,7 +89,7 @@ def __init__(self, alternate_pk: bytes | None, spend_delay: int, recover_pk: byt class Unvaulting(StandardAugmentedP2TR): - def __init__(self, alternate_pk: bytes | None, spend_delay: int, recover_pk: bytes): + def __init__(self, alternate_pk: Optional[bytes], spend_delay: int, recover_pk: bytes): assert (alternate_pk is None or len(alternate_pk) == 32) and len(recover_pk) == 32 self.alternate_pk = alternate_pk diff --git a/matt/__init__.py b/matt/__init__.py index 9ec8fd8..156fd99 100644 --- a/matt/__init__.py +++ b/matt/__init__.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from enum import Enum from io import BytesIO -from typing import Callable, Optional +from typing import Callable, Dict, List, Optional, Tuple, Union from .argtypes import ArgType, SignerType from .btctools import script, key @@ -32,9 +32,9 @@ class ClauseOutputAmountBehaviour(Enum): @dataclass class ClauseOutput: - n: None | int + n: Optional[int] next_contract: AbstractContract # only StandardP2TR and StandardAugmentedP2TR are supported so far - next_data: None | bytes = None # only meaningful if c is augmented + next_data: Optional[bytes] = None # only meaningful if c is augmented next_amount: ClauseOutputAmountBehaviour = ClauseOutputAmountBehaviour.PRESERVE_OUTPUT def __repr__(self): @@ -46,13 +46,13 @@ def __init__(self, name: str, script: CScript): self.name = name self.script = script - def stack_elements_from_args(self, args: dict) -> list[bytes]: + def stack_elements_from_args(self, args: dict) -> List[bytes]: raise NotImplementedError - def next_outputs(self, args: dict) -> list[ClauseOutput]: + def next_outputs(self, args: dict) -> List[ClauseOutput]: raise NotImplementedError - def args_from_stack_elements(self, elements: list[bytes]) -> dict: + def args_from_stack_elements(self, elements: List[bytes]) -> dict: raise NotImplementedError def __repr__(self): @@ -65,7 +65,7 @@ def __repr__(self): # Other types of generic treatable clauses could be defined (for example, a MiniscriptClause). # Moreover, it specifies a function that converts the arguments of the clause, to the data of the next output. class StandardClause(Clause): - def __init__(self, name: str, script: CScript, arg_specs: list[tuple[str, ArgType]], next_output_fn: Callable[[dict], list[ClauseOutput] | CTransaction] | None = None): + def __init__(self, name: str, script: CScript, arg_specs: List[Tuple[str, ArgType]], next_output_fn: Optional[Callable[[dict], Union[List[ClauseOutput], CTransaction]]] = None): super().__init__(name, script) self.arg_specs = arg_specs @@ -73,14 +73,14 @@ def __init__(self, name: str, script: CScript, arg_specs: list[tuple[str, ArgTyp - def next_outputs(self, args: dict) -> list[ClauseOutput] | CTransaction: + def next_outputs(self, args: dict) -> Union[List[ClauseOutput], CTransaction]: if self.next_outputs_fn is not None: return self.next_outputs_fn(args) else: return [] - def stack_elements_from_args(self, args: dict) -> list[bytes]: - result: list[bytes] = [] + def stack_elements_from_args(self, args: dict) -> List[bytes]: + result: List[bytes] = [] for arg_name, arg_cls in self.arg_specs: if arg_name not in args: raise ValueError(f"Missing argument: {arg_name}") @@ -90,7 +90,7 @@ def stack_elements_from_args(self, args: dict) -> list[bytes]: return result - def args_from_stack_elements(self, elements: list[bytes]) -> dict: + def args_from_stack_elements(self, elements: List[bytes]) -> dict: result = {} cur = 0 for arg_name, arg_cls in self.arg_specs: @@ -138,7 +138,7 @@ class P2TR(AbstractContract): A class representing a Pay-to-Taproot script. """ - def __init__(self, internal_pubkey: bytes, scripts: list[tuple[str, CScript]]): + def __init__(self, internal_pubkey: bytes, scripts: List[Tuple[str, CScript]]): assert len(internal_pubkey) == 32 self.internal_pubkey = internal_pubkey @@ -167,7 +167,7 @@ def __init__(self, naked_internal_pubkey: bytes): self.naked_internal_pubkey = naked_internal_pubkey - def get_scripts(self) -> list[tuple[str, CScript]]: + def get_scripts(self) -> List[Tuple[str, CScript]]: raise NotImplementedError("This must be implemented in subclasses") def get_taptree(self) -> bytes: @@ -190,15 +190,15 @@ class StandardP2TR(P2TR): A StandardP2TR where all the transitions are given by a StandardClause. """ - def __init__(self, internal_pubkey: bytes, clauses: list[StandardClause]): + def __init__(self, internal_pubkey: bytes, clauses: List[StandardClause]): super().__init__(internal_pubkey, list(map(lambda x: (x.name, x.script), clauses))) self.clauses = clauses self._clauses_dict = {clause.name: clause for clause in clauses} - def get_scripts(self) -> list[tuple[str, CScript]]: + def get_scripts(self) -> List[Tuple[str, CScript]]: return list(map(lambda clause: (clause.name, clause.script), self.clauses)) - def decode_wit_stack(self, stack_elems: list[bytes]) -> tuple[str, dict]: + def decode_wit_stack(self, stack_elems: List[bytes]) -> Tuple[str, dict]: leaf_hash = stack_elems[-2] clause_name = None @@ -220,15 +220,15 @@ class StandardAugmentedP2TR(AugmentedP2TR): An AugmentedP2TR where all the transitions are given by a StandardClause. """ - def __init__(self, naked_internal_pubkey: bytes, clauses: list[StandardClause]): + def __init__(self, naked_internal_pubkey: bytes, clauses: List[StandardClause]): super().__init__(naked_internal_pubkey) self.clauses = clauses self._clauses_dict = {clause.name: clause for clause in clauses} - def get_scripts(self) -> list[tuple[str, CScript]]: + def get_scripts(self) -> List[Tuple[str, CScript]]: return list(map(lambda clause: (clause.name, clause.script), self.clauses)) - def decode_wit_stack(self, data: bytes, stack_elems: list[bytes]) -> tuple[str, dict]: + def decode_wit_stack(self, data: bytes, stack_elems: List[bytes]) -> Tuple[str, dict]: leaf_hash = stack_elems[-2] clause_name = None @@ -252,7 +252,7 @@ def __repr__(self): # would include other info to help the signer decide (e.g.: the transaction) # There are no bad people here, though, so we keep it simple for now. class SchnorrSigner: - def __init__(self, keys: key.ExtendedKey | list[key.ExtendedKey]): + def __init__(self, keys: Union[key.ExtendedKey, List[key.ExtendedKey]]): if not isinstance(keys, list): keys = [keys] @@ -262,7 +262,7 @@ def __init__(self, keys: key.ExtendedKey | list[key.ExtendedKey]): self.keys = keys - def sign(self, msg: bytes, pubkey: bytes) -> bytes | None: + def sign(self, msg: bytes, pubkey: bytes) -> Optional[bytes]: if len(msg) != 32: raise ValueError("msg should be 32 bytes long") if len(pubkey) != 32: @@ -282,7 +282,7 @@ class ContractInstanceStatus(Enum): class ContractInstance: - def __init__(self, contract: StandardP2TR | StandardAugmentedP2TR): + def __init__(self, contract: Union[StandardP2TR, StandardAugmentedP2TR]): self.contract = contract self.data = None if not self.is_augm() else b'\0'*32 @@ -293,10 +293,10 @@ def __init__(self, contract: StandardP2TR | StandardAugmentedP2TR): self.last_height = 0 self.status = ContractInstanceStatus.ABSTRACT - self.outpoint: COutPoint | None = None - self.funding_tx: CTransaction | None = None + self.outpoint: Optional[COutPoint] = None + self.funding_tx: Optional[CTransaction] = None - self.spending_tx: CTransaction | None = None + self.spending_tx: Optional[CTransaction] = None self.spending_vin = None self.spending_clause = None @@ -323,7 +323,7 @@ def get_value(self) -> int: raise ValueError("contract not funded, or funding transaction unknown") return self.funding_tx.vout[self.outpoint.n].nValue - def decode_wit_stack(self, stack_elems: list[bytes]) -> tuple[str, dict]: + def decode_wit_stack(self, stack_elems: List[bytes]) -> Tuple[str, dict]: if self.is_augm(): return self.contract.decode_wit_stack(self.data, stack_elems) else: @@ -335,7 +335,7 @@ def __repr__(self): value = self.funding_tx.vout[self.outpoint.n].nValue return f"{self.__class__.__name__}(contract={self.contract}, data={self.data if self.data is None else self.data.hex()}, value={value}, status={self.status}, outpoint={self.outpoint})" - def __call__(self, clause_name: str, *, signer: Optional[SchnorrSigner] = None, outputs: list[CTxOut] = [], **kwargs) -> list['ContractInstance']: + def __call__(self, clause_name: str, *, signer: Optional[SchnorrSigner] = None, outputs: List[CTxOut] = [], **kwargs) -> List['ContractInstance']: if self.manager is None: raise ValueError("Direct invocation is only allowed after adding the instance to a ContractManager") @@ -346,13 +346,13 @@ def __call__(self, clause_name: str, *, signer: Optional[SchnorrSigner] = None, class ContractManager: - def __init__(self, contract_instances: list[ContractInstance], rpc: AuthServiceProxy, *, poll_interval: float = 1, mine_automatically: bool = False): + def __init__(self, contract_instances: List[ContractInstance], rpc: AuthServiceProxy, *, poll_interval: float = 1, mine_automatically: bool = False): self.instances = contract_instances self.mine_automatically = mine_automatically self.rpc = rpc self.poll_interval = poll_interval - def _check_instance(self, instance: ContractInstance, exp_statuses: None | ContractInstanceStatus | list[ContractInstanceStatus] = None): + def _check_instance(self, instance: ContractInstance, exp_statuses: Optional[Union[ContractInstanceStatus, List[ContractInstanceStatus]]] = None): if exp_statuses is not None: if isinstance(exp_statuses, ContractInstanceStatus): if instance.status != exp_statuses: @@ -371,7 +371,7 @@ def add_instance(self, instance: ContractInstance): instance.manager = self self.instances.append(instance) - def wait_for_outpoint(self, instance: ContractInstance, txid: str | None = None): + def wait_for_outpoint(self, instance: ContractInstance, txid: Optional[str] = None): self._check_instance(instance, exp_statuses=ContractInstanceStatus.ABSTRACT) if instance.is_augm(): if instance.data is None: @@ -394,15 +394,15 @@ def wait_for_outpoint(self, instance: ContractInstance, txid: str | None = None) def get_spend_tx( self, - spends: tuple[ContractInstance, str, dict] | list[tuple[ContractInstance, str, dict]], - output_amounts: dict[int, int] = {} - ) -> tuple[CTransaction, list[bytes]]: + spends: Union[Tuple[ContractInstance, str, dict], List[Tuple[ContractInstance, str, dict]]], + output_amounts: Dict[int, int] = {} + ) -> Tuple[CTransaction, List[bytes]]: if not isinstance(spends, list): spends = [spends] tx = CTransaction() tx.nVersion = 2 - outputs_map: dict[int, CTxOut] = {} + outputs_map: Dict[int, CTxOut] = {} tx.vin = [CTxIn(outpoint=instance.outpoint) for instance, _, _ in spends] @@ -464,7 +464,7 @@ def get_spend_tx( tx.vout = [outputs_map[i] for i in range(len(outputs_map))] # TODO: generalize for keypath spend? - sighashes: list[bytes] = [] + sighashes: List[bytes] = [] spent_utxos = [] # TODO: simplify @@ -501,11 +501,11 @@ def get_spend_wit(self, instance: ContractInstance, clause_name: str, wargs: dic ] return in_wit - def _mine_blocks(self, n_blocks: int = 1) -> list[str]: + def _mine_blocks(self, n_blocks: int = 1) -> List[str]: address = self.rpc.getnewaddress() return self.rpc.generatetoaddress(n_blocks, address) - def spend_and_wait(self, instances: ContractInstance | list[ContractInstance], tx: CTransaction) -> list[ContractInstance]: + def spend_and_wait(self, instances: Union[ContractInstance, List[ContractInstance]], tx: CTransaction) -> List[ContractInstance]: if isinstance(instances, ContractInstance): instances = [instances] @@ -520,11 +520,11 @@ def spend_and_wait(self, instances: ContractInstance | list[ContractInstance], t self._mine_blocks(1) return self.wait_for_spend(instances) - def wait_for_spend(self, instances: ContractInstance | list[ContractInstance]) -> list[ContractInstance]: + def wait_for_spend(self, instances: Union[ContractInstance, List[ContractInstance]]) -> List[ContractInstance]: if isinstance(instances, ContractInstance): instances = [instances] - out_contracts: dict[int, ContractInstance] = {} + out_contracts: Dict[int, ContractInstance] = {} for instance in instances: self._check_instance(instance, exp_statuses=ContractInstanceStatus.FUNDED) @@ -590,7 +590,7 @@ def wait_for_spend(self, instances: ContractInstance | list[ContractInstance]) - self.add_instance(instance) return result - def fund_instance(self, contract: StandardP2TR | StandardAugmentedP2TR, amount: int, data: Optional[bytes] = None) -> ContractInstance: + def fund_instance(self, contract: Union[StandardP2TR, StandardAugmentedP2TR], amount: int, data: Optional[bytes] = None) -> ContractInstance: """ Convenience method to create an instance of a contract, add it to the ContractManager, and send a transaction to fund it with a certain amount. @@ -609,7 +609,7 @@ def fund_instance(self, contract: StandardP2TR | StandardAugmentedP2TR, amount: self.wait_for_outpoint(instance, txid) return instance - def spend_instance(self, instance: ContractInstance, clause_name: str, args: dict, *, signer: Optional[SchnorrSigner], outputs: Optional[list[CTxOut]] = None) -> list[ContractInstance]: + def spend_instance(self, instance: ContractInstance, clause_name: str, args: dict, *, signer: Optional[SchnorrSigner], outputs: Optional[List[CTxOut]] = None) -> List[ContractInstance]: """ Creates and broadcasts a transaction that spends a contract instance using a specified clause and arguments. diff --git a/matt/argtypes.py b/matt/argtypes.py index 8a7f023..7e41022 100644 --- a/matt/argtypes.py +++ b/matt/argtypes.py @@ -1,4 +1,4 @@ -from typing import Any, Tuple +from typing import Any, List, Tuple from abc import ABC, abstractmethod from .merkle import MerkleProof @@ -13,7 +13,7 @@ class ArgType(ABC): contract clauses. Methods: - serialize_to_wit(self, value: Any) -> list[bytes]: + serialize_to_wit(self, value: Any) -> List[bytes]: Serializes the provided value into a format suitable for inclusion in a witness stack. This method must be overridden in subclasses. @@ -25,7 +25,7 @@ class ArgType(ABC): A list of one or more witness arguments, serialized in the format of the witness stack. - deserialize_from_wit(self, wit_stack: list[bytes]) -> Tuple[int, Any]: + deserialize_from_wit(self, wit_stack: List[bytes]) -> Tuple[int, Any]: Deserializes data from a witness stack into a Python object. This method must be overridden in subclasses. @@ -41,27 +41,27 @@ class ArgType(ABC): object depends on the subclass implementation. """ @abstractmethod - def serialize_to_wit(self, value: Any) -> list[bytes]: + def serialize_to_wit(self, value: Any) -> List[bytes]: raise NotImplementedError() @abstractmethod - def deserialize_from_wit(self, wit_stack: list[bytes]) -> Tuple[int, Any]: + def deserialize_from_wit(self, wit_stack: List[bytes]) -> Tuple[int, Any]: raise NotImplementedError() class IntType(ArgType): - def serialize_to_wit(self, value: int) -> list[bytes]: + def serialize_to_wit(self, value: int) -> List[bytes]: return [encode_wit_element(value)] - def deserialize_from_wit(self, wit_stack: list[bytes]) -> Tuple[int, int]: + def deserialize_from_wit(self, wit_stack: List[bytes]) -> Tuple[int, int]: return 1, vch2bn(wit_stack[0]) class BytesType(ArgType): - def serialize_to_wit(self, value: int) -> list[bytes]: + def serialize_to_wit(self, value: int) -> List[bytes]: return [encode_wit_element(value)] - def deserialize_from_wit(self, wit_stack: list[bytes]) -> Tuple[int, bytes]: + def deserialize_from_wit(self, wit_stack: List[bytes]) -> Tuple[int, bytes]: return 1, wit_stack[0] @@ -81,10 +81,10 @@ class MerkleProofType(ArgType): def __init__(self, depth: int): self.depth = depth - def serialize_to_wit(self, value: MerkleProof) -> list[bytes]: + def serialize_to_wit(self, value: MerkleProof) -> List[bytes]: return value.to_wit_stack() - def deserialize_from_wit(self, wit_stack: list[bytes]) -> Tuple[int, MerkleProof]: + def deserialize_from_wit(self, wit_stack: List[bytes]) -> Tuple[int, MerkleProof]: n_proof_elements = 2 * self.depth + 1 if len(wit_stack) < n_proof_elements: raise ValueError("Witness stack too short") diff --git a/matt/environment.py b/matt/environment.py index 0fc0830..2d12fee 100644 --- a/matt/environment.py +++ b/matt/environment.py @@ -1,4 +1,5 @@ +from typing import Optional from matt.btctools.auth_proxy import AuthServiceProxy from . import ContractManager @@ -11,7 +12,7 @@ def __init__(self, rpc: AuthServiceProxy, manager: ContractManager, host: str, p self.port = port self.interactive = interactive - def prompt(self, message: str | None = None): + def prompt(self, message: Optional[str] = None): if message is not None: print(message) if self.interactive: diff --git a/matt/merkle.py b/matt/merkle.py index 9c78f46..edcdf71 100644 --- a/matt/merkle.py +++ b/matt/merkle.py @@ -102,18 +102,18 @@ class MerkleProof: a certain element from the root, along with the value of the element itself. Attributes: - hashes (list[bytes]): A list of hashes (h_1, ..., h_n), each 32 bytes long. - directions (list[int]): A list of directions (d_1, ..., d_n), where each direction is either 0 (left) or 1 (right). + hashes (List[bytes]): A list of hashes (h_1, ..., h_n), each 32 bytes long. + directions (List[int]): A list of directions (d_1, ..., d_n), where each direction is either 0 (left) or 1 (right). x (bytes): An arbitrary bytes array. """ - def __init__(self, hashes: list[bytes], directions: list[int], x: bytes): + def __init__(self, hashes: List[bytes], directions: List[int], x: bytes): """ Initializes the MerkleProof with given hashes, directions, and an element x. Args: - hashes (list[bytes]): A list of 32-byte long hashes. - directions (list[int]): A list of directions, each being 0 or 1. + hashes (List[bytes]): A list of 32-byte long hashes. + directions (List[int]): A list of directions, each being 0 or 1. x (bytes): An arbitrary bytes array. """ if not all(isinstance(h, bytes) and len(h) == 32 for h in hashes): @@ -154,7 +154,7 @@ def to_wit_stack(self): """ return [encode_wit_element(t) for pair in zip(self.hashes, self.directions) for t in pair] + [self.x] - def from_wit_stack(wit_stack: list[bytes]): + def from_wit_stack(wit_stack: List[bytes]): """ Constructs a MerkleProof instance from a given witness stack. diff --git a/matt/utils.py b/matt/utils.py index 1fb3ebc..0669a57 100644 --- a/matt/utils.py +++ b/matt/utils.py @@ -1,5 +1,5 @@ from io import BytesIO -from typing import Optional, Tuple +from typing import List, Optional, Tuple, Union import time from .btctools.auth_proxy import AuthServiceProxy, JSONRPCException @@ -21,7 +21,7 @@ def vch2bn(s: bytes) -> int: return -v_abs if is_negative else v_abs -def encode_wit_element(x: bytes | int) -> bytes: +def encode_wit_element(x: Union[bytes, int]) -> bytes: if isinstance(x, int): return bn2vch(x) elif isinstance(x, bytes): @@ -175,7 +175,7 @@ def addr_to_script(addr: str) -> bytes: ]) -def make_ctv_template(outputs: list[(bytes|str, int)], *, nVersion: int = 2, nSequence: int = 0) -> CTransaction: +def make_ctv_template(outputs: List[(Union[bytes, str, int])], *, nVersion: int = 2, nSequence: int = 0) -> CTransaction: tmpl = CTransaction() tmpl.nVersion = nVersion tmpl.vin = [CTxIn(nSequence=nSequence)] diff --git a/test_utils/__init__.py b/test_utils/__init__.py index 3fc8c73..87dae75 100644 --- a/test_utils/__init__.py +++ b/test_utils/__init__.py @@ -1,7 +1,8 @@ +from typing import List from matt.btctools.auth_proxy import AuthServiceProxy -def mine_blocks(rpc: AuthServiceProxy, n_blocks: int) -> list[str]: +def mine_blocks(rpc: AuthServiceProxy, n_blocks: int) -> List[str]: address = rpc.getnewaddress() return rpc.generatetoaddress(n_blocks, address)