Building an Arbitrage Bot on Starknet: Part 2 - The bot

󰃭 2024-08-19

Introduction

This article is part of a series on building an arbitrage bot between centralised (CEX) and decentralised (DEX) exchanges on Starknet.

In this article I will finally cover the bot implementation, explaining the interfaces, the communication with the exchanges and the Arbitrage class.

Project overview

The bot is written in Python 3.12 using asyncio. It makes extensive use of coroutines and some of the features offered by Asyncio. If you not know how asyncio works, you can start here.

As I mentioned in my previous article, I decided to use poetry as a dependency manager. One of its features is automatically creating the project folder when poetry new is executed. The folder looks like this:

.
├── Dockerfile
├── README.md
├── poetry.lock
├── pyproject.toml
├── starknet_arbitrage
│   ├── __main__.py
│   ├── arbitrage.py
│   ├── core
│   │   └── types.py
│   ├── exchange
│   │   ├── base.py
│   │   ├── cex
│   │   │   └── binance.py
│   │   └── dex
│   │       └── avnu.py
│   └── starknet.py
└── tests
    └── __init__.py
  • __main__.py loads the setting and starts the bot
  • arbitrage.py contains the logic of the bot
  • core defines common types
  • exchange handles the communication between centralised, decentralised exchanges and the bot
  • starknet.py handles the communication between Starknet and the bot

Core

The module defines types and logic useful for the whole library. Currently, I have only defined common types in core/types.py

@dataclass
class Token:
    name: str
    address: str = ""
    decimals: int = 18

    def __str__(self) -> str:
        return f"{type(self).__name__}(name={self.name})"


@dataclass
class Symbol:
    base: Token
    quote: Token

@dataclass
class Ticker:
    raw: dict
    bid: Decimal
    bid_amount: Decimal
    ask: Decimal
    ask_amount: Decimal


@dataclass
class Wallet:
    raw: dict
    token: Token
    amount: Decimal

    def __str__(self) -> str:
        return f"{type(self).__name__}(token={self.token}, amount={self.amount})"

    @staticmethod
    def empty(token: Token) -> Wallet:
        return Wallet({}, token, Decimal(0))


@dataclass
class Order:
    raw: dict
    symbol: Symbol
    amount: Decimal
    price: Decimal
    side: str

The names of the classes are rather self-explanatory. raw fields contain the original message sent by the exchange. They can be used for debugging or extracting exchange-dependent information, as required by AVNU.

Decimal objects provide support for fast, correctly-rounded decimal floating-point arithmetic. Although they are slower than float, they are a good compromise for avoiding the floating- point errors that float suffers from.

Exchange comunication

Although centralised and decentralised exchanges are completely different services, with different features and different APIs, I want a unified interface so the bot can work easily. We are only interested in:

  • subscribe to a ticker
  • send market orders

Based on this, the Exchange interface is defined as:

class Exchange:
    @abc.abstractmethod
    async def subscribe_ticker(self, symbol: Symbol, **_):
        """Subscribe to the ticker channel."""

    @abc.abstractmethod
    async def buy_market_order(self, symbol: Symbol, amount: Decimal, *args, **kwargs):
        """Insert a buy market order."""

    @abc.abstractmethod
    def receiver_queue(self) -> asyncio.Queue:
        """Return the queue containing the messages from the Exchange."""

    @abc.abstractmethod
    async def sell_market_order(self, symbol: Symbol, amount: Decimal, *args, **kwargs):
        """Insert a sell market order."""

The receiver_queue method returns a Queue containing the messages received from the exchange. Remember that the bot connects to the exchanges using websockets. There will be a continuous flow of messages, which will be parsed by the class and added to the receiver_queue.

I decided to implement only two exchanges: Binance and AVNU. The latter is not a real DEX but an aggregator. However, it is useful because it offers an API that provides prices and quotes for all the DEXes implemented.

Binance

No need to introduce Binance. It is the centralised exchange with the highest volume and market share. To handle the websocket, ccxt is used. The Binance class defined in exchange/cex/binance.py is a wrapper around the ccxt methods.

class Binance(Exchange):
    """Binance exchange."""

    def __init__(self, api_key: str, secret_key: str) -> None:
        self._exchange_handle = ccxt.pro.binance(
            {"apiKey": api_key, "secret": secret_key}
        )
        self._receiver_queue = asyncio.Queue()
        
    ...

