Skip to content

Commit

Permalink
reformatted codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
hweichelt committed Jul 24, 2023
1 parent 14204b7 commit 1fbcde8
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 151 deletions.
84 changes: 42 additions & 42 deletions clingexplaid/__main__.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
import time

import clingo

from clingexplaid.utils import get_solver_literal_lookup
from clingexplaid.utils.transformer import AssumptionTransformer
from clingexplaid.utils.muc import CoreComputer


sig = [
('dog', 1),
('mantis', 1),
('axolotl', 1),
('something_true', 0),
('snake', 1),
('test', 0)
]

at = AssumptionTransformer(sig)

files = ["temp/instance.lp", "temp/encoding.lp"]

ctl = clingo.Control()

transformed_program = ""

for f in files:
transformed_program += at.parse_file(f) + "\n"

ctl.add("base", [], transformed_program)

ctl.ground([("base", [])])

literal_lookup = get_solver_literal_lookup(ctl)

assumptions = at.get_assumptions(ctl)
print(assumptions, [str(literal_lookup[a]) for a in assumptions])

cc = CoreComputer(ctl, assumptions)

ctl.solve(assumptions=list(assumptions), on_core=cc.shrink)
print(cc.minimal)
# import time
#
# import clingo
#
# from clingexplaid.utils import get_solver_literal_lookup
# from clingexplaid.utils.transformer import AssumptionTransformer
# from clingexplaid.utils.muc import CoreComputer
#
#
# sig = [
# ('dog', 1),
# ('mantis', 1),
# ('axolotl', 1),
# ('something_true', 0),
# ('snake', 1),
# ('test', 0)
# ]
#
# at = AssumptionTransformer(sig)
#
# files = ["temp/instance.lp", "temp/encoding.lp"]
#
# ctl = clingo.Control()
#
# transformed_program = ""
#
# for f in files:
# transformed_program += at.parse_file(f) + "\n"
#
# ctl.add("base", [], transformed_program)
#
# ctl.ground([("base", [])])
#
# literal_lookup = get_solver_literal_lookup(ctl)
#
# assumptions = at.get_assumptions(ctl)
# print(assumptions, [str(literal_lookup[a]) for a in assumptions])
#
# cc = CoreComputer(ctl, assumptions)
#
# ctl.solve(assumptions=list(assumptions), on_core=cc.shrink)
# print(cc.minimal)
18 changes: 13 additions & 5 deletions clingexplaid/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""
Utilities
"""

from typing import Tuple, Dict, Set, Union
from collections.abc import Iterable

import clingo
from clingo.ast import ASTType
from typing import Tuple, Dict, Set, Union


SymbolSet = Set[clingo.Symbol]
LiteralSet = Set[Tuple[clingo.Symbol, bool]]
AssumptionSet = Set[Union[LiteralSet, int]]
Literal = Tuple[clingo.Symbol, bool]
LiteralSet = Set[Literal]
Assumption = Union[Literal, int]
AssumptionSet = Iterable[Assumption]


def match_ast_symbolic_atom_signature(ast_symbol: ASTType.SymbolicAtom, signature: Tuple[str, int]):
Expand All @@ -15,7 +23,7 @@ def match_ast_symbolic_atom_signature(ast_symbol: ASTType.SymbolicAtom, signatur
"""

symbol = str(ast_symbol.symbol)
name = symbol.split('(')[0]
name = symbol.split('(', maxsplit=1)[0]
arity = len(ast_symbol.symbol.arguments)

