-
Notifications
You must be signed in to change notification settings - Fork 0
/
position_db.py
249 lines (212 loc) · 9.51 KB
/
position_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from contextlib import closing
import os
import sqlite3
from typing import Optional, Union
from base_types import BoardSize, PlayerToMove, TpsString
import symmetry_normalisator
from position_processor import PositionProcessor
from tak import GameState
class PositionDataBase(PositionProcessor):
def __init__(self, db_file_name: str):
assert db_file_name
self.conn: Optional[sqlite3.Connection] = None
self.max_id = 0
self.db_file_name = db_file_name
def __enter__(self):
create_tables_sql = ["""
CREATE TABLE IF NOT EXISTS games (
id integer PRIMARY KEY,
playtak_id integer,
size integer,
white text NOT NULL,
black text NOT NULL,
result text NOT NULL,
komi integer,
rating_white integer DEFAULT 1000,
rating_black integer DEFAULT 1000,
date integer,
tournament integer
);
""",
"""
CREATE TABLE IF NOT EXISTS positions (
id integer PRIMARY KEY,
tps text NOT NULL,
player_to_move text NOT NULL,
moves text
);
""",
"""
CREATE TABLE IF NOT EXISTS game_position_xref (
id integer PRIMARY KEY,
game_id integer,
position_id integer,
FOREIGN KEY (game_id) REFERENCES games(id),
FOREIGN KEY (position_id) REFERENCES positions(id)
);
"""]
create_index_sql = [
"CREATE INDEX IF NOT EXISTS idx_xref_game_id ON game_position_xref (game_id);",
"CREATE INDEX IF NOT EXISTS idx_xref_position_id ON game_position_xref (position_id);",
"CREATE UNIQUE INDEX IF NOT EXISTS idx_position_tps ON positions (tps, player_to_move);",
"CREATE INDEX IF NOT EXISTS idx_games_white ON games (white);",
"CREATE INDEX IF NOT EXISTS idx_games_black ON games (black);",
"CREATE INDEX IF NOT EXISTS idx_games_rating_white ON games (rating_white);",
"CREATE INDEX IF NOT EXISTS idx_games_rating_black ON games (rating_black);",
"CREATE INDEX IF NOT EXISTS idx_games_komi ON games (komi);",
"CREATE INDEX IF NOT EXISTS idx_games_date ON games (date);",
"CREATE INDEX IF NOT EXISTS idx_games_tournament ON games (tournament);",
]
try:
if os.path.exists(self.db_file_name):
self.conn = sqlite3.connect(self.db_file_name)
for query in create_index_sql:
self.conn.execute(query)
self.conn.row_factory = sqlite3.Row
with closing(self.conn.cursor()) as cur:
get_highest_id_sql = """
SELECT MAX(playtak_id) AS max_id, COUNT(ALL playtak_id) AS games_count FROM games;
"""
cur.execute(get_highest_id_sql)
row = cur.fetchone()
if row is not None:
row_dict = dict(row)
max_id = row_dict['max_id']
games_count = row_dict['games_count']
if max_id is not None:
print("max game ID in loaded DB: ", max_id)
print("number of games in loaded DB:", games_count)
self.max_id = max_id
return self
self.conn = sqlite3.connect(self.db_file_name)
self.conn.row_factory = sqlite3.Row
for query in create_tables_sql:
self.conn.execute(query).close()
for query in create_index_sql:
self.conn.execute(query).close()
return self
except sqlite3.Error as exc:
print(exc)
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.conn is not None:
self.conn.close()
self.conn = None
def commit(self):
assert self.conn is not None
self.conn.commit()
def add_position(
self,
game_id: int,
move,
result: str,
tps: TpsString,
next_tps: Union[TpsString, None],
tak: GameState
) -> int:
assert self.conn is not None
assert bool(next_tps) == bool(move) # either none or both must be set
with closing(self.conn.cursor()) as curr:
# In the beginning of the game, on ply 2 and 3, white is placed consecutively
color_to_place = tak.colour_to_play(tak.ply_counter - 1)
color_to_place_next = tak.colour_to_play(tak.ply_counter)
# normalize for symmetries
tps_normalized, own_symmetry = symmetry_normalisator.get_tps_orientation(tps)
select_position_row_sql = f"""
SELECT *
FROM positions
WHERE tps = '{tps_normalized}' AND player_to_move = '{color_to_place}'
;
"""
curr.execute(select_position_row_sql)
row = curr.fetchone()
# if this position does not exist, create it
if row is None:
self.create_position_entry(tps_normalized, color_to_place)
curr.execute(select_position_row_sql)
row = curr.fetchone()
# update the game-move crossreference table
row_dict = dict(row)
position_id = row_dict['id']
curr.execute(
"INSERT INTO game_position_xref (game_id, position_id) VALUES (:game_id, :position_id);",
{ 'game_id': game_id, 'position_id': position_id }
)
if next_tps is not None and move is not None:
next_tps_normalized, _next_symmetry = symmetry_normalisator.get_tps_orientation(next_tps)
select_next_position_row_sql = f"""
SELECT *
FROM positions
WHERE tps = '{next_tps_normalized}'
AND player_to_move = '{color_to_place_next}'
;
"""
curr.execute(select_next_position_row_sql)
next_pos = curr.fetchone()
# if next position does not exist, create it
if next_pos is None:
self.create_position_entry(next_tps_normalized, color_to_place_next)
curr.execute(select_next_position_row_sql)
next_pos = curr.fetchone()
next_pos_id = dict(next_pos)['id']
# if a move is given also update the move table
# orient move to previous symmetry
move = symmetry_normalisator.transform_move(
move=move,
orientation=own_symmetry,
board_size=tak.size,
)
position_moves = row_dict['moves']
if position_moves != '':
position_moves = row_dict['moves'].split(';')
else:
position_moves = []
moves_list = list(map(lambda x: x.split(','), position_moves))
# if move is in moves_list, update count
move_found = False
for moves in moves_list:
if moves[0] == move:
move_found = True
break
if not move_found:
# append new move to moves_list
moves_list.append((move, str(next_pos_id)))
# transform moves_list into db string format
position_moves = ';'.join(map(','.join, moves_list))
curr.execute(
"UPDATE positions SET moves=:position_moves WHERE id=:position_id",
{ 'position_moves': position_moves, 'position_id': position_id }
)
return own_symmetry
def dump(self):
assert self.conn is not None
for line in self.conn.iterdump():
print(line)
def add_game(
self,
size: BoardSize,
playtak_id: int,
white_name: str,
black_name: str,
result: str,
komi: int,
rating_white: int,
rating_black: int,
date: int, # timestamp
tournament: bool
) -> int:
assert self.conn is not None
insert_game_data_sql = f"""
INSERT INTO games (playtak_id, size, white, black, result, komi, rating_white, rating_black, 'date', tournament)
VALUES ('{playtak_id}', '{size}', '{white_name}', '{black_name}', '{result}', {komi}, {rating_white}, {rating_black}, {date}, {tournament})
RETURNING id;
""" # use RETURNING so that we can get the inserted id after the query
with closing(self.conn.cursor()) as curr:
curr.execute(insert_game_data_sql)
inserted_id = curr.fetchone()[0]
return inserted_id
def create_position_entry(self, tps: str, player_to_move: PlayerToMove):
assert self.conn is not None
insert_position_data_sql = "INSERT INTO positions (tps, player_to_move, moves) VALUES (:tps, :player_to_move, '');"
with closing(self.conn.cursor()) as curr:
curr.execute(insert_position_data_sql, { 'tps': tps, 'player_to_move': player_to_move })