from typing import Dict, List, Union, Optional, Callable
import random
import math
from time import perf_counter
import re
import os
import datetime
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from cshogi import *
from cshogi.usi import Engine
from cshogi import CSA
from cshogi import PGN
from cshogi.elo import Elo
try:
is_jupyter = get_ipython().__class__.__name__ != 'TerminalInteractiveShell'
if is_jupyter:
from IPython.display import SVG, display
except NameError:
is_jupyter = False
re_usi_info = re.compile('^.*score (cp|mate) ([+\-0-9]+).*pv (.*)$')
[docs]
def to_score(m):
if m[1] == 'cp':
score = int(m[2])
elif m[1] == 'mate':
if m[2][0] == '-':
score = -100000
else:
score = 100000
return score
[docs]
def usi_info_to_score(info):
m = re_usi_info.match(info)
if m is None:
return None
return to_score(m)
[docs]
def main(engine1: str, engine2: str, options1: Dict = {}, options2: Dict = {}, names: Optional[List[str]] = None,
games: int = 1, resign: Optional[int] = None, mate_win: bool = False,
byoyomi: Optional[Union[int, List[int]]] = None, time: Optional[Union[int, List[int]]] = None, inc: Optional[Union[int, List[int]]] = None,
draw: int = 256, ponder: bool = False, no_swap: bool = False,
opening: Optional[str] = None, opening_moves: int = 24, opening_seed: Optional[int] = None, opening_index: Optional[int] = None,
keep_process: bool = False,
csa: Optional[str] = None, multi_csa: bool = False, pgn: Optional[str] = None, no_pgn_moves: bool = False,
is_display: bool = False, debug: bool = False,
print_summary: bool = True, callback: Optional[Callable] = None) -> Dict:
"""Executes a series of games between two USI engines.
:param engine1: Path to the first USI engine.
:param engine2: Path to the second USI engine.
:param options1: Optional engine options for engine1.
:param options2: Optional engine options for engine2.
:param names: Optional names for the engines.
:param games: Number of games to play.
:param resign: Resignation threshold. If a score falls below the negative of this value, the engine resigns.
:param mate_win: Whether a checkmate is considered a win.
:param byoyomi: Byoyomi time in milliseconds. It can be different for each engine.
:param time: Time control in milliseconds. It can be different for each engine.
:param inc: Increment time in milliseconds per move. It can be different for each engine.
:param draw: Number of moves before a draw is declared.
:param ponder: Whether to use pondering.
:param no_swap: Whether to disable engine swapping.
:param opening: Path to a file containing opening positions.
:param opening_moves: Number of opening moves to play.
:param opening_seed: Seed for random shuffling of openings.
:param opening_index: Specific index of an opening to use.
:param keep_process: Whether to keep the engine process running after completion.
:param csa: Optional path for CSA file export.
:param multi_csa: Whether to use multi-CSA format.
:param pgn: Optional path for PGN file export.
:param no_pgn_moves: Whether to omit PGN move listing.
:param is_display: Whether to display the game board.
:param debug: Whether to enable debugging mode.
:param print_summary: Whether to print the summary after the match.
:param callback: Optional callback function executed after each game.
:return: Dictionary containing game results and statistics.
"""
engine1 = Engine(engine1, connect=False)
engine2 = Engine(engine2, connect=False)
# byoyomi
if type(byoyomi) in (list, tuple):
if len(byoyomi) >= 2:
byoyomi1, byoyomi2 = byoyomi
else:
byoyomi1 = byoyomi2 = byoyomi[0]
else:
byoyomi1 = byoyomi2 = byoyomi
# time
if type(time) in (list, tuple):
if len(time) >= 2:
time1, time2 = time
else:
time1 = time2 = time[0]
else:
time1 = time2 = time
# inc
if type(inc) in (list, tuple):
if len(inc) >= 2:
inc1, inc2 = inc
else:
inc1 = inc2 = inc[0]
else:
inc1 = inc2 = inc
# ponder
if ponder:
executor = ThreadPoolExecutor(max_workers=2)
# debug
if debug:
class Listener:
def __init__(self, id):
self.id = id
self.info = self.bestmove = ''
def __call__(self, line):
print(self.id + ':' + line)
self.info = self.bestmove
self.bestmove = line
listener1 = Listener('1')
listener2 = Listener('2')
else:
class Listener:
def __init__(self):
self.info = self.bestmove = ''
def __call__(self, line):
self.info = self.bestmove
self.bestmove = line
listener1 = listener2 = Listener()
# CSA
if csa and multi_csa:
csa_exporter = CSA.Exporter(csa, append=True)
# PGN
if pgn:
pgn_exporter = PGN.Exporter(pgn, append=True)
# 初期局面読み込み
if opening:
opening_list = []
with open(opening) as f:
for line in f:
moves = line.strip()[15:]
if moves:
opening_list.append(moves.split(' '))
else:
opening_list.append([])
# インデックス指定
if opening_index is not None:
opening_list = [opening_list[opening_index]]
else:
# シャッフル
if opening_seed is not None:
random.seed(opening_seed)
random.shuffle(opening_list)
board = Board()
engine1_won = [0, 0, 0, 0, 0, 0]
engine2_won = [0, 0, 0, 0, 0, 0]
draw_count = 0
WIN_DRAW = 2
for n in range(games):
# 先後入れ替え
if no_swap or n % 2 == 0:
engines_order = (engine1, engine2)
options_order = (options1, options2)
listeners_order = (listener1, listener2)
byoyomi_order = (byoyomi1, byoyomi2)
btime = time1
wtime = time2
binc = inc1
winc = inc2
else:
engines_order = (engine2, engine1)
options_order = (options2, options1)
listeners_order = (listener2, listener1)
byoyomi_order = (byoyomi2, byoyomi1)
btime = time2
wtime = time1
binc = inc2
winc = inc1
# 接続とエンジン設定
for engine, options, listener in zip(engines_order, options_order, listeners_order):
if engine.proc is None:
engine.connect(listener=listener)
if ponder:
engine.setoption('USI_Ponder', 'true', listener=listener)
for name, value in options.items():
engine.setoption(name, value, listener=listener)
engine.isready(listener=listener)
if names:
if names[0]: engine1.name = names[0]
if names[1]: engine2.name = names[1]
print('{} vs {} start.'.format(engines_order[0].name, engines_order[1].name))
# 初期局設定
board.reset()
moves = []
usi_moves = []
repetition_hash = defaultdict(int)
if csa:
engine_names = [engine.name for engine in engines_order]
if not multi_csa:
csa_exporter = CSA.Exporter(os.path.join(csa, '+'.join(engine_names) + '+' + datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '.csa'), append=False)
csa_exporter.info(board, engine_names, version='V2')
if opening:
for move_usi in opening_list[n // 2 % len(opening_list)]:
move = board.push_usi(move_usi)
if csa:
csa_exporter.move(move)
moves.append(move)
usi_moves.append(move_usi)
repetition_hash[board.zobrist_hash()] += 1
if board.move_number > opening_moves:
break
# 盤面表示
if is_display:
print('開始局面')
if is_jupyter:
display(SVG(board.to_svg()))
else:
print(board)
# 新規ゲーム
for engine, listener in zip(engines_order, listeners_order):
engine.usinewgame(listener=listener)
# 対局
is_game_over = False
is_nyugyoku = False
is_illegal = False
is_repetition_win = False
is_repetition_lose = False
is_fourfold_repetition = False
is_timeup = False
remain_time = [btime, wtime]
inc_time = (binc, winc)
bestmove = None
pondermove = None
pre_pondermove = None
feature = None
pre_feature = None
while not is_game_over:
engine_index = (board.move_number - 1) % 2
engine = engines_order[engine_index]
listener = listeners_order[engine_index]
byoyomi = byoyomi_order[engine_index]
# 持将棋
if board.move_number > draw:
is_game_over = True
break
# ponder
ponderhit = False
if ponder:
if pre_pondermove:
if bestmove == pre_pondermove:
ponderhit = True
pre_pondermove = pondermove
start_time = perf_counter()
engine.ponderhit(listener=listener)
bestmove, pondermove = pre_feature.result()
elapsed_time = perf_counter() - start_time
else:
pre_pondermove = pondermove
engine.stop(listener=listener)
pre_feature.result()
else:
pre_pondermove = pondermove
pre_feature = feature
if not ponderhit:
# position
engine.position(usi_moves, listener=listener)
start_time = perf_counter()
# go
bestmove, pondermove = engine.go(byoyomi=byoyomi, btime=remain_time[BLACK], wtime=remain_time[WHITE], binc=binc, winc=winc, listener=listener)
elapsed_time = perf_counter() - start_time
if remain_time[board.turn] is not None:
if inc_time[board.turn] is not None:
remain_time[board.turn] += inc_time[board.turn]
remain_time[board.turn] -= math.ceil(elapsed_time * 1000)
if remain_time[board.turn] < 0:
# 1秒未満は切れ負けにしない
if remain_time[board.turn] > -1000:
remain_time[board.turn] = 0
else:
# 時間切れ負け
is_timeup = True
is_game_over = True
break
# ponder
if ponder and pondermove:
engine.position(usi_moves + [bestmove, pondermove], listener=listener)
feature = executor.submit(engine.go, ponder=True, byoyomi=byoyomi, btime=remain_time[BLACK], wtime=remain_time[WHITE], binc=binc, winc=winc, listener=listener)
score = usi_info_to_score(listener.info)
# 投了閾値
if resign is not None:
if score is not None and score <= -resign:
# 投了
is_game_over = True
break
# 詰みを見つけたら終了
if mate_win:
if score is not None and score == 100000:
move = board.move_from_usi(bestmove)
if csa:
csa_exporter.move(move, time=int(elapsed_time), comment=usi_info_to_csa_comment(board, listener.info))
board.push(move)
is_game_over = True
break
if bestmove == 'resign':
# 投了
is_game_over = True
break
elif bestmove == 'win':
# 入玉勝ち宣言
is_nyugyoku = True
is_game_over = True
break
else:
move = board.move_from_usi(bestmove)
if board.is_legal(move):
if csa:
csa_exporter.move(move, time=int(elapsed_time), comment=usi_info_to_csa_comment(board, listener.info))
board.push(move)
moves.append(move)
usi_moves.append(bestmove)
key = board.zobrist_hash()
repetition_hash[key] += 1
# 千日手
if repetition_hash[key] == 4:
# 連続王手
is_draw = board.is_draw()
if is_draw == REPETITION_WIN:
is_repetition_win = True
is_game_over = True
break
elif is_draw == REPETITION_LOSE:
is_repetition_lose = True
is_game_over = True
break
is_fourfold_repetition = True
is_game_over = True
break
else:
is_illegal = True
is_game_over = True
break
# 盤面表示
if is_display:
print('{}手目'.format(len(usi_moves)))
if is_jupyter:
display(SVG(board.to_svg(move)))
else:
print(board)
# 終局判定
if board.is_game_over():
is_game_over = True
break
# ゲーム終了
for engine, listener in zip(engines_order, listeners_order):
engine.gameover(listener=listener)
# エンジン終了
if not keep_process:
for engine, listener in zip(engines_order, listeners_order):
engine.quit(listener=listener)
# 結果出力
if not board.is_game_over() and board.move_number > draw:
win = WIN_DRAW
print('まで{}手で持将棋'.format(board.move_number - 1))
csa_endgame = '%JISHOGI'
elif is_fourfold_repetition:
win = WIN_DRAW
print('まで{}手で千日手'.format(board.move_number - 1))
csa_endgame = '%SENNICHITE'
elif is_nyugyoku:
win = board.turn
print('まで{}手で入玉宣言'.format(board.move_number - 1))
csa_endgame = '%KACHI'
elif is_illegal:
win = opponent(board.turn)
print('まで{}手で{}の反則負け'.format(board.move_number - 1, '先手' if win == WHITE else '後手'))
csa_endgame = '%ILLEGAL_MOVE'
elif is_repetition_win:
win = board.turn
print('まで{}手で{}の反則勝ち'.format(board.move_number - 1, '先手' if win == BLACK else '後手'))
csa_endgame = '%+ILLEGAL_ACTION' if board.turn == WHITE else '%-ILLEGAL_ACTION'
elif is_repetition_lose:
win = opponent(board.turn)
print('まで{}手で{}の反則負け'.format(board.move_number - 1, '先手' if win == WHITE else '後手'))
csa_endgame = 'ILLEGAL_MOVE'
elif is_timeup:
win = opponent(board.turn)
print('まで{}手で{}の切れ負け'.format(board.move_number - 1, '先手' if win == WHITE else '後手'))
csa_endgame = '%TIME_UP'
else:
win = opponent(board.turn)
print('まで{}手で{}の勝ち'.format(board.move_number - 1, '先手' if win == BLACK else '後手'))
csa_endgame = '%TORYO'
# 勝敗カウント
if engines_order[0] is engine1:
engine1_turn = 0
engine2_turn = 1
else:
engine1_turn = 1
engine2_turn = 0
if win == WIN_DRAW:
draw_count += 1
engine1_won[4 + engine1_turn] += 1
engine2_won[4 + engine2_turn] += 1
elif engine1_turn == 0 and win == BLACK or engine1_turn == 1 and win == WHITE:
engine1_won[engine1_turn] += 1
engine2_won[2 + engine2_turn] += 1
else:
engine2_won[engine2_turn] += 1
engine1_won[2 + engine1_turn] += 1
black_won = engine1_won[0] + engine2_won[0]
white_won = engine1_won[1] + engine2_won[1]
engine1_won_sum = engine1_won[0] + engine1_won[1]
engine2_won_sum = engine2_won[0] + engine2_won[1]
total_count = engine1_won_sum + engine2_won_sum + draw_count
# 勝敗状況表示
if print_summary:
print('{} of {} games finished.'.format(n + 1, games))
print('{} vs {}: {}-{}-{} ({:.1f}%)'.format(
engine1.name, engine2.name, engine1_won_sum, engine2_won_sum, draw_count,
(engine1_won_sum + draw_count / 2) / total_count * 100))
print('Black vs White: {}-{}-{} ({:.1f}%)'.format(
black_won, white_won, draw_count,
(black_won + draw_count / 2) / total_count * 100))
print('{} playing Black: {}-{}-{} ({:.1f}%)'.format(
engine1.name,
engine1_won[0], engine1_won[2], engine1_won[4],
(engine1_won[0] + engine1_won[4] / 2) / (engine1_won[0] + engine1_won[2] + engine1_won[4]) * 100))
print('{} playing White: {}-{}-{} ({:.1f}%)'.format(
engine1.name,
engine1_won[1], engine1_won[3], engine1_won[5],
(engine1_won[1] + engine1_won[5] / 2) / (engine1_won[1] + engine1_won[3] + engine1_won[5]) * 100 if not no_swap and n > 0 else 0))
print('{} playing Black: {}-{}-{} ({:.1f}%)'.format(
engine2.name,
engine2_won[0], engine2_won[2], engine2_won[4],
(engine2_won[0] + engine2_won[4] / 2) / (engine2_won[0] + engine2_won[2] + engine2_won[4]) * 100 if not no_swap and n > 0 else 0))
print('{} playing White: {}-{}-{} ({:.1f}%)'.format(
engine2.name,
engine2_won[1], engine2_won[3], engine2_won[5],
(engine2_won[1] + engine2_won[5] / 2) / (engine2_won[1] + engine2_won[3] + engine2_won[5]) * 100))
elo = Elo(engine1_won_sum, engine2_won_sum, draw_count)
if engine1_won_sum > 0 and engine2_won_sum > 0:
try:
error_margin = elo.error_margin()
except ValueError:
error_margin = math.nan
print('Elo difference: {:.1f} +/- {:.1f}, LOS: {:.1f} %, DrawRatio: {:.1f} %'.format(
elo.diff(), error_margin, elo.los(), elo.draw_ratio()))
# CSA
if csa:
csa_exporter.endgame(csa_endgame)
# PGN
if pgn:
if win == BLACK:
result = BLACK_WIN
elif win == WHITE:
result = WHITE_WIN
else:
result = DRAW
pgn_exporter.tag_pair([engine.name for engine in engines_order], result, round=n+1)
if not no_pgn_moves:
pgn_exporter.movetext(moves)
if callback:
is_continue = callback({
'engine1_name': engine1.name,
'engine2_name': engine2.name,
'engine1_won': engine1_won_sum,
'engine2_won': engine2_won_sum,
'black_won': black_won,
'white_won': white_won,
'draw': draw_count,
'total': total_count,
})
if not is_continue:
break
# CSA
if csa:
csa_exporter.close()
# PGN
if pgn:
pgn_exporter.close()
# エンジン終了
if keep_process:
for engine, listener in zip(engines_order, listeners_order):
engine.quit(listener=listener)
return {
'engine1_name': engine1.name,
'engine2_name': engine2.name,
'engine1_won': engine1_won_sum,
'engine2_won': engine2_won_sum,
'black_won': black_won,
'white_won': white_won,
'draw': draw_count,
'total': total_count,
}
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('engine1')
parser.add_argument('engine2')
parser.add_argument('engine3', nargs='?')
parser.add_argument('--options1', type=str, default='')
parser.add_argument('--options2', type=str, default='')
parser.add_argument('--options3', type=str, default='')
parser.add_argument('--name1', type=str)
parser.add_argument('--name2', type=str)
parser.add_argument('--name3', type=str)
parser.add_argument('--games', type=int, default=1)
parser.add_argument('--resign', type=int)
parser.add_argument('--mate-win', action='store_true')
parser.add_argument('--byoyomi', type=int, nargs='+')
parser.add_argument('--time', type=int, nargs='+')
parser.add_argument('--inc', type=int, nargs='+')
parser.add_argument('--draw', type=int, default=256)
parser.add_argument('--ponder', action='store_true')
parser.add_argument('--no-swap', action='store_true')
parser.add_argument('--opening', type=str)
parser.add_argument('--opening-moves', type=int, default=24)
parser.add_argument('--opening-seed', type=int)
parser.add_argument('--opening-index', type=int)
parser.add_argument('--keep-process', action='store_true')
parser.add_argument('--csa', type=str)
parser.add_argument('--multi-csa', action='store_true')
parser.add_argument('--pgn', type=str)
parser.add_argument('--no-pgn-moves', action='store_true')
parser.add_argument('--display', action='store_true')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
if args.csa is not None and not args.multi_csa:
os.makedirs(args.csa, exist_ok=True)
options_list = [{}, {}, {}]
for i, kvs in enumerate([options.split(',') for options in (args.options1, args.options2, args.options3)]):
if len(kvs) == 1 and kvs[0] == '':
continue
for kv_str in kvs:
kv = kv_str.split(':', 1)
if len(kv) != 2:
raise ValueError('options{}'.format(i + 1))
options_list[i][kv[0]] = kv[1]
if args.engine3 is None:
# 1 on 1 matches
main(args.engine1, args.engine2,
options_list[0], options_list[1],
[args.name1, args.name2],
args.games, args.resign, args.mate_win,
args.byoyomi, args.time, args.inc,
args.draw, args.ponder, args.no_swap, args.opening, args.opening_moves, args.opening_seed, args.opening_index,
args.keep_process,
args.csa, args.multi_csa,
args.pgn, args.no_pgn_moves,
args.display, args.debug)
else:
# league matches
engines = (args.engine1, args.engine2, args.engine3)
names = (args.name1, args.name2, args.name3)
if args.byoyomi:
if len(args.byoyomi) == 3:
byoyomis = (args.byoyomi[0], args.byoyomi[1], args.byoyomi[2])
else:
byoyomis = (args.byoyomi[0], args.byoyomi[0], args.byoyomi[0])
else:
byoyomis = (None, None, None)
if args.time:
if len(args.time) == 3:
times = (args.time[0], args.time[1], args.time[2])
else:
times = (args.time[0], args.time[0], args.time[0])
else:
times = (None, None, None)
if args.inc:
if len(args.inc) == 3:
incs = (args.inc[0], args.inc[1], args.inc[2])
else:
incs = (args.inc[0], args.inc[0], args.inc[0])
else:
incs = (None, None, None)
combinations = ((0, 1), (0, 2), (1, 2))
results = [
{ 'engine1_won': 0, 'engine2_won': 0, 'draw': 0, 'total': 0 },
{ 'engine1_won': 0, 'engine2_won': 0, 'draw': 0, 'total': 0 },
{ 'engine1_won': 0, 'engine2_won': 0, 'draw': 0, 'total': 0 }]
# 初期局面
if args.opening:
# インデックス指定
if args.opening_index is not None:
opening_index_list = [args.opening_index]
else:
with open(args.opening) as f:
opening_index_list = list(range(len(f.readlines())))
# シャッフル
if args.opening_seed is not None:
random.seed(args.opening_seed)
random.shuffle(opening_index_list)
for n in range(0, args.games, 2):
if args.opening:
opening_index = opening_index_list[n // 2 % len(opening_index_list)]
else:
opening_index = None
for i, (a, b) in enumerate(combinations):
# 先後入れ替えて1回ずつ対局
result = main(engines[a], engines[b],
options_list[a], options_list[b],
[names[a], names[b]],
2, args.resign, args.mate_win,
(byoyomis[a], byoyomis[b]), (times[a], times[b]), (incs[a], incs[b]),
args.draw, args.ponder, False, args.opening, args.opening_moves, None, opening_index,
args.keep_process,
args.csa, args.multi_csa,
args.pgn, args.no_pgn_moves,
args.display, args.debug,
print_summary=False)
results[i]['engine1_name'] = result['engine1_name']
results[i]['engine2_name'] = result['engine2_name']
results[i]['engine1_won'] += result['engine1_won']
results[i]['engine2_won'] += result['engine2_won']
results[i]['draw'] += result['draw']
results[i]['total'] += result['total']
# 勝敗状況表示
print('{} of {} games finished.'.format(n + 2, args.games))
for i, (a, b) in enumerate(combinations):
print('{} vs {}: {}-{}-{} ({:.1f}%)'.format(
results[i]['engine1_name'], results[i]['engine2_name'],
results[i]['engine1_won'], results[i]['engine2_won'], results[i]['draw'],
(results[i]['engine1_won'] + results[i]['draw'] / 2) / results[i]['total'] * 100))