main.py
), com suporte completo a CRUD de usuários e produtos.SQLite
User
)Product
)mkdir fastapi_rest_api
cd fastapi_rest_api
main.py
touch main.py
python -m venv venv
venv\Scripts\activate # Windows
source venv/bin/activate # Linux/macOS
pip install fastapi uvicorn sqlalchemy pydantic
main.py
main.py
:from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr
from typing import List
from sqlalchemy import Column, Integer, String, Float, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base, Session
import os
# ======= CONFIGURAÇÃO DO BANCO DE DADOS =======
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ======= MODELOS SQLALCHEMY =======
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Float)
# Criar tabelas no primeiro uso
if not os.path.exists("test.db"):
Base.metadata.create_all(bind=engine)
# ======= ESQUEMAS Pydantic =======
class UserCreate(BaseModel):
name: str
email: EmailStr
class UserResponse(UserCreate):
id: int
class Config:
from_attributes = True
class ProductCreate(BaseModel):
name: str
price: float
class ProductResponse(ProductCreate):
id: int
class Config:
from_attributes = True
# ======= APLICAÇÃO FastAPI =======
app = FastAPI(
title="API de Gerenciamento (SQLite + FastAPI)",
description="CRUD completo de usuários e produtos com banco de dados SQLite",
version="1.0.0"
)
# ======= ROTAS: USERS =======
@app.post("/users", response_model=UserResponse)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
return user
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
db_user.name = user.name
db_user.email = user.email
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
db.delete(user)
db.commit()
return {"message": "Usuário removido com sucesso"}
@app.get("/users", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
# ======= ROTAS: PRODUCTS =======
@app.post("/products", response_model=ProductResponse)
def create_product(product: ProductCreate, db: Session = Depends(get_db)):
db_product = Product(name=product.name, price=product.price)
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
@app.get("/products/{product_id}", response_model=ProductResponse)
def get_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
return product
@app.put("/products/{product_id}", response_model=ProductResponse)
def update_product(product_id: int, product: ProductCreate, db: Session = Depends(get_db)):
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
db_product.name = product.name
db_product.price = product.price
db.commit()
db.refresh(db_product)
return db_product
@app.delete("/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
db.delete(product)
db.commit()
return {"message": "Produto removido com sucesso"}
@app.get("/products", response_model=List[ProductResponse])
def list_products(db: Session = Depends(get_db)):
return db.query(Product).all()
uvicorn main:app --reload
User
)Product
)fastapi_app/
├── main.py # Ponto de entrada da aplicação
├── database.py # Conexão e configuração do banco
├── models.py # Modelos SQLAlchemy
├── schemas.py # Schemas Pydantic
├── create_db.py # Script para criar as tabelas
├── test.db # Banco SQLite (gerado automaticamente)
└── requirements.txt # Lista de dependências
mkdir fastapi_app
cd fastapi_app
python -m venv venv
venv\Scripts\activate # Windows
source venv/bin/activate # Linux/macOS
pip install fastapi uvicorn sqlalchemy pydantic
pip freeze > requirements.txt
type nul > main.py
type nul > database.py
type nul > models.py
type nul > schemas.py
type nul > create_db.py
touch main.py database.py models.py schemas.py create_db.py
database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
models.py
from sqlalchemy import Column, Integer, String, Float
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Float)
schemas.py
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
name: str
email: EmailStr
class UserResponse(UserCreate):
id: int
class Config:
from_attributes = True
class ProductCreate(BaseModel):
name: str
price: float
class ProductResponse(ProductCreate):
id: int
class Config:
from_attributes = True
create_db.py
from database import Base, engine
from models import User, Product
Base.metadata.create_all(bind=engine)
python create_db.py
main.py
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from typing import List
from database import get_db
from models import User, Product
from schemas import UserCreate, UserResponse, ProductCreate, ProductResponse
app = FastAPI(
title="API de Gerenciamento",
description="CRUD de usuários e produtos com FastAPI e SQLite",
version="1.0.0"
)
# ==== USERS ====
@app.post("/users", response_model=UserResponse)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
return user
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
db_user.name = user.name
db_user.email = user.email
db.commit()
db.refresh(db_user)
return db_user
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuário não encontrado")
db.delete(user)
db.commit()
return {"message": "Usuário removido com sucesso"}
@app.get("/users", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
# ==== PRODUCTS ====
@app.post("/products", response_model=ProductResponse)
def create_product(product: ProductCreate, db: Session = Depends(get_db)):
db_product = Product(name=product.name, price=product.price)
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
@app.get("/products/{product_id}", response_model=ProductResponse)
def get_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
return product
@app.put("/products/{product_id}", response_model=ProductResponse)
def update_product(product_id: int, product: ProductCreate, db: Session = Depends(get_db)):
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
db_product.name = product.name
db_product.price = product.price
db.commit()
db.refresh(db_product)
return db_product
@app.delete("/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Produto não encontrado")
db.delete(product)
db.commit()
return {"message": "Produto removido com sucesso"}
@app.get("/products", response_model=List[ProductResponse])
def list_products(db: Session = Depends(get_db)):
return db.query(Product).all()
uvicorn main:app --reload
from fastapi import FastAPI
from sqlalchemy.orm import Session
from pydantic import BaseModel
class UserResponse(BaseModel):
name: str
age: int
@app.get("/user/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session):
user = db.query(UserModel).filter(UserModel.id == user_id).first()
return user
{
"name": "João",
"age": 30
}