강의를 마무리하는 시점에서 첨언을 드리자면, 이거는 AI아바타를 만든다기보다 그냥 이미지 여러개를 번갈아 돌리면서 TTS로 생성된 음성파일을 들려주는 수준의 강좌입니다. 뭔가 입모양이 움직이고 3D로 새로운 얼굴의 아바타가 생성되는 그런 마법같은 일은 생기지 않습니다. 강의 마지막부분에서 결과물에 상당히 실망할수도 있음을 미리 알려드립니다. 그래도 문자열을 음성으로 변환하거나, 이미지에서 얼굴부분만 가져오는 라이브러리등은 알아두면 좋을것 같아요.
안녕하세요. 얼마전 Synthesia라는 AI동영상 생성툴을 발견해서 테스트해보던중에 AI아바타를 만들수 있지는 않을까 하는 호기심에 찾아봤는데 Creating an AI Avatar라는 아티클을 발견해서 함께 시도해보면 어떨까 싶어서 강좌를 시작합니다. 저는 AI나 머신러닝등에는 지식이 전무해서 따라하면서도 설명이 살짝 부족할 수도 있어요. 그래도 코드를 보면서 최대한 열심히 설명을 해보도록하겠습니다.
이번시간에는 Python을 이용하여, 여러개의 이미지 속에서 얼굴만 모아서 하나의 이미지에 나란히 붙이는 코딩을 해보도록 하겠습니다. 아래와 같이 코딩에 필요한 라이브러리를 설치해주세요.
pip install numpy dlib opencv-python pillow
collect_faces.py라는 파일을 하나 생성합니다. 여기에서 사용한 라이브러리는 opencv-python의 cv2, dlib, pillow의 Image, 그리고 numpy입니다. 필요한 라이브러리를 코드에 포함시켜주세요.
import cv2
import dlib
from PIL import Image
import numpy as np
그리고 얼굴이 들어있는 이미지 3개를 준비해서 같은 폴더에 저장해주시고 해당 이미지명들을 배열안에 넣어서 아래와 같이 변수에 저장해주세요.
이미지에서 얼굴을 추출해서 반환하는 함수를 만들겠습니다. 일단 함수 안에서 사용할 얼굴인식 모듈은 코드가 무거우니까 함수 윗쪽에 한개만 선언해놓고 함수 안에서 계속 갖다 쓰는걸로 할게요. 인자로 이미지경로를 받고 cv2라이브러리로 해당이미지를 읽어서 dlib라이브러리가 제공하는 얼굴 검출 기능을 사용하여 이미지 안의 얼굴들을 떠냅니다. 이미지 안에 얼굴이 1개 이상 있는 경우를 생각해서 첫번째 얼굴을 반환하는걸로 할게요. 이때 검출한 얼굴 이미지는 칼라포멧이 BGR포멧입니다. cv2.cvtColor()함수에 변환필터 cv2.COLOR_BGR2RGB를 명시해서 BGR을 RGB포멧으로 변환해서 반환하도록 할게요.
detector = dlib.get_frontal_face_detector()
def extract_face (image_path):
image = cv2.imread(image_path)
faces = detector(image)
if len (faces) > 0 :
face = faces[ 0 ] # 발견된 첫 번째 얼굴 추출
x, y, w, h = face.left(), face.top(), face.width(), face.height()
face_image = image[y:y+h, x:x+w]
return cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
else :
return None
이제 생성한 함수를 호출하여 이미지 안의 얼굴들을 모아볼게요. 함수에서 반환받은 얼굴들을 모아 저장할 배열 faces를 하나 정의합니다. 그리고 이미지들을 돌면서 방금만든 extract_face()함수에 이미지경로를 넘겨주어 호출합니다. face가 null이 아니면 faces 배열에 넘겨받은 얼굴을 추가합니다.
faces = []
for path in image_paths:
face = extract_face(path)
if face is not None :
faces.append(face)
이번에는 배열에 모은 얼굴들을 하나로 합성해서 새로운 얼굴을 만들어 볼게요. numpy의 hstack()함수에 얼굴배열을 넘겨주어 하나로 통합하는 코드입니다.
if faces:
composite_face = np.hstack(faces)
composite_image = Image.fromarray(composite_face)
composite_image.save('composite_faces.jpg')
composite_image.show()
else :
print ( "이미지에서 얼굴이 감지되지 않았습니다." )
아래는 완성된 코드입니다.
import cv2
import dlib
from PIL import Image
import numpy as np
# dlib 얼굴 검출기 로드
detector = dlib.get_frontal_face_detector()
# 이미지에서 얼굴을 추출하는 함수
def extract_face(image_path):
image = cv2.imread(image_path)
faces = detector(image)
if len(faces) > 0:
face = faces[0] # 발견된 첫 번째 얼굴 추출
x, y, w, h = face.left(), face.top(), face.width(), face.height()
face_image = image[y:y+h, x:x+w]
return cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
else :
return None
# 이미지 경로
image_paths = ['image1.png', 'image2.png', 'image3.png']
# 얼굴을 추출하여 저장
faces = []
for path in image_paths:
face = extract_face(path)
if face is not None :
faces.append(face)
# 얼굴을 수평으로 쌓아 합성 이미지 생성
if faces:
composite_face = np.hstack(faces)
composite_image = Image.fromarray(composite_face)
composite_image.save('composite_faces.jpg')
composite_image.show()
else :
print ( "이미지에서 얼굴이 감지되지 않았습니다." )
안녕하세요. 이번시간에는 Google에서 제공하는 API를 이용하는 방법에 대해서 알아보도록 하겠습니다.
Create Project
Google의 API들을 사용하시려면 Google Cloud에 프로젝트를 생성하셔야합니다. Google Cloud의 APIs & Services대시보드를 열어주세요. 그리고 상단에 프로젝트 이름을 클릭하면 현재 프로젝트들이 보이는 팝업을 뜨는데요.
저는 기존에 있는 프로젝트가 2016년도에 만들어진 것들이라서 해당 프로젝트로 API에 접근하려고 보니까 API사용 quota가 다 소진되었다고 나와요. 하루 10,000번의 request가 가능한데 저 2번 요청했거든요. 근데 찾아보니까 옛날에 만들어진 프로젝트에서 나오는 오류라고 하더라구요. 그래서 저도 이번에 프로젝트를 새로 만들었습니다. 여기에서 상단에 NEW PROJECT버튼을 누르세요.
그러면 프로젝트 생성하는 양식이 나타납니다. 프로젝트 이름을 넣고 Create버튼을 눌러주세요.
그러면 해당 프로젝트의 대시보드가 눈앞에 펼쳐집니다.
Enable APIs
왼쪽 메뉴에서 Library를 누르세요. 그러면 사용가능한 모든 API를 보여줍니다.
앞으로 하나하나 다 사용해보겠지만 우선 YouTube Data API v3를 선택할게요. 밑으로 스크롤 내리다보면 YouTube섹션에 가장 처음 나오는 API인데 검색해도 나오니까 선택해주세요. 그러면 API에 대한 상세한 설명이 나오는데 여기에서 ENABLE버튼을 클릭해주세요. 다른 API도 필요하실때 라이브러리에서 Enable을 시켜주셔야 사용이 가능합니다.
Enable시키면 아래와 같이 API사용이 허용됩니다.
Authentication
Google의 API를 사용하려면 우선 인증키가 필요합니다. 인증을 하는 방법에는 간단하게 API Key를 사용하는 방법과 OAuth를 사용하는 방법 두가지가 있습니다. Google Cloud의 APIs & Services에 들어가면 아래 화면과 같이 Credential들을 확인하실 수 있습니다.
API Key
Create API Key
API Key를 생성하시려면 상단에 +Create Credentials을 클릭하면 API Key를 생성할지 OAuth Client ID를 생성할지를 물어봅니다. 여기서 API Key를 선택합니다.
그러면 다른거 안물어보고 바로 API Key를 하나 생성해서 보여줍니다.
위에 경고문에 이 API key를 갖고 뭐든지 할수 있으니 권한을 조정하시오라고 써있습니다. Edit API Key를 눌러서 허용범위를 좁혀볼게요. 접근할 수 있는 클라이언트는 제한을 두지 않았고, 접근가능한 API는 YouTube Data API v3만 허용하는 걸로 변경했습니다.
API Key를 이용한 API호출
그래도 구글인데 ?key=API_KEY를 쿼리스트링으로 암호화없이 그냥 보낸다고? 이건 진짜..큰일나는데…구글이 왜그랬지? 그밑에는 헤더에 넣어서 보내긴 하지만 헤더도 얼마든지 해킹이 가능한데..이건 진짜 너무 심했다…서비스가 너무 커서 예전 인증방식을 업그레이드 하는게 시간이 많이 걸렸나? 아직도 API_KEY방식을 지원하는 것도 좀 우습고..돈도 잘버는데 개발자들 팍팍 써서 싹 다 갈아엎어 버리지 이걸 그냥 이렇게 쓰냐….쯪….
APIs & Services > Credentials에 들어가시면 API Key목록이 보입니다. 오른쪽 점세개 누르시면 Delete API Key메뉴가 뜰거에요. 그거 누르세요.
진짜 지울거냐고 물어보고
DELETE버튼 누르면 텍스트상자에 DELETE라고 입력하라고 나와요.
DELETE라고 쓰고 밑에 DELETE버튼 누르면 API Key가 삭제됩니다. 사용하지 않는 API Key들은 전부 삭제하시는게 보안상 안전합니다.
OAuth
API Key를 매번 사용하는 것은 보안상 매우 취약하기 때문에 가급적 OAuth Client ID를 이용해서 토큰을 받아와서 그걸로 API를 호출하는 방식으로 하는것을 추천드립니다.
Create OAuth client ID
API Key와 마찬가지로 좌측 Credentials메뉴를 클릭하고 들어간 화면의 상단에 + CREATE CREDENTIALS을 클릭하면 아래와 같이 어떤 서비스에 OAuth를 사용할지를 물어봅니다. 필요에 따서 앱타입을 선택하시고요.
앱의 이름도 적어줍니다.
필요한 정보를 넣고 CREATE버튼을 클릭하면, 다음과 같이 Client ID와 Client Secret을 만들어 줍니다.
Scope정하기
OAuth를 사용하려면 접근가능한 범위를 지정해야합니다. Google Auth Platform > Data Access를 열어보시면 아래와 같은 화면이 뜹니다.
중간에 ADD OR REMOVE SCOPES를 클릭합니다. 그러면 어떤걸 접근을 풀어주고, 어떤걸 접근을 막을지 선택하는 곳입니다. 저는 YouTube Data API v3에서 읽는것만 일단 허용하도록 하겠습니다. 선택하고 나서 반드시 밑에 숨어있는 UPDATE버튼을 반드시 눌러주셔야합니다. 안그러면 저장이 되지 않습니다. 그리고 팝업을 닫고 Save버튼을 눌러서 변경된 Data Access내용들을 저장해주세요. Scope을 어떤걸 선택했는지는 따로 노트해두세요. 나중에 인증할때 필요합니다. 저는 테스트니까 안전하게 YouTube Data API v3의 .../auth/youtube.readonly만 선택하도록 할게요.
Redirect URI정하기
Google Auth Platform에서 좌측메뉴 Clients를 클릭하시면 현재 소유하고 있는 Client들의 목록이 나오는데요
이중에 사용하고 싶은 클라이언트를 선택하세요. 그러면 아래와 같이 클라이언트에 대한 상세정보가 나타납니다.
여기에서 + ADD URI버튼을 클릭해주세요. 그리고 나타난 텍스트상자에 인증코드를 받아서 돌아갈 주소를 넣어주세요. 그리고 SAVE버튼을 눌러서 저장합니다. 그리고 방금 저장한 Redirect URI는 인증코드를 받을 때 필요하니까 어딘가에 노트를 해두세요.
Test User추가하기
인증테스트를 하려면 앱을 출시하기 전에 접근을 할수 있는 사용자를 등록해야 테스트를 할수 있습니다. APIs & Services의 OAuth consent screen을 클릭하시면 테스트 사용자를 등록할 수 있습니다.
+ADD USERS버튼을 누르면 슬라이드팝업이 뜨는데 여기에 접근을 허용하고자 하는 이메일을 입력한 뒤 SAVE버튼을 클릭하세요. 그러면 아래와 같이 테스트사용자가 들어갑니다.
Access Token 받아오기
타 서비스의 OAuth인증과 마찬가지로 API토큰을 받아오기 위해서 일단 Code를 받아와야합니다. 여기에서 방금 선택한 Scope과 Redirect URL,그리고 Client ID를 다른 파라메터들과 함께 넘겨줍니다. 지금은 테스트하는거니까 코드로 안하고 그냥 브라우저 주소창에 호출할게요.
이제 토큰을 가지고 API를 호출해볼까요? Google API Manual에 가면 Authorization하는 부분이 있는데 거기에서 API를 실행할 수 있는 양식이 준비되어 있습니다.
텍스트상자중에 access_token을 찾아서 방금 받아온 Access Token을 입력하신 뒤에 맨 밑에 Execute버튼을 누르세요. 이때 API Key체크상자는 체크를 해지한뒤에 눌러주세요.
그러면 진행할 계정을 선택하라고 나옵니다. 사용하고자 했던 클라이언트를 만든 계정을 선택하세요.
계속하면 개인정도등이 공유된다고 경고메세지를 보여줍니다. Continue를 눌러주세요.
그러면 본인의 계정에 접속하여 갖가지 데이타를 추가, 수정, 삭제할 수 있는 권한을 이 클라이언트 앱에 할당 할지를 물어봅니다. Allow버튼을 눌러주세요.
위에서 Allow버튼을 누르면 팝업이 닫히고 APIs Explorer의 Execute버튼 밑으로 API결과를 보여줍니다.
API Key를 없애려는 구글의 노력
프로젝트를 생성하면 바로 APIs & Services 의 Enabled APIs & services로 트래픽현황을 볼수 있는데요. 상단에 CREATE CREDENTIALS이 큼지막하게 보입니다. 아마도 좌측메뉴를 눌러서 API Key를 생성하기 전에 OAuth로 인증을 받도록 유도하려는 의도인것 같습니다.
Credentials > Clients
APIs & Services > Credentials은 이제 사라지고 앞으로는 Google Auth Platform > Clients에서 관리가 될것이라고 공지가 뜨네요.
OAuth consent screen > Audience
APIs & Services > OAuth consent screen도 이제 Google Auth Platform > Audience로 옮겨간다고 합니다.
Create Credentials는 OAuth만 지원
위의 화면에서 CREATE CREDENTIALS버튼을 클릭해보면 아시겠지만 인증종류를 물어보지도 않고 그냥 OAuth로 정해서 생성하도록 구성되어있습니다.
Credential Type
OAuth Consent Screen
Scopes
OAuth Client ID
Your Credentials
몇가지 화면만 봐도 API Key는 앞으로 사라지게 될것 같네요.
마무리
여기까지 Google의 API에 접근해서 데이타를 가져오는 방법에 대해서 간략하게 설명드렸습니다. OAuth로 생성한 Client ID를 제대로 활용하려면 지정한 도메인에서 돌아가는 Web앱이나, iOS, 또는 안드로이드 앱이 있어야 합니다. 사용하려는 곳의 출처를 웹앱이라면 도메인으로 소유권을 증명하고, 모바일 앱은 앱스토어 아이디등으로 누가 어디서 사용하는지를 명확히 명시해야합니다. 아마도 API를 악용하는 경우를 방지하기 위해서 2차 3차 검증을 하는 것으로 사료됩니다. API를 이용할 수 있는 웹앱이 만들어지면 그때 코드로 호출하는 방법에 대해서 알아보는 시간을 갖도록하겠습니다. 수고하셨습니다. 좋은 밤되세요!
그러면 컴퓨터에 qpdf를 설치하신 후에 파이썬 패키지 pikepdf를 설치해주셔야합니다.
brew install qpdf
pip install pikepdf
brew install이 너무 오래걸리면 Ctrl+C로 취소하시고 brew update를 해주신 뒤에 다시 설치하시면 됩니다. 그래도 오래걸리면 그냥 기다리는 수밖에 없어요 ㅠㅜ
설치가 다 끝났으면 코드를 만들어 볼게요. 일단 이미지 3개를 1.png, 2.png, 3.png로 저장해주세요. 그리고 아래와 같이 코드를 써서 app.py로 저장합니다. 파이썬파일명은 아무거나 해도 되는데 img2pdf.py로는 하시면 안되요. 그러면 패키지 img2pdf를 가져오지 않고 내가 만든 파일에서 img2pdf기능을 찾으려고 하거든요.
import img2pdf
from PIL import Image
image_paths = ["1.png", "2.png", "3.png"]
output_pdf_path = "output.pdf"
try:
with open(output_pdf_path, "wb") as pdf_file:
pdf_file.write(img2pdf.convert(image_paths))
except Exceptioin as e:
raise PDFDocError(e)
실행해볼까요?
python app.py
제 PDF파일은 너무나도 완벽하게 잘 만들어졌습니다. 여러분들도 큰문제 없이 잘 변환이 되었기를 바래요. 그럼 다음시간에 만나요!
from sqlalchemy import create_engine
from sqlalchemy import text
uri = 'mysql://DB_USER:DB_PASS@DB_HOST/DB_NAME'
engine = create_engine(uri, echo=False)
코드를 실행하다보면 connection이 끊어질때가 있는데 그때마다 engine을 다시 연결해주시면 됩니다.
그리고 이전시간에 사용했던 테이블을 계속해서 사용할 건데요. 테이블이 데이타베이스에 없으신 분들은 아래 코드를 실행하여 생성해주시기 바랍니다.
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy import ForeignKey
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String(30), nullable=False)
fullname = mapped_column(String(30))
addresses = relationship("Address", back_populates="user")
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user_account.id"))
email_address = mapped_column(String(30), nullable=True)
user = relationship("User", back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r}, user_id={self.user_id!r}, email_address={self.email_address!r})"
# 테이블 생성
Base.metadata.create_all(engine)
# 생성한 테이블 삭제
Base.metadata.drop_all(engine)
select() SQL 표현식 구조
select문도 insert와 마찬가지로 statement을 먼저 만든후에 실행을 하게 되는데요. select()함수의 statement를 보시면 아래와 같습니다. 아래는 이름이 “spongebob”인 사용자의 레코드를 가져오는 코드입니다.
from sqlalchemy import select
stmt = select(User).where(User.name == "spongebob")
>>> print(stmt)
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name = :name_1
위의 User는 ORM으로 정의 되었지만 ORM없이 mapper()를 통하여 mapping을 하는 경우 Table()로 정의된 변수로 대체하여 실행해도 같은 결과가 나옵니다. 다만 Table객체는 칼럼이 .c안에 들어가 있으므로 Table.c.칼럼명 이렇게 해 주셔야 에러없이 실행이 됩니다. 결과는 ORM통해서 실행했을때와 완전 동일하게 나옵니다.
from sqlalchemy import select
stmt = select(user_table).where(user_table.c.name == "spongebob")
>>> print(stmt)
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name = :name_1
위와 같이 statement을 만들고나서 engine에 connect해서 해당 statement을 execute()함수로 실행을 시켜야 결과를 반환합니다.
with engine.connect() as conn:
for row in conn.execute(stmt):
print(row)
위와 같은 코드를 실행하면 아래와 같이 spongebob의 레코드를 볼수 있습니다. Model은 mapped_column이나 relationship또는 DeclarativeBase등을 사용하여 베이스를 만들었지만 사실상 ORM의 Session으로 실행하지 않으면 온전히 ORM을 사용한다고 볼수가 없습니다. 그런 의미에서 아래 나오는 결과는 쿼리의 결과이지 ORM이 넘겨주는 객체의 결과물은 아니에요.
(1, 'spongebob', 'Spongebob Squarepants')
ORM을 사용할때는 engine에서 connect를 하지 않고, Session을 통해서 쿼리를 하는데요.
from sqlalchemy.orm import Session
stmt = select(User).where(User.name == "spongebob")
with Session(engine) as session:
for obj_user in session.execute(stmt):
print(obj_user)
위와 같이 Session을 사용하는 경우 반환되는 레코드가 User모델객체이기 때문에 print를 했을때 위에서 선언한 User.__repr__()함수에서 정의한 대로 결과를 출력하니까 여기서 보여주지 않아도 되는 필드는 빼거나 보여주고 싶은 항목은 추가하시면 됩니다. 위의 ORM 코드를 실행하면 다음과 같습니다.
from sqlalchemy import create_engine
from sqlalchemy import text
uri = 'mysql://DB_USER:DB_PASS@DB_HOST/DB_NAME'
engine = create_engine(uri, echo=False)
코드를 실행하다보면 connection이 끊어질때가 있는데 그때마다 engine을 다시 연결해주시면 됩니다.
그리고 이전시간에 사용했던 테이블을 계속해서 사용할 건데요. 테이블이 데이타베이스에 없으신 분들은 아래 코드를 실행하여 생성해주시기 바랍니다.
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy import ForeignKey
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String(30), nullable=False)
fullname = mapped_column(String(30))
addresses = relationship("Address", back_populates="user")
def __repr__(self) -> str:
return f"User(id={self.id!r})"
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user_account.id"))
email_address = mapped_column(String(30), nullable=True)
user = relationship("User", back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r})"
# 테이블 생성
Base.metadata.create_all(engine)
# 생성한 테이블 삭제
Base.metadata.drop_all(engine)
INSERT 문 사용하기
insert를 구현하는 가장 간단한 방법은 insert()함수를 사용하는 것입니다.
from sqlalchemy import insert
stmt = insert(User).values(name="spongebob", fullname="Spongebob Squarepants")
위에 코드는 insert를 실행한것은 아니구요. Insert를 하는 명령어를 stmt에 저장한 거에요. 출력해보면 다음과 같습니다.
>>> print(stmt)
INSERT INTO user_account (name, fullname) VALUES (:name, :fullname)
위의 statement을 컴파일하면 다음과 같이 넘겨주는 매개변수들을 parsing할 수 있습니다.
외국문서 보면 말이 거창해서 겁부터 나는데요. 알고 보면 너무 당연하고 상당히 쉬운 컨셉의 내용들이 참 많습니다. 여러분께 조금이나마 도움이 되는 문서가 되길 바랍니다. 본 문서는 SQLAlchemy매뉴얼의 SQLAlchemy 1.4 / 2.0 튜토리얼의 일부를 발췌하여 번역한 내용입니다.
Establishing Connectivity – the Engine
기본적으로 가지고 계신 DB에 연결을 우선 먼저 할게요.
from sqlalchemy import create_engine
from sqlalchemy import text
uri = 'mysql://DB_USER:DB_PASS@DB_HOST/DB_NAME'
engine = create_engine(uri, echo=False)
# 커넥션 테스트
with engine.connect() as conn:
result = conn.execute(text("select 'hello world'"))
print(result.all())
Working with Transactions and the DBAPI
연결이 잘 되었으면 테스트로 테이블을 하나 만들어 볼게요. DDL은 수동으로 커밋하도록 합니다.
with engine.connect() as conn:
conn.execute(text("CREATE TABLE some_table (x int, y int)"))
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 1, "y": 1}, {"x": 2, "y": 4}],
)
conn.commit()
아래는 자동커밋의 예제입니다. begin()함수를 사용하면 DML은 commit()함수의 호출없이도 자동으로 커밋이 되어 데이타가 삽입이됩니다. DDL은 여전히 수동으로 커밋을 해야한다네요.
with engine.begin() as conn:
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 6, "y": 8}, {"x": 9, "y": 10}],
)
아래는 데이타를 가져와서 속성이름으로 데이타에 접근하는 예제입니다.
with engine.connect() as conn:
result = conn.execute(text("SELECT x, y FROM some_table"))
for row in result:
print(f"x: {row.x} y: {row.y}")
for문에서 변수명을 지정해서 바로 쓸수 있게 할수 있지만, 하지만 두개 이상일때는 곤란해지기 때문에 잘 사용하지 않습니다.
with engine.connect() as conn:
result = conn.execute(text("SELECT x, y FROM some_table"))
for x, y in result:
print(f"x: {x} y: {y}")
아래는 배열방 번호로 접근하는 예제입니다. 하지만 우리가 코딩할때 배열방 번호를 일일이 기억하고 있을 수는 없지요.
with engine.connect() as conn:
result = conn.execute(text("SELECT x, y FROM some_table"))
for row in result:
print(f"x: {row[0]} y: {row[1]}")
그래서 사용하는 것이 mappings()입니다. 매핑은 필드명으로 접근하기 때문에 정확도가 높고 편리합니다.
with engine.connect() as conn:
result = conn.execute(text("select x, y from some_table"))
for dict_row in result.mappings():
x = dict_row["x"]
y = dict_row["y"]
print(f"x: {x} y: {y}")
Executing with an ORM Session
SQLAlchemy의 가장 상층에 존재하는 ORM(Object Relational Mapper)는 Python에서 데이터베이스를 사용하는데 필요한 많은 기능을 가지고 있는 종합선물세트 같은 존재입니다. 일부만 발췌하여 사용할 수도 있고, 종합적으로 사용할 수도 있습니다. 아래 그림은 ORM의 종속성을 보여주는 구성 계층도입니다.
아래는 ORM의 Session을 사용하여 쿼리를 해오는 예제입니다.
from sqlalchemy.orm import Session
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y")
with Session(engine) as session:
result = session.execute(stmt, {"y": 6})
for row in result:
print(f"x: {row.x} y: {row.y}")
ORM사용시에도 UPDATE를 할때는 commit()을 명시적으로 호출하여 변경된 내용을 데이타베이스에 적용합니다.
with Session(engine) as session:
result = session.execute(
text("UPDATE some_table SET y=:y WHERE x=:x"),
[{"x": 9, "y": 11}, {"x": 13, "y": 15}],
)
session.commit()
Working with Database Metadata
아래는 MetaData 선언하여 user테이블을 매핑하는 코드입니다. Table()에 칼럼정보를 저장하고, 모델에는 순수하게 필요한 기능만 나열한 뒤, mapper()함수로 둘을 연결해주면 클래스가 간결해지니까 코드가 훨씬 보기 수월해 지겠죠.
from sqlalchemy import Table, MetaData, Column, Integer, String, ForeignKey
from sqlalchemy.orm import mapper
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('fullname', String(50)),
Column('nickname', String(12))
)
class User(object):
def __init__(self, name, fullname, nickname):
self.name = name
self.fullname = fullname
self.nickname = nickname
# sqlalchemy.orm.mapper()는 현재 없어졌다네요
# 이 방법은 옛날 방법이니까 가급적 사용하지 마시고요
# 아래에 소개해 드리는 ORM으로 구현한 방법을 사용하시기 바랍니다.
mapper(User, user)
참고로 create_all()함수를 실행하면 위의 테이블을 생성 할수 있게 됩니다.
metadata.create_all(engin)
ORM의 DeclarativeBase
Base클래스를 만들어서 MetaData에 접근할수 있습니다.
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
이렇게 하면 MetaData를 따로 가져오지 않아도 필요한 객체에 접근할 수 있습니다.
>>> Base.metadata
MetaData()
>>> Base.registry
<sqlalchemy.orm.decl_api.registry object at 0x1069cf0e0>
아래는 Declarative를 이용하여 두개의 모델을 매핑하는 PEP484에 따른 가장 현대적인 유형의 코드입니다.
from typing import List
from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]]
addresses: Mapped[List["Address"]] = relationship(back_populates="user")
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str]
user_id = mapped_column(ForeignKey("user_account.id"))
user: Mapped[User] = relationship(back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r}, email_address={self.email_address!r})"
주석때문에 너무 정신없죠. 주석빼고 이렇게 써도 됩니다.
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy import ForeignKey
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String(30), nullable=False)
fullname = mapped_column(String(30))
addresses = relationship("Address", back_populates="user")
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user_account.id"))
email_address = mapped_column(String(30), nullable=True)
user = relationship("User", back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r}, user_id={self.user_id!r}, email_address={self.email_address!r})"
마찬가지로 이렇게 선언한 후에 create_all()함수를 호출하면 테이블이 생성됩니다.
Base.metadata.create_all(engine)
그런데 이렇게 선언을 다 해놓고, 옛날 방법으로 데이타를 불러온다면 말짱 도로묵입니다. 아래와 같이 암만 User를 ORM Base로 선언하고, ORM의 mapped_column()를 이용하여 칼럼을 선언했어도. engine.connect()를 통해서 statement을 실행하면 그 결과는 ORM이 아니라 그냥 옛날에 사용하던 방법으로 나오던 쿼리 결과일 뿐입니다.
from sqlalchemy import select
stmt = select(User).where(User.name == "spongebob")
with engine.connect() as conn:
for row in conn.execute(stmt):
print(row)
# engine.connect.execute()을 통해서 실행한 결과 그냥 데이타 Set임
(1, 'spongebob', 'Spongebob Squarepants')
ORM을 제대로 사용하시려면, 선언도 ORM으로 하시고 실행도 아래와 같이 ORM.Session으로 하셔야합니다.
from sqlalchemy.orm import Session
with Session(engine) as session:
row = session.execute(select(User)).first()
print(row)
# ORM으로 실행한 결과 모델클래스의 객체가 반환됨
(User(id=1, name='spongebob', fullname='Spongebob Squarepants'),)
다음 시간에 이어서 데이타를 입력하고 수정하고 검색하는 내용을 배워보도록 하겠습니다. 오늘은 여기까지 잘 따라오셨습니다. 쉬는 시간에는 꼭 쉬세요^^
from flask import Flask
app = Flask(__name__)
@app.route('/')
def home():
return '<h1>Flask REST API</h1>'
if __name__=="__main__":
app.run(debug=True)
그리고 해당 앱을 실행함으로써 Flask API서버를 실행합니다.
$ python api.py
* Serving Flask app 'api'
* Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
* Restarting with stat
* Debugger is active!
* Debugger PIN: 211-059-641
이제 데이타베이스를 연동해볼게요. api.py에 SQLAlchemy를 사용하겠다고 명시해주세요. 그리고 데이타베이스 위치를 명시해준 뒤에 아래와 같이 User모델을 추가해주세요.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
db = SQLAlchemy(app)
class UserModel(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(80), unique=True, nullable=False)
def __repr__(self):
return f"User(name = {self.name}, email = {self.name})"
...
이 파일에서 정의한 모델을 데이타베이스에 생성할 파일을 하나 만듭니다. create_db.py를 만들어서 아래와 같이 코딩해주세요. api.py에서 app과 db를 가져와서 해당 모델들을 생성해줍니다.
from api import app, db
with app.app_context():
db.create_all()
그리고 실행합니다.
python create_db.py
그러면 instance 폴더에 SQLite데이타베이스가 생성된것을 확인하실수 있습니다.
$ ls -al instance
total 16
drwxr-xr-x 3 slim staff 96 Nov 24 01:51 ./
drwxr-xr-x 17 slim staff 544 Nov 24 01:49 ../
-rw-r--r-- 1 slim staff 16384 Nov 24 01:51 database.db
이제 User테이블에서 데이타를 가져오는 코드를 추가해볼게요. import 맨 끝에 flask_restful에서 Resource, Api, reqparse를 사용하겠다고 명시합니다. 그리고 GET, POST, DELETE등의 RestfulAPI에 필요한 프로토콜들을 사용하기 위해서 Api()함수를 실행하여 api변수에 담습니다. 그리고 UserModel밑에 Users라는 API클래스를 하나 선언합니다. 그리고 그 안에 get이라는 이름의 함수를 선언하는데요, 맞습니다. RestfulAPI에서 GET으로 해당 route이 호출되었을때 어떤 값을 넘겨줄지를 여기에서 정의하는 거에요. 저는 여기서 일단 UserModel에서 쿼리를 해오는데 일단 다 가져오도록 합니다. 그리고 그 값을 return하면 Flask가 척척 알아서 결과를 출력할거에요. 이제 이 클래스를 호출할 route을 그 밑에 추가합니다. 저는 /users/로 했어요.
...
from flask_restful import Resource, Api
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
db = SQLAlchemy(app)
api = Api(app)
class UserModel(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(80), unique=True, nullable=False)
def __repr__(self):
return f"User(name = {self.name}, email = {self.name})"
class Users(Resource):
def get(self):
users = UserModel.query.all()
return users
api.add_resource(Users, '/users/')
...
현재 User테이블에 아무것도 안들어가 있으니까 당연히 빈 배열방을 넘겨주겠죠. 그런데 RestAPI결과 값은 보통 JSON형식으로 보여줍니다. 결과를 보여주는 포멧을 한번 바꿔볼게요. JSON형식의 결과를 미리 정의하기 위해서는 flask_restful패키지의 fields가 필요합니다.
그리고 나서 정의한 포멧을 GET함수에 추가해줄건데요. 이때 필요한 패키지가 또 있습니다. marshal_with라고 출력할 포멧을 지정할때 Flask에서 필요한 데코레이터입니다.
from flask_restful import Resource, Api, fields, marshal_with
데코레이터는요, 기존의 Object를 꾸미는 용도로 사용하는 건데요. 보통 Object위에 붙여서 기존의 Object형식은 그대로 사용하면서 해당 Object에 필요한 함수나 변수들을 추가로 넣을 수 있는 기능입니다. Users클래스의 get()함수 바로 위에 @으로 시작되는 코드를 아래와 같이 추가해주세요. marshal_with에게 userFields를 인자로 넘겨주어서, 출력형식으로는 아까 정의해둔 userFields를 사용하겠다고 명시합니다.
class Users(Resource):
@marshal_with(userFields)
def get(self):
users = UserModel.query.all()
return users
현재 우리가 가지고 있는 데이타가 없기때문에 사용자 정보를 추가하기 위해서는 POST로 데이타를 받아서 처리해주는 부분이 필요합니다. 사용자에게 데이타를 받기 위해서는 받을 인자들을 미리 정의해 놓아야하는데 이때 Flask에서 사용하는 패키지가 바로 reqparse입니다.
from flask_restful import Resource, Api, fields, marshal_with, reqparse
그리고 userFields를 선언하기 전에 아래와 같이 사용자인자를 미리 정의합니다. reqparse패키지의 RequestParser()함수를 사용하여 user_args객체를 선언하고 해당 객체에 add_argument()함수를 통해서 각 인자들의 세부사항을 적어줍니다. name이랑 email 이렇게 두개를 받을 건데요, type은 문자열, 그리고 필수입력사항으로 정의할게요. 이렇게 선언을 하면 HTTP Request가 들어왔을때 함께 들어온 이름, 여기서는 name과 email이 되겠죠. 해당 이름의 인자들을 Request데이타에서 받아서 친절하게 user_args에 저장을 해줍니다.
user_args = reqparse.RequestParser()
user_args.add_argument('name', type=str, required=True, help="Name cannot be blank")
user_args.add_argument('email', type=str, required=True, help="Email cannot be blank")
이제 Users클래스에 POST요청이 들어왔을때 처리해줄 post()함수를 선언합니다. 사용자가 입력할 데이타도 Body에서 JSON으로 전달받을거니까 여기서도 마찬가지로 @marshal_with(userFields)데코레이터를 사용해줍니다. 위에서 받아온 매개변수, user_args를 사용할건데요. reqparse.RequestParser()가 넘겨준 결과값은 Object이기때문에 편리한 사용을 위해서 해당 Object의 pase_args()함수를 호출해서 배열에 담아줄게요. 그리고 해당값으로 UserModel을 하나 생성하고, 그 모델을 db.session에 추가해줌으로써 Insert를 완료하게됩니다. 사용자를 추가한 후에는 사용자목록을 받아와서 보여주도록 할게요. 이때 반환하는 HTTP code는 사용자를 성공적으로 생성했다는 의미로 201을 넘겨줍니다.
결과를 확인해보려면, Postman이나 Thunder Client같은 클라이언트 툴이 필요한데요, 저는 Visual Studio Code에서 Thunder Client모듈을 설치해서 테스트를 해보도록 할게요. 프로토콜은 POST로 경로를 적어주고, Body에 저장할 이름과 이메일을 JSON으로 넘겨줍니다.
전송버튼을 누르면 201코드와 함께 위에서 코딩한 대로 사용자목록을 Response로 반환해줍니다.
이번에는 ID를 넘겨주어 한명의 User만 가져오는 기능을 넣어볼게요. Users클래스 밑에 User라는 리소스를 하나 더 만들거에요. 마찬가지로 결과를 JSON으로 결과를 보여줄거니까 marshal_with데코레이터 추가하고요, 프로토콜은 GET이 될테니까 get()함수에다 정의해줄게요. 이때 get()함수에 self이외에 id를 추가해서 사용자에게서 받은 id를 함수안에서 사용할 수 있게 같이 받습니다. 그리고 넘겨받은 id를 가지고 UserModel의 id에 넣어서 filtering을 합니다. 그럴일은 없겠지만 혹시 결과가 여러개 나오면 처음 나오는것만 하나 보여주도록합니다. 이때, 혹시 검색하는 사용자가 없다면 더이상 진행을 하지않고, abort함수를 사용해서 404를 반환하여 찾는 사람이 없다는 걸 알려줍니다. 리소스가 다 완성이 되면 api.add_resource()를 통해서 새로운 route를 추가하여 /users/경로에 정수형의 id가 붙어서 들어오면 User라는 리소스를 통해서 요청을 처리한다고 명시합니다.
from flask_restful import Resource, Api, fields, marshal_with, reqparse, abort
class User(Resource):
@marshal_with(userFields)
def get(self, id):
user = UserModel.query.filter_by(id=id).first()
if not user:
abort(404, "User not found")
return user
api.add_resource(Users, '/users/')
api.add_resource(User, '/users/<int:id>')
요청은 /users/id로 아래와 같이 하면 됩니다.
결과는 배열없이 단일 객체로 id가 1인 사용자를 반환합니다.
이번에는 기존의 데이타를 수정하는 경로를 PATCH프로토콜로 구현해볼게요. User클래스에 patch함수를 정의하고, JSON으로 데이타를 주고 받을 거니까 marshal_with 데코레이터를 추가합니다. Request URL에서 id를 받아오도록 함수의 인자에 명시하고, Body에 JSON으로 받아온 매개변수를 args에 배열로 저장합니다. id를 가지고 수정할 사용자를 찾아와서 user모델에 저장한뒤 해당 모델의 name과 email을 받아온 데이타로 대체합니다. 그리고나서 commit()를 하면 변경된 모델이 데이타베이스에 적용이됩니다.
@marshal_with(userFields)
def patch(self, id):
args = user_args.parse_args()
user = UserModel.query.filter_by(id=id).first()
if not user:
abort(404, "User not found")
user.name = args["name"]
user.email = args["email"]
db.session.commit()
return user
PATCH프로토콜을 테스트할때는 경로에 변경할 사용자의 ID를 명시하고 Body에 JSON포멧으로 코드에서 명시한 형식에 따라 수정할 데이타를 넘겨줍니다.
그러면 변경된 결과를 반환하죠.
마지막으로 DELETE프로토콜도 구현을 해볼게요. 마찬가지로 marshal_with데코레이터를 넣고, id를 넘겨받는 delete함수를 선언해서 해당 id로 사용자를 찾습니다. 그리고 db.session.delete()함수를 사용해서 user를 삭제한뒤 commit()하면 처리가 되는데 다 처리가 되면 사용자목록을 반환합니다.
@marshal_with(userFields)
def delete(self, id):
user = UserModel.query.filter_by(id=id).first()
if not user:
abort(404, "User not found")
db.session.delete(user)
db.session.commit()
users = UserModel.query.all()
return users
요청은 Body없이 DELETE프로토콜에 id를 넣은 경로로 호출을 합니다. 삭제를 위해서 제가 미리 id가 2인 사용자를 등록해놨어요. 한번 지워볼게요.
pip install Pillow
# 아니면 python -m pip install Pillow
아래 코드를 resize_image.py로 저장하세요
from PIL import Image
import os
file_extention = ".JPG"
input_dir = "./input" # Replace with your actual input directory
output_dir = "./output" # Replace with your desired output directory
def resize_image(input_dir, output_dir, new_width=800):
"""Resizes all JPG images in the input directory to the specified width and saves them in the output directory."""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for filename in os.listdir(input_dir):
if filename.endswith(file_extention):
filepath = os.path.join(input_dir, filename)
with Image.open(filepath) as img:
width, height = img.size
new_height = int(height * new_width / width)
resized_img = img.resize((new_width, new_height), Image.LANCZOS)
output_path = os.path.join(output_dir, filename)
resized_img.save(output_path)
print(f"Resized {filename} to {new_width}x{new_height}")
if __name__ == "__main__":
resize_image(input_dir, output_dir)
input이라는 폴더 하나 만드시고, 거기다가 JPG이미지 넣으세요. 혹시 확장자가 jpg소문자이면 코드에서 file_extention값을 소문자로 바꿔주세요. 저는 함수 호출할때 새로 만들 이미지의 가로크기를 지정해주도록 했는데 기본값은 800이거든요. 혹시 다른 사이즈 넣고 싶으시면 함수 호출할때 같이 넣어주시면 되요. 이제 코드를 실행할게요.
python3 resize_image.py
그러면 결과를 출력하면서 이미지를 리사이징합니다.
Resized IMG_0809.JPG to 800x600
Resized IMG_0821.JPG to 800x600
...