alabuga/backend/app/services/python_mission.py
2025-09-29 11:59:25 -06:00

182 lines
5.7 KiB
Python

"""Логика миссии по Python."""
from __future__ import annotations
import subprocess
import sys
from datetime import datetime
from textwrap import dedent
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models.mission import Mission, MissionSubmission
from app.models.python import PythonChallenge, PythonSubmission, PythonUserProgress
from app.models.user import User
from app.schemas.python import PythonMissionState, PythonChallengeRead, PythonSubmissionRead
from app.services.mission import submit_mission
EVAL_TIMEOUT_SECONDS = 3
def _normalize_stdout(value: str | None) -> str:
if value is None:
return ""
return value.replace('\r\n', '\n').rstrip('\n').strip()
def get_progress(db: Session, user: User, mission: Mission) -> PythonUserProgress:
progress = (
db.query(PythonUserProgress)
.filter(PythonUserProgress.user_id == user.id, PythonUserProgress.mission_id == mission.id)
.first()
)
if not progress:
progress = PythonUserProgress(user_id=user.id, mission_id=mission.id, current_order=0)
db.add(progress)
db.commit()
db.refresh(progress)
return progress
def build_state(db: Session, user: User, mission: Mission) -> PythonMissionState:
progress = get_progress(db, user, mission)
challenges = (
db.query(PythonChallenge)
.filter(PythonChallenge.mission_id == mission.id)
.order_by(PythonChallenge.order)
.all()
)
total = len(challenges)
completed = progress.current_order
next_challenge: Optional[PythonChallenge] = None
last_code: Optional[str] = None
last_submissions: list[PythonSubmission] = []
if completed < total:
next_challenge = challenges[completed]
last_submission = (
db.query(PythonSubmission)
.filter(
PythonSubmission.progress_id == progress.id,
PythonSubmission.challenge_id == next_challenge.id,
)
.order_by(PythonSubmission.created_at.desc())
.first()
)
if last_submission:
last_code = last_submission.code
else:
last_submission = (
db.query(PythonSubmission)
.filter(PythonSubmission.progress_id == progress.id)
.order_by(PythonSubmission.created_at.desc())
.first()
)
if last_submission:
last_code = last_submission.code
last_submissions = (
db.query(PythonSubmission)
.filter(PythonSubmission.progress_id == progress.id)
.order_by(PythonSubmission.created_at.desc())
.limit(5)
.all()
)
return PythonMissionState(
mission_id=mission.id,
total_challenges=total,
completed_challenges=completed,
is_completed=completed >= total,
next_challenge=PythonChallengeRead.model_validate(next_challenge) if next_challenge else None,
last_submissions=[PythonSubmissionRead.model_validate(item) for item in last_submissions],
last_code=last_code,
)
def submit_code(db: Session, user: User, mission: Mission, challenge_id: int, code: str) -> PythonSubmission:
progress = get_progress(db, user, mission)
challenge = (
db.query(PythonChallenge)
.filter(PythonChallenge.id == challenge_id, PythonChallenge.mission_id == mission.id)
.first()
)
if not challenge:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Задание не найдено")
expected_order = progress.current_order + 1
if challenge.order != expected_order:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Сначала выполните предыдущее задание",
)
prepared_code = dedent(code)
try:
completed = subprocess.run(
[sys.executable, "-c", prepared_code],
input=(challenge.input_data or ""),
capture_output=True,
text=True,
timeout=EVAL_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Время выполнения превышено")
stdout = completed.stdout or ""
stderr = completed.stderr or ""
expected = _normalize_stdout(challenge.expected_output)
actual = _normalize_stdout(stdout)
is_passed = completed.returncode == 0 and actual == expected
submission = PythonSubmission(
progress_id=progress.id,
challenge_id=challenge.id,
code=prepared_code,
stdout=stdout,
stderr=stderr,
is_passed=is_passed,
)
db.add(submission)
if is_passed:
progress.current_order = challenge.order
total = (
db.query(PythonChallenge)
.filter(PythonChallenge.mission_id == mission.id)
.count()
)
if progress.current_order >= total:
progress.completed_at = datetime.utcnow()
ensure_mission_completed(db, user, mission)
db.commit()
db.refresh(submission)
db.refresh(progress)
return submission
def ensure_mission_completed(db: Session, user: User, mission: Mission) -> None:
existing_submission = (
db.query(MissionSubmission)
.filter(MissionSubmission.user_id == user.id, MissionSubmission.mission_id == mission.id)
.first()
)
if existing_submission and existing_submission.status.value == "approved":
return
submit_mission(
db=db,
user=user,
mission=mission,
comment="Задачи Python-миссии решены",
proof_url=None,
)