3 ๋ถ„ ์†Œ์š”

Celery ์‹คํ—˜ ์‹œ์ž‘

Celery ์— ๋Œ€ํ•ด์„œ ์ดํ•ด๋Š” ๋˜์ง€๋งŒ, ์ •์ž‘ ์ ์šฉํ•˜๋ ค๊ณ  ๋ณด๋‹ˆ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๊ณ  ์–ด๋–ป๊ฒŒ ๊ณ ๋„ํ™” ํ•ด์•ผํ•˜๋Š” ๊ฑด์ง€ ๋ˆˆ ์•ž์ด ์บ„์บ„ํ–ˆ๋‹ค.

๋ฌด์—‡์ด๋“ ์ง€ ์‹œ์ž‘์ด ๋ฐ˜์ด๋‹ˆ๊นŒ, 1+1 ์ˆ˜์ค€์˜ ๊ฐ„๋‹จํ•œ ์‹คํ—˜์œผ๋กœ ์‹œ์ž‘ํ•ด๋ณด๋ ค๊ณ  ํ•œ๋‹ค.

๐Ÿ‘‰ย 1์ดˆ๋งˆ๋‹ค 1์”ฉ ๊ฐ’์„ ์ฆ๊ฐ€์‹œํ‚ค๊ณ , ๊ทธ ๊ฐ’์„ ํ™•์ธํ•˜๋Š” ์•ฑ์ด๋‹ค!

Django project & app

mkdir celery_test

cd celery_test
  • celery_test ๋ผ๋Š” ํด๋”๋ฅผ ๋งŒ๋“ค๊ณ , ๊ทธ ํด๋”๋กœ ์ ‘์†ํ•œ๋‹ค.
    • ๊ฐœ์ธ์ ์œผ๋กœ ํด๋” ์•„๋ž˜์— ํ”„๋กœ์ ํŠธ๋ฅผ config ๋ผ๋Š” ์ด๋ฆ„์œผ๋กœ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์„ ์„ ํ˜ธํ•จ.
django-admin startproject config .
  • django project๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
python manage.py startapp api
  • api ๋ผ๋Š” ์ด๋ฆ„์˜ ์•ฑ์„ ์ƒ์„ฑํ•œ๋‹ค.

celery & redis install

python -m venv venv

source venv/bin/activate
  • ๊ฐ€์ƒํ™˜๊ฒฝ์„ ๋งŒ๋“ค๊ณ , ์‹คํ–‰ํ•œ๋‹ค.
pip install celery
  • django 3.1 ๋ถ€ํ„ฐ๋Š” ๊ธฐ๋ณธ์œผ๋กœ ์ง€์›๋œ๋‹ค๊ณ  ํ•œ๋‹ค.
pip install redis
  • redis๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์„ค์น˜ํ•ด์ค€๋‹ค.
    • ์™œ redis? โ†’ ์ง„์งœ ๋„ˆ๋ฌด ๊ฐ„๋‹จํ•œ ์‹คํ—˜์ด๋ผ์„œ ๋น„๊ต์  ์„ค์ •์ด ์‰ฌ์šด redis ์„ ํƒ

config/settings.py

INSTALLED_APPS = [

		...

		"api",
]

# Redis and Celery settings
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'

CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
  • ์ƒ์„ฑํ•œ ์•ฑ์„ ์ถ”๊ฐ€ํ•ด์ฃผ๊ณ 
  • redis ์„ธํŒ…์„ ์œ„ํ•ด ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

config/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

app.conf.beat_schedule = {
    'increment-every-second': {
        'task': 'api.tasks.increment_number',
        'schedule': 1.0,  # 1 second
    },
}
  • os.environ.setdefault('DJANGO_SETTINGS_MODULE', '{PROJECT_NAME}.settings'): ํ™˜๊ฒฝ๋ณ€์ˆ˜ ์„ธํŒ…์ด๋‹ค. โ†’ {PROJECT_NAME} ์—๋Š” ํ”„๋กœ์ ํŠธ์˜ ์ด๋ฆ„์ด ๋“ค์–ด๊ฐ€๋ฉด ๋œ๋‹ค. (๋‚œ config๊ฐ€ ๋“ค์–ด๊ฐ)
  • app = Celery('{PROJECT_NAME}'): config ๋ผ๋Š” ์ด๋ฆ„์œผ๋กœ Celery ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
  • ๊ทธ ์•„๋ž˜๋Š” ์…€๋Ÿฌ๋ฆฌ ๊ธฐ๋ณธ ์„ธํŒ…์ด๋‹ค.
  • app.conf.beat_schedule: ์…€๋Ÿฌ๋ฆฌ ๋น„ํŠธ ์„ค์ •์ด๋‹ค. ์‹ค์ œ ์Šค์ผ€์ค„๋ง์„ ๋Œ๋ฆด ์‹œ๊ฐ„์˜ ๊ธฐ์ค€์„ ์„ค์ •ํ•œ๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” 1์ดˆ.

