I spent some time before making Datalogs that translated into SQL. https://www.philipzucker.com/tiny-sqlite-datalog/

There are advantages. SQL engines are very well engineered and commonly available. SQLite and DuckDB are a pretty great one-two punch.

A new twist on how to do this occurred to me that seems very clean compared to my previous methods.

Basically, the relational algebra style of SQL actually meshes with manipulating the Datalog body environments (sets of named variables bindings) better then it meshes with the base Datalog relations themselves (edge, path, etc).

As a cheeky bonus as the end, you can do semi-naive by using the dual number https://en.wikipedia.org/wiki/Dual_number trick from forward mode automatic differentiation over these environment relations.

You can tinker on this post yourself on colab here https://colab.research.google.com/github/philzook58/philzook58.github.io/blob/master/pynb/2025-08-26-compose_datalog.ipynb

The Environment As a Relation

The core bits of SELECT-FROM-WHERE SQL, select-project-join relational algebra expressions, and datalog bodies are all expressing roughly the same thing: conjunctive queries.

I rather like the syntax of a Datalog. It presents as both operational and logical.

It is also pretty close to a SQL query, but it takes a jussst a little work because of incompatibilities

In SQL:

  • columns names are properties of the tables like myrow.name
  • rows can be locally named in FROM mytable as myrow

In Datalog:

  • The columns are referred to positionally
  • entries of rows are bound to variables edge(1,X)

If you write a Datalog interpreter, like most interpreters, you’ll thread through a environment mapping variables names to values type Env = dict[Var, Value]. Datalog interpreters are branching in the sense that to bind a variable there are many choices (which row in the table to use). Roughly, an interpreter of a query has the signature interp : expr -> database -> env -> set[env].

It is a basic trick that I vaguely associated with finally tagless that you can remove the middle man of the AST when you have an expression datatype and instead just use combinators which are basically partially applied versions of the interpreter to the expressions interp myexp : database -> env -> set[env].

You don’t actually have to thread that env through though.

Really the semantics of a base expressions like path(X,Y) : db -> set[env] are a function from the database to a set of environments. You can combine them by performing inner joins on these primitive pieces, joining by variable names in the environments. SQL rocks at inner joins.

As a toy model of these ideas, this is a naive combinator based implementation in Python.

Here I make a combinator reldecl that pulls a named table out of the database and filters on the args. This step converts from a base relation to an environment.

from typing import Callable
type Var = str
type Env = dict[Var, object]
type DB = dict[str, set[tuple(object,...)]]
type Query = Callable[[DB], list[Env]]
from dataclasses import dataclass

@dataclass(frozen=True)
class Var():
    name : str

def reldecl(name: str) -> Callable[..., Query]:
    def rel(*args) -> Query:
        def res(db: DB) -> list[Env]:
            R = db[name]
            envs = []
            for row in R:
                env = {}
                for arg, val in zip(args, row):
                    if isinstance(arg, Var):
                        if arg in env and env[arg] != val:
                            break
                        env[arg] = val
                    else:
                        if arg != val:
                            break
                else:
                    envs.append(env)
            return envs
        return res
    return rel


Some usage examples. We make a database and get all the edge entries through a query

db = {"edge" : {(1,2), (2,3), (3,4), (2,1), (3,3)}, 
      "color" : {("red",), ("blue",), ("green",)}}
edge = reldecl("edge")
color = reldecl("color")
x,y,z = Var("x"), Var("y"), Var("z")
edge(x,y)(db)
[{Var(name='x'): 2, Var(name='y'): 3},
 {Var(name='x'): 1, Var(name='y'): 2},
 {Var(name='x'): 3, Var(name='y'): 3},
 {Var(name='x'): 2, Var(name='y'): 1},
 {Var(name='x'): 3, Var(name='y'): 4}]

All color query

color(x)(db)
[{Var(name='x'): 'red'}, {Var(name='x'): 'blue'}, {Var(name='x'): 'green'}]

Is "red" in the color table?

color("red")(db) # succeeds
[{}]

Is "black" in the color table?

color("black")(db) # fails
[]

All self edges

edge(x, x)(db)
[{Var(name='x'): 3}]

Now we want to be able to conjoin subqueries. You can conjoin them by performing inner joins on these primitive pieces, joining by variable names in the environments. SQL rocks at inner joins, which is what we’ll get to next. Here, this is done in the most naive why via a nested loop join https://en.wikipedia.org/wiki/Nested_loop_join

def conj(*queries: Query) -> Query:
    def res(db : DB) -> list[Env]:
        envs = [{}]
        for query in queries:
            R = query(db)
            newenvs = []        
            for env1 in R:
                for env2 in envs:
                    if all(env1[k] == env2[k] for k in set(env1.keys()) & set(env2.keys())):
                        newenvs.append(env1 | env2)
            envs = newenvs
        return envs
    return res

conj(edge(x, y), edge(y, x))(db)
[{Var(name='y'): 1, Var(name='x'): 2},
 {Var(name='y'): 3, Var(name='x'): 3},
 {Var(name='y'): 2, Var(name='x'): 1}]