As you can see, __init__ needs api_key and secret_key to authenticate in the authenticated channel and to send orders. The class is very simple. The bot calls subscribe_ticker, then ccxt sends the request to subscribe to the ticker channel and _handle_ticker handles the messages.

    async def _handle_ticker(self, symbol: str):
        """Handle ticker messages and connection."""
        while True:
            msg = await self._exchange_handle.watch_ticker(symbol)

            ticker = Ticker(
                msg,
                Decimal(msg["info"]["b"]),
                Decimal(msg["info"]["B"]),
                Decimal(msg["info"]["a"]),
                Decimal(msg["info"]["A"]),
            )

            await self._receiver_queue.put(ticker)

    async def subscribe_ticker(self, symbol: Symbol, **_):
        """Subscribe to the ticker."""
        exchange_symbol = f"{symbol.base.name}{symbol.quote.name}"
        asyncio.create_task(self._handle_ticker(exchange_symbol))

The bot will also need to subscribe to the authenticated channel in the same way as _fetch_ticker, so it can receive wallet updates.

The Binance websocket has one problem: it does not send initial snapshots of both, the balances and the ticker. For the ticker, this isn’t a big deal for active pairs, as an update is sent immediately. However, for the balances, it is an issue because an update is only sent when the balances change. This means an initial request for the balances is required.

    async def subscribe_balance(self, symbol: Symbol, **_):
        """Subscribe to the authenticated endpoint for balances.
        
        NOTE: Balances are filtered by `Symbol`.
        
        """
        exchange_symbol = f"{symbol.base.name}{symbol.quote.name}"
        await self._receiver_queue(
            await self._exchange_handle.fetchBalance(exchange_symbol)
        )
        
        asyncio.create_task(self._handle_balance(exchange_symbol))

The method of sending an order is as follows:

async def buy_market_order(
        self, symbol: Symbol, amount: Decimal, *_, **__
    ):
        """Insert a new buy market order."""
        exchange_symbol = f"{symbol.base.name}{symbol.quote.name}"

        order = await self._exchange_handle.create_order_ws(
            exchange_symbol, "market", "buy", float(amount), params={"test": True}
        )

        await self._receiver_queue(Order(
            order, symbol, Decimal(str(order.price)), Decimal(str(order.amount)), "buy"
        ))

AVNU

As aforementioned, AVNU is not a real DEX, it is an aggregator. I use this protocol because its API provides prices and quotes from all supported exchanges. At the expense of higher fees, the bot can easily swap with 10 different DEXes.

There is only one problem with the API: it does not offer a websocket connection, so the class needs to simulate it. Remember that I want to abstract this problem to Arbitrage. The bot expects to receive a stream of messages, not to make requests.The main idea is to create coroutines that make requests every $$N$$ seconds. But first let’s define some useful constants for the class:

ETH = Token(
    "ETH", "0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", 18
)


URLS = {
    "base": "https://starknet.api.avnu.fi",
    "quotes": "swap/v2/quotes",
    "prices": "swap/v2/prices",
    "sources": "swap/v2/sources",
    "build": "swap/v2/build",
}

ETH is a Token object representing the Ethereum token on Starknet, while URLS is a dictionary containing the AVNU API url and endpoints. AVNU is defined as:

class AVNU(Exchange):
    """AVNU exchange."""

    def __init__(self, account: Account, balance: Symbol) -> None:
        self._account = account
        # Fetch available dexes
        response = requests.get(f"{URLS['base']}/{URLS['sources']}")
        available_dexes = [
            dex["name"] for dex in response.json() if dex["type"] == "DEX"
        ]
        available_dexes.append("lastPrice")

        self._last_prices = {dex: Decimal("0") for dex in available_dexes}
        self._receiver_queue = asyncio.Queue()

        asyncio.create_task(self._fetch_balance(balance))

As you can see, the class starts by fetching the exchanges supported by AVNU and initialising self._last_prices. The request is synchronous because:

  • __init__ cannot be asynchronous. This is a limitation of Python, the class initialiser cannot be marked as async
  • even if it is a blocking request, it is fine to make this call while initialising

Finally, it creates the coroutine for _fetch_balance which fetches the initial account balances. The method is defined as:

    async def _fetch_balance(self, symbol: typing.Optional[Symbol] = None):
        tokens = [ETH] if symbol is None else [symbol.base, symbol.quote]

        for token in tokens:
            logger.debug(f"Fetching {token.address} balance")
            amount = await self._account.get_balance(token.address)
            await self._receiver_queue.put(
                Wallet({"amount": amount}, token, Decimal(amount) / 10**token.decimals)
            )

