kdrag.contrib.pcode.asmspec

Functions

assemble_and_check(filename[, langid, as_bin])

assemble_and_check_str(asm_str)

assemble_and_gen_vcs(filename[, langid, as_bin])

execute_insn(tracestate, ctx[, verbose])

execute_spec_stmts(stmts, tracestate, ctx[, ...])

run_all_paths(ctx, spec[, mem, verbose])

substitute_ghost(e, ghost_env)

Substitute ghost variables in an expression.

Classes

Always(expr)

AsmSpec(addrmap, list[SpecStmt]] =)

A specification of an assembly program.

Assert(label, addr, expr)

Assign(label, addr, name, expr)

Assume(label, addr, expr)

BoolStmt(label, addr, expr)

Cut(label, addr, expr)

Entry(label, addr, expr)

Exit(label, addr, expr)

Results(successes, failures)

TraceState(start, trace, state, ghost_env, ...)

VerificationCondition(start, trace, ...)

class kdrag.contrib.pcode.asmspec.Always(expr: z3.z3.BoolRef)

Bases: object

Parameters:

expr (BoolRef)

expr: BoolRef
class kdrag.contrib.pcode.asmspec.AsmSpec(addrmap: ~collections.defaultdict[int, list[SpecStmt]] = <factory>)

Bases: object

A specification of an assembly program. https://www.philipzucker.com/assembly_verify/ Registers are named by their corresponding names in pypcode. You can examine them by

`python import pypcode print(pypcode.Context("x86:LE:64:default").registers) `

Parameters:

addrmap (defaultdict[int, list[SpecStmt]])

addrmap: defaultdict[int, list[SpecStmt]]
classmethod of_file(filename: str, ctx: BinaryContext)
Parameters:
class kdrag.contrib.pcode.asmspec.Assert(label: str, addr: int, expr: BoolRef)

Bases: BoolStmt

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.Assign(label: str, addr: int, name: str, expr: z3.z3.ExprRef)

Bases: object

Parameters:
  • label (str)

  • addr (int)

  • name (str)

  • expr (ExprRef)

addr: int
expr: ExprRef
label: str
name: str
class kdrag.contrib.pcode.asmspec.Assume(label: str, addr: int, expr: BoolRef)

Bases: BoolStmt

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.BoolStmt(label: str, addr: int, expr: z3.z3.BoolRef)

Bases: object

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.Cut(label: str, addr: int, expr: BoolRef)

Bases: BoolStmt

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.Entry(label: str, addr: int, expr: BoolRef)

Bases: BoolStmt

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.Exit(label: str, addr: int, expr: BoolRef)

Bases: BoolStmt

Parameters:
  • label (str)

  • addr (int)

  • expr (BoolRef)

addr: int
expr: BoolRef
label: str
class kdrag.contrib.pcode.asmspec.Results(successes: list[str] = <factory>, failures: list[str] = <factory>)

Bases: object

Parameters:
  • successes (list[str])

  • failures (list[str])

failures: list[str]
successes: list[str]
class kdrag.contrib.pcode.asmspec.TraceState(start: SpecStmt, trace: list[int], state: kdrag.contrib.pcode.SimState, ghost_env: dict[str, z3.z3.ExprRef] = <factory>)

Bases: object

Parameters:
  • start (SpecStmt)

  • trace (list[int])

  • state (SimState)

  • ghost_env (dict[str, ExprRef])

ghost_env: dict[str, ExprRef]
start: SpecStmt
state: SimState
trace: list[int]
class kdrag.contrib.pcode.asmspec.VerificationCondition(start, trace, path_cond, memstate, ghost_env, cause)

Bases: NamedTuple

Parameters:
  • start (SpecStmt)

  • trace (list[int])

  • path_cond (list[BoolRef])

  • memstate (MemState)

  • ghost_env (dict[str, ExprRef])

  • cause (SpecStmt)

cause: SpecStmt

Alias for field number 5

count(value, /)

Return number of occurrences of value.

countermodel(ctx: BinaryContext, m: ModelRef) dict

Interpret a countermodel on the relevant constants

Parameters:
Return type:

dict

ghost_env: dict[str, ExprRef]

Alias for field number 4