Heads of rules are a semantically distinct thing from the relation expressions that appear in the body of the expression. The head actually mutates the database given an environment. The environment in particular ought to be derived from a query giving a semantic signature type Action = Query -> DB -> DB. You could of course also just mutate the db rather than return a new version purely functionally.

type Action = Callable[[DB, Query], DB]

def headdecl(name : str):
    def rel(*args) -> Action:
        def res(query : Query, db : DB) -> DB:
            envs = query(db)
            Rold = db.get(name, set())
            Rnew = {tuple(env[arg] if isinstance(arg,Var) else arg for arg in args) for env in envs}
            return {**db, **{name : Rnew | Rold}}
        return res
    return rel

edgeh = headdecl("edge")
# edge(x,y) :- edge(y,x), edge(x,x)
edgeh(x, y)(conj(edge(y, x), edge(x,x)), db)

{'edge': {(1, 2), (2, 1), (2, 3), (3, 2), (3, 3), (3, 4)},
 'color': {('blue',), ('green',), ('red',)}}

SQLizing

We can push this thing around in sort of a staged metaprogramming style https://okmij.org/ftp/ML/MetaOCaml.html to instead produce code (db -> set[env]), the code in particular being strings of SQL.

In the following, the simplicity of what is happening is largely being obscured by the nastiness of constructing SQL strings but it is basically the same as the above. I typically construct my froms, wheres, and selects in python lists and then join them together into fstrings.

I chose to wrap things in classes in order to get nice operator overloading path(x,z) <= edge(x,y) & path(y,z). It was too cute to not do. This does bulk out the implementation compared to the combinator style I did above.

EnvRel is spiritually a code (db -> set env)

from dataclasses import dataclass

type SQL = str

@dataclass(frozen=True)
class Var():
    name : str

@dataclass
class EnvRel():
    """
    A relation representing the datalog environment. It has named columns representing
    the datalog variables currently bound.
    For example
    path("x", "y") :- edge("x", "y") has 
    """
    cols : set[Var] # reverse these two.
    code : str
    def __and__(self, other: "EnvRel") -> "EnvRel":
        # Does an inner join. The analog of `conj` from before.
        cols = self.cols | other.cols
        on_clause = " AND ".join(f"t1.{c.name} = t2.{c.name}" for c in (self.cols & other.cols))
        if on_clause != "":
            on_clause = "WHERE " + on_clause
        selects = [f"t1.{c.name} AS {c.name}" for c in self.cols] + [f"t2.{c.name} AS {c.name}" for c in other.cols if c not in self.cols]
        code = f"SELECT {", ".join(selects)}\nFROM ({self.code}) AS t1,\n({other.code}) AS t2\n{on_clause}"
        return EnvRel(cols, code)
    def __or__(self, other : "EnvRel") -> "EnvRel":
        assert self.cols == other.cols, "Can only union relations with same columns"
        return EnvRel(self.cols, f"SELECT {", ".join([c.name for c in self.cols])}\n FROM ({self.code}\nUNION {other.code})")


x,y,z,w = Var("x"), Var("y"), Var("z"), Var("w")

Like before, a lot of the action actually happens in converting from the actual applied base relation decl to an environment containing the results of the query.

A nice syntactic trick is to use myrel(x) for actions and myrel[x] for queries. I believe I first saw this in SLOG https://arxiv.org/abs/2411.14330 https://arxiv.org/abs/2211.11573 or maybe Relational AI’s thing?

from dataclasses import dataclass

@dataclass
class RelDecl:
    """
    RelDecl is the unapplied function symbol like edge/2 or path/2
    """
    name : str
    arity : int
    def create(self) -> SQL:
        """
        SQL command to create corresponding table with primary key on all columns x0,x1,x2, ... .
        """
        cols = [f"x{i}" for i in range(self.arity)]
        return f"CREATE TABLE IF NOT EXISTS {self.name} ({', '.join(cols)}, PRIMARY KEY ({', '.join(cols)}))"
    def __call__(self, *args) -> "Head":
        """
        rel(...) notation is used in the head. They represent facts to be inserted into the database
        """
        return Head(self, args)
    def __getitem__(self, args) -> "EnvRel":
        """
        rel[...] notation is used in the body. They represent queries on the database.

        This is where the action happens converting the base names into the environment
        """
        # min_arg is a kind of determinstic union find. Pick minimum index as canonical
        min_arg = {a : min(j for j,b in enumerate(args) if a == b) for a in args if isinstance(a, Var)}
        wheres = [f"x{i} = x{min_arg[a]}" for (i, a) in enumerate(args) if isinstance(a, Var) and min_arg[a] != i]
        # constant projections
        wheres +=  [f"x{n} = {str(arg)}" for n,arg in enumerate(args) if not isinstance(arg, Var)]
        if wheres:
            wheres = "WHERE " + " AND ".join(wheres)
        else:
            wheres = ""
        cols = set(a for a in args if isinstance(a, Var))
        if not min_arg:
            selects = "NULL"
        else:
            selects = ", ".join(f"x{j} AS {a.name}" for a,j in min_arg.items())
            #selects = ", ".join(f"x{n} as {arg}" for n,arg in enumerate(args) if isinstance(arg, str))
        return EnvRel(cols, f"SELECT {selects}\nFROM {self.name}\n{wheres}")

