import subprocess import shogi import random import time import os import copy import multiprocessing as mp from collections import defaultdict # ========================================== # ★設定(Dドライブ完全移行版) # ========================================== CPU_ENGINE_PATH = r"D:\Users\takas\YaneuraOu\YaneuraOu_NNUE_halfkp_256x2_32_32-V900Git_AVX2.exe" GPU_ENGINE_PATH = r"D:\Users\takas\FukauraOu\dlshogi_onnxruntime.exe" WORKER_COUNT = 10 # ★ 並列ワーカー数 TARGET_PROBLEMS = 100 # ★ 保存先をすべてDドライブに統一 SAVE_DIR = r"C:\Users\takas\OneDrive\生成詰将棋" HISTORY_FILE = r"D:\Users\takas\生成詰将棋\tsume_sac5_strict_history.txt" # ★5回捨駒専用の履歴ファイルに変更 # 金・銀の上限を2枚に減らし、粘着質な配置を防ぐ PIECE_LIMITS = { shogi.ROOK: 2, shogi.BISHOP: 2, shogi.GOLD: 2, shogi.SILVER: 2, shogi.KNIGHT: 4, shogi.LANCE: 4, shogi.PAWN: 18 } PROMOTE_MAP = { shogi.PROM_ROOK: shogi.ROOK, shogi.PROM_BISHOP: shogi.BISHOP, shogi.PROM_SILVER: shogi.SILVER, shogi.PROM_KNIGHT: shogi.KNIGHT, shogi.PROM_LANCE: shogi.LANCE, shogi.PROM_PAWN: shogi.PAWN } KANJI_NUMS = ["", "一", "二", "三", "四", "五", "六", "七", "八", "九", "十", "十一", "十二", "十三", "十四", "十五", "十六", "十七", "十八"] ZENKAKU_NUMS = ["", "1", "2", "3", "4", "5", "6", "7", "8", "9"] PIECE_KIF = { shogi.PAWN: "歩", shogi.LANCE: "香", shogi.KNIGHT: "桂", shogi.SILVER: "銀", shogi.GOLD: "金", shogi.BISHOP: "角", shogi.ROOK: "飛", shogi.KING: "玉", shogi.PROM_PAWN: "と", shogi.PROM_LANCE: "成香", shogi.PROM_KNIGHT: "成桂", shogi.PROM_SILVER: "成銀", shogi.PROM_BISHOP: "馬", shogi.PROM_ROOK: "竜" } KIF_BOARD_PIECE = { shogi.PAWN: "歩", shogi.LANCE: "香", shogi.KNIGHT: "桂", shogi.SILVER: "銀", shogi.GOLD: "金", shogi.BISHOP: "角", shogi.ROOK: "飛", shogi.KING: "玉", shogi.PROM_PAWN: "と", shogi.PROM_LANCE: "杏", shogi.PROM_KNIGHT: "圭", shogi.PROM_SILVER: "全", shogi.PROM_BISHOP: "馬", shogi.PROM_ROOK: "竜" } # ========================================== # クラス定義 # ========================================== class HistoryManager: def __init__(self, file_path, lock): self.file_path = file_path self.lock = lock os.makedirs(os.path.dirname(self.file_path), exist_ok=True) self.history = self._load_history() def _load_history(self): seen = set() with self.lock: if not os.path.exists(self.file_path): return seen try: with open(self.file_path, "r", encoding="utf-8") as f: for line in f: parts = line.strip().split() if parts: seen.add(parts[0]) except: pass return seen def is_duplicate(self, sfen): with self.lock: return sfen.split()[0] in self.history def add(self, sfen): with self.lock: self.history.add(sfen.split()[0]) try: with open(self.file_path, "a", encoding="utf-8") as f: f.write(sfen + "\n") except: pass class USISolver: def __init__(self, engine_path, name="Engine"): self.engine_path = engine_path self.name = name self.process = None self.boot_engine() def boot_engine(self): self.close() if not os.path.exists(self.engine_path): return try: self.process = subprocess.Popen( self.engine_path, cwd=os.path.dirname(self.engine_path), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='replace', bufsize=1 ) self._send_raw("usi") if not self._wait_for("usiok", timeout=15): return self._send_raw("setoption name Threads value 2") self._send_raw("setoption name USI_Hash value 256") self._send_raw("isready") if not self._wait_for("readyok", timeout=15): return self._send_raw("usinewgame") except: self.close() def _send_raw(self, command): if self.process: try: self.process.stdin.write(command + "\n") self.process.stdin.flush() except: pass def _read_line(self): try: if self.process: return self.process.stdout.readline() except: pass return None def _wait_for(self, keyword, timeout=10): start = time.time() while True: line = self._read_line() if not line: break if keyword in line: return True if time.time() - start > timeout: return False return False def close(self): if self.process: try: self.process.stdin.close() self.process.terminate() self.process.wait(timeout=1) except: try: self.process.kill() except: pass self.process = None def solve(self, board, nodes=50000): if self.process is None: self.boot_engine() try: self._send_raw(f"position sfen {board.sfen()}") self._send_raw(f"go nodes {nodes}") except: self.boot_engine() return False, "Error", 0 start_time = time.time() pv_moves = [] mate_found = False while True: line = self._read_line() if not line: self.boot_engine() return False, "Lost", 0 if "mate " in line and ("score" in line or "info" in line) and "mate -" not in line: mate_found = True try: parts = line.split() if "pv" in parts: pv_moves = [m for m in parts[parts.index("pv")+1:] if m != "resign"] except: pass if "bestmove" in line: parts = line.strip().split() if len(parts) > 1: best_move = parts[1] if best_move == "resign" or not mate_found: return False, "NoMate", 0 moves_str = " ".join(pv_moves) if pv_moves else best_move steps = len(pv_moves) if pv_moves else 1 if steps % 2 == 0: return False, "Incomplete", 0 return True, moves_str, steps return False, "Unknown", 0 if time.time() - start_time > 10.0: self.boot_engine() return False, "Timeout", 0 return False, "Unknown", 0 def solve_strictly(self, board, estimated_steps): try: self._send_raw("usinewgame") self._send_raw(f"position sfen {board.sfen()}") self._send_raw(f"go nodes 5000000") except: return False, "Error", 0 start_time = time.time() pv1_moves = [] mate_found = False while True: line = self._read_line() if not line: break if "mate " in line and ("score" in line or "info" in line) and "mate -" not in line: mate_found = True try: parts = line.split() if "pv" in parts: pv1_moves = [m for m in parts[parts.index("pv")+1:] if m != "resign"] except: pass if "bestmove" in line: if not mate_found: return False, "NoMate", 0 if pv1_moves: steps = len(pv1_moves) if steps % 2 == 0: return False, "Incomplete", 0 return True, " ".join(pv1_moves), steps return False, "NoMate", 0 if time.time() - start_time > 60.0: return False, "Timeout", 0 return False, "Unknown", 0 def check_alt_moves(self, board, correct_move_usi): try: legal_moves = list(board.generate_legal_moves()) alt_moves = [m.usi() for m in legal_moves if m.usi() != correct_move_usi] if not alt_moves: return False, None self._send_raw("usinewgame") self._send_raw(f"position sfen {board.sfen()}") self._send_raw(f"go nodes 500000 searchmoves {' '.join(alt_moves)}") except: return False, None start_time = time.time() is_mate = False while True: line = self._read_line() if not line: break if "mate " in line and "mate -" not in line: is_mate = True if "bestmove" in line: parts = line.split() if len(parts) > 1 and parts[1] != "resign" and is_mate: return True, parts[1] return False, None if time.time() - start_time > 10.0: return False, None return False, None class GPUSolver(USISolver): def __init__(self, engine_path): super().__init__(engine_path, name="GPU(dlshogi)") # ========================================================================= # ★ 空間重視・疑似単玉・中段玉特化モデル (5回捨駒版) # ========================================================================= class PerfectSacrificeGenerator: def __init__(self, worker_id, shared_counter, lock, history_manager, target_problems): self.worker_id = worker_id self.wid_str = f"[W-{worker_id:02d}]" self.shared_counter = shared_counter self.lock = lock self.history_manager = history_manager self.target_problems = target_problems self.cpu_solver = USISolver(CPU_ENGINE_PATH, f"CPU-{worker_id}") self.gpu_solver = GPUSolver(GPU_ENGINE_PATH) def _get_piece_counts(self, board): counts = defaultdict(int) for sq in range(81): p = board.piece_at(sq) if p and p.piece_type != shogi.KING: pt = p.piece_type if pt in PROMOTE_MAP: pt = PROMOTE_MAP[pt] counts[pt] += 1 for pt, num in board.pieces_in_hand[shogi.BLACK].items(): counts[pt] += num for pt, num in board.pieces_in_hand[shogi.WHITE].items(): counts[pt] += num return counts def _is_valid_square(self, piece_type, color, sq): r = sq // 9 if color == shogi.BLACK: if piece_type in [shogi.PAWN, shogi.LANCE] and r == 0: return False if piece_type == shogi.KNIGHT and r <= 1: return False else: if piece_type in [shogi.PAWN, shogi.LANCE] and r == 8: return False if piece_type == shogi.KNIGHT and r >= 7: return False return True def _evaluate_fitness(self, steps, sac_count, has_surplus, sac_types): # ★ 変更: ただ捨てを増やすため、手数の上限を45手まで緩和 if steps > 45: return -99999 if steps % 2 == 0: return -99999 # ★ 変更: ただ捨てのスコアを圧倒的に高くし、最優先事項とする score = sac_count * 1000000 score += steps * 50 # 手数が長くなることも少し評価する for pt in sac_types: if pt in [shogi.ROOK, shogi.BISHOP, shogi.PROM_ROOK, shogi.PROM_BISHOP]: score += 100000 elif pt in [shogi.LANCE, shogi.KNIGHT, shogi.PROM_LANCE, shogi.PROM_KNIGHT]: score += 50000 elif pt in [shogi.GOLD, shogi.SILVER, shogi.PROM_SILVER]: score += 20000 if has_surplus: score -= 10 return score def _has_surplus(self, board, moves_str): moves = moves_str.split() sim_board = copy.deepcopy(board) try: for m_str in moves: sim_board.push(shogi.Move.from_usi(m_str)) return sum(sim_board.pieces_in_hand[shogi.BLACK].values()) > 0 except: return True def _analyze_sacrifices(self, board, moves_str): moves = moves_str.split() sim_board = copy.deepcopy(board) sac_count = 0 sac_types = [] try: for i in range(0, len(moves), 2): if i >= len(moves): break m_black = shogi.Move.from_usi(moves[i]) # ★追加: 先手が動かす「前」に、目標のマスが空(None)であるかを確認する target_piece_before_move = sim_board.piece_at(m_black.to_square) is_empty_square = (target_piece_before_move is None) moved_piece = sim_board.piece_at(m_black.from_square) if m_black.from_square is not None else None piece_type = moved_piece.piece_type if moved_piece else m_black.drop_piece_type sim_board.push(m_black) if i + 1 < len(moves): m_white = shogi.Move.from_usi(moves[i+1]) # ★変更: 同じマスに取り返されたか + 「元々が空きマスだったか」を条件に追加 if m_white.to_square == m_black.to_square and is_empty_square: sac_count += 1 sac_types.append(piece_type) sim_board.push(m_white) except: return 0, [] return sac_count, sac_types def run(self): try: retry = 0 while self.shared_counter.value < self.target_problems: retry += 1 if retry % 500 == 0: print(f"{self.wid_str} 候補を探索中... (シード生成試行: {retry}回)") seed_board = self._create_random_seed() solve_board = self._create_solve_board(seed_board) if self.history_manager.is_duplicate(solve_board.sfen()): continue is_mate, moves_str, steps = self.cpu_solver.solve(solve_board, nodes=8000) # ★ 変更: 5捨に合わせて上限を35手に引き上げ if not is_mate or steps > 35 or steps < 3 or steps % 2 == 0: continue print(f"{self.wid_str} [Seed発見] 試行{retry}回目: {steps}手詰 中段玉の原石を進化させます。") current_best = seed_board current_steps = steps current_sac, current_sac_types = self._analyze_sacrifices(solve_board, moves_str) current_surplus = self._has_surplus(solve_board, moves_str) current_score = self._evaluate_fitness(current_steps, current_sac, current_surplus, current_sac_types) stagnation_counter = 0 # 5回捨て駒のため進化の上限世代数を増やす for gen in range(1, 4001): if self.shared_counter.value >= self.target_problems: return child_board = self._mutate_with_hand(current_best, stagnation_counter) if not self._is_safe_board(child_board): continue solve_child = self._create_solve_board(child_board) is_mate_c, new_moves, new_steps = self.cpu_solver.solve(solve_child, nodes=20000) if is_mate_c and new_steps % 2 != 0: new_sac, new_sac_types = self._analyze_sacrifices(solve_child, new_moves) new_surplus = self._has_surplus(solve_child, new_moves) new_score = self._evaluate_fitness(new_steps, new_sac, new_surplus, new_sac_types) update = False if new_score > current_score: print(f"{self.wid_str} [進化] {current_steps}手({current_sac}捨) -> {new_steps}手({new_sac}捨) (Gen:{gen})") update = True elif new_score == current_score and current_score > 0: if random.random() < 0.2: update = True if update: current_best = child_board current_steps = new_steps current_sac = new_sac current_surplus = new_surplus current_score = new_score stagnation_counter = 0 # ★ 最終審査:最低11手以上、かつ5回捨て駒を要求 if current_steps >= 11 and current_sac >= 4 and not current_surplus: if self._finalize_check(current_best, current_steps): with self.lock: self.shared_counter.value += 1 count = self.shared_counter.value print(f"\n{self.wid_str} ★ 【累計 {count} 問目】の空間美あふれる5捨完全作を出力しました!次を探します...\n") if count >= self.target_problems: return break else: print(f"{self.wid_str} -> 最終審査落ち。種を破棄して次を探します。") break else: stagnation_counter += 1 else: stagnation_counter += 1 # ★ 変更: 難易度の高いただ捨てを発見するための忍耐力を限界突破させる limit = 300 if current_sac >= 1: limit = 800 if current_sac >= 2: limit = 2000 if current_sac >= 3: limit = 4000 if current_sac >= 4: limit = 8000 if stagnation_counter > limit: print(f"{self.wid_str} -> [撤退] 成長限界(捨駒{current_sac} / 停滞{stagnation_counter}回)。新しい原石を探します。") break except KeyboardInterrupt: pass finally: self.cpu_solver.close() self.gpu_solver.close() def _finalize_check(self, board, target_steps): print(f"{self.wid_str} ★ 条件クリア! 最終クリーニング&余詰審査を開始します...") cleaned_board = self._clean_board_pieces_fast(board, target_steps) final_solve_board = self._create_solve_board(cleaned_board) validator = USISolver(CPU_ENGINE_PATH, f"Validator-{self.worker_id}") is_mate, moves, steps = validator.solve_strictly(final_solve_board, target_steps) validator.close() if not is_mate or steps % 2 == 0: return False sac, _ = self._analyze_sacrifices(final_solve_board, moves) # ★ 変更: 5回未満の捨て駒なら破棄 if sac < 4: return False if not self._is_valid_tsume_strict_no_surplus(final_solve_board, moves): print(f"{self.wid_str} -> クリーニングで駒余りが消しきれませんでした。") return False checker = USISolver(CPU_ENGINE_PATH, f"Checker-{self.worker_id}") has_yozume, _ = checker.check_alt_moves(final_solve_board, moves.split()[0]) checker.close() if has_yozume: print(f"{self.wid_str} -> 余詰(別解)が発見されました。") return False current_sfen = final_solve_board.sfen() self.history_manager.add(current_sfen) with self.lock: print("\n========================================") print(f"{self.wid_str} ★ 厳密完全作:{steps}手詰 (捨て駒 {sac}回)") print("========================================") print(f"手順: {moves}") print("状態: 【完全作(余詰なし・中央玉・ルール厳守)】") print(f"SFEN: {current_sfen}") print("--- 盤面図 ---") print(shogi.Board(current_sfen)) print("========================================\n") self._save_kif_file(current_sfen, moves, steps) return True def _save_kif_file(self, sfen, moves_str, steps): os.makedirs(SAVE_DIR, exist_ok=True) timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}_W{self.worker_id:02d}.kif" filepath = os.path.join(SAVE_DIR, filename) board = shogi.Board(sfen) lines = [] lines.append("手合割:その他") wh = [] for pt in [shogi.ROOK, shogi.BISHOP, shogi.GOLD, shogi.SILVER, shogi.KNIGHT, shogi.LANCE, shogi.PAWN]: c = board.pieces_in_hand[shogi.WHITE][pt] if c > 0: wh.append(PIECE_KIF[pt] + (KANJI_NUMS[c] if c > 1 else "")) lines.append("後手の持駒:" + (" ".join(wh) if wh else "なし")) lines.append("  9 8 7 6 5 4 3 2 1") lines.append("+---------------------------+") for r in range(9): row_str = "|" for f in range(9): sq = r * 9 + f p = board.piece_at(sq) if p is None: row_str += " ・" else: char = KIF_BOARD_PIECE[p.piece_type] row_str += ("v" if p.color == shogi.WHITE else " ") + char row_str += f"|{KANJI_NUMS[r+1]}" lines.append(row_str) lines.append("+---------------------------+") bh = [] for pt in [shogi.ROOK, shogi.BISHOP, shogi.GOLD, shogi.SILVER, shogi.KNIGHT, shogi.LANCE, shogi.PAWN]: c = board.pieces_in_hand[shogi.BLACK][pt] if c > 0: bh.append(PIECE_KIF[pt] + (KANJI_NUMS[c] if c > 1 else "")) lines.append("先手の持駒:" + (" ".join(bh) if bh else "なし")) lines.append("先手番") lines.append("手数----指手---------消費時間--") moves = moves_str.split() sim_board = shogi.Board(sfen) prev_to = None for i, m_str in enumerate(moves): m = shogi.Move.from_usi(m_str) if m.drop_piece_type is not None: to_col = 9 - (m.to_square % 9) to_row = (m.to_square // 9) + 1 p_str = PIECE_KIF[m.drop_piece_type] move_str = f"{ZENKAKU_NUMS[to_col]}{KANJI_NUMS[to_row]}{p_str}打" else: to_col = 9 - (m.to_square % 9) to_row = (m.to_square // 9) + 1 from_col = 9 - (m.from_square % 9) from_row = (m.from_square // 9) + 1 if prev_to == m.to_square: pos_str = "同 " else: pos_str = f"{ZENKAKU_NUMS[to_col]}{KANJI_NUMS[to_row]}" p = sim_board.piece_at(m.from_square) p_str = PIECE_KIF[p.piece_type] prom_str = "成" if m.promotion else "" move_str = f"{pos_str}{p_str}{prom_str}({from_col}{from_row})" lines.append(f"{i+1:4d} {move_str}") prev_to = m.to_square sim_board.push(m) try: with open(filepath, "w", encoding="cp932", errors="replace") as f: f.write("\n".join(lines) + "\n") print(f"{self.wid_str} -> 保存完了: {filename} を書き出しました!") except Exception as e: print(f"{self.wid_str} -> 保存エラー: {e}") def _create_solve_board(self, board): solve_board = copy.deepcopy(board) counts = self._get_piece_counts(board) REAL_PIECE_LIMITS = { shogi.ROOK: 2, shogi.BISHOP: 2, shogi.GOLD: 4, shogi.SILVER: 4, shogi.KNIGHT: 4, shogi.LANCE: 4, shogi.PAWN: 18 } for pt, total in REAL_PIECE_LIMITS.items(): remainder = total - counts[pt] if remainder > 0: solve_board.pieces_in_hand[shogi.WHITE][pt] = remainder return solve_board def _create_random_seed(self): board = shogi.Board() board.clear() kx = random.randint(2, 6) ky = 2 k_pos = ky * 9 + kx board.set_piece_at(k_pos, shogi.Piece(shogi.KING, shogi.WHITE)) corners = [0, 8, 72, 80] s_king_pos = max(corners, key=lambda sq: abs((sq%9)-kx) + abs((sq//9)-ky)) board.set_piece_at(s_king_pos, shogi.Piece(shogi.KING, shogi.BLACK)) adj_squares = [r*9+f for f in range(max(0, kx-1), min(9, kx+2)) for r in range(max(0, ky-1), min(9, ky+2)) if r*9+f != k_pos] random.shuffle(adj_squares) for sq in adj_squares[:random.randint(3, 5)]: pt = random.choices([shogi.PAWN, shogi.SILVER, shogi.GOLD], weights=[50, 25, 25])[0] if self._is_valid_square(pt, shogi.WHITE, sq) and self._get_piece_counts(board)[pt] < PIECE_LIMITS[pt]: board.set_piece_at(sq, shogi.Piece(pt, shogi.WHITE)) outer_squares = [r*9+f for f in range(max(0, kx-2), min(9, kx+3)) for r in range(max(0, ky-2), min(9, ky+3)) if r*9+f != k_pos and not board.piece_at(r*9+f) and r*9+f != s_king_pos] random.shuffle(outer_squares) for sq in outer_squares[:random.randint(1, 3)]: pt = random.choices([shogi.PAWN, shogi.LANCE, shogi.KNIGHT], weights=[60, 20, 20])[0] if self._is_valid_square(pt, shogi.WHITE, sq) and self._get_piece_counts(board)[pt] < PIECE_LIMITS[pt]: board.set_piece_at(sq, shogi.Piece(pt, shogi.WHITE)) b_zone = [sq for sq in range(81) if not board.piece_at(sq) and sq != s_king_pos and max(abs((sq%9)-kx), abs((sq//9)-ky)) >= 2] random.shuffle(b_zone) for _ in range(random.randint(2, 4)): if b_zone: pt = random.choices( [shogi.ROOK, shogi.BISHOP, shogi.LANCE, shogi.KNIGHT, shogi.GOLD, shogi.SILVER], weights=[25, 25, 20, 20, 5, 5] )[0] valid_sqs = [sq for sq in b_zone if self._is_valid_square(pt, shogi.BLACK, sq)] if valid_sqs and self._get_piece_counts(board)[pt] < PIECE_LIMITS[pt]: sq = random.choice(valid_sqs) board.set_piece_at(sq, shogi.Piece(pt, shogi.BLACK)) b_zone.remove(sq) for _ in range(random.randint(0, 1)): pt = random.choices( [shogi.LANCE, shogi.KNIGHT, shogi.GOLD, shogi.SILVER], weights=[40, 40, 10, 10] )[0] if self._get_piece_counts(board)[pt] < PIECE_LIMITS[pt]: board.pieces_in_hand[shogi.BLACK][pt] += 1 return board def _mutate_with_hand(self, parent_board, stagnation=0): board = copy.deepcopy(parent_board) base_mutations = random.choice([1, 2]) extra = stagnation // 30 num_mutations = min(base_mutations + extra, 5) for _ in range(num_mutations): action = random.choices( ["add_hand", "add_black_board", "add_sac_target", "add_white_wall", "pull_defender", "remove"], weights=[5, 25, 10, 20, 10, 30] )[0] k_sq = board.king_squares[shogi.WHITE] if action == "add_hand": counts = self._get_piece_counts(board) avail = [p for p in PIECE_LIMITS.keys() if counts[p] < PIECE_LIMITS[p]] if avail: pt = random.choices( [shogi.ROOK, shogi.BISHOP, shogi.LANCE, shogi.KNIGHT, shogi.GOLD, shogi.SILVER, shogi.PAWN], weights=[20, 20, 20, 20, 5, 5, 10] )[0] if pt in avail: board.pieces_in_hand[shogi.BLACK][pt] += 1 elif action == "add_black_board": counts = self._get_piece_counts(board) avail = [p for p in PIECE_LIMITS.keys() if counts[p] < PIECE_LIMITS[p]] if avail and k_sq is not None: kf, kr = k_sq % 9, k_sq // 9 zone = [sq for sq in range(81) if not board.piece_at(sq) and max(abs((sq%9)-kf), abs((sq//9)-kr)) >= 2] if zone: pt = random.choices( [shogi.ROOK, shogi.BISHOP, shogi.LANCE, shogi.KNIGHT, shogi.GOLD, shogi.SILVER], weights=[25, 25, 20, 20, 5, 5] )[0] valid_zone = [sq for sq in zone if self._is_valid_square(pt, shogi.BLACK, sq)] if pt in avail and valid_zone: board.set_piece_at(random.choice(valid_zone), shogi.Piece(pt, shogi.BLACK)) elif action == "add_sac_target": counts = self._get_piece_counts(board) avail = [p for p in [shogi.ROOK, shogi.BISHOP, shogi.LANCE, shogi.KNIGHT] if counts[p] < PIECE_LIMITS[p]] if avail and k_sq is not None: kf, kr = k_sq % 9, k_sq // 9 adj_zone = [r*9+f for f in range(max(0, kf-3), min(9, kf+4)) for r in range(max(0, kr-3), min(9, kr+4)) if not board.piece_at(r*9+f)] if adj_zone: pt = random.choice(avail) valid_zone = [sq for sq in adj_zone if self._is_valid_square(pt, shogi.BLACK, sq)] if valid_zone: board.set_piece_at(random.choice(valid_zone), shogi.Piece(pt, shogi.BLACK)) elif action == "add_white_wall": counts = self._get_piece_counts(board) avail = [p for p in [shogi.PAWN, shogi.LANCE, shogi.KNIGHT, shogi.SILVER] if counts[p] < PIECE_LIMITS[p]] if avail and k_sq is not None: kf, kr = k_sq % 9, k_sq // 9 zone = [r*9+f for f in range(max(0, kf-3), min(9, kf+4)) for r in range(max(0, kr-3), min(9, kr+4)) if not board.piece_at(r*9+f)] if zone: pt = random.choice(avail) valid_zone = [sq for sq in zone if self._is_valid_square(pt, shogi.WHITE, sq)] if valid_zone: board.set_piece_at(random.choice(valid_zone), shogi.Piece(pt, shogi.WHITE)) elif action == "pull_defender": if k_sq is not None: kf, kr = k_sq % 9, k_sq // 9 adj_zone = [r*9+f for f in range(max(0, kf-1), min(9, kf+2)) for r in range(max(0, kr-1), min(9, kr+2)) if r*9+f != k_sq] defenders = [sq for sq in adj_zone if board.piece_at(sq) and board.piece_at(sq).color == shogi.WHITE] if defenders: src = random.choice(defenders) p = board.piece_at(src) board.remove_piece_at(src) outer_zone = [r*9+f for f in range(max(0, kf-3), min(9, kf+4)) for r in range(max(0, kr-3), min(9, kr+4)) if max(abs(f-kf), abs(r-kr)) >= 2 and not board.piece_at(r*9+f)] valid_outer = [sq for sq in outer_zone if self._is_valid_square(p.piece_type, shogi.WHITE, sq)] if valid_outer: board.set_piece_at(random.choice(valid_outer), p) else: board.set_piece_at(src, p) elif action == "remove": targets = [("hand", pt) for pt, n in board.pieces_in_hand[shogi.BLACK].items() if n>0] + \ [("board", sq) for sq in range(81) if board.piece_at(sq) and board.piece_at(sq).color == shogi.WHITE and board.piece_at(sq).piece_type != shogi.KING] targets += [("board", sq) for sq in range(81) if board.piece_at(sq) and board.piece_at(sq).color == shogi.BLACK and board.piece_at(sq).piece_type != shogi.KING] if targets: t, v = random.choice(targets) if t=="hand": board.pieces_in_hand[shogi.BLACK][v] -= 1 else: board.remove_piece_at(v) return board def _is_safe_board(self, board): if not board.king_squares[shogi.BLACK] or not board.king_squares[shogi.WHITE]: return False for i in range(9): b_pawn = 0 w_pawn = 0 for r in range(9): p = board.piece_at(r*9+i) if p: if p.piece_type == shogi.PAWN: if p.color == shogi.BLACK: b_pawn += 1 else: w_pawn += 1 if p.color == shogi.BLACK: if p.piece_type in [shogi.PAWN, shogi.LANCE] and r == 0: return False if p.piece_type == shogi.KNIGHT and r <= 1: return False else: if p.piece_type in [shogi.PAWN, shogi.LANCE] and r == 8: return False if p.piece_type == shogi.KNIGHT and r >= 7: return False if b_pawn > 1 or w_pawn > 1: return False counts = self._get_piece_counts(board) for pt, limit in PIECE_LIMITS.items(): if counts[pt] > limit: return False return True # ★ 変更: クリーニングの足切りも5捨対応に変更 def _clean_board_pieces_fast(self, board, org_steps): protected = [] if board.king_squares[shogi.WHITE] is not None: protected.append(board.king_squares[shogi.WHITE]) if board.king_squares[shogi.BLACK] is not None: protected.append(board.king_squares[shogi.BLACK]) checker = USISolver(CPU_ENGINE_PATH, f"Cleaner-{self.worker_id}") hand = board.pieces_in_hand[shogi.BLACK] for pt in list(hand.keys()): while hand[pt] > 0: hand[pt] -= 1 temp = self._create_solve_board(board) is_mate, moves, steps = checker.solve(temp, nodes=100000) sac, _ = self._analyze_sacrifices(temp, moves) if not (is_mate and sac >= 5): hand[pt] += 1 break for sq in reversed(range(81)): p = board.piece_at(sq) if p and p.color == shogi.BLACK and sq not in protected: board.remove_piece_at(sq) temp = self._create_solve_board(board) is_mate, moves, steps = checker.solve(temp, nodes=100000) sac, _ = self._analyze_sacrifices(temp, moves) if not (is_mate and sac >= 5): board.set_piece_at(sq, p) checker.close() return board def _is_valid_tsume_strict_no_surplus(self, board, moves_str): try: temp = copy.deepcopy(board) moves = moves_str.split() if not moves: return False for i, m_str in enumerate(moves): m = shogi.Move.from_usi(m_str) if not temp.is_legal(m): return False temp.push(m) if i % 2 == 0 and not temp.is_check(): return False if not temp.is_checkmate(): return False if sum(temp.pieces_in_hand[shogi.BLACK].values()) > 0: return False return True except: return False # ========================================================================= # ★ ワーカーのタスク定義とメイン処理 # ========================================================================= def worker_task(worker_id, shared_counter, lock, target_problems): history_manager = HistoryManager(HISTORY_FILE, lock) gen = PerfectSacrificeGenerator(worker_id, shared_counter, lock, history_manager, target_problems) gen.run() if __name__ == "__main__": print(f"\n=== 芸術特化: 4回捨駒 (三段目玉・空間重視モデル・完全ルール厳守版) ===") manager = mp.Manager() lock = manager.Lock() shared_counter = manager.Value('i', 0) initial_history = HistoryManager(HISTORY_FILE, lock) print(f"・過去の作品数: {len(initial_history.history)} 件") print(f"・保存先ディレクトリ: {SAVE_DIR}") print(f"・ワーカー数: {WORKER_COUNT} (目標: {TARGET_PROBLEMS}問)\n") processes = [] for i in range(WORKER_COUNT): p = mp.Process(target=worker_task, args=(i + 1, shared_counter, lock, TARGET_PROBLEMS)) p.start() processes.append(p) for p in processes: p.join() print("\n★ 全ての探索が完了しました!")