Python의 call by reference

함수에 인자를 넘기는 방법에는 두가지가 있습니다. call by reference랑 call by value. Python에서는 이걸 제가 어떻게 받겠다 하고 임의로 정할수는 없구요. data type에 따라서 자동으로 결정이 됩니다. int, float, str, tuples등을 call by value로 넘어가서 해당 함수 안에서 지지고 볶아도 일마치고 나왔을때 넘겨줬던 인자는(변수에 담아서 호출했다치고) 호출하기 전에 저장했던값과 동일한 값을 가지고 있습니다. 이런 변하지 않는 성질을 가지는 data type에 속하는 애들을 통칭해서 immutable object라고 합니다. 변하지 않는다 이거죠. 그런데 list, disc, set같은 data type들은 call by reference로 넘어가서 호출받은 함수에서 내용을 변경하면 함수를 마치고 나왔을때 인자로 넘겨주었던 value가 변경되어있어요. 이렇게 호출받은애가 내용을 바꿨을때 함수 바깥쪽에서도 같이 바껴버리는 성질을 가진 data type들을 통칭해서 mutable object라고 합니다.

References:

  • https://code13.tistory.com/214

Python Metaclass

Metaclass를 설명하기에 앞서 Python에서의 class란 무엇인가를 먼저 좀 짚고 넘어갈게요. 보통 다른언어에서 class를 정의하면 그 자체로는 객체(instance)가 될수 없고 오직 ClassName()이렇게 괄호를 사용해서 구현했을때만 instance를 얻어낼수가 있자나요. 그런데 Python에서 class는요 smalltalk이라는 언어에서 그 개념을 가져왔는데요. 얘는요 class를 정의하는 순간, instance가 만들어집니다. 여기서 instance의 개념을 좀 정의하고 넘어갈 필요가 있는데요. 일반적으로 memory에 class구조에 해당하는 어떤 공간을 할당했다라고 하면 그걸 보통 객체, instance라고 하자나요? 그런데 Python에서는 instance의 개념이 좀 헷갈리는게 얘가 class를 선언하면서 바로 memory에 공간을 할당해버리기때문에 class자체가 instance같이 존재하게 되어버리는거에요. 그렇다고 해서 그러면 Python은 class를 instance구현없이 그냥 막바로 쓰느냐? 또 그건 아니에요. class는 class대로 memory에 모양을 갖춰 instnace처럼 존재하지만 이걸 그냥 막 여기저기서 써버리면 class가 더러워져버리니까 실제 코딩을 할때는 그렇기 안하고, class는 class자체로 순수하게 냅두고 개발자가 필요에 따라 class의 instance를 별도로 만들어서 그거가지고 지지고 볶도록 하는거죠. 그러니까 여기서 용어를 좀 정리를 할게요. 보통 일반적으로 class가 메모리에 할당이 된 상태를 instance의 형태로 존재한다고 말하는데, 우리는 그 상태를 두가지로 분류할수 있어요. 처음에 class가 선언되면서 class자체가 memory에 할당이 되면서 instance랑 똑같은 모양으로 선언되는 거랑, 실제로 개발자가 ClassName()를 호출해서 class를 변수에 할당해서 일반적으로 우리가 일컫는 instance를 만들어서 사용하는 그런 경우 이렇게요. 이게 좀 헷갈리니까 처음에 class를 선언할때 얘가 memory에 들어가 있는 상태는 일단 instance가 되었다고 표현하지 않을게요. instance라는 영어의 뜻자체는 class를 구현했을때 만든 복사본을 의미하는것에 더 가깝거든요. 그런데 우리가 class를 구현했을때 memory에 할당이 되는것을 알고 있기때문에 어쩌다 그 의미가 “class가 memory에 할당된 상태”라고 와전이 되버린거죠. 그래서 Python에서 class가 선언되면서 memory에 할당이 되는걸보고 사람들이 Python에서는 class가 선언되면서 instance의 형태를 갖는다라고 말하게 된거구요.

이제 그러면 metaclass에 대해서 얘기를 해볼게요. 위에서 설명드린 class와 instance에 대한 용어정리는요 metaclass를 이해하기 위해 꼭 필요한건 아니구요 metaclass와 class, 그리고 instance의 상관관계를 설명드릴때 부연잡지식으로 좋을거같아서 그냥 서론이 좀 길었던거구요. 그러면 본론으로 들어가서 metaclass가 뭐냐면요, class의 엄마같은 애에요. 그동안 class가 최고봉이고 오리지날 이라고 생각했는데 그 위에 뭐가 더 있었다니 좀 신기하게 생각하시는 분들이 계실것같은데요. 아래에 제가 보여드리는 예제를 보시면 이미 알고 계셨는데 그동안 간과하고 있었구나 하고 깨닫게 되실거에요. 일단 아래와 같이 클래스를 하나 선언할게요.

>>> class Foo:
...     pass

그리고 나서 이 클래스의 type이 몬가 하고 들여다보면요

>>> type(Foo)
<class 'type'>

type이 type이라고 나와요. class의 type은 type이네요. 이번에는 class말고 data type들의 type을 한번 확인해볼까요?

>>> type(int)
<class 'type'>
>>> type(list)
<class 'type'>
>>> type(tuple)
<class 'type'>

어라? data type의 type도 type입니다. 이게 Python3에서 새로 도입된 개념인데요. 기존에 분리되어있었던 type과 class의 영역을 type으로 통합해버린거에요. 그래서 class도 그냥 어떤 data type의 하나로 분류를 하게 된거죠.

옛날에는요 class랑 type이 서로 달랐어요. Python2에서는요 class의 type을 찍어보면 classobj라고 나와요.

>>> # Python 2
>>> class Foo:
...     pass
>>> type(Foo)
<type 'classobj'>

그런데 Python3이 되면서 class를 data type처럼 어떤 type의 한종류로 보고 type과 class의 개념을 혁신적으로 통합을 해버린거죠.

>>> class Foo:
...     pass
>>> type(Foo)
<type 'type'>

사실 이 개념은 Python 2.2부터 있어왔는데요 그때당시에는 이렇게 쓰겠다 하고 특별히 명시를 해주지 않으면 default가 classobj로 되어 버리기때문에 그때는 확실히 그렇게 쓰였다고 보기는 힘들구요 Python 3부터 이 방식이 default로 사용되었기때문에 있었던거지만 2.2에서보다는 3에서 더 혁신적이라고 할수 있죠

아직도 살짝 헷갈려하시는 분들을 위해서, 이번에는 그 차이점을 instance에서 한번 확인해볼까요? 아래의 코드는 Python 2에서 class를 instance로 구현해서 __class__와 type을 비교해본 코드입니다.

>>> # Python 2
>>> obj = Foo()
>>> obj.__class__
<class __main__.Foo at 0x000000000535CC48>
>>> type(obj)
<type 'instance'>

Python 2에서는 __class__를 찍어보면 나오는게 “<class”로 시작하고, type함수를 통해서 찍어보면 “<type”으로 시작하는 결과가 나옵니다. 이게 무슨의미 인가요? class는 class고 type은 type으로 Python 2에서는 이걸 전혀 다르게 핸들링을 하다가 Python 3에서 이걸 type과 class로 전부다 통합하고 두개의 개념을 계층으로 표현하도록 한거에요.

이해를 돕기위해서 Python 3에서는 같은 코드가 어떻게 보이는지 한번 확인해볼까요?

>>> obj = Foo()
>>> obj.__class__
<class '__main__.Foo'>
>>> type(obj)
<class '__main__.Foo'>

이번에는 두개다 “<class”로 시작하고 있죠? 이제 그럼 두개가 같은건가요?

>>> obj.__class__ is type(obj)
True

네 이제 __class__는 해당 instance가 어떤 type인지에 대한 정보를 가지고 있고, type함수의 결과와 동일합니다.

여기서 그러면 다른 data type의 instance들은 각각 어떤 type을 가지는지 궁금해지죠?

>>> type(int)
<class 'type'>
>>> type(3)
<class 'int'>

>>> type(list)
<class 'type'>
>>> type(['foo', 'bar', 'baz'])
<class 'list'>

>>> type(tuple)
<class 'type'>
>>> t = (1, 2, 3, 4, 5)
>>> type(t)
<class 'tuple'>

>>> class Foo:
...     pass
...
>>> type(Foo)
<class 'type'>
>>> type(Foo())
<class '__main__.Foo'>

