Rafs-an09002's picture
Create engine/search.py
8a2f169 verified
"""
Synapse-Base Main Search Engine
State-of-the-art alpha-beta with advanced enhancements
Research Implementation:
- Alpha-Beta with PVS (Principal Variation Search)
- Aspiration Windows
- Null Move Pruning
- Late Move Reductions (LMR)
- Quiescence Search with SEE
- Iterative Deepening
- Transposition Table with Zobrist
- Advanced Move Ordering
"""
import chess
import time
import logging
from typing import Optional, Tuple, List, Dict
from .evaluate import NeuralEvaluator
from .transposition import TranspositionTable, NodeType
from .move_ordering import MoveOrderer
from .time_manager import TimeManager
from .endgame import EndgameDetector
logger = logging.getLogger(__name__)
class SynapseEngine:
"""
State-of-the-art chess engine with neural evaluation
"""
# Search constants (tuned values from research)
MATE_SCORE = 100000
MAX_PLY = 100
# Null move pruning parameters
NULL_MOVE_REDUCTION = 2
NULL_MOVE_MIN_DEPTH = 3
# Late move reduction parameters
LMR_MIN_DEPTH = 3
LMR_MOVE_THRESHOLD = 4
# Aspiration window size
ASPIRATION_WINDOW = 50
def __init__(self, model_path: str, num_threads: int = 2):
"""Initialize engine components"""
# Core components
self.evaluator = NeuralEvaluator(model_path, num_threads)
self.tt = TranspositionTable(size_mb=256) # 256MB TT
self.move_orderer = MoveOrderer()
self.time_manager = TimeManager()
self.endgame_detector = EndgameDetector()
# Search statistics
self.nodes_evaluated = 0
self.depth_reached = 0
self.sel_depth = 0 # Selective depth (quiescence)
self.principal_variation = []
logger.info("🎯 Synapse-Base Engine initialized")
logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB")
logger.info(f" TT Size: 256 MB")
def get_best_move(
self,
fen: str,
depth: int = 5,
time_limit: int = 5000
) -> Dict:
"""
Main search entry point
Args:
fen: Position in FEN notation
depth: Maximum search depth
time_limit: Time limit in milliseconds
Returns:
Dictionary with best_move, evaluation, stats
"""
board = chess.Board(fen)
# Reset statistics
self.nodes_evaluated = 0
self.depth_reached = 0
self.sel_depth = 0
self.principal_variation = []
# Time management
time_limit_sec = time_limit / 1000.0
self.time_manager.start_search(time_limit_sec, time_limit_sec)
# Age history for new search
self.move_orderer.age_history(0.95)
self.tt.increment_age()
# Special cases
legal_moves = list(board.legal_moves)
if len(legal_moves) == 0:
return self._no_legal_moves_result()
if len(legal_moves) == 1:
return self._single_move_result(board, legal_moves[0])
# Iterative deepening with aspiration windows
best_move = legal_moves[0]
best_score = float('-inf')
alpha = float('-inf')
beta = float('inf')
for current_depth in range(1, depth + 1):
# Time check
if self.time_manager.should_stop(current_depth):
break
# Aspiration window for depth >= 4
if current_depth >= 4 and abs(best_score) < self.MATE_SCORE - 1000:
alpha = best_score - self.ASPIRATION_WINDOW
beta = best_score + self.ASPIRATION_WINDOW
else:
alpha = float('-inf')
beta = float('inf')
# Search with aspiration window
score, move, pv = self._search_root(
board, current_depth, alpha, beta
)
# Handle aspiration window failures
if score <= alpha or score >= beta:
# Research with full window
score, move, pv = self._search_root(
board, current_depth, float('-inf'), float('inf')
)
# Update best move
if move:
best_move = move
best_score = score
self.depth_reached = current_depth
self.principal_variation = pv
logger.info(
f"Depth {current_depth}: {move.uci()} "
f"({score:+.2f}) | Nodes: {self.nodes_evaluated} | "
f"Time: {self.time_manager.elapsed():.2f}s"
)
# Return result
return {
'best_move': best_move.uci(),
'evaluation': round(best_score / 100.0, 2), # Convert to pawns
'depth_searched': self.depth_reached,
'seldepth': self.sel_depth,
'nodes_evaluated': self.nodes_evaluated,
'time_taken': int(self.time_manager.elapsed() * 1000),
'pv': [m.uci() for m in self.principal_variation],
'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)),
'tt_stats': self.tt.get_stats(),
'move_ordering_stats': self.move_orderer.get_stats()
}
def _search_root(
self,
board: chess.Board,
depth: int,
alpha: float,
beta: float
) -> Tuple[float, Optional[chess.Move], List[chess.Move]]:
"""Root node search with PVS"""
legal_moves = list(board.legal_moves)
# TT probe
zobrist_key = self.tt.compute_zobrist_key(board)
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
tt_move = tt_result[1] if tt_result else None
# Order moves
ordered_moves = self.move_orderer.order_moves(
board, legal_moves, depth, tt_move
)
best_move = ordered_moves[0]
best_score = float('-inf')
best_pv = []
for i, move in enumerate(ordered_moves):
board.push(move)
if i == 0:
# Full window search for first move (PV node)
score, pv = self._pvs(
board, depth - 1, -beta, -alpha, True
)
score = -score
else:
# Null window search for remaining moves
score, _ = self._pvs(
board, depth - 1, -alpha - 1, -alpha, False
)
score = -score
# Re-search if failed high
if alpha < score < beta:
score, pv = self._pvs(
board, depth - 1, -beta, -alpha, True
)
score = -score
else:
pv = []
board.pop()
# Update best
if score > best_score:
best_score = score
best_move = move
best_pv = [move] + pv
# Update alpha
if score > alpha:
alpha = score
# Time check
if self.time_manager.should_stop(depth):
break
# Store in TT
self.tt.store(
zobrist_key, depth, best_score,
NodeType.EXACT, best_move
)
return best_score, best_move, best_pv
def _pvs(
self,
board: chess.Board,
depth: int,
alpha: float,
beta: float,
do_null: bool
) -> Tuple[float, List[chess.Move]]:
"""
Principal Variation Search (PVS) with alpha-beta
Enhanced with:
- Null move pruning
- Late move reductions
- Transposition table
"""
self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth)
# Mate distance pruning
alpha = max(alpha, -self.MATE_SCORE + (self.MAX_PLY - depth))
beta = min(beta, self.MATE_SCORE - (self.MAX_PLY - depth) - 1)
if alpha >= beta:
return alpha, []
# Draw detection
if board.is_repetition(2) or board.is_fifty_moves():
return 0, []
# TT probe
zobrist_key = self.tt.compute_zobrist_key(board)
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
if tt_result and tt_result[0] is not None:
return tt_result[0], []
tt_move = tt_result[1] if tt_result else None
# Quiescence search at leaf nodes
if depth <= 0:
return self._quiescence(board, alpha, beta, 0), []
# Null move pruning
if (do_null and
depth >= self.NULL_MOVE_MIN_DEPTH and
not board.is_check() and
self._has_non_pawn_material(board)):
board.push(chess.Move.null())
score, _ = self._pvs(
board, depth - 1 - self.NULL_MOVE_REDUCTION,
-beta, -beta + 1, False
)
score = -score
board.pop()
if score >= beta:
return beta, []
# Generate and order moves
legal_moves = list(board.legal_moves)
if not legal_moves:
if board.is_check():
return -self.MATE_SCORE + (self.MAX_PLY - depth), []
return 0, [] # Stalemate
ordered_moves = self.move_orderer.order_moves(
board, legal_moves, depth, tt_move
)
# Main search loop
best_score = float('-inf')
best_pv = []
node_type = NodeType.UPPER_BOUND
moves_searched = 0
for move in ordered_moves:
board.push(move)
# Late move reductions
reduction = 0
if (moves_searched >= self.LMR_MOVE_THRESHOLD and
depth >= self.LMR_MIN_DEPTH and
not board.is_check() and
not board.is_capture(board.peek())):
reduction = 1
# PVS
if moves_searched == 0:
score, pv = self._pvs(
board, depth - 1, -beta, -alpha, True
)
score = -score
else:
# Reduced search
score, _ = self._pvs(
board, depth - 1 - reduction, -alpha - 1, -alpha, True
)
score = -score
# Re-search if necessary
if alpha < score < beta:
score, pv = self._pvs(
board, depth - 1, -beta, -alpha, True
)
score = -score
else:
pv = []
board.pop()
moves_searched += 1
# Update best
if score > best_score:
best_score = score
best_pv = [move] + pv
if score > alpha:
alpha = score
node_type = NodeType.EXACT
# Update history for good moves
if not board.is_capture(move):
self.move_orderer.update_history(move, depth, True)
self.move_orderer.update_killer_move(move, depth)
if score >= beta:
node_type = NodeType.LOWER_BOUND
break
# Store in TT
self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None)
return best_score, best_pv
def _quiescence(
self,
board: chess.Board,
alpha: float,
beta: float,
qs_depth: int
) -> float:
"""
Quiescence search to resolve tactical sequences
Only searches captures and checks
"""
self.nodes_evaluated += 1
# Stand-pat evaluation
stand_pat = self.evaluator.evaluate_hybrid(board)
stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat)
if stand_pat >= beta:
return beta
if alpha < stand_pat:
alpha = stand_pat
# Depth limit for quiescence
if qs_depth >= 8:
return stand_pat
# Generate tactical moves
tactical_moves = [
move for move in board.legal_moves
if board.is_capture(move) or board.gives_check(move)
]
if not tactical_moves:
return stand_pat
# Order tactical moves
tactical_moves = self.move_orderer.order_moves(
board, tactical_moves, 0
)
for move in tactical_moves:
board.push(move)
score = -self._quiescence(board, -beta, -alpha, qs_depth + 1)
board.pop()
if score >= beta:
return beta
if score > alpha:
alpha = score
return alpha
def _has_non_pawn_material(self, board: chess.Board) -> bool:
"""Check if side to move has non-pawn material"""
for piece_type in [chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN]:
if board.pieces(piece_type, board.turn):
return True
return False
def _no_legal_moves_result(self) -> Dict:
"""Result when no legal moves"""
return {
'best_move': '0000',
'evaluation': 0.0,
'depth_searched': 0,
'nodes_evaluated': 0,
'time_taken': 0
}
def _single_move_result(self, board: chess.Board, move: chess.Move) -> Dict:
"""Result when only one legal move"""
eval_score = self.evaluator.evaluate_hybrid(board)
return {
'best_move': move.uci(),
'evaluation': round(eval_score / 100.0, 2),
'depth_searched': 0,
'nodes_evaluated': 1,
'time_taken': 0,
'pv': [move.uci()]
}
def validate_fen(self, fen: str) -> bool:
"""Validate FEN string"""
try:
chess.Board(fen)
return True
except:
return False
def get_model_size(self) -> float:
"""Get model size in MB"""
return self.evaluator.get_model_size_mb()