client.py
from typing import List
import shutil
import datetime
import os
import subprocess
import tempfile
import json
from . import client_output
def format_command(cmd: List[str]) -> str:
# TODO the displayed command may not be 'shell' ready, for instance
# Michelson string parameters may requires additional quotes
color_code = '\033[34m'
endc = '\033[0m'
cmd_str = " ".join(cmd)
return f'{color_code}# {cmd_str}{endc}'
class Client:
"""Client to a Tezos node.
Manage the persistent client state and provides methods to call
tezos-client/tezos-admin-client commands, and return structured
representation of the client output.
The most generic method to call the client is `run`. It calls the client
with an arbitrary sequence of parameters and returns the stdout of the
command. `CalledProcessError` is raised if command fails.
Other commands such as `run_script`, `transfer`... are wrapper over `run`,
they set up the commands parameters and return a structured representation
of the client output.
TODO: - the set of methods isn't complete. To be added when needed... some
methods return client stdout instead of structured representation
- deal correctly with additional parameters in command wrappers
- this works for the current tests but should be more generic
"""
def __init__(self,
client_path: str,
admin_client_path: str,
host: str = '127.0.0.1',
base_dir: str = None,
rpc_port: int = 8732,
use_tls: int = False,
disable_disclaimer: bool = True):
"""
Args:
client (str): path to the client executable file
admin_client (str): path to the admin-client executable file
host (str): IP of the host
base_dir (str): path to the client dir. If None, a temp file is
created.
rpc_port (int): port of the server
use_tls (bool): use TLS
disable_disclaimer (bool): disable disclaimer
Returns:
A Client instance.
"""
assert os.path.isfile(client_path), f"{client_path} is not a file"
assert os.path.isfile(admin_client_path), (f"{admin_client_path} is "
f"not a file")
assert base_dir is None or os.path.isdir(base_dir), (f'{base_dir} not '
f'a dir')
self.host = host
self._disable_disclaimer = disable_disclaimer
self._is_tmp_dir = base_dir is None
if base_dir is None:
base_dir = tempfile.mkdtemp(prefix='tezos-client.')
assert base_dir
self.base_dir = base_dir
client = [client_path] + ['-base-dir', base_dir, '-addr', host,
'-port', str(rpc_port)]
admin_client = [admin_client_path, '-base-dir', base_dir, '-addr',
host, '-port', str(rpc_port)]
if use_tls:
client.append('-S')
admin_client.append('-S')
self._client = client
self._admin_client = admin_client
self.rpc_port = rpc_port
def run(self,
params: List[str],
admin: bool = False,
check: bool = True,
trace: bool = False) -> str:
"""Run an arbitrary command
Args:
params (list): list of parameters given to the tezos-client,
admin (bool): False to call tezos-client, True to call
tezos-admin-client
check (bool): raises an exception if client call fails
trace (bool): use '-l' option to trace RPCs
Returns:
stdout of client command.
The actual command will be displayed according to 'format_command'.
Client output (stdout, stderr) will be displayed unprocessed.
Fails with `CalledProcessError` if command fails
"""
client = self._admin_client if admin else self._client
trace_opt = ['-l'] if trace else []
cmd = client + trace_opt + params
print(format_command(cmd))
stdout = ""
new_env = os.environ.copy()
if self._disable_disclaimer:
new_env["TEZOS_CLIENT_UNSAFE_DISABLE_DISCLAIMER"] = "Y"
# in python3.7, cleaner to use capture_output=true, text=True
with subprocess.Popen(cmd,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
env=new_env) as process:
for line in process.stdout:
print(line, end='')
stdout += line
if check and process.returncode:
raise subprocess.CalledProcessError(process.returncode,
process.args)
return stdout
def rpc(self,
verb: str,
path: str,
data: dict = None,
params: List[str] = None) -> dict:
"""Run an arbitrary RPC command
Args:
verb (str): either `get`, `post` or `put`
path (str): rpc path
data (dict): json data for post
params (list): any additional parameters to pass to the client
Returns:
dict representing the json output, raise exception
if output isn't json.
See `run` for more details.
"""
assert verb in {'put', 'get', 'post'}
params = [] if params is None else params
params = params + ['rpc', verb, path]
if data is not None:
params = params + ['with', json.dumps(data)]
compl_pr = self.run(params)
return client_output.extract_rpc_answer(compl_pr)
def typecheck(self, contract: str) -> str:
assert os.path.isfile(contract), f'{contract} is not a file'
return self.run(['typecheck', 'script', contract])
def run_script(self,
contract: str,
storage: str,
inp: str,
amount: float = None) -> client_output.RunScriptResult:
assert os.path.isfile(contract), f'{contract} is not a file'
cmd = ['run', 'script', contract, 'on', 'storage', storage, 'and',
'input', inp]
if amount is not None:
cmd += ['-z', str(amount)]
return client_output.RunScriptResult(self.run(cmd))
def gen_key(self, alias: str, args: List[str] = None) -> str:
cmd = ['gen', 'keys', alias]
if args is None:
args = []
cmd += args
return self.run(cmd)
def import_secret_key(self, name: str, secret: str) -> str:
return self.run(['import', 'secret', 'key', name, secret])
def activate_protocol(self,
protocol: str,
parameter_file: str,
fitness: str = '1',
key: str = 'activator',
timestamp: str = None
) -> client_output.ActivationResult:
assert os.path.isfile(parameter_file), f'{parameter_file} not a file'
if timestamp is None:
utc_now = datetime.datetime.utcnow()
timestamp = utc_now.strftime("%Y-%m-%dT%H:%M:%SZ")
cmd = ['-block', 'genesis', 'activate', 'protocol', protocol, 'with',
'fitness', str(fitness), 'and', 'key', key, 'and', 'parameters',
parameter_file, '--timestamp', timestamp]
return client_output.ActivationResult(self.run(cmd))
def activate_protocol_json(self,
protocol: str,
parameters: dict,
fitness: str = '1',
key: str = 'activator',
timestamp: str = None
) -> client_output.ActivationResult:
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as params:
param_json = json.dumps(parameters)
params.write(param_json)
params.close()
return self.activate_protocol(protocol, params.name, fitness,
key, timestamp)
def show_voting_period(self) -> str:
return self.run(['show', 'voting', 'period'])
def ban_peer(self, port: int) -> dict:
return self.rpc('get', f'/network/points/127.0.0.1:{port}/ban')
def unban_peer(self, port: int) -> dict:
return self.rpc('get', f'/network/points/127.0.0.1:{port}/unban')
def trust_peer(self, port: int) -> dict:
return self.rpc('get', f'/network/points/127.0.0.1:{port}/trust')
def untrust_peer(self, port: int) -> dict:
return self.rpc('get', f'/network/points/127.0.0.1:{port}/untrust')
def endorse(self, account: str) -> client_output.EndorseResult:
res = self.run(['endorse', 'for', account])
return client_output.EndorseResult(res)
def bake(self,
account: str,
args: List[str] = None) -> client_output.BakeForResult:
cmd = ['bake', 'for', account]
if args is None:
args = []
cmd += args
return client_output.BakeForResult(self.run(cmd))
def originate(self,
contract_name: str,
amount: float,
sender: str,
contract: str,
args: List[str] = None) -> client_output.OriginationResult:
cmd = ['originate', 'contract', contract_name, 'transferring',
str(amount), 'from', sender, 'running', contract]
if args is None:
args = []
cmd += args
return client_output.OriginationResult(self.run(cmd))
def hash(self, data: str, typ: str) -> client_output.HashResult:
cmd = ['hash', 'data', data, 'of', 'type', typ]
return client_output.HashResult(self.run(cmd))
def pack(self, data: str, typ: str) -> str:
return self.hash(data, typ).packed
def sign(self, data: str, identity: str) -> str:
cmd = ['sign', 'bytes', data, 'for', identity]
return client_output.SignatureResult(self.run(cmd)).sig
def transfer(self,
amount: float,
account1: str,
account2: str,
args: List[str] = None) -> client_output.TransferResult:
cmd = ['transfer', str(amount), 'from', account1, 'to', account2]
if args is None:
args = []
cmd += args
res = self.run(cmd)
return client_output.TransferResult(res)
def set_delegate(self,
account1: str,
account2: str,
args: List[str] = None) -> client_output.TransferResult:
cmd = ['set', 'delegate', 'for', account1, 'to', account2]
if args is None:
args = []
cmd += args
res = self.run(cmd)
return client_output.SetDelegateResult(res)
def get_delegate(self,
account1: str,
args: List[str] = None) -> client_output.TransferResult:
cmd = ['get', 'delegate', 'for', account1]
if args is None:
args = []
cmd += args
res = self.run(cmd)
return client_output.GetDelegateResult(res).delegate
def withdraw_delegate(
self,
account1: str,
args: List[str] = None) -> client_output.TransferResult:
cmd = ['withdraw', 'delegate', 'from', account1]
if args is None:
args = []
cmd += args
res = self.run(cmd)
return res
def p2p_stat(self) -> str:
return self.run(['p2p', 'stat'], admin=True)
def get_balance(self, account) -> float:
res = self.run(['get', 'balance', 'for', account])
return client_output.extract_balance(res)
def get_mutez_balance(self, account) -> float:
res = self.run(['get', 'balance', 'for', account])
return int(client_output.extract_balance(res)*1000000)
def get_receipt(self,
operation: str,
args: List[str] = None) -> client_output.GetReceiptResult:
cmd = ['get', 'receipt', 'for', operation]
if args is None:
args = []
cmd += args
return client_output.GetReceiptResult(self.run(cmd))
def get_prevalidator(self) -> dict:
return self.rpc('get', '/workers/prevalidators')
def get_mempool(self) -> dict:
return self.rpc('get', '/chains/main/mempool/pending_operations')
def mempool_is_empty(self) -> bool:
rpc_res = self.rpc('get', '/chains/main/mempool/pending_operations')
return rpc_res['applied'] == [] and \
rpc_res['refused'] == [] and \
rpc_res['branch_refused'] == [] and \
rpc_res['branch_delayed'] == [] and \
rpc_res['unprocessed'] == []
def get_head(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head')
def get_block(self, block_hash) -> dict:
return self.rpc('get', f'/chains/main/blocks/{block_hash}')
def get_ballot_list(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head/votes/ballot_list')
def get_ballots(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head/votes/ballots')
def get_current_period_kind(self) -> dict:
return self.rpc('get',
'chains/main/blocks/head/votes/current_period_kind')
def get_current_proposal(self) -> dict:
return self.rpc('get',
'/chains/main/blocks/head/votes/current_proposal')
def get_current_quorum(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head/votes/current_quorum')
def get_listings(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head/votes/listings')
def get_proposals(self) -> dict:
return self.rpc('get', '/chains/main/blocks/head/votes/proposals')
def get_protocol(self, params: List[str] = None) -> str:
rpc_res = self.rpc('get', '/chains/main/blocks/head/metadata',
params=params)
return rpc_res['protocol']
def get_period_position(self) -> str:
rpc_res = self.rpc(
'get', '/chains/main/blocks/head/helpers/current_level?offset=1')
return rpc_res['voting_period_position']
def get_level(self, params: List[str] = None) -> int:
rpc_res = self.rpc('get', '/chains/main/blocks/head/header/shell',
params=params)
return int(rpc_res['level'])
def wait_for_inclusion(self,
operation_hash: str,
branch: str = None,
args=None) -> client_output.WaitForResult:
cmd = ['wait', 'for', operation_hash, 'to', 'be', 'included']
cmd += ['--check-previous', '2']
if branch is not None:
cmd += ['--branch', branch]
if args is None:
args = []
cmd += args
return client_output.WaitForResult(self.run(cmd))
def inject_protocol(self, proto) -> str:
return self.run(['inject', 'protocol', proto], admin=True)
def list_protocols(self) -> List[str]:
cmd = ['list', 'protocols']
return client_output.extract_protocols(self.run(cmd, admin=True))
def submit_proposals(self,
account: str,
protos: List[str]
) -> client_output.SubmitProposalsResult:
cmd = ['submit', 'proposals', 'for', account] + protos
return client_output.SubmitProposalsResult(self.run(cmd))
def submit_ballot(self,
account: str,
proto: str,
vote: str) -> str:
return self.run(['submit', 'ballot', 'for', account, proto, vote])
def bootstrapped(self) -> str:
return self.run(['bootstrapped'])
def cleanup(self) -> None:
"""Remove base dir, only if not provided by user."""
if self._is_tmp_dir:
shutil.rmtree(self.base_dir)