제가 기억을 상기시켜드리기 위해서 아까 찎어봤던 각 data type의 type을 한번에 정리해봤어요. 보시면, int라는 data type의 type을 확인해봤을때는 type라는 class이고 이걸 구현한 정수값의 type을 확인해보면 int라는 class라고 나옵니다. 일단 주목하셔야하는 부분이요. 이제 더이상 “<type”으로 시작하는 애는 없어요. 이제 Python에서 모든 자료구조는 class입니다. 그리고 계층구조가 보이시나요? class를 포함한 data type들은 type이라는 class로 만들어져 있고, 그 class들을 구현해서 만들어진 instance들은 각각의 class이름을 가지죠. int, list, tuple 혹은 __main__.Foo와 같은 class이름을 type으로 가져요.

여기 metaclass뭔지 배우러왔는데 자꾸 딴소리만하고 도대체 metaclass가 몬가요? 여기서 보시는 type이라는 class가 바로 metaclass입니다. type이라는 큰 덩어리에서 int, list, tuple등 각 class들이 파생되는거죠.

다시 정리해볼게요. Python에서 모든것들은 class로 이루어져있습니다. int, list, tuple 또는 MyClass같은 class들은 이제 더이상 그냥 기존에 우리가 생각하고 있던 class가 아니고 type이라는 class에서 파생된 class라고 할수 있죠. 아니, 파생되었다기보다 type이라는 스타일의 class들이라고 하는게 더 정확한 표현이 될것 같아요. 1,2,3들이 int라는 스타일의 instance들이 되는것 처럼 말이죠. 객체를 만든다고 할때 우리 “구현”한다라고 표현하는데, 구현이 무슨 의미인지는 다 알고 계시죠? 의상디자이너가 패턴을 그리고, 누군가 패턴가지고 옷을 만들면 이때, 패턴을 옷으로 구현하다라고 하죠? 건축가가 설계도를 그리고, 누군가 그 설계도를 가지고 건물을 지으면 그것도 “구현”한게 된거에요.

위의 그림을 보시면요 instance는 class를 구현한거에요. 그리고 class는 metaclass를 구현한겁니다. 예를 들어보자면 아래와 같은 그림들이 나오겠죠?

대충 metaclass가 어떤 것인지 흐릿하게 감이 오시죠? 다음시간에 metaclass와 상속의 차이점에 대한것을 공부하고, 그 후에 custom metaclass을 직접 만들어보시면 그 개념이 더확실해지지 않을까 기대해봅니다. 여기까지 읽어주셔서 감사하고요 다음 강의도 꼭 읽어주세요. 그럼 좋은 하루 되세요.

References:

Docker 명령어

실행되고 있는 Docker컨테이너를 보고 싶다면

docker ps
docker ps --all

컨테이너가 이유없이 죽거나 하면 로그를 보자

docker-compose logs 서비스명

모든 서비스를 종료하고 싶다면

docker-compose down

사용하지 않고 있는 Docker container를 삭제하고 싶다면

docker container prune

컨테이너 안으로 들어가고 싶다면

# 우선 현재 컨테이너 목록을 보고, 접속할 컨테이너의 이름을 획득합니다
docker ps

# 아래 명령어를 사용하여 획득한 이름의 컨테이너에 접속합니다
docker exec -it 컨테이너이름 bash
docker exec -it 컨테이너이름 sh

# 컨테이너 안에서 필요한 명령어를 실행한다
root@a2018e567fc7:/# mysql -h127.0.0.1 -p
Enter password: ****
mysql> use mydatabase;

docker attach

Shell로 접근

# 컨테이너 실행
$ docker run --name test -d -it debian
Unable to find image 'debian:latest' locally
latest: Pulling from library/debian
e4c3d3e4f7b0: Pull complete
Digest: sha256:8414aa82208bc42...6b8a8c4e7b5c84ca2d04bb244
Status: Downloaded newer image for debian:latest
811ddfb7e514c6cddcfefd14f...2ac6251d45abaf159a8a77c3b9f

# 아이디 획득
$ docker ps
CONTAINER ID    IMAGE    COMMAND    CREATED
811ddfb7e514    debian   ...        1 mins ago

# 아래 명령어를 통해 shell에 접근
$ docker attach 811ddfb7e514
root@811ddfb7e514:/# 

로그가져오기

# docker-compose.yaml에 stdin_open과 tty를 true로 설정한다.
    test_container:
        ...
        stdin_open: true
        tty: true

# 컨테이너 실행
$ docker-compose run test_container

$ docker ps
CONTAINER ID    IMAGE    COMMAND    CREATED
811ddfb7e514    debian   ...        1 mins ago

# 로그보기
docker attach 811ddfb7e514

Debug

debugging tests

# sh모드로 컨테이너 실행
$ docker-compose run --entrypoint sh test_container
/opt/docker/app $

# 디버깅코드 삽입
/opt/docker/app $ vi ./test_cases.py
...
import pdb; pdb.set_trace()
...

# 테스트 실행
/opt/docker/app $ pytest ./test_cases.py

debugging app during the test

it basically attaches your shell to the running process in the container, so if you

  1. increase the value of harakiri in uwsgi.ini
  2. add stdin_open: true and tty: true to the container in docker-compose.yaml
  3. add import pdb; pdb.set_trace() in the code
  4. run the app
  5. execute docker attach with the app container id
  6. run the test container
$ /myapp/uwsgi.ini
harakiri            = 10000000

$ vi docker-compose.yaml
    myapp:
        ...
        stdin_open: true
        tty: true

$ vi /myapp/app.py
...
import pdb; pdb.set_trace()
...

$ docker-compose run myapp

$ docker ps
CONTAINER ID    IMAGE    COMMAND    CREATED
811ddfb7e514    myapp    ...        1 mins ago

$ docker attach 811ddfb7e514

# in another terminal:
$ docker-compose run mytest

when you run the test, the attached container will be stopped by the request.

run tests inside the container

$ docker-compose run --entrypoint sh test_container
/opt/docker/app $ ./run_tests.sh

Container에 없는 Tool은 api-get으로 설치

apt-get update
apt-get install vim

Daily English: 02/21/2020

Sentences

Ah the other failure passed when I retried, so it may have been a fluke

i’ll be keeping a close eye on things and adjusting to keep things stable to the best of my ability

Let me know when things normalize, I will run it at a slower pace

Hm; I think the rest of the UI is pretty intuitive. One thing that might be cool is to have little optional breakout sessions where people with similar job functions brainstorm goals! I know I’ve been having trouble coming up with actionable, measurable goals for more nebulous ideas I have and a working sesh would be fun

kicking off deletion in stg

So I think with the new monitor metrics we could probably make it more resilient to blips with the median

Vocabulary

  • sluggish: slow moving or inactive
  • decommissioned: 전역, 퇴역하다
  • founding
  • momentum
  • summoned
  • instability
  • intuitive
  • breakout sessions

How do you take your coffee?

How do you take your coffee?

I take it black

I take it with sugar and cream

Two sugars are good (two sppoonful of sugar)

How many ketchups?

How many sugars?

2 salt packets

can I get 2 pepper packets

can I get two pieces of ginger?

can I get two cloves of garlic (마늘 두쪽)

I need 4 onions

two rocks는 돌맹이

two stones은 큰바위

gravel[그레뷀] 자갈(모래알보다는 크지만, 돌맹이보다는 작은돌. 셀수없음)

How much gravel do you have now?

How many pieces of gravel do you have now?

강의는 아래 링크를 눌러서 소피반선생님의 유투브강의를 들어주세요.
정말 훌륭한 선생님입니다. 구독, 좋아요 꼭 눌러주시는것도 잊지 마시구요.
Source: https://youtu.be/rQ8kTPMk5pA

Async IO코드를 Unit test하기

제가 얼마전에 asyncio를 이용해서 동시다발적으로 task를 진행하는 코드를 짰는데요. Python에서 제공하는 unittest랑 mock 라이브러리는요, asyncio를 테스트할수 있는 특별한 기능이 없더라구요. 그래서 인터넷검색을 하던중에 마침 저랑 같은 고민을 하고 있는 사람이 있길래 그분이 소개한 내용을 여러분께 전달해드리려고 책상에 앉았습니다. 원본링크는 맨 아래에 걸어두었으니까 참고하세요.