It is easiest to see how this really works by looking at particular examples of the output SQL.

edge = RelDecl("edge", 2)
path = RelDecl("path", 2)

print(edge[x,y].code)

SELECT x0 AS x, x1 AS y
FROM edge

Here is a constant query

print(edge[2,1].code)
SELECT NULL
FROM edge
WHERE x0 = 2 AND x1 = 1

A slightly projective query

print(edge[x,1].code)
SELECT x0 AS x
FROM edge
WHERE x1 = 1

Actually running it.

import sqlite3

db = sqlite3.connect(":memory:")
db.execute(edge.create())
db.execute(path.create())
db.execute("INSERT INTO edge (x0, x1) VALUES (1,2), (2,3), (3,4), (4,5), (5,5)")
db.execute(edge[x,y].code).fetchall()
[(1, 2), (2, 3), (3, 4), (4, 5), (5, 5)]
db.execute(edge[x,x].code).fetchall()
[(5,)]
db.execute((edge[x,y] & edge[y,z]).code).fetchall()
[(2, 1, 3), (3, 2, 4), (4, 3, 5), (5, 4, 5), (5, 5, 5)]
db.execute(edge[x,3].code).fetchall()
[(2,)]

I made an auxiliary class for applied RelDecl so that I can have nice <= notation. <= is lower precedence than &, which is the precedence I want for datalog rules.

@dataclass
class Head:
    """
    This is the applied function symbol to arguments e.g. rel(1,2,3). Mainly I did this to get <= for the heads of rules.
    """
    r : RelDecl
    args : list[object]
    def fact(self):
        assert all(not isinstance(a, Var) for a in self.args)
        return f"INSERT OR IGNORE INTO {self.r.name}\nVALUES ({', '.join(str(a) for a in self.args)})"
    def __le__(self, body : EnvRel) -> SQL:
        selects = [a.name if isinstance(a, Var) else str(a) for a in self.args]
        return f"INSERT OR IGNORE INTO {self.r.name}\nSELECT {",".join(selects)}\nFROM ({body.code})"

Here is SQL resulting from the transitive closure query

print(path(x,y) <= edge[x,y])
INSERT OR IGNORE INTO path
SELECT x,y
FROM (SELECT x0 AS x, x1 AS y
FROM edge
)
db.execute(path(x,y) <= edge[x,y])
db.execute(path[x,y].code).fetchall()
[(1, 2), (2, 3), (3, 4), (4, 5), (5, 5)]
print(path(x,z) <= edge[x,y] & path[y,z])
INSERT OR IGNORE INTO path
SELECT x,z
FROM (SELECT t1.y AS y, t1.x AS x, t2.z AS z
FROM (SELECT x0 AS x, x1 AS y
FROM edge
) AS t1,
(SELECT x0 AS y, x1 AS z
FROM path
) AS t2
WHERE t1.y = t2.y)
db.execute(path(x,z) <= edge[x,y] & path[y,z])
db.execute(path[x,y].code).fetchall()
[(1, 2), (2, 3), (3, 4), (4, 5), (5, 5), (1, 3), (2, 4), (3, 5)]

Fixpoints

Ok, but one of the points of datalog is that it has fixpoints. It runs the mutually dependent rules until they produce nothing new. These are nice for doing things like the transitive closure of a relation or performing program analyses.

def naive_fix(db, rels, stmts, limit=4):
    for rel in rels:
        db.execute(rel.create())
    for stmt in stmts:
        if isinstance(stmt, Head):
            db.execute(stmt.fact())
    n = 0
    while True:
        n += 1
        for stmt in stmts:
            if isinstance(stmt, str):
                db.execute(stmt)
        if n > limit:
            return
db.execute("DELETE FROM edge")
db.execute("DELETE FROM path")
path = RelDecl("path", 2)
naive_fix(db, [edge, path], [
    edge(1,2),
    edge(2,3),
    path(x,y) <= edge[x,y],
    path(x,z) <= edge[x,y] & path[y,z]
])
db.execute(path[x,y].code).fetchall()
[(1, 2), (2, 3), (1, 3)]
print(path(x,z) <= edge[x,y] & path[y,z])
INSERT OR IGNORE INTO path
SELECT x,z
FROM (SELECT t1.y AS y, t1.x AS x, t2.z AS z
FROM (SELECT x0 AS x, x1 AS y
FROM edge
) AS t1,
(SELECT x0 AS y, x1 AS z
FROM path
) AS t2
WHERE t1.y = t2.y)
edge(x,y) <= edge[x,y]
db.execute(edge(4,5).fact())
db.execute(edge(y,x) <= edge[x,y])
db.execute(edge[x,y].code).fetchall()
[(1, 2), (2, 3), (4, 5), (2, 1), (3, 2), (5, 4)]

