from typing import Dict, Optional
import math
from cshogi import Board, CSA, KIF, BLACK, WHITE, opponent, REPETITION_WIN, REPETITION_LOSE
from cshogi.usi import Engine
from cshogi.cli import usi_info_to_score, usi_info_to_csa_comment, re_usi_info
from flask import Flask, render_template, request
from wsgiref.simple_server import make_server
TURN_SYMBOLS = ('▲', '△')
CSA_TURN_SYMBOLS = {'+': '▲', '-': '△'}
[docs]class Human:
def __init__(self, human_input):
self.board = Board()
self.name = 'Human'
self.human_input = human_input
[docs] def usi(self, listener=None):
return []
[docs] def isready(self, listener=None):
pass
[docs] def usinewgame(self, listener=None):
pass
[docs] def position(self, moves=None, sfen="startpos", listener=None):
self.board.reset()
for move in moves:
self.board.push_usi(move)
[docs] def go(self, ponder=False, btime=None, wtime=None, byoyomi=None, binc=None, winc=None, nodes=None, listener=None):
import time
while True:
human_input = dict(self.human_input)
if human_input['number'] == self.board.move_number:
usi_move = human_input['move']
move = self.board.move_from_usi(usi_move)
if self.board.is_legal(move):
return usi_move, None
time.sleep(0.1)
[docs] def quit(self, listener=None):
pass
[docs]def usi_info_to_pv(board, info):
m = re_usi_info.match(info)
if m is None:
return None
# pv
pv = []
board2 = board.copy()
for usi_move in m[3].split(' '):
move = board2.move_from_usi(usi_move)
if not board2.is_legal(move):
break
pv.append(TURN_SYMBOLS[board2.turn] + KIF.move_to_kif(move, board2.peek()))
board2.push(move)
return ' '.join(pv)
[docs]def match(moves, engine1=None, engine2=None, options1={}, options2={}, names=None, byoyomi=None, time=None, inc=None, draw=256, human_input=None, csa=None):
from collections import defaultdict
from time import perf_counter
class Listener:
def __init__(self):
self.info = self.bestmove = ''
def __call__(self, line):
self.info = self.bestmove
self.bestmove = line
listener = Listener()
# CSA
if csa:
csa_exporter = CSA.Exporter(csa, append=False)
engines = []
for engine in (engine1, engine2):
if engine == 'human':
engines.append(Human(human_input))
else:
engines.append(Engine(engine, connect=True))
for engine, options in zip(engines, (options1, options2)):
for name, value in options.items():
engine.setoption(name, value, listener=listener)
engine.isready(listener=listener)
for i in range(2):
if names[i]:
engines[i].name = names[i]
else:
names[i] = engines[i].name
board = Board()
usi_moves = []
repetition_hash = defaultdict(int)
# 新規ゲーム
for engine in engines:
engine.usinewgame()
if csa:
csa_exporter.info(board, names, version='V2')
# 対局
is_game_over = False
is_nyugyoku = False
is_illegal = False
is_repetition_win = False
is_repetition_lose = False
is_fourfold_repetition = False
is_timeup = False
remain_time = [time, time]
while not is_game_over:
engine_index = (board.move_number - 1) % 2
engine = engines[engine_index]
# 持将棋
if board.move_number > draw:
is_game_over = True
break
# position
engine.position(usi_moves)
start_time = perf_counter()
# go
listener.info = ''
listener.bestmove = ''
bestmove, _ = engine.go(byoyomi=byoyomi, btime=remain_time[BLACK], wtime=remain_time[WHITE], binc=inc, winc=inc, listener=listener)
elapsed_time = perf_counter() - start_time
if remain_time[board.turn] is not None:
if inc_time[board.turn] is not None:
remain_time[board.turn] += inc_time[board.turn]
remain_time[board.turn] -= math.ceil(elapsed_time * 1000)
if bestmove == 'resign':
# 投了
is_game_over = True
break
elif bestmove == 'win':
# 入玉勝ち宣言
is_nyugyoku = True
is_game_over = True
break
else:
move = board.move_from_usi(bestmove)
if board.is_legal(move):
if csa:
csa_exporter.move(move, time=int(elapsed_time), comment=usi_info_to_csa_comment(board, listener.info))
score = usi_info_to_score(listener.info)
pv = usi_info_to_pv(board, listener.info)
moves.append({
'number': board.move_number,
'kif_move': TURN_SYMBOLS[(board.move_number + 1) % 2] + KIF.move_to_kif(move, board.peek()),
'time': math.ceil(elapsed_time),
'move': move,
'eval': (score * (1 if board.turn == BLACK else -1)) if score is not None else 'null',
'pv': pv if pv is not None else '',
})
board.push(move)
usi_moves.append(bestmove)
key = board.zobrist_hash()
repetition_hash[key] += 1
# 千日手
if repetition_hash[key] == 4:
# 連続王手
is_draw = board.is_draw()
if is_draw == REPETITION_WIN:
is_repetition_win = True
is_game_over = True
break
elif is_draw == REPETITION_LOSE:
is_repetition_lose = True
is_game_over = True
break
is_fourfold_repetition = True
is_game_over = True
break
else:
is_illegal = True
is_game_over = True
break
# 終局判定
if board.is_game_over():
is_game_over = True
break
# エンジン終了
for engine in engines:
engine.quit()
# 結果出力
if not board.is_game_over() and board.move_number > draw:
result = '持将棋'
csa_endgame = '%JISHOGI'
elif is_fourfold_repetition:
result = '千日手'
csa_endgame = '%SENNICHITE'
elif is_nyugyoku:
result = TURN_SYMBOLS[board.turn] + '入玉宣言'
csa_endgame = '%KACHI'
elif is_illegal:
win = opponent(board.turn)
result = '{}の反則負け'.format('先手' if win == WHITE else '後手')
csa_endgame = '%ILLEGAL_MOVE'
elif is_repetition_win:
win = board.turn
result = '{}の反則勝ち'.format('先手' if win == BLACK else '後手')
csa_endgame = '%+ILLEGAL_ACTION' if board.turn == WHITE else '%-ILLEGAL_ACTION'
elif is_repetition_lose:
win = opponent(board.turn)
result = '{}の反則負け'.format('先手' if win == WHITE else '後手')
csa_endgame = 'ILLEGAL_MOVE'
elif is_timeup:
win = opponent(board.turn)
result = '{}の切れ負け'.format('先手' if win == WHITE else '後手')
csa_endgame = '%TIME_UP'
else:
result = TURN_SYMBOLS[board.turn] + '投了'
csa_endgame = '%TORYO'
# CSA
if csa:
csa_exporter.endgame(csa_endgame)
moves.append({
'number': board.move_number,
'kif_move': result,
'time': 0,
'move': 0,
'eval': 'null',
'pv': '',
})
[docs]def run(engine1: Optional[str] = None, engine2: Optional[str] = None, options1: Dict = {}, options2: Dict = {}, name1: Optional[str] = None, name2: Optional[str] = None, byoyomi: Optional[int] = None, time: Optional[int] = None, inc: Optional[int] = None, draw: int = 256, csa: Optional[str] = None, host: str = 'localhost', port: int = 8000):
"""Initializes and runs a shogi match between two engines or replays a game from a given CSA file.
The match or replay is rendered using Flask and is accessible via a web interface.
:param engine1: Name or path of the first engine, or 'human' for human player. Default is None.
:param engine2: Name or path of the second engine, or 'human' for human player. Default is None.
:param options1: Configuration options for the first engine. Default is an empty dictionary.
:param options2: Configuration options for the second engine. Default is an empty dictionary.
:param name1: Optional name for the first player. Default is None.
:param name2: Optional name for the second player. Default is None.
:param byoyomi: Byoyomi time in milliseconds. Default is None.
:param time: Time control for the match in milliseconds. Default is None.
:param inc: Increment time for each move in milliseconds. Default is None.
:param draw: Number of moves before a draw is claimed. Default is 256.
:param csa: Path to a CSA file to replay a game. Default is None.
:param host: Hostname to bind the Flask server to. Default is 'localhost'.
:param port: Port number to bind the Flask server to. Default is 8000.
"""
is_match = 'false'
auto_update = 'false'
if engine1 and engine2:
from multiprocessing import Process, Manager
manager = Manager()
moves = manager.list()
human_input = manager.dict({ 'number': 0, 'move': None })
names = manager.list([name1, name2])
humans = [engine1 == 'human', engine2 == 'human']
match_proc = Process(target=match, args=[moves, engine1, engine2, options1, options2, names, byoyomi, time, inc, draw, human_input, csa])
match_proc.start()
is_match = 'true'
auto_update = 'true'
elif csa:
moves = []
kif = CSA.Parser.parse_file(csa)[0]
names = kif.names
board = Board(sfen=kif.sfen)
for i, (move, prev_move, time, comment) in enumerate(zip(kif.moves, [None] + kif.moves[:-1], kif.times, kif.comments)):
comment_items = comment.split(' ')
eval = 'null'
pv = []
assert board.is_legal(move)
board.push(move)
if len(comment_items) > 0 and comment_items[0] != '':
eval = int(comment_items[0])
if len(comment_items) > 1:
board2 = board.copy()
for csa in comment_items[1:]:
move2 = board2.move_from_csa(csa[1:])
if board2.is_legal(move2):
pv.append(CSA_TURN_SYMBOLS[csa[0]] + KIF.move_to_kif(move2))
board2.push(move2)
else:
pv = comment_items[1:]
break
moves.append({
'number': i + 1,
'kif_move': TURN_SYMBOLS[i % 2] + KIF.move_to_kif(move, prev_move),
'time': time,
'move': move,
'eval': eval,
'pv': ' '.join(pv),
})
moves.append({
'number': i + 2,
'kif_move': TURN_SYMBOLS[(i + 1) % 2] + CSA.JAPANESE_END_GAMES[kif.endgame],
'time': 0,
'move': 0,
'eval': 'null',
'pv': '',
})
app = Flask(__name__)
@app.route("/")
def init_board():
autoupdate = 'false'
human = ''
if is_match == 'true':
if match_proc.is_alive():
autoupdate = request.args.get('autoupdate', default=auto_update)
if humans[len(moves) % 2]:
human = 'black' if len(moves) % 2 == 0 else 'white'
return render_template('board.html', names=names, moves=moves, is_match=is_match, autoupdate=autoupdate, human=human)
@app.route("/update")
def update():
human = ''
if humans[len(moves) % 2]:
human = 'black' if len(moves) % 2 == 0 else 'white'
return { 'names': list(names), 'moves': list(moves), 'human':human }
@app.route("/move")
def human_move():
import time
human_input['move'] = request.args.get('move')
human_input['number'] = int(request.args.get('number', default=0))
time.sleep(0.2)
human = ''
if names[len(moves) % 2] == 'Human':
human = 'black' if len(moves) % 2 == 0 else 'white'
return { 'names': list(names), 'moves': list(moves), 'human':human }
server = make_server(host, port, app)
server.serve_forever()
[docs]def colab(engine1=None, engine2=None, options1={}, options2={}, name1=None, name2=None, byoyomi=None, time=None, inc=None, draw=256, csa=None):
from multiprocessing import Process
import portpicker
from google.colab import output
global proc
if 'proc' in globals():
proc.terminate()
proc.join()
port = portpicker.pick_unused_port()
proc = Process(target=run, args=(engine1, engine2, options1, options2, name1, name2, byoyomi, time, inc, draw, csa, 'localhost', port))
proc.start()
output.serve_kernel_port_as_iframe(port, height='680')
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('engine1')
parser.add_argument('engine2')
parser.add_argument('--options1', default='')
parser.add_argument('--options2', default='')
parser.add_argument('--name1')
parser.add_argument('--name2')
parser.add_argument('--byoyomi', type=int)
parser.add_argument('--time', type=int)
parser.add_argument('--inc', type=int)
parser.add_argument('--draw', type=int, default=256)
parser.add_argument('--csa')
parser.add_argument('--host', type=str, default='localhost')
parser.add_argument('--port', type=int, default=8000)
args = parser.parse_args()
options_list = [{}, {}]
for i, kvs in enumerate([options.split(',') for options in (args.options1, args.options2)]):
if len(kvs) == 1 and kvs[0] == '':
continue
for kv_str in kvs:
kv = kv_str.split(':', 1)
if len(kv) != 2:
raise ValueError('options{}'.format(i + 1))
options_list[i][kv[0]] = kv[1]
run(engine1=args.engine1, engine2=args.engine2,
options1=options_list[0], options2=options_list[1],
name1=args.name1, name2=args.name2,
byoyomi=args.byoyomi, time=args.time, inc=args.inc, draw=args.draw,
csa=args.csa, host=args.host, port=args.port)