동기적인 코드 테스트하기

비동기적인 코드를 알려드리기에 앞서서 차이점을 설명드리기위해서 일단 동기적인 코드를 unit test하는걸 먼저 알고갈게요. 예를들어, 우리가 테스트해야하는 함수가 receive.py에 들어있는 receive()라는 함수라고 가정하고 아래코드를 봐주세요.

def receive(packet_type, packet_data):
    """Receive packet from the client."""
    if packet_type == 'PING':
        send_to_client("PONG", packet_data)
    elif packet_type == 'MESSAGE':
        response = trigger_event('message', packet_data)
        send_to_client('MESSAGE', response)
    else:
        raise ValueError('Invalid packer type')

def send_to_client(packet_type, packet_data):
    """Implementation of this function not show."""
    pass

def tirgger_event(event_name, event_data):
    """Implementation of this function not show."""
    pass

코드를 대충보시면 아시겠지만, 현재 이 코드는 어떤 메세지를 받아서 client에게 응답하는 메세지서버에요. client가 서버에 요청을 하면 receive를 호출하고요, 호출받은 packet의 type에 따라서 각각 send_to_client()를 호출해서 응답을 합니다. receive()를 unit test하는건 그리 어려워 보이지않아요. 그런데 send_to_client()는 응답 데이타를 외부 사용자에게 어떤 메세지를 전달하는 기능을 하는 함수라서, 외부 서비스에 의존해야하는데 아시다시피 unit test는 functional test와 달리 외부서비스에 의존하지 않고 테스트를 할수 있어야하자나요. 마찬가지로, trigger_event()도 어떤 이벤트를 발생시켜서 다른 시스템이 그 데이타를 받아서 처리하도록 하게하고 심지어 이 함수는 결과값을 반환하기까지 합니다. 그걸 받아서 client에 전달해야하는거죠. 다행히 여러가지 테스트 케이스중에 적어도 한개는 좀 쉬워보이네요. 바로 packet_type이 ‘PING’, ‘MESSAGE’ 이 두개중에 하나가 아니면 ValueError를 raise하도록 만들어져있자나요. 그러면 일단 쉬운거부터 만들면서 진행해볼게요:

import unittest
from receive import receive

class TestReceive(unittest.TestCase):
    def test_invalid_packet(self):
        self.assertRaises(ValueError, receive, 'FOO', 'data')

다른 두개는 실제로 외부서비스를 호출하지 않게 하기 위해서 mock을 이용해서 가짜 함수로 대체할게요.

import uniitest
from unittest import mock
from receive import receive

class TestReceive(unittest.TestCase):
    ...
    @mock.patch('receive.send_to_client')
    def test_ping(self, send_to_client):
        receive('PING', 'data')
        send_to_clinet.assert_called_once_with('PONG', 'data')

mock으로 함수를 대체하면 실제로 그 함수를 호출하지 않고 mocking된 가짜함수를 호출하게 되는거에요. 그리고 mocking된 가짜함수가 반환할 값들을 다양하게 조작하고 경우의 수를 만들어서 모든 케이스를 다 커버하도록 조작 하는거죠. mocking을 하기 위해서 위의 코드에서는 mock.patch데코레이터를 사용했어요. 그리고 외부로 서비스되는 테스트는 functional테스트에서 하도록 할건데요. 그부분은 다음에 다루도록 할게요. 이번시간에는 unit test에 집중해서 설명을 하도록 하겠습니다. 이렇게 patch를 한 function은 MagicMock으로 mocking이 되는데요. MagicMock이 어떻게 생겼는지 아래 코드에서 보여드릴게요:

>>> from unittest import mock
>>> f = mock.MagicMock()
>>> f()
<MagicMock name='mock()' id='4373365928'>
>>> f('hello', 'world')
<MagicMock name='mock()' id='4373365928'>
>>> f.some_method('foo')
<MagicMock name='mock.some_method()' id='4373489200'>

위에서 보시다시피 patch말고도 MagicMock을 직접 이용해서 mocking을 할수 있는데요 사실 patch 데코레이터안에서 MagicMock을 사용하고 있기 때문에 patch로 mocking을 하나, MagicMock으로 코딩을하나 결과는 비슷하다고 보시면 되요. 마지막 라인에서 보시면요, MagixMock으로 정의된 오브젝트에 서브함수를 정의해 넣고 있습니다. 이렇게 mocking을 당한 함수들은요 처리가 되는 내내 감시를 받게 됩니다. 아래와 같이 몇번 호출되었는지, 함수에 호출된 인자값이 제대로 잘 갔는지 그런거를 비교할수 있어요.

>>> f('hello', 'world')
<MagicMock name='mock()' id='4373526568'>
>>> f.assert_called_once_with('hello', 'world')
>>> f.assert_called_once_with('bye', 'world')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../unittest/mock.py", line 825, in assert_called_once_with
    return self.assert_called_with(*args, **kwargs)
  File ".../unittest/mock.py", line 814, in assert_called_with
    raise AssertionError(_error_message()) from cause
AssertionError: Expected call: mock('bye', 'world')
Actual call: mock('hello', 'world')

자 그러면 이제 packet_type이 ‘MESSAGE’인 경우의 테스트 코드를 한번 만들어 볼까요?

class TestReceive(unittest.TestCase):
    # ...
    @mock.patch('receive.trigger_event', return_value='my response')
    @mock.patch('receive.send_to_client')
    def test_message(self, send_to_client, trigger_event):
        receive('MESSAGE', 'data')
        trigger_event.assert_called_once_with('message', 'data')
        send_to_client.assert_called_once_with('MESSAGE', 'my response')

trigger_event랑 send_to_client를 mocking해야겠죠? 그리고 trigger_event에서 받은 값을 반환해야하니까 가짜 반환값을 ‘my_response’라는 문자열로 주었습니다. 그리고 receive를 호출했을때, 각 함수들이 정확한 함수의 인자들을 가지고 호출이 되었는지를 확인하는거죠.

AsyncIO 테스트

이제 앞서만든 동기적인 코드를 비동기적으로 변환해볼게요. async_receive.py라는 파일을 하나 만드시고요 아래 코드들 저장합니다.

async def receive(packet_type, packet_data):
    """Receive packet from the client."""
    if packet_type == 'PING':
        await send_to_client("PONG", packet_data)
    elif packet_type == 'MESSAGE':
        response = await trigger_event('message', packet_data)
        await send_to_client('MESSAGE', response)
    else:
        raise ValueError('Invalid packet type')

async def send_to_client(packet_type, packet_data):
    """Implementation of this function not shown."""
    pass

async def trigger_event(event_name, event_data):
    """Implementation of this function not shown."""
    pass

보시다시피, 3개 함수 모두다 async로 정의했구요, receivetrigger_eventsend_to_client를 호출할때는 await으로 기다리는 동안 다른 task들이 실행되도록 작성했어요.

자, 이제 unit test코드를 만들어 볼까요? 가장 우선적으로 receive함수를 테스트 해야겠죠? receive함수는 async로 선언이 되었기때문에 함수를 호출하면 함수가 실행되는게 아니구요, 대신에 coroutine이라는 object가 만들어집니다.

>>> from async_receive import receive
>>> receive('FOO', 'data')
<coroutine object receive at 0x10e5b32b0>

위의 코드를 보시면 나는 분명히 receive함수를 불렀는데 coroutine이라는 오브젝트를 반환했죠? 이것은 coroutine이라는 객체가 receive함수를 들고 event loop라는데 들어가서 CPU가 해당 coroutine의 작업을 처리해주기를 기다려야하기 때문이에요. FOO를 packet type으로 넘기면 ValueError가 나도록 만들었는데 에러는 안나고 이상한 객체만 보여주고, 왠지 coroutine에 등록을 하고 나면 뭔가 제어할수 있는 권한이 내 손을 떠나버린 느낌이 들죠? 그래서 함수를 실행하는걸 직접 제어하려면 event loop을 만들어서 그 안에서 실행되도록 해야해요.

>>> import asyncio
>>> asyncio.get_event_loop().run_until_complete(receive('FOO', 'data'))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File ".../async_receive.py", line 19, in receive
    raise ValueError('Invalid packet type')
ValueError: Invalid packet type

asyncio에서 제공하는 get_event_loop을 초기화해서 run_until_complete이라는 옵션으로 receive를 실행합니다. 그러면 이제 ValueError가 반환되는게 보이죠? 이걸 좀 간편하게 갖다 쓰기 위해서 저는 _run이라는 함수를 만들었어요.