SemiNaive via Dual Relations

Seminaive evaluation works by observation that any new fact must come from a query involving at least one new fact from the previous iteration.

We can carry through both the primary and the delta relation in a single bundle. This is the analog of automatic differentiation. Pretty neat huh!

https://stackoverflow.com/questions/47043937/what-is-the-difference-between-naive-and-semi-naive-evaluation

@dataclass
class DualEnv():
    env : Env
    denv : Env

    def __and__(self, other):
        env = self.env & other.env
        denv = self.denv & other.env | self.env & other.denv
        return DualEnv(env, denv)

@dataclass
class DualDecl():
    r : RelDecl
    dr : RelDecl
    newr : RelDecl
    def __init__(self, name, arity):
        self.r = RelDecl(name, arity)
        self.dr = RelDecl("delta_" + name, arity)
        self. newr = RelDecl("new_" + name, arity)
    def __call__(self, *args) -> "DualHead":
        return DualHead(self.newr, args)
    def __getitem__(self, args) -> "DualEnv":
        return DualEnv(self.r[args], self.dr[args])

@dataclass
class DualHead(Head):
    def __le__(self, body : DualEnv) -> SQL:
        return super().__le__(body.denv)

edge = DualDecl("edge", 2)
path = DualDecl("path", 2)

print((edge[x,y] & path[y,z]).denv.code)

SELECT z, y, x
 FROM (SELECT t1.y AS y, t1.x AS x, t2.z AS z
FROM (SELECT x0 AS x, x1 AS y
FROM delta_edge
) AS t1,
(SELECT x0 AS y, x1 AS z
FROM path
) AS t2
WHERE t1.y = t2.y
UNION SELECT t1.y AS y, t1.x AS x, t2.z AS z
FROM (SELECT x0 AS x, x1 AS y
FROM edge
) AS t1,
(SELECT x0 AS y, x1 AS z
FROM delta_path
) AS t2
WHERE t1.y = t2.y)
db.execute(edge.r.create())
db.execute(edge.dr.create())
db.execute(path.r.create())
db.execute(path.dr.create())
db.execute(edge.dr(1,2).fact())
db.execute((edge[x,y] & path[y,z]).denv.code).fetchall()
[(3, 2, 1)]
print((edge[1,x] & path[x,3]).denv.code)
SELECT x
 FROM (SELECT t1.x AS x
FROM (SELECT x1 AS x
FROM delta_edge
WHERE x0 = 1) AS t1,
(SELECT x0 AS x
FROM path
WHERE x1 = 3) AS t2
WHERE t1.x = t2.x
UNION SELECT t1.x AS x
FROM (SELECT x1 AS x
FROM edge
WHERE x0 = 1) AS t1,
(SELECT x0 AS x
FROM delta_path
WHERE x1 = 3) AS t2
WHERE t1.x = t2.x)

Seminaive Fixpoint

The naive fixpoint can also be generalized

def fix(db, rels : list[DualDecl], stmts, limit=None):
    for rel in rels:
        db.execute(rel.r.create())
        db.execute(rel.dr.create())
        db.execute(rel.newr.create())
        db.execute(f"DELETE FROM {rel.r.name}")
        db.execute(f"DELETE FROM {rel.dr.name}")
        db.execute(f"DELETE FROM {rel.newr.name}")
    # Do facts
    for stmt in stmts:
        if isinstance(stmt, Head):
            db.execute(stmt.fact())
    n = 0
    while True:
        n += 1
        for stmt in stmts:
            if isinstance(stmt, str):
                db.execute(stmt)
        for rel in rels:
            # Do the old swaparooni. Delete delta, delta := new - old, old += delta, new := {}
            db.execute(f"DELETE FROM {rel.dr.name}")
            db.execute(f"INSERT OR IGNORE INTO {rel.dr.name} SELECT * FROM {rel.newr.name} EXCEPT SELECT * FROM {rel.r.name}")
            db.execute(f"INSERT OR IGNORE INTO {rel.r.name} SELECT * FROM {rel.dr.name}")
            db.execute(f"DELETE FROM {rel.newr.name}")
            print(n, rel.dr.name, db.execute("SELECT * FROM " + rel.dr.name).fetchall())
        if (limit is not None and n > limit) or all(db.execute(f"SELECT * FROM {rel.dr.name} LIMIT 1").fetchone() is None for rel in rels):
            return

fix(db, [edge, path], [
    edge(1,2),
    edge(2,3),
    edge(3,4),
    path(x,y) <= edge[x,y],
    path(x,z) <= edge[x,y] & path[y,z]
], limit=10)
db.execute(path[x,y].env.code).fetchall()
1 delta_edge [(1, 2), (2, 3), (3, 4)]
1 delta_path []
2 delta_edge []
2 delta_path [(1, 2), (2, 3), (3, 4)]
3 delta_edge []
3 delta_path [(1, 3), (2, 4)]
4 delta_edge []
4 delta_path [(1, 4)]
5 delta_edge []
5 delta_path []