return all((signature[0] == name, signature[1] == arity))
Expand All @@ -26,7 +34,7 @@ def get_solver_literal_lookup(control: clingo.Control) -> Dict[int, clingo.Symbo
This function can be used to get a lookup dictionary to associate literal ids with their respective symbols for all
symbolic atoms of the program
"""
lookup = dict()
lookup = {}
for atom in control.symbolic_atoms:
lookup[atom.literal] = atom.symbol
return lookup
Expand Down
2 changes: 1 addition & 1 deletion clingexplaid/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def set_handler(level, color):
set_handler(logging.DEBUG, "BLUE")
set_handler(logging.ERROR, "RED")

return logger
return logger
22 changes: 15 additions & 7 deletions clingexplaid/utils/muc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from typing import Set, Tuple, Optional
"""
Unsatisfiable Core Utilities
"""

from typing import Tuple, Optional, Set

import clingo

from . import get_solver_literal_lookup, AssumptionSet, SymbolSet
from . import get_solver_literal_lookup, AssumptionSet, SymbolSet, Assumption


class CoreComputer:
Expand All @@ -15,17 +19,17 @@ def __init__(self, control: clingo.Control, assumption_set: AssumptionSet):
self.control = control
self.assumption_set = assumption_set
self.literal_lookup = get_solver_literal_lookup(control=self.control)
self.minimal = None
self.minimal: Optional[AssumptionSet] = None

def _solve(self, assumptions: Optional[AssumptionSet] = None) -> Tuple[bool, SymbolSet, AssumptionSet]:
def _solve(self, assumptions: Optional[AssumptionSet] = None) -> Tuple[bool, SymbolSet, SymbolSet]:
"""
Internal function that is used to make the single solver calls for finding the minimal unsatisfiable core.
"""
if assumptions is None:
assumptions = self.assumption_set

with self.control.solve(assumptions=list(assumptions), yield_=True) as solve_handle:
satisfiable = solve_handle.get().satisfiable
with self.control.solve(assumptions=list(assumptions), yield_=True) as solve_handle: # type: ignore[union-attr]
satisfiable = bool(solve_handle.get().satisfiable)
model = solve_handle.model().symbols(atoms=True) if solve_handle.model() is not None else []
core = {self.literal_lookup[literal_id] for literal_id in solve_handle.core()}

Expand All @@ -51,7 +55,7 @@ def _compute_single_minimal(self, assumptions: Optional[AssumptionSet] = None) -
if satisfiable:
return set()

muc_members = set()
muc_members: Set[Assumption] = set()
working_set = set(assumptions)

for assumption in self.assumption_set:
Expand All @@ -70,6 +74,10 @@ def _compute_single_minimal(self, assumptions: Optional[AssumptionSet] = None) -
return muc_members

def shrink(self, assumptions: Optional[AssumptionSet] = None) -> None:
"""
This function applies the unsatisfiable core minimization (`self._compute_single_minimal`) on the assumptions
set `assumptions` and stores the resulting MUC inside `self.minimal`.
"""
self.minimal = self._compute_single_minimal(assumptions=assumptions)


Expand Down
2 changes: 1 addition & 1 deletion clingexplaid/utils/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ def get(levels, name):
parser.add_argument(
"--version", "-v", action="version", version=f"%(prog)s {VERSION}"
)
return parser
return parser
99 changes: 70 additions & 29 deletions clingexplaid/utils/transformer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""
Transformers for Explanation
"""

from pathlib import Path
from typing import Tuple, Optional, Union, Set, List

import clingo
from clingo.ast import Transformer, parse_string, ASTType

from clingo import ast as _ast
from typing import List, Tuple, Optional, Union, Set
from dataclasses import dataclass
from pathlib import Path

from clingexplaid.utils import match_ast_symbolic_atom_signature

Expand All @@ -17,7 +21,7 @@ class UntransformedException(Exception):
"""


class RuleIDTransformer(Transformer):
class RuleIDTransformer(_ast.Transformer):
"""
A Transformer that takes all the rules of a program and adds an atom with `self.rule_id_signature` in their bodys,
to make the original rule the generated them identifiable even after grounding. Additionally, a choice rule
Expand All @@ -30,7 +34,10 @@ def __init__(self, rule_id_signature: str = RULE_ID_SIGNATURE):
self.rule_id = 0
self.rule_id_signature = rule_id_signature

def visit_Rule(self, node):
def visit_Rule(self, node): # pylint: disable=C0103
"""
Adds a rule_id_signature(id) atom to the body of every rule that is visited.
"""
# add for each rule a theory atom (self.rule_id_signature) with the id as an argument
symbol = _ast.Function(
location=node.location,
Expand All @@ -50,41 +57,54 @@ def _get_number_of_rules(self):

def parse_string(self, string: str) -> str:
"""
Function that applies the transformation on the `program_string` it's called with and returns a
TransformerResult with the transformed program-string and the necessary assumptions for the
`self.rule_id_signature` atoms.
Function that applies the transformation to the `program_string` it's called with and returns the transformed
program string.
"""
self.rule_id = 1
out = []
parse_string(string, lambda stm: out.append((str(self(stm)))))
_ast.parse_string(string, lambda stm: out.append((str(self(stm)))))
out.append(f"{{_rule(1..{self._get_number_of_rules()})}}"
f" % Choice rule to allow all _rule atoms to become assumptions")

return "\n".join(out)

def parse_file(self, path: Union[str, Path]) -> str:
with open(path, "r") as f:
def parse_file(self, path: Union[str, Path], encoding: str = "utf-8") -> str:
"""
Parses the file at path and returns a string with the transformed program.
"""
with open(path, "r", encoding=encoding) as f:
return self.parse_string(f.read())

def get_assumptions(self, n_rules: Optional[int] = None) -> Set[Tuple[clingo.Symbol, bool]]:
"""
Returns the rule_id_signature assumptions depending on the number of rules contained in the transformed
program. Can only be called after parse_file has been executed before.
"""
if n_rules is None:
n_rules = self._get_number_of_rules()
return {(clingo.parse_term(f"{self.rule_id_signature}({rule_id})"), True) for rule_id in range(1, n_rules + 1)}


class AssumptionTransformer(Transformer):
class AssumptionTransformer(_ast.Transformer):
"""
A transformer that transforms facts that match with one of the signatures provided (no signatures means all facts)
into choice rules and also provides the according assumptions for them.
"""

def __init__(self, signatures: Set[Tuple[str, int]] = None):
def __init__(self, signatures: Optional[Set[Tuple[str, int]]] = None):
self.signatures = signatures if signatures is not None else set()
self.fact_rules = []
self.fact_rules: List[str] = []

def visit_Rule(self, node):
if node.head.ast_type != ASTType.Literal:
def visit_Rule(self, node): # pylint: disable=C0103
"""
Transforms head of a rule into a choice rule if it is a fact and adheres to the given signatures.
"""
if node.head.ast_type != _ast.ASTType.Literal:
return node
if node.body:
return node
has_matching_signature = any(
[match_ast_symbolic_atom_signature(node.head.atom, (name, arity)) for (name, arity) in self.signatures])
match_ast_symbolic_atom_signature(node.head.atom, (name, arity)) for (name, arity) in self.signatures)
# if signatures are defined only transform facts that match them, else transform all facts
if self.signatures and not has_matching_signature:
return node
Expand All @@ -103,15 +123,26 @@ def visit_Rule(self, node):
)