import asyncio

def _run(coro):
    return asyncio.get_event_loop().run_until_complete(coro)

이 함수를 사용하면 테스트 코드가 좀더 깔끔해지죠

import unittest
from unittest import mock
from async_receive import receive

class TestReceive(unittest.TestCase):
    def test_invalid_packet(self):
        self.assertRaises(ValueError, _run, receive('FOO', 'data'))

ValueError를 반환하는 경우는 이렇게 처리가 가능하지만, 나머지 두가지 경우는 이거보다 좀더 복잡한데요. ‘PING’의 경우에는 send_to_client()를 mocking 해야되요. 위에 동기식 테스트에서 했던것처럼 말이에요. 그런데 여기서 비동기에서의 문제점이 하나 있는데요, receive함수에서 send_to_client를 호출할때 await으로 호출하고 있죠? 그런데 mock 객체는 await으로 실행될수가 없어요.

mock 객체를 사용할수 없다면, 도대체 어떻게해야만 send_to_client를 mocking할수 있을까요? send_to_client함수는 async함수니까 그냥 호출이 되면 아까 보여드린대로 coroutine 객체를 반환할거에요. 당연히 unit test중에 coroutine이 반환되는건 아무도 원치 않을거에요. 어차피 그 안에 함수는 우리가 제어할수도 없으니까요. 그렇다면 우리가 원하는건 바로 coroutine을 MagicMock을 이용해서 mocking하는 겁니다. 그러면 바로 우리가 원하는 대로 함수를 호출할수가 있어요.

갑자기 두통이 오시나요? 사실 저도 이거 고민하면서 두통이 생겼었어요. 결국 제가 찾은 방법은 또다른 helper함수를 통해서 coroutine을 mocking하자는 거였죠.

def AsyncMock(*args, **kwargs):
    m = mock.MagicMock(*args, **kwargs)

    async def mock_coro(*args, **kwargs):
        return m(*args, **kwargs)

    mock_coro.mock = m
    return mock_coro

위의 함수를 차근차근 한번 살펴볼게요. 함수 중간에 보시면 비동기로 선언되어있는 mock_coro라는 내부함수가 있죠? 얘가 AsyncMock에 넘겨진 함수의 인자를 고스란히 받게 됩니다. 맨 마지막 줄에 보시면 AsyncMock함수는 이 내부함수 mock_coro를 반환하게 되는데요. 제가 방금 위에서 말씀드린대로 실제로 돌아가는 모양새에 맞게 비동기함수를 mocking해야하는데요, 바로 이 mock_coro()함수가 그 역할을 해주는거에요. 이게 비동기로 선언이 되어있기때문에 가능한일입니다.

이 coroutine이 실행될때, 우리는 MagicMock객체를 만들고 싶은거자나요. 위의 AsyncMock()이 호출되면 가장먼저 m이 초기화되면서, 이 mocking된 object를 만드는거죠. MagicMock이 함수인자를 다 받아줄수 있어서 AsyncMock으로 받은 모든 인자를 m에 죄다 넘겨줄수가 있어요. 이때 넘겨주는 인자에는 return_value같은거도 다 넘어간답니다. 이렇게 선언된 m은 어떻게 활용되느냐, 바로 mock_coro함수에서 이걸 객체로 구현하는거에요. 바로 이때 모든 함수의 인자들이 실제 호출한 함수로 전달이 되는 순간인거죠.

정리를 좀 하자면요,
1) 일반 함수를 하나 선언하고
2) 내부변수 m에 MagicMock을 생성합니다. 이때, 일반 함수에서 받은 모든 인자를 MagicMock에 넘겨주어 mocking함수를 m에 할당합니다.
3) 내부에 비동기 함수를 하나 만듭니다. 그 함수는 호출될때 부모함수가 받은 모든 인자를 받게 됩니다. 이 내부함수는 방금 MagicMock으로 선언한 m을 호출한 결과를 반환합니다. 이때 일반 함수에서 받은 모든 인자를 그래도 m에 넘겨주도록합니다.
4) 내부함수를 바깥에서 호출할수 있게 함수자체를 return하려고 하는데 그전에 방금 만든 MigicMock을 할당한 m을 내부함수에 mock이라는 객체를 하나 붙여서 내부비동기함수.mock()이라고 호출하면 호출되도록 callable함수를 함수.mock()에 저장합니다.

이제 우리에게 마지막으로 필요한게 한가지 남았는데 바로 테스트코드에서 이 m객체를 사용하도록 만드는 방법이 필요해요. 이 객체가 함수 바깥에서도 접근해서 사용하도록 만들려면, 이걸 넘겨주는 뭔가가 있어야겠죠? 그래서 만든게 mock_coro함수인거죠. 지금 제가 하는게 이상하다고 생각하시는 분들이 계실까봐 잠시 설명을 드리자면요, Python에서 함수들은 전부 Object입니다. 그래서 여러분이 내부함수를 선언하시면요 그게 바로 해당 Object의 custom attribute이 되는거에요. 클래스로 치면 내부 메쏘드 정도라고 생각하시면 될것 같아요.

백문이 불여일견이라고요, 이렇게 우왕좌왕 설명을 드리는것보다 코드로 정리해서 직접 눈으로 확인시켜드리는게 훨씬 효과적일거 같다는 생각이 드네요. 아래 코드를 봐주세요.

>>> import asyncio
>>> from test_async_receive import AsyncMock
>>> f = AsyncMock(return_value='hello!')
>>> f('foo', 'bar')
<coroutine object AsyncMock.<locals>.mock_coro at 0x10ef84ca8>
>>> asyncio.get_event_loop().run_until_complete(f('foo', 'bar'))
'hello!'
>>> f.mock.assert_called_once_with('foo', 'bar')
>>> f.mock.assert_called_once_with('foo')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../unittest/mock.py", line 825, in assert_called_once_with
    return self.assert_called_with(*args, **kwargs)
  File ".../unittest/mock.py", line 814, in assert_called_with
    raise AssertionError(_error_message()) from cause
AssertionError: Expected call: mock('foo')
Actual call: mock('foo', 'bar')

위의 예제에서 보시다시피요, async로 선언된 mock_coro를 반환하는 AsyncMock함수(=Object)를 invoke해서 f에 assign했죠? 만약에 제가 함수 f()를 invoke한다면, coroutine을 하나 갖게 되겠죠? 지금 우리가 만든게 실제 async함수가 동작하는 원리랑 똑같이 작동하도록 만든거에요.

f(‘foo’, ‘bar’)가 loop를 돌고 있다면, 기존에 mock에 설정해둔 return value를 받아오게 되는거죠. 아까 AsyncMock선언한 코드를 보시면요, 내부함수를 반환하기 전에 MagicMock으로 만든 m을 mock라는 변수에다 할당합니다. 그러니까 그 coroutine이 실제 holding하고 있는 task 즉, 그안에 MagicMock에 접근을 하려면 f가 아니라 f.mock으로 접근을 해야겠죠. 코드는 엄청 짧은데 정말 복잡한 코드에요. 근데 이게 동작은 정말 잘해요.

아래에 ‘PING’의 경우를 unit test해본 거에요.

class TestReceive(unittest.TestCase):
    # ...
    @mock.patch('async_receive.send_to_client', new=AsyncMock())
    def test_ping(self):
        _run(receive('PING', 'data'))
        from async_receive import send_to_client
        send_to_client.mock.assert_called_once_with('PONG', 'data')

여기에서 send_to_client를 AsyncMock으로 mocking합니다. 그게 비동기니까 send_to_client는 반환값이 아닌 coroutine을 반환할거니까 그 모양새를 비슷하게 만들어 놓은 AsyncMock를 구현해서 바꿔치기 하는거죠. 그리고 receive를 할때는 에러든 뭐든 일단 볼수 있게, 직접 호출하지 않고, Mock에서 제공하는 event_loop을 초기화해서 사용할수 있도록 _run()함수를 호출합니다.

아래는 ‘MESSAGE’의 경우를 테스트해본 결과인데요, 어딘가 뭔가 조금더 복잡해 진듯 하네요.