index(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

memstate: MemState

Alias for field number 3

path_cond: list[BoolRef]

Alias for field number 2

start: SpecStmt

Alias for field number 0

trace: list[int]

Alias for field number 1

verify(ctx: BinaryContext, by=[]) Proof

Verify the verification condition using the given context.

Parameters:

ctx (BinaryContext)

Return type:

Proof

kdrag.contrib.pcode.asmspec.assemble_and_check(filename: str, langid='x86:LE:64:default', as_bin='as') Results
Parameters:

filename (str)

Return type:

Results

kdrag.contrib.pcode.asmspec.assemble_and_check_str(asm_str: str) Results
Parameters:

asm_str (str)

Return type:

Results

kdrag.contrib.pcode.asmspec.assemble_and_gen_vcs(filename: str, langid='x86:LE:64:default', as_bin='as') tuple[BinaryContext, list[VerificationCondition]]
Parameters:

filename (str)

Return type:

tuple[BinaryContext, list[VerificationCondition]]

kdrag.contrib.pcode.asmspec.execute_insn(tracestate: TraceState, ctx: BinaryContext, verbose=True) list[TraceState]
Parameters:
Return type:

list[TraceState]

kdrag.contrib.pcode.asmspec.execute_spec_stmts(stmts: list[SpecStmt], tracestate: TraceState, ctx: BinaryContext, verbose=True) tuple[TraceState | None, list[VerificationCondition]]
Parameters:
Return type:

tuple[TraceState | None, list[VerificationCondition]]

kdrag.contrib.pcode.asmspec.run_all_paths(ctx: BinaryContext, spec: AsmSpec, mem=None, verbose=True) list[VerificationCondition]
Parameters:
Return type:

list[VerificationCondition]

kdrag.contrib.pcode.asmspec.substitute_ghost(e: ExprRef, ghost_env: dict[str, ExprRef]) ExprRef

Substitute ghost variables in an expression.

Parameters:
  • e (ExprRef)

  • ghost_env (dict[str, ExprRef])

Return type:

ExprRef

import re
import kdrag.smt as smt
import kdrag as kd
import dataclasses
from dataclasses import dataclass
import kdrag.contrib.pcode as pcode
from collections import defaultdict
import json
import subprocess
from typing import NamedTuple, Optional

kd_macro = r"""
#precondition
.macro kd_entry label smt_bool
\label :
.endm

.macro kd_assert label smt_bool
\label :
.endm

.macro kd_assume label smt_bool
\label :
.endm

#postcondition
.macro kd_exit label smt_bool 
\label :
.endm

#invariant
.macro kd_cut label smt_bool
\label :
.endm 

.macro kd_always smt_bool
.endm

.macro kd_prelude smt_command
.endm

.macro kd_assign label name value
\label :
.endm



"""


@dataclass
class BoolStmt:
    label: str
    addr: int
    expr: smt.BoolRef

    def __repr__(self):
        return f"{self.__class__.__name__}({self.label}, {hex(self.addr)}, {self.expr})"


class Assert(BoolStmt): ...


class Assume(BoolStmt): ...


class Exit(BoolStmt): ...


class Entry(BoolStmt): ...


class Cut(BoolStmt): ...


@dataclass
class Assign:
    label: str
    addr: int
    name: str
    expr: smt.ExprRef


@dataclass
class Always:
    expr: smt.BoolRef


type SpecStmt = Entry | Assert | Assume | Exit | Cut | Assign


@dataclass
class AsmSpec:
    """
    A specification of an assembly program.
    https://www.philipzucker.com/assembly_verify/
    Registers are named by their corresponding names in pypcode.
    You can examine them by

    ```python
    import pypcode
    print(pypcode.Context("x86:LE:64:default").registers)
    ```

    """

    addrmap: defaultdict[int, list[SpecStmt]] = dataclasses.field(
        default_factory=lambda: defaultdict(list)
    )

    @classmethod
    def of_file(cls, filename: str, ctx: pcode.BinaryContext):
        COMMASEP = r"\s*(?:,\s*)?"
        LABEL = r"([A-Za-z_.$][A-Za-z0-9_.$]*)"  # valid GAS label
        NAME = r"([A-Za-z_.$][A-Za-z0-9_.$]*)"  # valid GAS name
        EXPR = r'"([^"]+)"'  # quoted formula
        kd_prefix = re.compile(r"^\s*kd_")
        boolstmt_pattern = re.compile(
            rf"""^\s*(kd_assert|kd_assume|kd_exit|kd_entry|kd_cut)\s+{LABEL}{COMMASEP}{EXPR}\s*$""",
        )
        assign_pattern = re.compile(
            rf"""^\s*kd_assign\s+{LABEL}\s+{NAME}\s+{EXPR}\s*$""",
        )
        prelude_pattern = re.compile(rf"^\s*kd_prelude\s+{EXPR}\s*$")
        preludes = []
        decls = ctx._subst_decls.copy()
        spec = cls()

        def find_label(label: str) -> int:
            assert ctx.loader is not None, (
                "BinaryContext must be loaded before disassembling"
            )
            sym = ctx.loader.find_symbol(label)
            if sym is None:
                raise Exception(f"Symbol {label} not found in binary {ctx.filename}")
            return sym.rebased_addr

        with open(filename) as f:
            for lineno, line in enumerate(f.readlines()):
                if kd_prefix.match(
                    line
                ):  # A little bit of sanity checking that we don't miss any "kd_" line
                    if match := boolstmt_pattern.match(line):
                        cmd, label, expr = match.groups()
                        addr = find_label(label)
                        smt_string = "\n".join(preludes + ["(assert ", expr, ")"])
                        expr = smt.parse_smt2_string(smt_string, decls=decls)[0]
                        if cmd == "kd_entry":
                            spec.addrmap[addr].append(Entry(label, addr, expr))
                        elif cmd == "kd_assert":
                            spec.addrmap[addr].append(Assert(label, addr, expr))
                        elif cmd == "kd_assume":
                            spec.addrmap[addr].append(Assume(label, addr, expr))
                        elif cmd == "kd_exit":
                            spec.addrmap[addr].append(Exit(label, addr, expr))
                        elif cmd == "kd_cut":
                            spec.addrmap[addr].append(Cut(label, addr, expr))
                    elif match := assign_pattern.match(line):
                        label, name, expr = match.groups()
                        # Turn expression `expr` into dummy assertion `expr == expr` for parsing
                        addr = find_label(label)
                        smt_string = "\n".join(
                            preludes + ["(assert (=", expr, expr, "))"]
                        )
                        expr = smt.parse_smt2_string(smt_string, decls=decls)[0].arg(0)
                        spec.addrmap[addr].append(
                            Assign(
                                label,
                                addr,
                                name,
                                expr,
                            )
                        )
                    elif match := prelude_pattern.match(line):
                        expr = match.group(1)
                        preludes.append(expr)
                    else:
                        raise ValueError(
                            f"Invalid kd_ statement at line {lineno + 1}: {line}"
                        )

        return spec

    def __repr__(self):
        return json.dumps(
            dataclasses.asdict(self), indent=2, default=lambda b: b.sexpr()
        )


@dataclass
class TraceState:
    start: SpecStmt
    trace: list[int]  # list of addresses
    state: pcode.SimState
    ghost_env: dict[str, smt.ExprRef] = dataclasses.field(default_factory=dict)


class VerificationCondition(NamedTuple):
    start: SpecStmt
    trace: list[int]
    path_cond: list[smt.BoolRef]
    memstate: pcode.MemState
    ghost_env: dict[str, smt.ExprRef]
    cause: SpecStmt
    # pf : Optional[kd.Proof] = None

    def __repr__(self):
        return f"VC({self.start}, {[hex(addr) for addr in self.trace]}, {self.cause}, {self.ghost_env})"

    def verify(self, ctx: pcode.BinaryContext, by=[]) -> kd.Proof:
        """
        Verify the verification condition using the given context.
        """
        vc = smt.Implies(
            smt.And(*self.path_cond),
            ctx.substitute(
                self.memstate, substitute_ghost(self.cause.expr, self.ghost_env)
            ),
        )
        return kd.prove(vc, by=by)

    def countermodel(self, ctx: pcode.BinaryContext, m: smt.ModelRef) -> dict:
        """
        Interpret a countermodel on the relevant constants
        """
        # find all interesting selects
        interesting = set(
            c
            for c in kd.utils.consts(self.cause.expr)
            if not kd.utils.is_value(c) and not c.decl().name().startswith("ram")
        )
        todo = [self.cause.expr]
        while todo:
            t = todo.pop()
            if smt.is_select(t):
                interesting.add(t)
            elif smt.is_app(t):
                todo.extend(t.children())
        return {c: m.eval(ctx.substitute(self.memstate, c)) for c in interesting}


def substitute_ghost(e: smt.ExprRef, ghost_env: dict[str, smt.ExprRef]) -> smt.ExprRef:
    """
    Substitute ghost variables in an expression.
    """
    return smt.substitute(
        e, *[(smt.Const(name, v.sort()), v) for name, v in ghost_env.items()]
    )


def execute_spec_stmts(
    stmts: list[SpecStmt],
    tracestate: TraceState,
    ctx: pcode.BinaryContext,
    verbose=True,
) -> tuple[Optional[TraceState], list[VerificationCondition]]:
    trace, state, ghost_env = tracestate.trace, tracestate.state, tracestate.ghost_env
    vcs = []
    for stmt in stmts:
        if verbose:
            print("Executing SpecStmt:", stmt)
        if isinstance(stmt, Assume) or isinstance(stmt, Entry):
            # Add the assumption to the path condition
            state = dataclasses.replace(
                state,
                path_cond=state.path_cond + [stmt.expr],
            )
        elif (
            isinstance(stmt, Assert) or isinstance(stmt, Exit) or isinstance(stmt, Cut)
        ):
            vcs.append(
                VerificationCondition(
                    start=tracestate.start,
                    trace=trace.copy(),
                    path_cond=state.path_cond.copy(),
                    memstate=state.memstate,
                    cause=stmt,
                    ghost_env=ghost_env.copy(),
                )
            )
            if isinstance(stmt, Exit) or isinstance(stmt, Cut):
                # If this is an exit or cut, we end the execution of this path
                return None, vcs
        elif isinstance(stmt, Assign):
            # Assign a value to a variable in the ghost environment
            ghost_env = ghost_env.copy()
            e1 = substitute_ghost(
                stmt.expr, ghost_env
            )  # TODO simultaneous assignment would be better.
            ghost_env[stmt.name] = ctx.substitute(state.memstate, e1)
        else:
            raise Exception(
                f"Unexpected statement type {type(stmt)} in execute_specstmts"
            )
    return TraceState(
        start=tracestate.start, trace=trace, state=state, ghost_env=ghost_env
    ), vcs


def execute_insn(
    tracestate: TraceState, ctx: pcode.BinaryContext, verbose=True
) -> list[TraceState]:
    # TODO : copy trace less. if single result
    state0 = tracestate.state
    addr, pc = state0.pc
    if pc != 0:
        raise Exception(f"Unexpected program counter {pc} at address {hex(addr)}")
    return [
        TraceState(
            start=tracestate.start,
            trace=tracestate.trace + [state0.pc[0]],
            state=state,
            ghost_env=tracestate.ghost_env,
        )
        for state in ctx.sym_execute(
            state0.memstate,
            addr,
            path_cond=state0.path_cond,
            max_insns=1,
            verbose=verbose,
        )
    ]


def run_all_paths(
    ctx: pcode.BinaryContext, spec: AsmSpec, mem=None, verbose=True
) -> list[VerificationCondition]:
    if mem is None:
        mem = pcode.MemState.Const("mem", bits=ctx.bits)
    todo = []
    vcs = []
    # Initialize executions out of entry points and cuts
    for addr, specstmts in spec.addrmap.items():
        for n, stmt in enumerate(specstmts):
            if isinstance(stmt, Cut) or isinstance(stmt, Entry):
                precond = ctx.substitute(mem, stmt.expr)
                assert isinstance(precond, smt.BoolRef)
                tracestate = TraceState(
                    start=stmt,
                    trace=[],
                    state=pcode.SimState(mem, (addr, 0), [precond]),
                )
                tracestate, new_vcs = execute_spec_stmts(
                    specstmts[n + 1 :], tracestate, ctx
                )
                vcs.extend(new_vcs)
                if tracestate is not None:
                    todo.extend(execute_insn(tracestate, ctx, verbose=verbose))
    # Execute pre specstmts and instructions
    while todo:
        tracestate = todo.pop()
        addr = tracestate.state.pc[0]
        specstmts = spec.addrmap.get(addr, [])
        tracestate, new_vcs = execute_spec_stmts(specstmts, tracestate, ctx)
        vcs.extend(new_vcs)
        if tracestate is not None:
            todo.extend(execute_insn(tracestate, ctx, verbose=verbose))
    return vcs


@dataclass
class Results:
    successes: list[str] = dataclasses.field(default_factory=list)
    failures: list[str] = dataclasses.field(default_factory=list)


def assemble_and_gen_vcs(
    filename: str, langid="x86:LE:64:default", as_bin="as"
) -> tuple[pcode.BinaryContext, list[VerificationCondition]]:
    with open("/tmp/knuckle.s", "w") as f:
        f.write(kd_macro)
        f.flush()
    subprocess.run([as_bin, filename, "-o", "/tmp/kdrag_temp.o"], check=True)
    ctx = pcode.BinaryContext("/tmp/kdrag_temp.o", langid=langid)
    spec = AsmSpec.of_file(filename, ctx)
    return ctx, run_all_paths(ctx, spec)


def assemble_and_check(
    filename: str, langid="x86:LE:64:default", as_bin="as"
) -> Results:
    ctx, vcs = assemble_and_gen_vcs(filename, langid, as_bin)
    res = Results()
    for vc in vcs:
        try:
            vc.verify(ctx)
            res.successes.append(f"[✅] {vc}")
        except Exception as _e:
            res.failures.append(f"[❌] {vc}")
    return res


def assemble_and_check_str(asm_str: str) -> Results:
    with open("/tmp/kdrag_temp.s", "w") as f:
        f.write(asm_str)
        f.flush()
    return assemble_and_check("/tmp/kdrag_temp.s")