api/tasks.py

import redis
from celery import shared_task
from django.conf import settings

# Redis์— ์—ฐ๊ฒฐ
r = redis.Redis.from_url(settings.REDIS_URL)

@shared_task
def increment_number():
    # ํ˜„์žฌ ๊ฐ’์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. ๊ฐ’์ด ์—†์œผ๋ฉด 0์œผ๋กœ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
    current_value = r.get('current_number')
    if current_value is None:
        current_value = 0
    else:
        current_value = int(current_value)

    # ๊ฐ’์„ 1์”ฉ ์ฆ๊ฐ€์‹œํ‚ต๋‹ˆ๋‹ค.
    current_value += 1

    # Redis์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
    r.set('current_number', current_value)

    return current_value
  • r = redis.Redis.from_url(settings.REDIS_URL): redis์— ์—ฐ๊ฒฐํ•˜๋Š” ์„ค์ •์ด๋‹ค.
  • @shared_task: Celery์˜ ์ž‘์—…์ž„์„ ๋‚˜ํƒ€๋‚ธ๋‹ค. ์ด ์ž‘์—…์€ Worker๋กœ ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋‹ค.
    • ์•„๋ž˜ ํ•จ์ˆ˜๋Š” task์˜ ์ด๋ฆ„์ด๊ฒ ์ง€
  • redis์— current_number๋ผ๋Š” ์ปฌ๋Ÿผ ๊ฐ’์„ ๊ฐ€์ ธ์˜จ๋‹ค.
  • ๋งŒ์•ฝ ์—†์œผ๋ฉด 0์œผ๋กœ ์ดˆ๊ธฐํ™”ํ•œ๋‹ค.
  • ๋ณธ๊ฒฉ์ ์ธ task ์‹œ์ž‘์œผ๋กœ, current_value๋ฅผ 1์”ฉ ์ฆ๊ฐ€ํ•œ๋‹ค.
  • ์ฆ๊ฐ€ํ•œ ๊ฐ’์„ r.set("current_number", current_value) ๋ฅผ ํ†ตํ•ด redis์— ์ €์žฅํ•œ๋‹ค.
  • ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.

api/views.py

from django.http import JsonResponse
import redis
from django.conf import settings

r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)

def show_hello(request):
    current_value = r.get('counter')
    if current_value is None:
        current_value = 0
    else:
        current_value = int(current_value)
    return JsonResponse({'counter': current_value})
  • r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB): redis ์—ฐ๊ฒฐ์„ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.
    • ์•„๊นŒ๋ณด๋‹ค ๊ธธ์–ด์กŒ๋Š”๋ฐ, ๊ฒฐ๊ตญ ๋˜‘๊ฐ™์€๊ฑฐ์ž„. settings.py๋ฅผ ํ™•์ธํ•ด๋ณด์ž.
  • current_value = r.get('counter'): redis์—์„œ counter๋ผ๋Š” ํ‚ค์˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜จ๋‹ค.
    • ์—ฌ๊ธฐ์„œ ์•Œ ์ˆ˜ ์žˆ๋Š”๊ฒŒ, tasks.py์—์„œ redis์— ๊ฐ’์„ ์ €์žฅํ•  ๋•Œ, {counter : current_number} ๋กœ ์ €์žฅํ–ˆ๊ฒ ๊ตฌ๋‚˜ ์‹ถ์€๊ฑฐ์ž„.
  • current_value ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๊ณ , ์—†์œผ๋ฉด 0์œผ๋กœ ์ดˆ๊ธฐํ™”, ์žˆ์œผ๋ฉด int๋กœ ํ˜•๋ณ€ํ™˜ ํ›„ JSON ์‘๋‹ต์„ ๋‚ด๋ณด๋‚ธ๋‹ค.

