Skip to content

check_distributor

fetch_clawback_dt(address) async

Tries to find clawback IX for a given Distributor address.

Ideally it should be the last transaction, but it can happen so that
after clawback there are some failed transaction that will be returned by this method.

So we iterate over the whole page that `get_signatures_for_address` returns.

Parameters:

Name Type Description Default
address str

address of the Distributor

required

Returns:

Type Description
datetime | None

datetime of clawback, parsed from block_time of the clawback transaction

Source code in app/scheduler/tasks/check_distributor.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def fetch_clawback_dt(address: str) -> datetime | None:
    """
        Tries to find clawback IX for a given Distributor address.

        Ideally it should be the last transaction, but it can happen so that
        after clawback there are some failed transaction that will be returned by this method.

        So we iterate over the whole page that `get_signatures_for_address` returns.

    Args:
        address: address of the Distributor

    Returns:
        datetime of clawback, parsed from `block_time` of the clawback transaction
    """
    async with AsyncClient(settings.SOLANA_RPC_URL) as client:
        signatures = await client.get_signatures_for_address(Pubkey.from_string(address))
        for sig in signatures.value:
            tx = await client.get_transaction(sig.signature, max_supported_transaction_version=1)
            if not tx.value or not tx.value.block_time:
                continue
            transaction = tx.value.transaction.transaction
            if not hasattr(transaction, "message"):
                continue
            for ix in transaction.message.instructions:
                if getattr(ix, "data", None) == CLAWBACK_IX_ENCODED:
                    return datetime.fromtimestamp(tx.value.block_time)
    return None