teoria del tutto
Banned
E' un codice per numeri primi che devo verificare, la mia vecchia teoria dei primi ho provato a darla in pasto a chat gpt:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
matriosca_v7.py — scanner CPU per Mersenne con pipeline deterministica e parallela.
Punti chiave:
- Nessun falso negativo: i prefiltri non escludono mai un candidato buono.
- MR-map soft (default): usa basi [5,7] o [7,5] quando r36 è informativo; strict opzionale.
- Trial factoring sicuro: wheel-30, MR deterministico a 64-bit su q, poi check pow(2,p,q).
- LL CPU ottimizzato; usa gmpy2 se disponibile.
- Multiprocessing (--workers) per saturare i core.
"""
from __future__ import annotations
import argparse, time, csv, os
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict
from concurrent.futures import ProcessPoolExecutor, as_completed
# ===== opzionale: gmpy2 =====
try:
import gmpy2
HAVE_GMP = True
except Exception:
gmpy2 = None
HAVE_GMP = False
# ===== Miller–Rabin deterministico < 2^64 =====
def is_probable_prime(n: int) -> bool:
if n < 2:
return False
small = (2,3,5,7,11,13,17,19,23,29,31,37)
for sp in small:
if n == sp:
return True
if n % sp == 0:
return n == sp
# scomponi n-1 = d*2^s
d = n - 1
s = 0
while d & 1 == 0:
d >>= 1; s += 1
# basi deterministiche per 64-bit
for a in (2,325,9375,28178,450775,9780504,1795265022):
if a % n == 0:
continue
x = pow(a, d, n)
if x == 1 or x == n-1:
continue
for _ in range(s-1):
x = (x * x) % n
if x == n-1:
break
else:
return False
return True
# ===== riduzione modulo M=2^p-1 (micro-opt) =====
def mersenne_mod(x: int, p: int) -> int:
if x < 0: x = -x
if p <= 0: return 0
M = (1 << p) - 1
if x <= M:
return 0 if x == M else x
while x.bit_length() > p:
x = (x & M) + (x >> p)
if x >= M:
x -= M
return x
# ===== Lucas–Lehmer =====
def lucas_lehmer_gmp(p: int) -> bool:
if p == 2: return True
if p < 2: return False
N = (gmpy2.mpz(1) << p) - 1
S = gmpy2.mpz(4)
for _ in range(p-2):
S = (S*S - 2) % N
return S == 0
def lucas_lehmer_cpu(p: int, *, block: int = 8) -> bool:
if HAVE_GMP:
return lucas_lehmer_gmp(p)
if p == 2: return True
if p < 2: return False
M = (1 << p) - 1
s = 4
iters = p - 2
if block <= 1: block = 1
full, tail = divmod(iters, block)
for _ in range(full):
for __ in range(block):
s = s * s - 2
s = (s & M) + (s >> p)
if s >= M: s -= M
for _ in range(tail):
s = s * s - 2
s = (s & M) + (s >> p)
if s >= M: s -= M
return s == 0
# ===== prefiltro esponente p (safe) =====
def exponent_filter(p: int) -> bool:
"""Ritorna True se p supera i filtri sicuri (p primo e p ≡ 1,5 (mod 6) per p>=7)."""
if p < 2:
return False
if p >= 7 and (p % 6) not in (1,5):
return False
# MR non dice mai "composto" per un primo -> zero falsi negativi
return is_probable_prime(p)
# ===== PRP forte su N=2^p-1 =====
def _strong_prp(N: int, a: int) -> bool:
if N < 3 or (N & 1) == 0:
return False
d, s = N - 1, 0
while d & 1 == 0:
d >>= 1; s += 1
x = pow(a, d, N)
if x == 1 or x == N-1:
return True
for _ in range(s-1):
x = (x * x) % N
if x == N-1:
return True
return False
def mr_map_info(p: int) -> Tuple[int, List[int]]:
"""Restituisce (r36, basi) — basi vuote se r36 non è informativo."""
if p < 5:
return (0, [])
r36 = (pow(2, p, 36) - 1) % 36
if r36 == 31:
return (r36, [5, 7])
if r36 in (1,7,13,19,23,29):
return (r36, [7, 5])
return (r36, [])
def mr_map(p: int, *, strict: bool = False) -> Tuple[bool, int, List[int]]:
"""
PRP su N=2^p-1 pilotato da r36.
- soft (strict=False): mai bloccare quando r36 non è informativo -> ritorna True.
- strict=True: se r36 è informativo e PRP fallisce → blocca (ritorna False).
Ritorna (ok, r36, bases).
"""
r36, bases = mr_map_info(p)
if not bases:
return True, r36, bases
N = (1 << p) - 1
ok = all(_strong_prp(N, a) for a in bases)
if strict:
return ok, r36, bases
return True, r36, bases # soft: non blocca mai
# ===== Trial factoring “safe” + wheel-30 + MR su q =====
def trial_factor_mersenne(
p: int,
limit: int = 500_000,
*,
require_q_mod8_pm1: bool = True,
) -> Optional[int]:
"""
Cerca q | (2^p-1) con q = 2*k*p + 1, q <= limit.
Filtri: q ≡ 1 o 7 (mod 8) opzionale; wheel-30; MR deterministico su q; poi test pow(2,p,q).
Ritorna q se divide; None altrimenti. Mai falsi negativi.
"""
if p < 3 or limit <= 3:
return None
step = 2 * p
q = step + 1
small = (3,5,7,11,13)
W30 = {1,7,11,13,17,19,23,29}
while q <= limit:
if require_q_mod8_pm1:
r8 = q & 7
if r8 != 1 and r8 != 7:
q += step; continue
if (q % 30) not in W30:
q += step; continue
composite = False
for sp in small:
if q % sp == 0 and q != sp:
composite = True; break
if not composite and is_probable_prime(q) and pow(2, p, q) == 1:
return q
q += step
return None
# ===== ranking pietre (25/35/49) — solo ordinamento =====
def col25(N: int) -> int: return (((N-25)//6)//5) % 5 if (N-25) % 6 == 0 else -1
def col35(N: int) -> int: return (((N-35)//6)//5) % 5 if (N-35) % 6 == 0 else -1
def col49(N: int) -> int: return (((N-49)//6)//7) % 5 if (N-49) % 6 == 0 else -1
def stone_score(p: int) -> int:
if p < 5: return 0
N = (1 << p) - 1
return int(col25(N) >= 0) + int(col35(N) >= 0) + int(col49(N) >= 0)
# ===== self-test / bench =====
KNOWN_P_PRIME = [2,3,5,7,13,17,19,31,61,89,107,127]
KNOWN_P_COMP = [11,23,29,37,41,43,47,53]
@dataclass
class SelfTestResult:
mod_ok: bool; ll_ok: bool; tf_ok: bool
def self_test(rand_cases: int = 800) -> SelfTestResult:
import random
rng = random.Random(1337)
# mersenne_mod
mod_ok = True
for _ in range(rand_cases):
p = rng.randrange(3, 61)
M = (1 << p) - 1
x = rng.getrandbits(2 * p)
if mersenne_mod(x, p) != (x % M):
mod_ok = False; break
# LL
ll_ok = all(lucas_lehmer_cpu(p) for p in KNOWN_P_PRIME) \
and all(not lucas_lehmer_cpu(p) for p in KNOWN_P_COMP)
# trial factoring (pochi check “noti”)
tf_ok = True
if trial_factor_mersenne(11, limit=200_000) not in (23, 89): tf_ok = False
if trial_factor_mersenne(23, limit=500_000) not in (47, 178481): tf_ok = False
return SelfTestResult(mod_ok, ll_ok, tf_ok)
def micro_bench(samples: int = 120, p_min: int = 3000, p_max: int = 8000, block: int = 8) -> float:
import random
rng = random.Random(42)
ps: List[int] = []
while len(ps) < samples:
p = rng.randrange(p_min | 1, (p_max | 1) + 1, 2)
if p >= 7 and (p % 6) not in (1, 5): continue
if not is_probable_prime(p): continue
ps.append(p)
t0 = time.perf_counter()
_ok = 0
for p in ps:
_ok += int(lucas_lehmer_cpu(p, block=block))
dt = time.perf_counter() - t0
return len(ps) / dt if dt > 0 else 0.0
def auto_block(cands=(8,12,16)) -> int:
best_b, best_pps = None, -1.0
for b in cands:
pps = micro_bench(block=b)
if pps > best_pps:
best_pps, best_b = pps, b
print(f"[auto-block] scelto block={best_b} (≈ {best_pps:.2f} p/s)")
return best_b or 8
# ===== worker parallelo =====
def process_one(p: int,
use_mr_map: bool,
mr_strict: bool,
tf_limit: int,
block: int) -> Dict:
t0 = time.perf_counter()
score = stone_score(p)
r36 = None; bases = []
mr_pass = True
if use_mr_map:
ok, r36, bases = mr_map(p, strict=mr_strict)
mr_pass = ok
if mr_strict and not ok:
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=False, tf_factor=0, prime=False, ms=int(dt*1000))
tf_fac = 0
if tf_limit > 0:
fac = trial_factor_mersenne(p, limit=tf_limit, require_q_mod8_pm1=True)
if fac is not None:
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=mr_pass, tf_factor=fac, prime=False, ms=int(dt*1000))
is_prime = lucas_lehmer_cpu(p, block=block)
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=mr_pass, tf_factor=tf_fac, prime=is_prime, ms=int(dt*1000))
# ===== CLI =====
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--self-test', action='store_true')
ap.add_argument('--bench', action='store_true')
ap.add_argument('--auto-block', action='store_true')
ap.add_argument('--start', type=int, default=0)
ap.add_argument('--stop', type=int, default=0)
ap.add_argument('--max-n', type=int, default=0)
ap.add_argument('--tf-limit', type=int, default=0)
ap.add_argument('--block', type=int, default=8)
ap.add_argument('--workers', type=int, default=os.cpu_count() or 1)
# MR-map soft/strict (retrocompat soppressa: qui usiamo solo --use-mr-map)
ap.add_argument('--use-mr-map', action='store_true',
help='Attiva il pre-MR su M_p (soft di default).')
ap.add_argument('--mr-strict', action='store_true',
help='Se attivo, MR-map può escludere p quando r36 è informativo e fallisce il PRP.')
ap.add_argument('--trace-map', action='store_true',
help='Stampa p,r36,basi,pass/fail MR-map.')
ap.add_argument('--rank-stone', action='store_true',
help='Ordina i candidati per punteggio 25/35/49.')
ap.add_argument('--out-csv', type=str, default='',
help='Scrivi risultati in CSV.')
ap.add_argument('--stop-on-first', action='store_true',
help='Ferma la scansione al primo LL-positivo.')
args = ap.parse_args()
if args.self_test:
res = self_test()
print(f"mod_ok={res.mod_ok} ll_ok={res.ll_ok} tf_ok={res.tf_ok}")
raise SystemExit(0 if (res.mod_ok and res.ll_ok and res.tf_ok) else 1)
if args.bench:
pps = micro_bench(block=args.block)
accel = " (gmpy2 ON)" if HAVE_GMP else ""
print(f"LL throughput: {pps:.2f} p/s{accel}")
raise SystemExit(0)
if args.auto_block:
args.block = auto_block()
if not (args.start and args.stop and args.start <= args.stop):
print("Usa --self-test / --bench / --auto-block oppure specifica --start e --stop.")
raise SystemExit(0)
# costruiamo la lista candidati (safe)
cands: List[int] = []
for p in range(max(3, args.start) | 1, (args.stop | 1) + 1, 2):
if not exponent_filter(p):
continue
cands.append(p)
if args.rank_stone:
cands.sort(key=stone_score, reverse=True)
# logging
writer = None; fh = None
if args.out_csv:
fh = open(args.out_csv, 'w', newline='')
writer = csv.writer(fh)
writer.writerow(['p','stone_score','r36','bases','mr_pass','tf_fact or','prime','ms'])
# parallelo
total = len(cands)
done = 0
found = 0
t0_all = time.perf_counter()
# funzione di submit
def submit_all(executor):
futs = []
for p in cands:
futs.append(executor.submit(process_one, p,
args.use_mr_map, args.mr_strict,
args.tf_limit, args.block))
return futs
if args.workers <= 1:
# sequenziale
for p in cands:
rec = process_one(p, args.use_mr_map, args.mr_strict, args.tf_limit, args.block)
done += 1
if args.trace_map and rec['r36'] is not None:
print(f"[map] p={rec['p']} r36={rec['r36']} bases={rec['bases']} pass={rec['mr_pass']}")
if writer:
writer.writerow([rec['p'], rec['score'], rec['r36'], rec['bases'],
rec['mr_pass'], rec['tf_factor'], rec['prime'], rec['ms']])
if rec['prime']:
found += 1
print(f"p={rec['p']} -> M_p PRIMO (in {rec['ms']} ms)")
if args.stop_on_first:
break
else:
with ProcessPoolExecutor(max_workers=args.workers) as ex:
for fut in as_completed(submit_all(ex)):
rec = fut.result()
done += 1
if args.trace_map and rec['r36'] is not None:
print(f"[map] p={rec['p']} r36={rec['r36']} bases={rec['bases']} pass={rec['mr_pass']}")
if writer:
writer.writerow([rec['p'], rec['score'], rec['r36'], rec['bases'],
rec['mr_pass'], rec['tf_factor'], rec['prime'], rec['ms']])
if rec['prime']:
found += 1
print(f"p={rec['p']} -> M_p PRIMO (in {rec['ms']} ms)")
if args.stop_on_first:
break
if fh: fh.close()
dt = time.perf_counter() - t0_all
pps = done / dt if dt > 0 else 0.0
print(f"\nProcessati {done}/{total} esponenti in {dt:.2f}s (~{pps:.2f} p/s). Primi trovati: {found}.")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
matriosca_v7.py — scanner CPU per Mersenne con pipeline deterministica e parallela.
Punti chiave:
- Nessun falso negativo: i prefiltri non escludono mai un candidato buono.
- MR-map soft (default): usa basi [5,7] o [7,5] quando r36 è informativo; strict opzionale.
- Trial factoring sicuro: wheel-30, MR deterministico a 64-bit su q, poi check pow(2,p,q).
- LL CPU ottimizzato; usa gmpy2 se disponibile.
- Multiprocessing (--workers) per saturare i core.
"""
from __future__ import annotations
import argparse, time, csv, os
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict
from concurrent.futures import ProcessPoolExecutor, as_completed
# ===== opzionale: gmpy2 =====
try:
import gmpy2
HAVE_GMP = True
except Exception:
gmpy2 = None
HAVE_GMP = False
# ===== Miller–Rabin deterministico < 2^64 =====
def is_probable_prime(n: int) -> bool:
if n < 2:
return False
small = (2,3,5,7,11,13,17,19,23,29,31,37)
for sp in small:
if n == sp:
return True
if n % sp == 0:
return n == sp
# scomponi n-1 = d*2^s
d = n - 1
s = 0
while d & 1 == 0:
d >>= 1; s += 1
# basi deterministiche per 64-bit
for a in (2,325,9375,28178,450775,9780504,1795265022):
if a % n == 0:
continue
x = pow(a, d, n)
if x == 1 or x == n-1:
continue
for _ in range(s-1):
x = (x * x) % n
if x == n-1:
break
else:
return False
return True
# ===== riduzione modulo M=2^p-1 (micro-opt) =====
def mersenne_mod(x: int, p: int) -> int:
if x < 0: x = -x
if p <= 0: return 0
M = (1 << p) - 1
if x <= M:
return 0 if x == M else x
while x.bit_length() > p:
x = (x & M) + (x >> p)
if x >= M:
x -= M
return x
# ===== Lucas–Lehmer =====
def lucas_lehmer_gmp(p: int) -> bool:
if p == 2: return True
if p < 2: return False
N = (gmpy2.mpz(1) << p) - 1
S = gmpy2.mpz(4)
for _ in range(p-2):
S = (S*S - 2) % N
return S == 0
def lucas_lehmer_cpu(p: int, *, block: int = 8) -> bool:
if HAVE_GMP:
return lucas_lehmer_gmp(p)
if p == 2: return True
if p < 2: return False
M = (1 << p) - 1
s = 4
iters = p - 2
if block <= 1: block = 1
full, tail = divmod(iters, block)
for _ in range(full):
for __ in range(block):
s = s * s - 2
s = (s & M) + (s >> p)
if s >= M: s -= M
for _ in range(tail):
s = s * s - 2
s = (s & M) + (s >> p)
if s >= M: s -= M
return s == 0
# ===== prefiltro esponente p (safe) =====
def exponent_filter(p: int) -> bool:
"""Ritorna True se p supera i filtri sicuri (p primo e p ≡ 1,5 (mod 6) per p>=7)."""
if p < 2:
return False
if p >= 7 and (p % 6) not in (1,5):
return False
# MR non dice mai "composto" per un primo -> zero falsi negativi
return is_probable_prime(p)
# ===== PRP forte su N=2^p-1 =====
def _strong_prp(N: int, a: int) -> bool:
if N < 3 or (N & 1) == 0:
return False
d, s = N - 1, 0
while d & 1 == 0:
d >>= 1; s += 1
x = pow(a, d, N)
if x == 1 or x == N-1:
return True
for _ in range(s-1):
x = (x * x) % N
if x == N-1:
return True
return False
def mr_map_info(p: int) -> Tuple[int, List[int]]:
"""Restituisce (r36, basi) — basi vuote se r36 non è informativo."""
if p < 5:
return (0, [])
r36 = (pow(2, p, 36) - 1) % 36
if r36 == 31:
return (r36, [5, 7])
if r36 in (1,7,13,19,23,29):
return (r36, [7, 5])
return (r36, [])
def mr_map(p: int, *, strict: bool = False) -> Tuple[bool, int, List[int]]:
"""
PRP su N=2^p-1 pilotato da r36.
- soft (strict=False): mai bloccare quando r36 non è informativo -> ritorna True.
- strict=True: se r36 è informativo e PRP fallisce → blocca (ritorna False).
Ritorna (ok, r36, bases).
"""
r36, bases = mr_map_info(p)
if not bases:
return True, r36, bases
N = (1 << p) - 1
ok = all(_strong_prp(N, a) for a in bases)
if strict:
return ok, r36, bases
return True, r36, bases # soft: non blocca mai
# ===== Trial factoring “safe” + wheel-30 + MR su q =====
def trial_factor_mersenne(
p: int,
limit: int = 500_000,
*,
require_q_mod8_pm1: bool = True,
) -> Optional[int]:
"""
Cerca q | (2^p-1) con q = 2*k*p + 1, q <= limit.
Filtri: q ≡ 1 o 7 (mod 8) opzionale; wheel-30; MR deterministico su q; poi test pow(2,p,q).
Ritorna q se divide; None altrimenti. Mai falsi negativi.
"""
if p < 3 or limit <= 3:
return None
step = 2 * p
q = step + 1
small = (3,5,7,11,13)
W30 = {1,7,11,13,17,19,23,29}
while q <= limit:
if require_q_mod8_pm1:
r8 = q & 7
if r8 != 1 and r8 != 7:
q += step; continue
if (q % 30) not in W30:
q += step; continue
composite = False
for sp in small:
if q % sp == 0 and q != sp:
composite = True; break
if not composite and is_probable_prime(q) and pow(2, p, q) == 1:
return q
q += step
return None
# ===== ranking pietre (25/35/49) — solo ordinamento =====
def col25(N: int) -> int: return (((N-25)//6)//5) % 5 if (N-25) % 6 == 0 else -1
def col35(N: int) -> int: return (((N-35)//6)//5) % 5 if (N-35) % 6 == 0 else -1
def col49(N: int) -> int: return (((N-49)//6)//7) % 5 if (N-49) % 6 == 0 else -1
def stone_score(p: int) -> int:
if p < 5: return 0
N = (1 << p) - 1
return int(col25(N) >= 0) + int(col35(N) >= 0) + int(col49(N) >= 0)
# ===== self-test / bench =====
KNOWN_P_PRIME = [2,3,5,7,13,17,19,31,61,89,107,127]
KNOWN_P_COMP = [11,23,29,37,41,43,47,53]
@dataclass
class SelfTestResult:
mod_ok: bool; ll_ok: bool; tf_ok: bool
def self_test(rand_cases: int = 800) -> SelfTestResult:
import random
rng = random.Random(1337)
# mersenne_mod
mod_ok = True
for _ in range(rand_cases):
p = rng.randrange(3, 61)
M = (1 << p) - 1
x = rng.getrandbits(2 * p)
if mersenne_mod(x, p) != (x % M):
mod_ok = False; break
# LL
ll_ok = all(lucas_lehmer_cpu(p) for p in KNOWN_P_PRIME) \
and all(not lucas_lehmer_cpu(p) for p in KNOWN_P_COMP)
# trial factoring (pochi check “noti”)
tf_ok = True
if trial_factor_mersenne(11, limit=200_000) not in (23, 89): tf_ok = False
if trial_factor_mersenne(23, limit=500_000) not in (47, 178481): tf_ok = False
return SelfTestResult(mod_ok, ll_ok, tf_ok)
def micro_bench(samples: int = 120, p_min: int = 3000, p_max: int = 8000, block: int = 8) -> float:
import random
rng = random.Random(42)
ps: List[int] = []
while len(ps) < samples:
p = rng.randrange(p_min | 1, (p_max | 1) + 1, 2)
if p >= 7 and (p % 6) not in (1, 5): continue
if not is_probable_prime(p): continue
ps.append(p)
t0 = time.perf_counter()
_ok = 0
for p in ps:
_ok += int(lucas_lehmer_cpu(p, block=block))
dt = time.perf_counter() - t0
return len(ps) / dt if dt > 0 else 0.0
def auto_block(cands=(8,12,16)) -> int:
best_b, best_pps = None, -1.0
for b in cands:
pps = micro_bench(block=b)
if pps > best_pps:
best_pps, best_b = pps, b
print(f"[auto-block] scelto block={best_b} (≈ {best_pps:.2f} p/s)")
return best_b or 8
# ===== worker parallelo =====
def process_one(p: int,
use_mr_map: bool,
mr_strict: bool,
tf_limit: int,
block: int) -> Dict:
t0 = time.perf_counter()
score = stone_score(p)
r36 = None; bases = []
mr_pass = True
if use_mr_map:
ok, r36, bases = mr_map(p, strict=mr_strict)
mr_pass = ok
if mr_strict and not ok:
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=False, tf_factor=0, prime=False, ms=int(dt*1000))
tf_fac = 0
if tf_limit > 0:
fac = trial_factor_mersenne(p, limit=tf_limit, require_q_mod8_pm1=True)
if fac is not None:
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=mr_pass, tf_factor=fac, prime=False, ms=int(dt*1000))
is_prime = lucas_lehmer_cpu(p, block=block)
dt = time.perf_counter() - t0
return dict(p=p, score=score, r36=r36, bases=bases,
mr_pass=mr_pass, tf_factor=tf_fac, prime=is_prime, ms=int(dt*1000))
# ===== CLI =====
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--self-test', action='store_true')
ap.add_argument('--bench', action='store_true')
ap.add_argument('--auto-block', action='store_true')
ap.add_argument('--start', type=int, default=0)
ap.add_argument('--stop', type=int, default=0)
ap.add_argument('--max-n', type=int, default=0)
ap.add_argument('--tf-limit', type=int, default=0)
ap.add_argument('--block', type=int, default=8)
ap.add_argument('--workers', type=int, default=os.cpu_count() or 1)
# MR-map soft/strict (retrocompat soppressa: qui usiamo solo --use-mr-map)
ap.add_argument('--use-mr-map', action='store_true',
help='Attiva il pre-MR su M_p (soft di default).')
ap.add_argument('--mr-strict', action='store_true',
help='Se attivo, MR-map può escludere p quando r36 è informativo e fallisce il PRP.')
ap.add_argument('--trace-map', action='store_true',
help='Stampa p,r36,basi,pass/fail MR-map.')
ap.add_argument('--rank-stone', action='store_true',
help='Ordina i candidati per punteggio 25/35/49.')
ap.add_argument('--out-csv', type=str, default='',
help='Scrivi risultati in CSV.')
ap.add_argument('--stop-on-first', action='store_true',
help='Ferma la scansione al primo LL-positivo.')
args = ap.parse_args()
if args.self_test:
res = self_test()
print(f"mod_ok={res.mod_ok} ll_ok={res.ll_ok} tf_ok={res.tf_ok}")
raise SystemExit(0 if (res.mod_ok and res.ll_ok and res.tf_ok) else 1)
if args.bench:
pps = micro_bench(block=args.block)
accel = " (gmpy2 ON)" if HAVE_GMP else ""
print(f"LL throughput: {pps:.2f} p/s{accel}")
raise SystemExit(0)
if args.auto_block:
args.block = auto_block()
if not (args.start and args.stop and args.start <= args.stop):
print("Usa --self-test / --bench / --auto-block oppure specifica --start e --stop.")
raise SystemExit(0)
# costruiamo la lista candidati (safe)
cands: List[int] = []
for p in range(max(3, args.start) | 1, (args.stop | 1) + 1, 2):
if not exponent_filter(p):
continue
cands.append(p)
if args.rank_stone:
cands.sort(key=stone_score, reverse=True)
# logging
writer = None; fh = None
if args.out_csv:
fh = open(args.out_csv, 'w', newline='')
writer = csv.writer(fh)
writer.writerow(['p','stone_score','r36','bases','mr_pass','tf_fact or','prime','ms'])
# parallelo
total = len(cands)
done = 0
found = 0
t0_all = time.perf_counter()
# funzione di submit
def submit_all(executor):
futs = []
for p in cands:
futs.append(executor.submit(process_one, p,
args.use_mr_map, args.mr_strict,
args.tf_limit, args.block))
return futs
if args.workers <= 1:
# sequenziale
for p in cands:
rec = process_one(p, args.use_mr_map, args.mr_strict, args.tf_limit, args.block)
done += 1
if args.trace_map and rec['r36'] is not None:
print(f"[map] p={rec['p']} r36={rec['r36']} bases={rec['bases']} pass={rec['mr_pass']}")
if writer:
writer.writerow([rec['p'], rec['score'], rec['r36'], rec['bases'],
rec['mr_pass'], rec['tf_factor'], rec['prime'], rec['ms']])
if rec['prime']:
found += 1
print(f"p={rec['p']} -> M_p PRIMO (in {rec['ms']} ms)")
if args.stop_on_first:
break
else:
with ProcessPoolExecutor(max_workers=args.workers) as ex:
for fut in as_completed(submit_all(ex)):
rec = fut.result()
done += 1
if args.trace_map and rec['r36'] is not None:
print(f"[map] p={rec['p']} r36={rec['r36']} bases={rec['bases']} pass={rec['mr_pass']}")
if writer:
writer.writerow([rec['p'], rec['score'], rec['r36'], rec['bases'],
rec['mr_pass'], rec['tf_factor'], rec['prime'], rec['ms']])
if rec['prime']:
found += 1
print(f"p={rec['p']} -> M_p PRIMO (in {rec['ms']} ms)")
if args.stop_on_first:
break
if fh: fh.close()
dt = time.perf_counter() - t0_all
pps = done / dt if dt > 0 else 0.0
print(f"\nProcessati {done}/{total} esponenti in {dt:.2f}s (~{pps:.2f} p/s). Primi trovati: {found}.")
if __name__ == "__main__":
main()