Let’s see how subscribe_ticker is implemented:

    async def subscribe_ticker(self, symbol: Symbol, amount: decimal.Decimal, **_):
        asyncio.create_task(self._handle_quotes(symbol, amount))
        
    async def _handle_quotes(
        self,
        symbol: Symbol,
        amount: decimal.Decimal,
    ):
        url = f"{URLS['base']}/{URLS['quotes']}"
        params_sell = {
            "sellAmount": hex(amount * 10 ** int(symbol.base.decimals)),
            "sellTokenAddress": symbol.base.address,
            "buyTokenAddress": symbol.quote.address,
        }
        params_buy = {
            "sellAmount": hex(amount * 10 ** int(symbol.base.decimals)),
            "sellTokenAddress": symbol.base.address,
            "buyTokenAddress": symbol.quote.address,
        }

        async with aiohttp.ClientSession() as session:
            while True:
                logger.debug("Fetching quotes")
                # We need to make two requests because a call to `/quotes` also
                # sets the swap order. Therefore, if we want to buy `base` from
                # `quote` we need the opposite ordeer (and a new `quoteId`)
                resp_sell, resp_buy = await asyncio.gather(
                    session.get(url, params=params_sell),
                    session.get(url, params=params_buy),
                )
                entries_sell, entries_buy = await asyncio.gather(
                    resp_sell.json(), resp_buy.json()
                )

                # `entries_sell` is a list with one element
                entry = entries_sell[0]

                quote_amount = decimal.Decimal(int(entry["buyAmount"], 16))
                quote_amount = quote_amount / (10**symbol.quote.decimals)
                price = quote_amount / amount

                if self._last_prices["lastPrice"] != price:
                    self._last_prices["lastPrice"] = price

                    entry["sellId"] = entry["quoteId"]
                    entry["buyId"] = entries_buy[0]["quoteId"]
                    ticker = Ticker(entry, price, amount, price, amount)

                    await self._receiver_queue.put(ticker)

                await asyncio.sleep(1)

The methods are simple. The first creates a coroutine to _handle_quotes, while the second does the actual work . Rather than using the prices endpoint, quotes is used. Not only to get the best price, but to ask to AVNU to prepare the swap for us. The endpoint builds a route to get the best swap and assigns a unique quoteId to each request. The id can be used later when the class sends the request to build the swap transaction. Two requests are made because it is not possible to change the order of the buy/sell token addresses while the transaction is being built.

The method continues with parsing the outputs, calculating the price, storing the quoteId and adding the Ticker to the receiver_queue.

The last methods are for sending the orders:

    async def buy_market_order(
        self,
        symbol: Symbol,
        amount: Decimal,
        ticker: Ticker,
        slippage: Decimal = Decimal("0.01"),
    ):
        """Insert a buy market order."""
        url = f"{URLS['base']}/{URLS['build']}"

        payload = {
            "slippage": str(slippage),
            "takerAddress": hex(self._account.address),
            "quoteId": ticker.raw["quoteId"],
            "includeApprove": True,
        }

        # FIXME: Share session with `handle_prices_quotes`
        async with aiohttp.ClientSession() as session:
            response = await session.post(url, json=payload)
            resp_calls = await response.json()

            # `resp_calls` is a dict containing `chainId` and `calls` keys. A
            # call is a dict containing `contractAddress, entrypoint` and
            # `calldata`. Values are hexed or string (the selector)
            calls = [
                Call(
                    int(call["contractAddress"], 16),
                    get_selector_from_name(call["entrypoint"]),
                    [int(call_value, 16) for call_value in call["calldata"]],
                )
                for call in resp_calls["calls"]
            ]

            logger.info("sending order")
            transaction_hash = await self._account.execute_v3(calls, auto_estimate=True)
            await self._wait_txn(
                transaction_hash.transaction_hash, symbol, amount, ticker, "buy"
            )

At this stage build endpoint is used to build the swap transaction. In the future I would like to remove AVNU and swap directly on the DEXes.

The quoteId is used to request AVNU to build the swap and return the transaction calldata. The response is prepared for Starknetjs and not for the python version. contractAddress and calldata need to be converted to int while entrypoint is a human readable string and needs to be converted into the selector (i.e. from approve to 0x04270219d365d6b017231b52e92b3fb5d7c8378b05e9abc97724537a80e93b0f).