[(1, 2), (2, 3), (3, 4), (1, 3), (2, 4), (1, 4)]

Bits and Bobbles

A small z3 ast to sqlite compiler is here https://github.com/philzook58/knuckledragger/blob/main/kdrag/solvers/datalog.py .

Actually, the thing that I thought was fun was generalizing all of these to Dual relatiosn akin to dual numbers https://en.wikipedia.org/wiki/Dual_number to achieve seminaive

I should also maybe switch to a form using CTEs rather than inlined nested select statements. That would make less repetition.

CHR and graph rewriting on SQL would also be interesting

A thought occurred to me as I was revisiting some of this stuff, considering about submitting to the minikanren workshop (which I did not).

What I did before was a somewhat light pass over an AST to determine which vars to ground at compile them.

I can probably throw garbage into SQL engines and get something ok out (?).

Datalog conjunctive queries have a different flavor than SQL style conjunctive queries. Seemingly, in datalog you give names to slots of the relations mytable(Myvar), whereas in SQL style you give alpha invariant names to entire rows SELECT myrow.col0 FROM mytable as myrow.

SQL and relational algebra tables have named columns

But a twist that I don’t know how to think about exactly is that the variable names in the environment can be column names.

SQL makes it easy to state relational algebra operations https://en.wikipedia.org/wiki/Relational_algebra like projection, selection, and renaming.

There might be a fun version of this post that works off of a relational algebra interface

from typing import Protocol
class RelAlg(Protocol):
    cols: list[str]  # column names
    def rename(self, src: str, dst: str) -> 'RelAlg': ...  # subst : list[tuple[str, str]]
    def proj(self, cols: list[str]) -> 'RelAlg': ...
    def join(self, other: 'RelAlg') -> 'RelAlg': ...
    def union(self, other: 'RelAlg') -> 'RelAlg': ...
    def intersect(self, other: 'RelAlg') -> 'RelAlg': ...
    def diff(self, other: 'RelAlg') -> 'RelAlg': ...
    def select(self, cond: str) -> 'RelAlg': ... # not str
class DualEnv():
    env : Env
    denv : Env

    def __and__(self, other):
        env = self.env & other.env
        denv = self.denv & other.env | self.env & other.denv
        return DualEnv(env, denv)




import sqlite3
db = sqlite3.connect(":memory:")
db.execute(RelDecl("edge", 2).create())
<sqlite3.Cursor at 0x7c674c9551c0>

edge = BaseRel({"x0","y0"}, "edge")
path = BaseRel({"x0","y0"}, "path")
db.execute(edge["y", "x"].code).fetchall()
db.execute(edge("y", "x", body=edge["x", "y"]))
db.execute("SELECT * FROM edge").fetchall()
#db.execute(path("x", "z", body=edge["x", "y"] & path["y", "z"]))
edge["x", "y"] & path["y", "z"]



---------------------------------------------------------------------------

NameError                                 Traceback (most recent call last)

Cell In[33], line 1
----> 1 edge = BaseRel({"x0","y0"}, "edge")
      2 path = BaseRel({"x0","y0"}, "path")
      3 db.execute(edge["y", "x"].code).fetchall()


NameError: name 'BaseRel' is not defined

Bits and Bobbles

class EnvRel():
    def project(self, cols: set[Var]) -> "EnvRel":
        assert cols <= self.cols
        select_clause = ", ".join(f"{c} AS {c}" for c in cols)
        return EnvRel(cols, f"SELECT {select_clause} FROM ({self.code})")
    def rename(self, renames: dict[str,str]) -> "EnvRel":
        new_cols = {renames.get(c,c) for c in self.cols}
        select_clause = ", ".join(f"{c} AS {renames.get(c,c)}" for c in self.cols)
        return EnvRel(new_cols, f"SELECT {select_clause} FROM ({self.code})")


def fix(db, rels : list[tuple[str,int]], stmts, limit=None):
    for rel,arity in rels:
        db.execute(f"CREATE TABLE {rel}(a int, b int)")
        db.execute("CREATE TABLE delta_{rel} (a)")
        db.execute("CREATE TABLE new_{rel} (a)")
        db.execute("INSERT INTO delta_{rel} SELECT * FROM {rel}")
    n = 0
    while True:
        n += 0
        for stmt in stmts:
            db.execute(stmt)
        for rel,arity in rels:
            db.execute(f"DELETE FROM delta_{rel}")
            db.execute(f"INSERT INTO {rel} SELECT * FROM new_{rel} EXCEPT SELECT * FROM {rel}")
            db.execute(f"DELETE FROM new_{rel}")
            if (limit is not None and n > limit) or db.execute(f"SELECT COUNT(*) FROM delta_{rel}").fetchone()[0] > 0:
                # cleanup
                db.execute("DELETE TABLE delta_{rel}")
                db.execute("DELETE TABLE new_{rel}")
                return
            