class TestReceive(unittest.TestCase):
    # ...
    @mock.patch('async_receive.send_to_client', new=AsyncMock())
    @mock.patch('async_receive.trigger_event', new=AsyncMock(return_value='my response'))
    def test_message(self):
        _run(receive('MESSAGE', 'data'))
        from async_receive import send_to_client, trigger_event
        trigger_event.mock.assert_called_once_with('message', 'data')
        send_to_client.mock.assert_called_once_with('MESSAGE', 'my response')

여기서는 message에 trigger_event가 반환한 값을 넘겨주어야하니까 AsyncMock을 호출할때 return_value도 함께 넘겨줍니다. 그러면 내부에 비동기로 선언한 함수를 호출할때도, 마찬가지로 해당 인자를 넘겨받아서 실행하게 됩니다.

아래는 필요한 함수와 코드들을 한번에 모아봤어요. 코드 전체다 보시고 싶으시면 https://github.com/miguelgrinberg/asyncio-testing.

async_receive.py

async def receive(packet_type, packet_data):
    """Receive packet from the client."""
    if packet_type == 'PING':
        await send_to_client("PONG", packet_data)
    elif packet_type == 'MESSAGE':
        response = await trigger_event('message', packet_data)
        await send_to_client('MESSAGE', response)
    else:
        raise ValueError('Invalid packet type')

async def send_to_client(packet_type, packet_data):
    """Implementation of this function not shown."""
    pass

async def trigger_event(event_name, event_data):
    """Implementation of this function not shown."""
    pass

test_async_receive.py

import unittest
from unittest import mock
from async_receive import receive
import asyncio

def _run(coro):
    return asyncio.get_event_loop().run_until_complete(coro)

def AsyncMock(*args, **kwargs):
    m = mock.MagicMock(*args, **kwargs)

    async def mock_coro(*args, **kwargs):
        return m(*args, **kwargs)

    mock_coro.mock = m
    return mock_coro

class TestReceive(unittest.TestCase):
    def test_invalid_packet(self):
        self.assertRaises(ValueError, _run, receive('FOO', 'data'))

    @mock.patch('async_receive.send_to_client', new=AsyncMock())
    def test_ping(self):
        _run(receive('PING', 'data'))
        from async_receive import send_to_client
        send_to_client.mock.assert_called_once_with('PONG', 'data')

    @mock.patch('async_receive.send_to_client', new=AsyncMock())
    @mock.patch('async_receive.trigger_event', new=AsyncMock(return_value='my response'))
    def test_message(self):
        _run(receive('MESSAGE', 'data'))
        from async_receive import send_to_client, trigger_event
        trigger_event.mock.assert_called_once_with('message', 'data')
        send_to_client.mock.assert_called_once_with('MESSAGE', 'my response')

테스트 결과

> pip install pytest
> pytest test_async_receive.py
======================================================================== test session starts ========================================================================
platform darwin -- Python 3.7.3, pytest-5.3.5, py-1.8.1, pluggy-0.13.1
rootdir: /Users/damazzang/Desktop
collected 3 items

test_async_receive.py ...                                                                                                                                     [100%]

========================================================================= 3 passed in 0.06s =========================================================================

부디 제 글이 조금이나마 비동기 테스트를 하시는데 도움이 되셨길 바랍니다. 혹시 번역이 부족하면 아래 original article을 참고하세요.

Source: https://blog.miguelgrinberg.com/post/unit-testing-asyncio-code

내 컴퓨터에 Docker에 들어있는 PostgreSQL에 접속하기

이 글은 Docker에 대한 전반지식과 Database에 대한 기본 지식이 있다는 전제하게 쓰여졌습니다. 그점 양해부탁드려요.

Docker가 있어서 PostgreSQL을 설치하고 관리하기가 참 쉬워진거 같아요. 그냥 아래 명령을 날리면 바로 Local에 설치, 실행이 되거든요.

$ docker run -d -p 5432:5432 --name my-postgres -e POSTGRES_PASSWORD=my-password my-user

위의 명령은 Docker container로 정의되어있는 PostgreSQL을 시작하고 -p <host_post>:<container_port> <- 이 부분에 의해서 컨테이너 안의 포트와 실제 내 컴퓨터의 포트가 서로 연동이 되는데요, 위에서는 같은 포트번호를 적어주었으니 PostgreSQL의 기본포트 5432를 컨테이너와 내 컴퓨터의 포트에 연결하겠다는 의미에요. 내 컴퓨터에서 5432포트를 이미 다른 프로그램이 쓰고 있는 경우에는 앞에 포트번호를 다른걸로 바꿔주면 됩니다.

그럼 이제 컨테이너 안으로 들어가서 PostgreSQL데이타베이스에 접속해볼까요? 일단 아래의 명령을 치면 PostgreSQL를 품고 있는 Docker컨테이너에 접속이 됩니다.

$ docker exec -it my-postgres bash

이제 방금 설치한 PostgreSQL에 접속해서 새로운 데이타베이스 mydb를 만들어볼게요.

root@cb9222b1f718:/# psql -U my-user
psql (10.3 (Debian 10.3-1.pgdg90+1))
Type "help" for help.
postgres=# CREATE DATABASE mydb;
CREATE DATABASE
postgres=#\q

이제 컨테이너에서 나와서 해당 데이타베이스에 접근할수 있어요

$ psql -h localhost -p 5432 -U my-user -W
Password for user my-user: ****

자 이제 그 안에 table도 만들고 하시면 됩니다. PostgreSQL명령어에 익숙하지 않으신 분들은 PgAdmin같은 Graphic UI 툴을 설치하셔서 편리하게 이용하셔도 좋을것 같아요. 그럼 오늘도 좋은 하루 되세요!

Source: https://medium.com/better-programming/connect-from-local-machine-to-postgresql-docker-container-f785f00461a7

Locust 기본설치와 확인

준비작업

현재(2020년6월17일) Locust최신버젼은 1.0.3입니다. 해당버젼은 파이썬 3.6이상에서만 설치가 가능합니다.

pip install locust

Load test를 수행할 웹서버가 있어야겠죠? Python 초간단 웹서버페이지를 참고하여 웹서버를 실행합니다. 로컬에 띄운 서버를 확인해주세요.

$ curl http://localhost:8000
Hello, world

Locustfile 만들기

locustfile.py은 locust에게 뭘 어떻게 하라고 작업들을 지시해놓는 파일입니다. 여기서 우리는 HttpLocust클래스를 상속받아서 웹서비스의 load test를 하는 locustfile을 만들거에요. locustfile.py에 대한 자세한 설명은 여기를 참조하세요.

from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
    wait_time = between(5, 9)

    @task
    def my_task(self):
        self.client.get("/")

위의 파일에는 / root page를 호출하라는 미션이 명시되어있습니다. 그러면 이 제 locust를 실행해볼까요? locust를 실행해도 바로 load test가 시작되는건 아니니까 안심하고 아래 command line 명령어를 실행하세요.

$ locust -f locustfile.py --host=http://localhost:8000
[2020-01-20 07:59:53,646] MyComputer/INFO/locust.main: Starting web monitor at http://*:8089
[2020-01-20 07:59:53,646] MyComputer/INFO/locust.main: Starting Locust 0.13.5

locust에게 수행할 미션이 적혀있는 파일의 위치를 알려주고, load test를 수행할 웹서비스의 host도 여기서 넘겨주도록 합니다. 그래야 로컬에서 테스트가 끝나면 별도의 코드 수정없이 바로 서버에서 실행할수 있으니까요.
위의 command를 실행하면 locust가 제공하는 Web UI에 접속할수 있습니다.
인터넷 브라우저를 하나 열어서 http://localhost:8089를 open하세요.

아래와 같은 화면이 보이신다면 실행에 성공하신거에요.

동시접속자수(Number of total users to simulate)는 1명으로 적을게요, 그리고, 접속빈도수(Hatch rate)은 1초로 할게요. 그러면 1명이 1초에 한번씩 지정한 Host, http://localhost:8000에 요청을 하는데요, 아까 우리는 TaskSet에 task를 한개밖에 안만들었자나요(my_task). 만약에 우리가 TaskSet에 task를 여러개 만들었다면, 돌아가면서 한번씩 실행하게 되겠지만, 현재는 한개밖에 없으니까 1명이 1초에 한번씩 MyTaskSet.my_task함수를 실행하게 될거에요. my_task에서는 root페이지를 호출하도록 명시해놓았으니까, 결국 1초에 한번씩 root페이지가 호출되겠죠? 어디한번 그렇게 되는지 확인해볼까요?