The final steps are to broadcast the transaction and wait for the sequencer to validate it. Once validated, the Order message is added to the receiver_queue and the new balances are fetched:

    async def _wait_txn(
        self,
        transaction_hash: int,
        symbol: Symbol,
        amount: Decimal,
        ticker: Ticker,
        side: str,
    ):
        """Wait until transaction is confirmed. After create the order and update the balance."""
        await self._account.client.wait_for_tx(transaction_hash)

        order = Order(
            {"transaction_hash": transaction_hash},
            symbol,
            amount,
            # NOTE: `ask` and `bid` are the same
            ticker.ask,
            side,
        )
        await self._receiver_queue.put(order)

        asyncio.create_task(self._fetch_balance(symbol))

Arbitrage class

In the previous sections, I defined the Exchange interface and described how I implemented Binance and AVNU. In this section, I will describe the last component of the bot: the Arbitrage class, the most important one. It is responsible for determining if there is a profit and attempting to take it.

class Arbitrage:
    def __init__(
        self,
        exchanges: list[Exchange],
        symbol: Symbol,
        spread_threshold: Decimal,
        trade_amount: Decimal,
        min_trade_amount: Decimal,
    ):
        self._exchanges = exchanges
        self._symbol = symbol
        self._threshold = spread_threshold
        self._queue: asyncio.Queue[tuple[Any, Exchange]] = asyncio.Queue()
        self._trade_amount = trade_amount
        self._min_trade_amount = min_trade_amount

It requires a list of Exchange, the Symbol on which to trade, spread_threshold which indicates the profit threshold, and trade_amount and min_trade_amount which define the trading range.

The more observant will have noticed that Arbitrage requires a list of Exchange rather than a single CEX and DEX. The class is capable of iterate over multiple exchanges and it does not care whether the exchanges are CEXes or DEXes.

Arbitrage defines two utility functions, _initialize and _merge_queues

    async def _initialize(self):
        for exchange in self._exchanges:
            await exchange.subscribe_ticker(
                self._symbol, amount=int(self._trade_amount)
            )
            queue = exchange.receiver_queue()
            asyncio.create_task(self._merge_queues(queue, exchange))

As the name suggests, _initialize is called to subscribe to the tickers and to merge the queues into a single one by calling _merge_queues:

    async def _merge_queues(self, queue: asyncio.Queue, ex: Exchange):
        while True:
            val = await queue.get()
            await self._queue.put((val, ex))

The functions for calculating the spread and the amount to be swapped are quite simple:

    def _calculate_spread(self, ask: Decimal, bid: Decimal) -> Decimal:
        numerator = bid - ask
        return numerator / bid
        
    def _calculate_amount(
        self,
        ask: Ticker,
        bid: Ticker,
        wallet_ask: Decimal,
        wallet_bid: Decimal,
    ) -> Decimal:
        """Return the amount we can trade."""
        amount = min(self._trade_amount, ask.ask_amount, bid.bid_amount)
        # Check wallets have enough balance
        # NOTE: `wallet_ask` is the quote token balance. Convert in base
        amount = min(amount, wallet_bid, wallet_ask * ask.ask)
        return amount

As you can see, _calculate_amount, follows the formula described in Step 3: Sending orders. Remember that wallet_ask is the total amount of available quote tokens. A base conversion is needed to compare them.

Finally, the last and the most interesting method of the class. The method waits for messages from the exchanges and it decides to send the order if there is a profit.

    async def run(self):
        # Subscribe to the tickers and merge the queues
        await self._initialize()

        best_bid = Ticker({}, Decimal("-INFINITY"), Decimal(0), Decimal(0), Decimal(0))
        best_ask = Ticker({}, Decimal(0), Decimal(0), Decimal("INFINITY"), Decimal(0))
        exchange_ask = exchange_bid = self._exchanges[0]

        wallets = {
            exchange: {
                self._symbol.base.name: Wallet.empty(self._symbol.base),
                self._symbol.quote.name: Wallet.empty(self._symbol.quote),
            }
            for exchange in self._exchanges
        }

        while True:
            msg, exchange = await self._queue.get()

            match msg:
                case Wallet():
                    wallets[exchange][msg.token.name] = msg
                case Order():
                    logger.info(
                        f"{exchange} order executed. {msg.side}: {msg.amount} at {msg.price}"
                    )
                case Ticker():
                    # We want to buy at the lowest price
                    if msg.ask <= best_ask.ask:
                        best_ask = msg
                        exchange_ask = exchange

                    # We want to sell at the highest price
                    if msg.bid >= best_bid.bid:
                        best_bid = msg
                        exchange_bid = exchange

                    # Same exchange, skip
                    if exchange_bid == exchange_ask:
                        continue

                    spread = self._calculate_spread(
                        best_ask.ask,
                        best_bid.bid,
                    )
                    ba = dataclasses.replace(best_ask)
                    ba.raw = exchange_ask
                    bb = dataclasses.replace(best_bid)
                    bb.raw = exchange_bid

                    logger.debug(
                        f"spread: {spread} best ask: {ba.ask} best bid: {bb.bid}"
                    )
                    if spread >= self._threshold:
                        logger.info(f"{spread} above the threshdold")
                        amount = self._calculate_amount(
                            best_ask,
                            best_bid,
                            wallets[exchange_ask][self._symbol.base.name].amount,
                            wallets[exchange_bid][self._symbol.quote.name].amount,
                        )

                        if amount < self._min_trade_amount:
                            continue

                        logger.debug(
                            f"{exchange_ask}: buy: {amount} price: {best_ask.ask}"
                        )
                        logger.debug(
                            f"{exchange_bid}: sell: {amount} price: {best_bid.bid}"
                        )

                        await asyncio.gather(
                            exchange_ask.buy_market_order(
                                self._symbol, amount, best_ask
                            ),
                            exchange_bid.sell_market_order(
                                self._symbol, amount, best_bid
                            ),
                        )