class Head():
    def __init__(self, rel, vs):
        self.rel = rel
        self.vs = vs
    def __le__(self, body):
        return f"INSERT INTO {self.rel} SELECT {','.join(body.vs)} FROM {body.froms} WHERE {' AND '.join(body.conds)}"

class Rel():
    def __init__(self, rel, vs):
        self.rel = rel
        self.vs = vs

type goaly = (int,db) -> list[env]

type Env = dict
type goal = Callable[[DB, Env], list[Env]]

class Var():
    name: str

def conj(*args : Env):


class DatalogRel():
    data : set[tuple[object,...]]
    def __call__(self, *args):
    




from dataclasses import dataclass

type SQL = str

@dataclass
class SqlRel():
    cols : set[str] # reverse these two.
    code : str
    def __and__(self, other: "EnvRel") -> "EnvRel":
        on = self.cols & other.cols
        assert on, f"Cannot join on no columns {self.cols} {other.cols}"
        on_clause = " AND ".join(f"self.{c} = other.{c}" for c in on)
        return SqlRel(self.cols | other.cols, f"({self.code}) INNER JOIN ({other.code}) ON {on_clause}")
    
    def project(self, cols: set[str]) -> "SqlRel":
        assert cols <= self.cols
        select_clause = ", ".join(f"{c} AS {c}" for c in cols)
        return SqlRel(cols, f"SELECT {select_clause} FROM ({self.code})")
    def rename(self, renames: dict[str,str]) -> "SqlRel":
        new_cols = {renames.get(c,c) for c in self.cols}
        select_clause = ", ".join(f"{c} AS {renames.get(c,c)}" for c in self.cols)
        return SqlRel(new_cols, f"SELECT {select_clause} FROM ({self.code})")
    #def __getitem__(self, cols : set[str]) -> "SqlRel":
    #    return self.project(cols)

@dataclass
class EnvRel(SqlRel): ...



    

edgexy = SqlRel({"x","y"}, "SELECT x0 AS x, y0 as y FROM edge")
pathyz = SqlRel({"y","z"}, "SELECT x0 AS y, y0 as z FROM path")

import sqlite3
db = sqlite3.connect(":memory:")
def dict_factory(cursor, row):
    fields = [column[0] for column in cursor.description]
    return {key: value for key, value in zip(fields, row)}
db.row_factory = dict_factory#sqlite3.Row
db.execute("CREATE TABLE edge (x0 INT, y0 INT, UNIQUE(x0, y0))")
db.execute("CREATE TABLE path (x0 INT, y0 INT, UNIQUE(x0, y0))")
db.execute("INSERT INTO edge VALUES (1,2), (2,3), (3,4)")
(edgexy & pathyz).project({"x", "z"})

db.execute(edgexy.project({"x"}).code).fetchall()

db.execute(edgexy.rename({"x" : "z"}).code).fetchall()
db.execute(f"INSERT OR IGNORE INTO path {edgexy.code}").fetchall()
db.execute(f"INSERT OR IGNORE INTO path {edgexy.code}").fetchall()
db.execute("SELECT * FROM path").fetchall()
#db.execute(f"INSERT OR IGNORE INTO path { (edgexy & pathyz).project({'x','z'}).code}").fetchall()
db.execute("SELECT * FROM path").fetchall()


[{'x0': 1, 'y0': 2}, {'x0': 2, 'y0': 3}, {'x0': 3, 'y0': 4}]

SqlRel(cols={'y', 'z', 'x'}, code='(SELECT x0 AS x, y0 AS y FROM (edge)) INNER JOIN (SELECT x0 AS y, y0 AS z FROM (path)) ON self.y = other.y')
from typing import Protocol

class RelAlg(Protocol):
    cols: list[str]  # column names
    def rename(self, src: str, dst: str) -> 'RelAlg': ...  # subst : list[tuple[str, str]]
    def proj(self, cols: list[str]) -> 'RelAlg': ...
    def join(self, other: 'RelAlg') -> 'RelAlg': ...
    def union(self, other: 'RelAlg') -> 'RelAlg': ...
    def intersect(self, other: 'RelAlg') -> 'RelAlg': ...
    def diff(self, other: 'RelAlg') -> 'RelAlg': ...
    def select(self, cond: str) -> 'RelAlg': ... # not str

class SQLRel(RelAlg):
    cols : list[str]
    code : str

class PyRel(RelAlg):
    rel : frozenset(frozendict)
    
class PolarsRel(RelAlg):
    pass

class DualRel(RelAlg):
    R: RelAlg
    dR: RelAlg






Lazy Trie Join The simplest version just pushes completely.

Minikanren has uniformity.

https://dodisturb.me/posts/2018-12-25-The-Essence-of-Datalog.html https://stackoverflow.com/questions/28467011/what-are-the-main-technical-differences-between-prolog-and-minikanren-with-resp