동시접속자 수와, 호출빈도를 1로 입력하고, Start swarming버튼을 눌러주세요. 그러면 command line에서는 아래와 같이 호출이 swarming이 시작되었다는 메세지가 추가됩니다.

$ locust -f locustfile.py --host=http://localhost:8000
...
[2020-01-20 08:11:29,282] Slim-MBP-15/INFO/locust.runners: Hatching and swarming 1 clients at the rate 1 clients/s...
[2020-01-20 08:11:30,288] Slim-MBP-15/INFO/locust.runners: All locusts hatched: RequestLocust: 1

그리고 Web UI에서는 요청이 얼마나 이루어지고 있는지 통계치를 계속 갱신하면서 보여주고요.

실제로 아까 우리가 실행한 웹서버가 요청을 받고 있는지 한번 볼까요?

$ python -m SimpleHTTPServer 8000
Serving HTTP on 0.0.0.0 port 8000 ...
127.0.0.1 - - [20/Jan/2020 08:11:29] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Jan/2020 08:11:30] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Jan/2020 08:11:31] "GET / HTTP/1.1" 200 -
...
127.0.0.1 - - [20/Jan/2020 08:11:52] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Jan/2020 08:11:53] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Jan/2020 08:11:54] "GET / HTTP/1.1" 200 -

웹서버에도 1초에 한번씩 access log가 추가되고 있어요. 호출이 잘 이루어지고 있네요. 이제 그만 stop해야겠어요. locust의 Web UI에서 우측상단의 빨간색 STOP버튼을 누르세요.

그러면 웹서버로의 요청이 중지됩니다.

일단 실행이 되는걸 보여드리기 위해서 Number of users를 1명 Hatch rate도 1로 심플하게 설정해서 보여드렸는데요, Hatch rate에 대한 설명을 조금더 드리자면요, Number of users를 1000명으로 설정하시면요, 처음부터 1000명이 바로 접속하게 할수도 있지만, Hatch rate이라는걸 설정해서 초당 몇명씩 증가하도록 설정할수도 있어요. 예를 들어 Number of users를 1000으로 하고, Hatch rate을 10으로 주면, 처음에는 10명으로 시작해서, 매 1초가 지날때마다 10명씩 증가해서 총 1000명이 될때까지 10명씩 계속 늘어나는거에요. 사용자의 증가율에 따른 퍼포먼스를 보고 싶은 경우에 Hatch rate를 이용하면 좋겠죠.


Source: http://dejavuqa.tistory.com/131

Python 초간단 웹서버

웹서비스를 하고 싶은 폴더를 만들어 들어갑니다.

$ cd ~/webroot

서비스하고 싶은 프로그램을 작성합니다.

$ vi index.html
Hello, World!

저는 index.html안에 Hello, world라는 문자열을 저장했습니다.
index.html이 저장된 폴더에서 PORT번호를 지정하여 웹서비스를 실행합니다.

$ python -m http.server

자 그러면 서버가 잘 돌아가는지 확인해볼까요?

$ curl http://localhost:8000
Hello, world

Hello, world라는 문자열을 웹서버로 부터 반환받았습니다.

SimpleHTTPServer가 없는 Python버전의 경우 아래의 명령어로 서버를 띄울수 있습니다.

$ python -m http.server 8000

Locustfile 작성하기

본문은 locustio, 즉 locust 1.0이하를 기준으로 작성되었습니다. 새로운 버젼에서는 호환이 되지 않음을 알려드립니다. 현재기준 최신버젼, locust 1.0.3을 소개한 문서를 참고하세요.

Locustfile은 일반 파이썬 프로그램 파일입니다. 특별히 다른점 하나는 최소 하나이상의 class가 정의 되어있어야한다는 점이에요. 앞으로 이 class를 locust class라고 부를게요. 이 class는 Locust라는 class를 상속받아서 만들어져야합니다.

Locust class

여러분이 만드실, 이 locust class는요, 하나의 사용자를 통해서 호출되어 지는 명령함수들을 모아놓은거라고 보시면 되는데요. 사실 Locust 실행전에 동시접속자수를 여러명으로 설정해서 traffic을 조정할수 있는데 굳이 하나의 사용자라는 용어를 사용한 이유는 바로, Locust가 이 class를 호출 하는 과정에서 사용자 한명당, 하나의객체(instance)를 만들어서 실행을 하기 때문에 그렇게 말한거에요. 자 그러면 이 Locust Class에 어떤 것들이 보통 정의가 되는지 알아보자구요.

여러분이 만드실, 이 locust class는요, 하나의 사용자를 통해서 호출되어 지는 명령함수들을 모아놓은거라고 보시면 되는데요. 사실 Locust 실행전에 동시접속자수를 여러명으로 설정해서 traffic을 조정할수 있는데 굳이 하나의 사용자라는 용어를 사용한 이유는 바로, Locust가 이 class를 호출 하는 과정에서 사용자 한명당, 하나의객체(instance)를 만들어서 실행을 하기 때문에 그렇게 말한거에요. 자 그러면 이 Locust Class에 어떤 것들이 보통 정의가 되는지 알아보자구요.

task_set

클래스 변수로 task_set이란게 있는데요. 얘는 TaskSet class를 가지고 있는 애에요. TaskSet은 앞으로 이야기할건데요, 간단하게 말하자면 사용자가 어떤 행동을 하도록 할것인지에 대한 자세한 내용들을 담은 클래스에요.

wait_time

wait_time은 Locust class의 클래스 함수에요. 이건요 task하나 하고나서 다음 task하기 전에 얼마나 기다릴지를 알려주는 함수인데요, Locust에 정의된 함수들 중에는 wait_time함수를 반환해 주는 애들도 있어요. 그중에 대표적인게 between이라는 함순데요. 얘는 task를 하나 실행하고나서 넘겨받은 min하고 max시간 사이에서 시간을 random으로 가져와서 다음 task시작하기 전에 그만큼 기다려주는 애에요. 시간을 정해서 wait_time으로 만들어주면 Locust class가 알아서 중간에 그만큼 쉬어줘요. 그밖에 기다려주는 함수로는 constantconstant_pacing이 있어요.

아래의 예제를 보시면, 각 user별로 task하나가 끝날때마다 5에서 15초를 random으로 기다리게 될거에요.

from locust import Locust, TaskSet, task, between

class MyTaskSet(TaskSet):
    @task
    def my_task(self):
        print("executing my_task")

class User(Locust):
    task_set = MyTaskSet
    wait_time = between(5, 15)

wait_time 함수는요 숫자를 반환하는데요, 바로 “초”단위 숫자입니다. 이 함수는 TaskSet 클래스에서도 정의할수 있는데요, 단 그 경우에는 해당 task에만 적용된답니다.

wait_time함수를 Locust나 TaskSet클래스에 직접 만들어서 구현할수도 있는데요, 아래에 처음에는 1초쉬고, 그다음엔 2초, 3초 이렇게 1초씩 늘어나면서 쉬도록 하는 wait_time함수를 만들어봤어요.

class MyLocust(Locust):
    task_set = MyTaskSet
    last_wait_time = 0
    def wait_time(self):
        self.last_wait_time += 1
        return self.last_wait_time

weight

만약에 locustfile.py안에 Locust 클래스가 하나 이상 정의가 되어있는데, 명령어를 실행할때 어떤걸 쓸지 locusts를 지정하지 않았다면, 기존에 정의된 클래수중에 하나를 random으로 하나 선택해서 실행하게 됩니다. 그게 싫으면 명령어를 날릴때 어떤 locusts를 사용할지 골라서 명시하면됩니다. 아래와 같이 하면 두개중에 하나만 랜덤으로 실행하겠죠?

$ locust -f locustfile.py WebUserLocust MobileUserLocust

그런데 여러분이 두개중에 어떤 특정 클래스를 좀더 자주 호출하고 싶다 그러면요. 그때 사용되어지는게 바로 Locust 클래스 변수, weight이에요. 아래의 예제를 실행하면 WebUserLocust가 MobileUserLocust보다 3배 더 많이 실행하도록 선택될거에요.

class WebUserLocust(Locust):
    weight = 3
    ...
class MobileUserLocust(Locust):
    weight = 1
    ...

host