Almost ready

The bot is almost ready. What is missing is loading the config and the main function. The config is in an .env file and is read using the dotenv library. Config is responsible for loading and validating the configuration.

class Config:
    REQUIRED_KEYS = {
        "BASE_ADDR",
        "BASE_DECIMALS",
        "BASE",
        "QUOTE_ADDR",
        "QUOTE_DECIMALS",
        "QUOTE",
        "API_KEY",
        "SECRET_KEY",
        "SIGNER_KEY",
        "NODE_URL",
        "SPREAD_THRESHOLD",
        "MAX_AMOUNT_TRADE",
    }

    def __init__(self, config: dict) -> None:
        for key in self.REQUIRED_KEYS:
            if key not in config:
                raise ValidationError(f"Missing `{key}`")

        self.base = Token(
            config["BASE"], config["BASE_ADDR"], int(config["BASE_DECIMALS"])
        )
        self.quote = Token(
            config["QUOTE"], config["QUOTE_ADDR"], int(config["QUOTE_DECIMALS"])
        )
        self.symbol = Symbol(self.base, self.quote)

        self.api_key = config["API_KEY"]
        self.secret_key = config["SECRET_KEY"]

        self.node_url = config["NODE_URL"]
        self.account_address = config["ACCOUNT_ADDRESS"]
        self.signer_key = config["SIGNER_KEY"]

        self.spread_threshold = Decimal(config["SPREAD_THRESHOLD"])
        self.max_amount_trade = Decimal(config["MAX_AMOUNT_TRADE"])
        self.min_amount_trade = Decimal(config["MIN_AMOUNT_TRADE"])

Finally, the main function:

async def main():
    # Bot settings
    logger = logging.getLogger("bot")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(RichHandler())
    # Load config
    config = {**dotenv_values(".env"), **os.environ}
    config = Config(config)

    # Exchange initialisation
    logger.debug("Connecting to starknet...")
    chain = Starknet(config.node_url)
    account = chain.get_account(config.account_address, config.signer_key)
    logger.debug("Connecting to AVNU...")
    avnu = AVNU(account, config.symbol)

    logger.debug("Connecting to binance...")
    binance = Binance(config.api_key, config.secret_key)

    # Run bot
    logger.debug("All setup, running bot...")
    bot = Arbitrage(
        [binance, avnu],
        config.symbol,
        config.spread_threshold,
        config.max_amount_trade,
        config.min_amount_trade,
    )
    await bot.run()


asyncio.run(main())

Future improvements

The bot explained is very basic and there is room for many improvements. Here are a few:

  • Clean up the code and build more robust components. For example, if a coroutine crashes, the bot continues to work without that coroutine. This is a big problem if the crashed coroutine is the function that parses the tickers.
  • Remove ccxt.While the library is great for prototyping, it introduces a lot of unnecessary requests and is slow. It can be replaced by calling the Binance API.
  • Replace AVNU. Although is great for its API, the bot swaps at the best rates while it should swap between the best and worst prices.
  • Avoid using auto_estimate when signing the transaction.

As I mentioned in my previous articles of the series there is a group to discuss MEV on Starknet. If you are interested in the argument, want to discuss or have some interesting information, you can find me there. Feel free to join the group.