https://www.cambridge.org/core/journals/journal-of-functional-programming/article/sql-to-c-compiler-in-500-lines-of-code/C38B40C78B6A9C55232D4A850587FC64

class Var():
    ind : int
#type Var = int
type Env = dict[int, object]
#type rel = Callable[[...], list[Env]]

edge_table = {(1,2), (2,3)}
def edge(x,y):
    def res(db):
        if x is Var:
            return
            
def rel(name, *args):
    def res(db):
        R = db.get(name, {})
        for row in R: ...

def merge_env(e1,e2):
    if len(e1) <= len(e2):
        e1, e2 = e2, e1
    res = e1.copy()
    for k,v in e2:
        if k in res:
            if res[k] != v:
                return
        res[k] = v
    return res

#def bind(v, name):

def forall(A, f):
    def res(db):
        for a in db[A]:
            yield f(a)

def forall(f):
    def res(counter, db):
        return f(counter)(counter+1, db)
    return res




    #for (n,a) in sorted(enumerate(args), key=lambda z: (z[1],z[0])):





def prod(x,y): #  seminiave  ((x*y), x*dy + y * dx)
    def res(db):
        for a in x(db):
            for b in y(db):
                yield merge_env(a,b)
    return res






import sqlite3


db = sqlite3.connect(":memory:")
db.execute("CREATE TABLE edge (x0 INTEGER, x1 INTEGER)")
db.executemany("INSERT INTO edge VALUES (?, ?)", [(1,2),(2,3)])
db.execute("CREATE TABLE path (x0 INTEGER, x1 INTEGER)")
db.execute("INSERT INTO path SELECT x as x0, y as x1 FROM (SELECT x0 as x, x1 as y FROM edge)")
db.execute("SELECT * FROM path").fetchall()
[(1, 2), (2, 3)]
class Rel(): ...
    def sql():
    def freevars():
    def __mul__(self, other):
        return Rel(f"({self.sql} JOIN {other.sql})")

class BaseRelation():
    def __init__(self, name, db):
        self.name = name
        self.db = db
    def __call__(self, *args, **kwargs):
        args = {"x" + str(n) : a for n,a in enumerate(args), **kwargs}
        wheres = ["TRUE"]
        env = {}
        for k,v in args.items():
            if isinstance(v, Var):
                if v.ind in env:
                    wheres.append(f"{k} = {env[v.ind]}")
                else:
                    env[v.ind] = k
            else:
                wheres.append(f"{k} = {repr(v)}")
        return Rel(
            f'(SELECT {"," .join([f"{v} As {k}" for k,v in env.items()])} 
                FROM {self.name} 
                WHERE {" AND ".join(wheres)})', 
                env.keys())
    def __getitem__(self, key): ...
        
  Cell In[28], line 2
    def sql():
    ^
IndentationError: unexpected indent

I can translate datalog into relation algerbao f the body. Then i can directly interpet the relation algebra, do optimizations finally tagless style or interpret into SQL.

The key is the senmantcs of the body is the relation of _environments”

Using dual numbers “edge”, “delta_edge” UNION

THe initial trnalsation from base relations to an environment form is the only complex part.

Using Lazy Trie Join if clean somehow

class Trie(): # colname : [object] rename : [object] # level 0 = “foo”, level 1 = “bar” … etc. trie : dict[dict[dict]]

def intersect(t1,t2) if t.level[0] == t.level[1]: else: if t1.level[0] < t2.level[1]: t2,t1 = t1,t2 return {k : for k,v in t1.trie}

class RelAlg(Protocol):
    def rename(self, **kwargs): ...
    def project(self, *args): ...
    def join(self, other): ...

class RelAlgCode():
    sql : str
class RelAlgSet():







def rel(R):
    def res(*args, **kwargs):
        args = {**{"x" + str(n) : a for n,a in enumerate(args)}, **kwargs}
        Q = set()
        for row in R:
            env = {}
            for k,v in args.items():
                if isinstance(v, Var):
                    if v in env:
                        if env[v] != row[k]:
                            break
                    else:
                        env[v] = row[k]
                else:
                    if row[k] != v:
                        break
            else:
                Q.add(frozendict(env))
        return Q
    return res
  Cell In[79], line 1
    class RelAlg()
                  ^
SyntaxError: expected ':'

@dataclass(frozen=True)
class Var():
    ind : str
x,y,z = Var("x"), Var("y"), Var("z")

def rel(name):
    def res(*args, **kwargs):
        args = {**{"x" + str(n) : a for n,a in enumerate(args)}, **kwargs}
        wheres = ["TRUE"]
        env = {}
        for k,v in args.items():
            if isinstance(v, Var):
                if v in env:
                    wheres.append(f"{k} = {env[v]}")
                else:
                    env[v] = k
            else:
                wheres.append(f"{k} = {repr(v)}")
        return f"""SELECT DISTINCT {", " .join([f"{v} AS {k.ind}" for k,v in env.items()] + ["NULL"])} \
                FROM {name} \
                WHERE {" AND ".join(wheres)}"""
    return res