์‹คํ–‰ํ•˜๊ธฐ

์ผ๋‹จ, ๊ต‰์žฅํžˆ ๋งŽ์€ ํ„ฐ๋ฏธ๋„์„ ์‚ฌ์šฉํ–ˆ๋‹ค..

๋ชจ๋“  ๋ช…๋ น์€ ๋ฃจํŠธ ๋””๋ ‰ํ† ๋ฆฌ(manage.py๊ฐ€ ์กด์žฌํ•˜๋Š” ์œ„์น˜) ์—์„œ ์ง„ํ–‰ํ•œ๋‹ค.

1. redis ์‹คํ–‰ํ•˜๊ธฐ

redis-server
  • ๋งŒ์•ฝ ํฌํŠธ๊ฐ€ ์‚ฌ์šฉ์ค‘์ด๋ฉด?
    • lsof -i :6379 โ†’ sudo kill -9 [PID]

2. redis cli ์‹คํ–‰ํ•˜๊ธฐ

redis-cli
  • ์ €์žฅ๋œ ๊ฐ’ ๋ณด๊ณ ์‹ถ์œผ๋ฉด?
    • GET current_number

3. Celery worker ์‹คํ–‰ํ•˜๊ธฐ

celery -A config worker --loglevel=info

4. Celery Beat ์‹คํ–‰ํ•˜๊ธฐ

celery -A config beat --loglevel=info

5. runserver

python manage.py runserver

6. api ์ ‘์†ํ•ด์„œ ํ™•์ธ

๋‚˜๋Š” url์„ http://127.0.0.1:8000/api/show_hello/ ๋กœ ์„ค์ •ํ–ˆ๋‹ค.

๊ถ๊ธˆํ•œ ๋‚ด์šฉ

api/[tasks.py](http://tasks.py)์—์„œ current_value๋ฅผ return ํ•˜๋Š”๋ฐ, api/views.py ์—์„œ redis๋ฅผ ์—ฐ๊ฒฐํ•˜๊ณ , ๊ฑฐ๊ธฐ์„œ ๊ฐ’์„ get ํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ, tasks.py์˜ ๊ฐ’์„ ๋ถˆ๋Ÿฌ์˜ค๋ฉด ๋ ๊นŒ?

์‚ฌ์‹ค ๋‹น์—ฐํžˆ ๋ผ์•ผํ•˜๋Š”๊ฑฐ๋ผ๊ณ  ์ƒ๊ฐํ•˜๊ธด ํ–ˆ์ง€๋งŒ ์ผ๋‹จ ํ…Œ์ŠคํŠธ ์ง„ํ–‰ํ–ˆ์Œ.

api/views.py

from django.http import JsonResponse
import redis
from django.conf import settings

# r = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB)

# def show_hello(request):
#     current_value = r.get('counter')
#     if current_value is None:
#         current_value = 0
#     else:
#         current_value = int(current_value)
#     return JsonResponse({'counter': current_value})

from .tasks import increment_number

def show_hello(request):
    current_value = increment_number()
    if current_value is None:
        current_value = 0
    else:
        current_value = int(current_value)
    return JsonResponse({'counter': current_value})
  • ์ด์ „ ๋‚ด์šฉ์€ ๊ทธ๋ƒฅ ์ผ๋‹จ ์ฃผ์„์ฒ˜๋ฆฌํ•˜๊ณ , increment_number ๋ฅผ importํ•ด์„œ ์‹คํ–‰ํ–ˆ๋‹ค.
  • ์ •์ƒ์ ์œผ๋กœ ์ˆซ์ž๊ฐ€ ์˜ฌ๋ผ๊ฐ„๋‹ค.

์ฐธ๊ณ ์ž๋ฃŒ

First steps with Django โ€” Celery 5.4.0 documentation

ํƒœ๊ทธ: ,

์นดํ…Œ๊ณ ๋ฆฌ:

์—…๋ฐ์ดํŠธ:

๋Œ“๊ธ€๋‚จ๊ธฐ๊ธฐ