host는 URL prefix를 저장하는데 사용되는 클래스 변수에요. 보통 실행할때 Locust에서 제공하는 Web UI를 사용해서 설정하기도 하고요, command line으로 실행할때 --host옵션으로 받아다가 Locust.host에 저장하기도 해요.

이게 Locust클래스에 한번 셋팅이 되면요, –host옵션 안주고 다시 호출하거나, 아니면 Web UI에서 없애라는 요청이 올때까지 계속 URL앞에 prefix로 붙여서 작업을 수행합니다.

TaskSet class

만약에 Locust클래스가 한명의 사용자가 아니라, 동시에 여러명이 실행하도록 설정을 했다면, TaskSet이 locust의 두뇌를 대표한다고 볼수 있습니다. 각 Locust클래스는 반드시 task_set 에 가지고 있어야하고, 그값은 TaskSet클래스 여야합니다.

하나의 TaskSet클래스는 이름에서 알수있다시피, task들의 집합이라고 할수 있어요. 이 task들은 일반적으로 python callable함수들이고, 예를 들어 경매사이트를 Load test한다고 치면, 첫페이지를 로딩하고, 그다음에 물건을 찾고, 그다음에 bidding을 하는것과 같은거죠.

Load test가 시작이 되면, 각 instance들은 동시다발적 Locust클래스에 의해서 그들이 원하는 TaskSet을 실행하기 시작하는것이죠. 각 task를 실행하고 나서는, 아까 배웠듯이 Locust.wait_time을 실행해줘서 중간에 쉬는시간을 갖습니다. 기억나시죠? TaskSet.wait_time이 있으면 task수행후에 Locust.wait_time대신에 TaskSet.wait_time가 실행이 된다고 아까 말씀드렸잖아요. 그리고 다 기다리고 나면 다음 task을 찾아서 실행하고, 기다리고를 계속 반복합니다.

Declaring tasks

TastSet에 task들을 정의하는 가장 전형적인 방법이 바로 task 데코레이터를 이용하는 방법입니다. 아래 예제를 보시면요,

from locust import Locust, TaskSet,task
class MyTaskSet(TaskSet):
    @task
    def my_task(self):
        print("Locust instance (%r) executing my_task" % (self.locust))

class MyLocust(Locust):
    task_set = MyTaskSet

데코레이션 @task에 weight을 지정할수도 있습니다. 아래의 예제는 task2를 task1보다 두배 더 돌리는 코드입니다.

from locust import Locust, TaskSet, task
from locust.wait_time import between

class MyTaskSet(TaskSet):
    wait_time = between(5, 15)
    @task(3)
    def task(self):
       pass
    @task(6)
    def task2(self):
        pass

class MyLocust(Locust):
    task_set = MyTaskSet

tasks

사용희 편리성을 위해 @task 데코레이터를 이용해서 task를 정의하는게 편리하기도 하고, 가장 많이 사용되는 방법이기도 합니다. 또다른 방법으로는 TaskSet 클래스안에 tasks를 정의하는 방법도 있습니다. 사실 @task 데코레이터를 이용한 방법은 바로 TaskSet클래스안의 tasks를 가져오는거에요.

tasks변수의 값으로는 하나의 호출가능한 형태의 함수들을 모아놓은 list가 될수도 있구요, 또는 {callsble: int}형태의 dict가 들어갈수 도 있어요. task들 하나의 인자를 받는 호출가능한 형태의 함수입니다. TaskSet클래스의 instance가 그 task를 실행합니다. 아래에 간단한 예제를 봐주세요. 사실 이 locustfile은 아무것도 실행하지 않을거에요.

from locust import Locust, TastSet

def my_task(l):
    pass

class MyTaskSet(TaskSet):
    tasks = [my_task]

class MyLocust(Locust):
    task_set = MyTaskSet

만약 tasks가 list로 정의가 되었다면, 그 안의 각 task가 실행이 될것이구요, 어떤 것이 실행될지는 tasks에 의해서 random하게 선택될거에요. 근데 만약에 tasks가 dict로 정의가 되어있다면 (callable함수를 key로 갖고, 정수를 값으로 갖는 형태), 역시나 task는 random하게 선택이 되겠지만, 값으로 설정된 ratio에 의해 실행되는 빈도수가 달라집니다. 예를들어,

{my_task: 3, another_task: 1}

위와 같이 선언이 되었다고 하면, my_task가 another_task보다 3배정도 더 많이 실행되게 됩니다.

TaskSets can be nested

TaskSet에서 매우 중요한 속성중의 하나가 바로 TaskSet이 nested형태로 제공될수 있다는 점인데요, 사실 실제 웹사이트들은 단편적으로 만들어진게 아니라 단계별로 실행되어져야 하는 경우가 대부분입니다. 예를 한번 들어보자면,

  • 메인 사용자 행동
    • 첫페이지
    • 포럼페이지
      • 포럼 읽기
        • 답변하기
      • 새로운 포럼 작성
      • 다음 페이지 보기
    • 카테고리 페이지
      • 동영상 시청
      • 동영상 검색
    • 소개 페이지

위에서 보시다시피, 어떤 특정 task가 실행된 후에 그 다음 task를 명시해야할 경우가 있는데 그럴때 사용할수 있는 방법이 tasks변수의 호출할수 있는 함수넣을 자리에 함수대신 또다른 TaskSet을 넣는거에요.

class ForumPage(TaskSet):
    @task(20)
    def read_thread(self):
        pass

    @task(1)
    def new_thread(self):
        pass

    @task(5)
    def stop(self):
        self.interrupt()

class UserBehaviour(TaskSet):
    tasks = {ForumPage:10}

    @task
    def index(self):
        pass

위의 예제에서 보시면, UserBehaviour의 tasks가 task를 선택할때, 또다른 TaskSet인 ForumPage를 선택하게 되겠죠, 그러면 그때 ForumPage가 실행되고 다시 ForumPage.tasks가 그 안의 함수들을 실행하게 되는 형태가 되는거에요.

여기서 중요한거 한가지, 위의 코드에서 마지막에 보시면 interrupt라는 함수가 사용된 stop이라는 task가 있죠? 이게 뭐냐면요, 이제 ForumPage.tasks가 그 안의 함수들을 랜덤으로 돌아가면서 실행을 하다가, stop을 실행하게 되면, 그때 바로 ForumPage실행을 멈추고 UserBehaviour로 돌아가서 그 다음 task를 실행하라는 거에요. 만약에 ForumPage어디에도 interrupt()함수가 호출되는곳이 없다면, Locust는 ForumPage task들을 멈추지 않고 영원히 돌리게 될거에요. 그래서 대강 ForumPage에서 사용자들을 얼마정도 있다가 돌아가겠거니 싶을때, task 데코레이션에 각 함수들의 호출 빈도를 지정함으로써 어느정도 포럼에서 머물다가 나와서 다른데도 구경하고 하게끔 interrupt()를 불러주는게 중요해요.

그리고, nested형태의 TaskSet은 @task데코러에터를 이용해서 클래스 안에서 inline으로 구현할수도 있어요. 일단 task를 정의하듯이 그렇게 말에요.

class MyTaskSet(TaskSet):
    @task
    class SubTaskSet(TaskSet):
        @task
        def my_task(self):
            pass

Locust 객체나 TaskSet의 부모객체에 접근하기

TaskSet객체 안에는 locust라는 포인트랑 parent라는 포인트가 있는데요, 얘네들을 이용해서 부모객체나 TaskSet을 호출한 Locust객체에 용이하게 접근할수가 있어요.

TaskSequence클래스

TaskSequence클래스는 일종의 TaskSet이에요. 근데 얘는 특이한점이 task들이 빈도수에 따라 random으로 실행되는게 아니라, 순차적으로 실행을 한다는 점이 일반 TaskSet이랑은 다른점이에요. 구현은 아래와 같이 TaskSequence를 상속받아서 클래스를 정의하고 @seq_task로 각 task함수들을 정의합니다.

class MyTaskSequence(TaskSequence):
    @seq_task(1)
    def first_task(self):
        pass
    @seq_task(2)
    def second_task(self):
        pass
    @seq_task(3)
    @task(10)
    def third_task(self):
        pass

이렇게 정의하면, 위에서 부터 @seq_task순서대로 차례로 실행이 되는데, 추가로 세번째 함수에 @task데코레이터의 값이 10이니까 마지막 함수는 10번 실행합니다. 보시다시피, 하나의 함수에 @seq_task@task 막 섞어서 쓰실수 있으시구요. 또 TaskSets의 nested형태를 구현할때도 TaskSequences랑 위아래 막 섞어서 정의하실수 있으세요.