def conj(*args):
    return ", ".join("(" + arg + ")" for arg in args)
    #return " INNER JOIN ".join("(" + arg + ")" for arg in args)
#def 

edge = rel("edge")
edge(1,2)
#db.execute("SELECT NULL, x0 as a FROM edge where true and x1 = 2").fetchall()
#db.execute(edge(3,4)).fetchall()

db = sqlite3.connect(":memory:")
db.execute("CREATE TABLE edge (x0 INTEGER, x1 INTEGER, PRIMARY KEY (x0, x1))")
db.executemany("INSERT OR IGNORE INTO edge VALUES (?, ?)", [(1,2),(2,3),(2,4)])
db.execute("CREATE TABLE path (x0 INTEGER, x1 INTEGER)")
db.execute("INSERT INTO path SELECT x as x0, y as x1 FROM (SELECT x0 as x, x1 as y FROM edge)")
db.execute("SELECT * FROM path").fetchall()

db.execute("SELECT * FROM" + conj(edge(1, x), edge(x,3))).fetchall()
#conj(edge(1, x), edge(x,3))
#edge(1, x)
#db.execute("SELECT * FROM edge").fetchall()
#conj(edge(1, x), edge(x,3))

def insert(name)
    def res(*args, **kwargs):
        def res2(body):
            args = {**{"x" + str(n) : a for n,a in enumerate(args)}, **kwargs}
            selects = ", ".join([f"{v} AS {k}" for k,v in args.items()])
            return f"INSERT OR IGNORE INTO {name} SELECT {selects} FROM {body}"
        return res2
    return res
insert("path")(1,2)() 

[(2, None, 2, None)]
class State():
    env : dict[int, key]
    data : trie
class State():
    env : dict[int, string]
    froms :list[str]
    constrs : list[str]

    def __mul__(self, other):
        return State({**self.env, **self.other},
            self.froms + other.froms,
            self.constrs + other.constrs + [self.env[k] == other.env[k] for k in self.env if k in other.env])

def rel(name, *args):
    return State(  , ["{name} as {row}"], [])

from dataclasses import dataclass
from collections import defaultdict
@dataclass
class Var():
    ind:int
    def __le__(self, other):
        if isinstance(other, Var):
            return self.ind <= other.ind
        return True
    def __lt__(self, other):
        if isinstance(other, Var):
            return self.ind < other.ind
        return True

def trie(R, *args):
    def worker(S, nargs):
        if len(S) == 0:
            return None
        if len(nargs) == 0:
            return () if S else None
        else:
            (n, a) = nargs[0]
            if isinstance(a, Var):
                q = defaultdict(set)
                for v in S:
                    q[v[n]].add(v)
                return (n, {k : worker(v, nargs[1:]) for k, v in q.items()})
            else:
                return worker({v for v in S if v[n] == a}, nargs[1:])
    return worker(R, sorted(enumerate(args), key=lambda z: (z[1], z[0])))
edge = {(1,2), (2,3), (2,1)}
trie(edge, Var(0), 1)

---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

Cell In[8], line 32
     30     return worker(R, sorted(enumerate(args), key=lambda z: (z[1], z[0])))
     31 edge = {(1,2), (2,3), (2,1)}
---> 32 trie(edge, Var(0), 1)


Cell In[8], line 30, in trie(R, *args)
     28         else:
     29             return worker({v for v in S if v[n] == a}, nargs[1:])
---> 30 return worker(R, sorted(enumerate(args), key=lambda z: (z[1], z[0])))


TypeError: '<' not supported between instances of 'int' and 'Var'
def trie(R,n):
    if n == 0:
        return ()
    elif n == 1:
        return {k[0] : () for k in R}
    else:
        q = defaultdict(set)
        for row in R:
            if len(row) >= 2:
                q[row[0]].add(row[1:])

        return {k : trie(v,n-1) for k, v in q.items()}

trie(edge, 2)



{2: {1: (), 3: ()}, 1: {2: ()}}
def trie(R):
    root = {}
    for row in R:
        node = root
        for v in row:
            node = node.setdefault(v, {})
    return root

trie(edge)
{2: {3: {}, 1: {}}, 1: {2: {}}}
def trie(R, args):
    root = {}
    for row in R:
        node = root
        for n, v in sorted(zip(args, row)):
            node = node.setdefault(v, {})
    return root

trie(edge, [0,1])
trie(edge, [1,0])
{3: {2: {}}, 2: {1: {}}, 1: {2: {}}}
def trie(R, pat):
    root = {}
    for row in R:
        env = {}
        for n, v in sorted(zip(pat, row)):
            if isinstance(n,Var) and n in env:
                if env[n] != v:
                    break
                else:
                    env[n] = v
            else:
                if n != v: # literal pattern
                    break
        else:
            node = root
            for n, v in sorted(env.items()):
                node = node.setdefault(v, {})
    return root

trie(edge, [0,1])
trie(edge, [1,0])
def run(db):
    for x in db.V:
        for y in db.V: