Skip to content

Commit

Permalink
work in progress: solver decision proagator for gui
Browse files Browse the repository at this point in the history
  • Loading branch information
hweichelt committed May 25, 2024
1 parent c129cc1 commit a06ce44
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 7 deletions.
2 changes: 2 additions & 0 deletions src/clingexplaid/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import sys

import clingo
from clingo.application import clingo_main

from .cli.clingo_app import ClingoExplaidApp
from .cli.textual_gui import textual_main
from .propagators import SolverDecisionPropagator

RUN_TEXTUAL_GUI = False

Expand Down
53 changes: 46 additions & 7 deletions src/clingexplaid/cli/textual_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import clingo
from textual import on
from textual.app import App, ComposeResult
from textual.containers import HorizontalScroll, Vertical, VerticalScroll
Expand All @@ -28,6 +29,8 @@
Tree,
)

from ..propagators import SolverDecisionPropagator

ACTIVE_CLASS = "active"


Expand Down Expand Up @@ -370,6 +373,20 @@ def action_exit(self) -> None:
"""
self.exit(0)

async def on_model(self, model):
tree = self.query_one(SolverTreeView).solve_tree
tree.root.add(f"MODEL {' '.join([str(a) for a in model])}")
await asyncio.sleep(0.1)

def on_propagate(self, decisions):
tree = self.query_one(SolverTreeView).solve_tree
decisions_string = " -> ".join([str(d) for d in decisions])
tree.root.add(decisions_string)

def on_undo(self):
tree = self.query_one(SolverTreeView).solve_tree
tree.root.add("UNDO")

async def action_solve(self) -> None:
"""
Action to exit the textual application
Expand All @@ -378,13 +395,35 @@ async def action_solve(self) -> None:
solve_button = self.query_one("#solve-button")
tree = tree_view.solve_tree
tree.reset(tree.root.label)
tree_view.add_class("loading")
# deactivate solve button
solve_button.disabled = True
await asyncio.sleep(2)
solve_button.disabled = False
tree_view.remove_class("loading")
tree.root.add("TEST")
# tree_view.add_class("loading")
# # deactivate solve button
# solve_button.disabled = True
# await asyncio.sleep(2)
# solve_button.disabled = False
# tree_view.remove_class("loading")

sdp = SolverDecisionPropagator(
callback_propagate=self.on_propagate,
callback_undo=self.on_undo,
)
ctl = clingo.Control("0")
ctl.register_propagator(sdp)
ctl.add("base", [], "1{a;b}. x:-not b.")
ctl.ground([("base", [])])

exhausted = False
with ctl.solve(yield_=True) as solver_handle:
while not exhausted:
result = solver_handle.get()
if result.satisfiable:
model = solver_handle.model()
if model is None:
break
await self.on_model(model.symbols(atoms=True))
exhausted = result.exhausted
if not exhausted:
solver_handle.resume()
tree.root.add("END")


def flatten_list(ls: Optional[List[List[Any]]]) -> List:
Expand Down
2 changes: 2 additions & 0 deletions src/clingexplaid/propagators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from typing import List

from .propagator_decision_order import DecisionOrderPropagator
from .propagator_solver_decisions import SolverDecisionPropagator

DecisionLevel = List[int]
DecisionLevelList = List[DecisionLevel]

__all__ = [
"DecisionOrderPropagator",
"SolverDecisionPropagator",
]
125 changes: 125 additions & 0 deletions src/clingexplaid/propagators/propagator_solver_decisions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Propagator Module: Decision Order
"""

from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union

import clingo
from clingo import Propagator

INTERNAL_STRING = "INTERNAL"


@dataclass
class Decision:
positive: bool
literal: int
symbol: Optional[clingo.Symbol]

def __str__(self) -> str:
symbol_string = str(self.symbol) if self.symbol is not None else INTERNAL_STRING
return f"({[" - ", " + "][self.positive]}{symbol_string})"


class SolverDecisionPropagator(Propagator):
"""
Propagator for showing the Solver Decisions of clingo
"""

def __init__(
self,
signatures: Optional[Set[Tuple[str, int]]] = None,
callback_propagate: Optional[Callable] = None,
callback_undo: Optional[Callable] = None,
):
# pylint: disable=missing-function-docstring
self.literal_symbol_lookup: Dict[int, clingo.Symbol] = {}
self.signatures = signatures if signatures is not None else set()

self.callback_propagate: Callable = callback_propagate if callback_propagate is not None else lambda x: None
self.callback_undo: Callable = callback_undo if callback_undo is not None else lambda x: None

def init(self, init: clingo.PropagateInit) -> None:
"""
Method to initialize the Decision Order Propagator. Here the literals are added to the Propagator's watch list.
"""
for atom in init.symbolic_atoms:
program_literal = atom.literal
solver_literal = init.solver_literal(program_literal)
self.literal_symbol_lookup[solver_literal] = atom.symbol

for atom in init.symbolic_atoms:
if len(self.signatures) > 0 and not any(atom.match(name=s, arity=a) for s, a in self.signatures):
continue
symbolic_atom = init.symbolic_atoms[atom.symbol]
if symbolic_atom is None:
continue # nocoverage
query_program_literal = symbolic_atom.literal
query_solver_literal = init.solver_literal(query_program_literal)
init.add_watch(query_solver_literal)
init.add_watch(-query_solver_literal)

def propagate(self, control: clingo.PropagateControl, changes: Sequence[int]) -> None:
"""
Propagate method the is called when one the registered literals is propagated by clasp. Here useful information
about the decision progress is recorded to be visualized later.
"""
# pylint: disable=unused-argument
decisions, entailments = self.get_decisions(control.assignment)

literal_sequence = []
for d in decisions:
literal_sequence.append(d)
if d in entailments:
literal_sequence.append(list(entailments[d]))
decision_sequence = self.literal_to_decision_sequence(literal_sequence)

self.callback_propagate(decision_sequence)

def undo(self, thread_id: int, assignment: clingo.Assignment, changes: Sequence[int]) -> None:
"""
This function is called when one of the solvers decisions is undone.
"""
# pylint: disable=unused-argument

self.callback_undo()

def literal_to_decision(self, literal: int) -> Decision:
is_positive = literal >= 0
symbol = self.literal_symbol_lookup.get(abs(literal))
return Decision(literal=abs(literal), positive=is_positive, symbol=symbol)

def literal_to_decision_sequence(
self, literal_sequence: List[Union[int, List[int]]]
) -> List[Union[Decision, List[Decision]]]:
new_decision_sequence = []
for element in literal_sequence:
if isinstance(element, int):
new_decision_sequence.append(self.literal_to_decision(element))
elif isinstance(element, list):
new_decision_sequence.append([self.literal_to_decision(literal) for literal in element])
return new_decision_sequence

@staticmethod
def get_decisions(assignment: clingo.Assignment) -> Tuple[List[int], Dict[int, List[int]]]:
"""
Helper function to extract a list of decisions and entailments from a clingo propagator assignment.
"""
level = 0
decisions = []
entailments = {}
try:
while True:
decision = assignment.decision(level)
decisions.append(decision)

trail = assignment.trail
level_offset_start = trail.begin(level)
level_offset_end = trail.end(level)
level_offset_diff = level_offset_end - level_offset_start
if level_offset_diff > 1:
entailments[decision] = trail[(level_offset_start + 1) : level_offset_end]
level += 1
except RuntimeError:
return decisions, entailments

0 comments on commit a06ce44

Please sign in to comment.