def parse_string(self, string: str) -> str:
"""
Function that applies the transformation to the `program_string` it's called with and returns the transformed
program string.
"""
out = []
parse_string(string, lambda stm: out.append((str(self(stm)))))
_ast.parse_string(string, lambda stm: out.append((str(self(stm)))))
return "\n".join(out)

def parse_file(self, path: Union[str, Path]) -> str:
with open(path, "r") as f:
def parse_file(self, path: Union[str, Path], encoding:str = "utf-8") -> str:
"""
Parses the file at path and returns a string with the transformed program.
"""
with open(path, "r", encoding=encoding) as f:
return self.parse_string(f.read())

def get_assumptions(self, control: clingo.Control) -> Set[int]:
"""
Returns the assumptions which were gathered during the transformation of the program. Has to be called after
a program has already been transformed.
"""
# Just taking the fact symbolic atoms of the control given doesn't work here since we anticipate that
# this control is ground on the already transformed program. This means that all facts are now choice rules
# which means we cannot detect them like this anymore.
Expand All @@ -124,21 +155,24 @@ def get_assumptions(self, control: clingo.Control) -> Set[int]:
fact_symbols = [sym.symbol for sym in fact_control.symbolic_atoms if sym.is_fact]

symbol_to_literal_lookup = {sym.symbol: sym.literal for sym in control.symbolic_atoms}
return {symbol_to_literal_lookup.get(sym) for sym in fact_symbols}
return {symbol_to_literal_lookup[sym] for sym in fact_symbols if sym in symbol_to_literal_lookup}


class ConstraintTransformer(Transformer):
class ConstraintTransformer(_ast.Transformer):
"""
A Transformer that takes all constraint rules and adds an atom to their head to avoid deriving false through them.
"""

def __init__(self, constraint_head_symbol: str):
self.constraint_head_symbol = constraint_head_symbol

def visit_Rule(self, node):
if node.head.ast_type != ASTType.Literal:
def visit_Rule(self, node): # pylint: disable=C0103
"""
Adds a constraint_head_symbol atom to the head of every constraint.
"""
if node.head.ast_type != _ast.ASTType.Literal:
return node
if node.head.atom.ast_type != ASTType.BooleanConstant:
if node.head.atom.ast_type != _ast.ASTType.BooleanConstant:
return node
if node.head.atom.value != 0:
return node
Expand All @@ -154,13 +188,20 @@ def visit_Rule(self, node):
return node.update(**self.visit_children(node))

def parse_string(self, string: str) -> str:
"""
Function that applies the transformation to the `program_string` it's called with and returns the transformed
program string.
"""
out = []
parse_string(string, lambda stm: out.append((str(self(stm)))))
_ast.parse_string(string, lambda stm: out.append((str(self(stm)))))

return "\n".join(out)

def parse_file(self, path: Union[str, Path]) -> str:
with open(path, "r") as f:
def parse_file(self, path: Union[str, Path], encoding:str = "utf-8") -> str:
"""
Parses the file at path and returns a string with the transformed program.
"""
with open(path, "r", encoding=encoding) as f:
return self.parse_string(f.read())


Expand Down
Loading

0 comments on commit 1fbcde8

Please sign in to comment.