Setups, Teardowns, on_start, 그리고 on_stop

Locust는 추가적으로 Locust 레벨에서는 setup그리고 teardown을 제공하고, TaskSet 레벨에서는 setup, teardown, on_start 그리고 on_stop을 제공합니다.

Setup과 Teardowns

Locust나 TaskSet의 setupteardown은 오직 한번만 실행이 됩니다. setup은 task들이 실행하기 전에 우선적으로 실행이 되고요, teardown은 모든 task가 전부다 실행되고 나가기 전에 마지막에 실행이 됩니다. 이 함수들을 재정의함으로써 작업이 실행하기 전에 준비작업을 할수도 있고, 작업을 최종적으로 종료하기 전에 실행하면서 어질러놨던거를 정리를 하고 나갈수도 있게 됩니다.

on_start와 on_stop

on_starton_stop 메써드는 TaskSet 클래스에서만 정의할수 있는데요. on_start는 어떤 사용자가 어떤 특정 TaskSet클래스를 실행했을때 호출되고, on_stop메써드는 TaskSet이 종료될때 실행이 됩니다.

각 함수가 호출되는 순서

아래는 위에서 설명한 함수들이 호출되는 순서입니다:

  1. Locust setup
  2. TaskSet setup
  3. TaskSet on_start
  4. TaskSet tasks….
  5. TaskSet on_stop
  6. TaskSet teardown
  7. Locust teardown

보통 setup이랑 teardown이 쌍으로 움직입니다.

HTTP요청하기

지금까지 task들을 어떻게 커버할지 Locust사용자 입장에서 계획만 짰자나요. 어떤 서비스의 실제 load test를 하려면 HTTP요청이 빠질수가 없죠. 이걸 손쉽게 하도록 생겨난것이 바로 HttpLocust라는 클래스에요. 이 클래스를 이용하면요, 각 instance가 HttpSession 클래스의 객체를 저장하는 client를 가지게 되고, 이것을 통해서 HTTP요청이 가능해지는거에요.

class HttpLocust

이 클래스의 instance하나가 HTTP 사용자 한명을 대변하고, 여러개의 instance를 통해 시스템을 공격하게 되는데 그게 바로 load test가 되는거죠.

이 사용자가 뭘할지는 해당 클래스안에 task_set라는 변수에 TaskSet을 상속받아 정의한 클래스를 할당함으로써 정의가 됩니다.

이 클래스는 client라는 변수를 만들고, 그 안에 session을 보유한 HTTP client를 갖고 있게 됩니다.

client=None

Locust가 초기화를 하면서 HttpSession의 instance를 만들어서 client에 저장합니다. 이 client는 쿠키저장도 가능하고, HTTP request간에 session도 유지도 합니다.

HttpLocust클래스를 상속해서 나만의 클래스를 정의할때, client를 이용해서 아래와 같이 HTTP요청을 만들수가 있어요.

from locust import HttpLocust, TaskSet, task, between

class MyTaskSet(TaskSet):
    @tast(2)
    def index(self):
        self.client.get("/")
    @task(1)
    def about(self):
        self.client.get("/about/")

class MyLocust(HttpLocust):
    task_set = MyTaskSet
    wait_time = between(5, 15)

위의 코드를 해석하자면, Locust클래스는 HttpLocust클래스를 상속받아 구현했기 때문에 session정보를 저장한 client를 가지게 되고요, task에서 URI를 호출하여 HTTP요청을 할수 있는데, index를 about보다 2배 정도 많이 호출하고, 각 task를 실행한 뒤에는 5초에서 15초정도 쉬어준뒤 다음 task를 실행합니다.

예리한 분들은 눈치채셨겠지만, HttpLocust는 부모 클래스로 정의가 되었는데 의외로 task에서 client에 접근할때 self.locust.client.get()이 아니라 self.client.get()으로 client에 접근하고 있어요. 이거는 사용자들의 편의를 위해서 self.client.get()를 실행하면 self.locust.client.get()가 호출되도록 뒷단에서 그렇게 만들어 놓은거에요.

HTTP client사용하기

HTTPLocust의 각 객체들은 HttpSession객체를 client에 저장하고 있어요. HttpSession클래스는요 사실 requests.Session의 서브클래스에요. 그래서 HTTP요청이 가능한건데요. HTTP요청이 get, post, put, delete, head, patch 그리고 options등의 다양한 형태로 요청이 되는데, 그 통계자료를 Locust에서 취합하게 되요. 하나의 HttpSession객체는 쿠키도 가질수 있고, 세션도 공유하기 때문에 웹사이트에서 로그로 사용될수도 있고, 요청간의 연관관계를 이용해서 뭔가 더 다양한 테스트를 할수 있겠죠. client는 위에서 말씀드린대로 TaskSet에서도 바로 접근하실수 있으십니다.

아래는 client를 통해서 GET으로 /about페이지에서 결과를 가져다가 화면에 보여주는 간단한 예제입니다.

response = self.client.get("/about")
print("Response status code:", response.status_code)
print("Response content:", response.text)

POST로 요청할때는 이렇게:

response = self.client.post("/login", {"username":"testuser", "password":"secret"})

Safe mode

HTTP client를 safe_mode로 돌리는 방법이 있습니다. 이게 뭐하는거냐면요, 만약에 어떤 요청이 connection에러가 났다거나, timeout났거나, 그밖에 어떤 에러상황에 처해서 처리에 실패를 했을때, 에러코드 대신에 그냥 빈 dummy response를 object에 넣어서 반환해주는 기능이에요. 해당 요청은 Locust통계에 실패로 보고가 되겠지만 사용자는 그냥 텅빈화면을 받아보게 되기 때문에 별다른 에러처리를 안해도 되는거죠. 대신에 response안에 status_code는 200이 아니라 0을 받게 될거에요.

요청의 성공, 실패 여부 조작하기

기본적으로 요청들은 처음에 failed상태로 요청을 시작했다가 요청이 끝나면 상태를 결과에 맞게 갱신해주게 되어있는데요. 대부분의 경우 이렇게 하면 만사오케이 거든요. 그런데 가끔은 테스트코드를 너무 막짜는 바람에 결과를 200만 받도록 했으면 좋겠는거에요. 예를 들어 404보다 큰 network status를 가지는 결과에 대해서 그냥 200으로 받고 싶은 경우에, 수동으로 결과를 조작하는 기능이 있습니다.

with self.client.get("/", catch_response=True) as response:
    if response.content != b"Success":
        response.failure("Got wrong response")

catch_response를 True로 요청을 하면 결과를 가져오는데 실패한 경우에 실패한 결과값 대신에 캐싱된 데이타를 보여주게 되는데 이렇게 하면, 아래 예제와 같이 최종 사용자에게 404를 넘겨주더라도 Locust통계에는 성공이라고 집계가 됩니다.

with self.client.get("/does_not_exist/", catch_response=True) as response:
    if response.status_code == 404:
        response.success()

URL과 파라메터별로 요청 묶기

어떤 특정 요청들을 묶어서 보고 싶을때, 요청에 name인자를 추가함으로써 통계를 볼때 해당 name으로 모아서 볼수가 있습니다.

# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):
    self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")

Common libraries

종종 사람들은 여러개의 locustfile들을 common libraries에 넣고 함께 공유하고 싶어 합니다. 그런 경우 우선 project root를 정의하고 모든 locustfile들은 프로젝트 root아래에 있게 하는것이 중요합니다.

아래는 단순한 형태의 구조입니다.

  • project root
    • commonlib_config.py
    • commonlib_auth.py
    • locustfile_web_app.py
    • locustfile_api.py
    • locustfile_ecommerce.py

위의 파일들이 서브구조를 가지면 좀더 깔끔하게 정리가 될것 같죠?

project root

  • __init__.py
  • common/
    • __init__.py
    • config.py
    • auth.py
  • locustfiles/
    • __init__.py
    • web_app.py
    • api.py
    • ecommerce.py

위의 구조로 설계를 한뒤에 아래와 같이 접근을 하실수있습니다

sys.path.append(os.getcwd())
import common.auth

Source: https://docs.locust.io/en/latest/writing-a-locustfile.html