Skip to content

Instantly share code, notes, and snippets.

@Jayke770
Created May 9, 2023 08:56
Show Gist options
  • Select an option

  • Save Jayke770/fc554d1e51072830b70bfbd37d000cf3 to your computer and use it in GitHub Desktop.

Select an option

Save Jayke770/fc554d1e51072830b70bfbd37d000cf3 to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
import asyncio
import aiohttp
import base64
from ton.sync import TonlibClient
from tvm_valuetypes import serialize_tvm_stack
from tonsdk.contract.wallet import WalletVersionEnum, Wallets, mnemonic_to_wallet_key
from tonsdk.utils import bytes_to_b64str, from_nano, to_nano, TonCurrencyEnum, Address, b64str_to_bytes
from tonsdk.crypto import mnemonic_new
from tonsdk.provider import ToncenterClient, SyncTonlibClient, prepare_address, address_state
from tonsdk.boc import Cell
from tonsdk.contract.token.ft import JettonWallet
wallet_version = WalletVersionEnum.v3r1
workchain = 0
class AbstractTonClient(ABC):
@abstractmethod
def _run(self, to_run, *, single_query=True):
raise NotImplemented
def get_address_information(self, address: str,
currency_to_show: TonCurrencyEnum = TonCurrencyEnum.ton):
return self.get_addresses_information([address], currency_to_show)[0]
def get_addresses_information(self, addresses,
currency_to_show: TonCurrencyEnum = TonCurrencyEnum.ton):
if not addresses:
return []
tasks = []
for address in addresses:
address = prepare_address(address)
tasks.append(self.provider.raw_get_account_state(address))
results = self._run(tasks, single_query=False)
for result in results:
result["state"] = address_state(result)
if "balance" in result:
if int(result["balance"]) < 0:
result["balance"] = 0
else:
result["balance"] = from_nano(
int(result["balance"]), currency_to_show)
return results
def seqno(self, addr: str):
addr = prepare_address(addr)
result = self._run(self.provider.raw_run_method(addr, "seqno", []))
if 'stack' in result and ('@type' in result and result['@type'] == 'smc.runResult'):
result['stack'] = serialize_tvm_stack(result['stack'])
return result
def send_boc(self, boc: Cell):
return self._run(self.provider.raw_send_message(boc))
class TonCenterTonClient(AbstractTonClient):
def __init__(self):
self.loop = asyncio.get_event_loop()
self.provider = ToncenterClient(base_url="https://testnet.toncenter.com/api/v2/",
api_key="")
def _run(self, to_run, *, single_query=True):
try:
return self.loop.run_until_complete(
self.__execute(to_run, single_query))
except Exception:
raise
async def __execute(self, to_run, single_query):
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
if single_query:
to_run = [to_run]
tasks = []
for task in to_run:
tasks.append(task["func"](
session, *task["args"], **task["kwargs"]))
return await asyncio.gather(*tasks)
client = TonCenterTonClient()
def generateWallet():
wallet_mnemonics = mnemonic_new()
_mnemonics, _pub_k, _priv_k, wallet = Wallets.from_mnemonics(
wallet_mnemonics, wallet_version, workchain)
query = wallet.create_init_external_message()
base64_boc = bytes_to_b64str(query["message"].to_boc(False))
print(base64_boc)
# client.send_boc(base64_boc)
return ({"mnemonic": _mnemonics, "publicKey": _pub_k, "privateKey": _priv_k, "address": wallet.address.to_string(True, True, True)})
def getBalance(address):
wallet = client.get_address_information(address)
return wallet['balance']
def sendTon(mnemonics, receiver, amount):
_mnemonics, _pub_k, _priv_k, wallet = Wallets.from_mnemonics(
mnemonics=mnemonics, version=wallet_version, workchain=workchain)
walletInfo = client.get_address_information(
wallet.address.to_string(True, True, True))
tx = wallet.create_transfer_message(
to_addr=receiver,
amount=to_nano(amount, 'ton'),
seqno=0,
payload=JettonWallet().create_transfer_body(
Address(receiver),
to_nano(amount, "ton")
))
boc = bytes_to_b64str(tx['message'].to_boc(False))
client.send_boc(boc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment