from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from sqlalchemy import Column, Integer, String, Float, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base, Session
from datetime import datetime, timedelta
import os
# ======= CONFIGURAÇÕES JWT =======
SECRET_KEY = "seu_segredo_supersecreto"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ======= CRIPTOGRAFIA DE SENHA =======
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain, hashed):
return pwd_context.verify(plain, hashed)
def get_password_hash(password):
return pwd_context.hash(password)
# ======= MODELOS JWT =======
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ======= BANCO DE DADOS =======
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ======= MODELOS SQLALCHEMY =======
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Float)
if not os.path.exists("test.db"):
Base.metadata.create_all(bind=engine)
# ======= SCHEMAS =======
class UserCreate(BaseModel):
name: str
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
name: str
email: EmailStr
class Config:
from_attributes = True
class ProductCreate(BaseModel):
name: str
price: float
class ProductResponse(ProductCreate):
id: int
class Config:
from_attributes = True
# ======= AUTENTICAÇÃO =======
def authenticate_user(db: Session, email: str, password: str):
user = db.query(User).filter(User.email == email).first()
if not user or not verify_password(password, user.hashed_password):
return None
return user
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.email == token_data.email).first()
if user is None:
raise credentials_exception
return user
# ======= APP FastAPI =======
app = FastAPI(title="API com Segurança JWT")
# ======= ROTA: REGISTRO =======
@app.post("/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
hashed = get_password_hash(user.password)
db_user = User(name=user.name, email=user.email, hashed_password=hashed)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# ======= ROTA: LOGIN E TOKEN =======
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Credenciais inválidas")
token = create_access_token({"sub": user.email}, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": token, "token_type": "bearer"}
# ======= ROTAS USERS (PROTEGIDAS) =======
@app.get("/users/me", response_model=UserResponse)
def read_current_user(current_user: User = Depends(get_current_user)):
return current_user
@app.get("/users", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
return db.query(User).all()
# ======= ROTAS PRODUCTS (PROTEGIDAS) =======
@app.post("/products", response_model=ProductResponse)
def create_product(product: ProductCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
db_product = Product(name=product.name, price=product.price)
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
@app.get("/products", response_model=List[ProductResponse])
def list_products(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
return db.query(Product).all()