APScheduler란?
APScheduler는 Advanced Python Scheduler의 줄임말로 스케줄을 예약하여 실행할 수 있도록 돕는 파이썬 라이브러리이다. 특정 작업에 주기성을 스케쥴링하는 Airflow등이 있지만, 직관적이며 기존 어플리케이션에서 사용 가능한 큰 장점이있다. 기존 스케줄러 라이브러리와 가장 큰 차이점이자 장점은 while문과 같은 무한루프를 돌리지 않아도 코드를 비동기적으로 실행시킬 수 있다. 여기서 비 동기적이란 코드의 실행 순서에 맞춰 진행하는 것이 아닌 동시 다발적으로 실행하는 것을 말한다. 예를 들면, p2p 사이트에서 다운 받을때 하나씩 받는 것을 동기적, 다수를 한번에 받는것을 비동기적이라 할 수 있다.
본 포스트에서는 본인의 경험을 바탕으로 주요 내용만을 이야기하겠다. 자세한 내용은 apscheduler 공식 user guide에서 확인 가능하다.
✔ 스케줄러 종류
- BlockingScheduler : 하나의 프로세스에서만 동작하며 스케쥴링 된 작업이 실행될때 다른 작업은 일시중단된다.
- BackgroundScheduler : 백그라운드에서 동작하며 스케쥴링 된 작업이 실행될때 다른 작업도 할 수 있다.
✔ 실행방식
- Cron : 주기의 시간(time)에 실행. Date 및 Time 지정이 가능하다
- Interval : 일정 주기로 실행
- Date : 특정 날짜로 실행(사실상 Cron과 동일 함)
APScheduler 라이브러리 설치는 아래와 같이 1번줄 명령어로 수행하며, 설치 실패 시 2번줄과 같이 강제로 설치 가능하다.
1 2 | pip install APScheduler pip install --user APScheduler |
Blocking Scheduler
Blocking Scheduler의 사용 방법은 다음과 같다. 주의할 점은 sched.start()다음은 실행되지 않는다. Scheduler.job 함수의 자세한 내용은 참고문헌에서 확인 가능하다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | import time from apscheduler.schedulers.blocking import BlockingScheduler # 스케줄링할 작업을 함수로 정의 def job1(): return print(f'job1 : {time.strftime("%H:%M:%S")}') def job2(): return print(f'job2 : {time.strftime("%H:%M:%S")}') # 스케쥴러 sched = BlockingScheduler(timezone='Asia/Seoul') # 매 5초 주기로 실행 - interval sched.add_job(job1, 'interval', seconds=5, id='test_1') # 매분에 실행 - cron sched.add_job(job2, 'cron', second='0', id="test_2") print('sched before start') sched.start() print('sched after start') # Blocking 이기 때문에 실행 안됨 |
Background Scheduler
필자는 여러 공공 데이터의 API를 주기적으로 호출하기 위해 Background Scheduler를 유용하게 사용하였다. 자세한 Background Scheduler의 사용 방법은 다음과 같으며 Blocking Scheduler와 비교하기 위해 기존 코드를 수정하였다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import time # from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.background import BackgroundScheduler # 스케줄링할 작업을 함수로 정의 def job1(): return print(f'job1 : {time.strftime("%H:%M:%S")}') def job2(): return print(f'job2 : {time.strftime("%H:%M:%S")}') # 스케쥴러 # sched = BlockingScheduler() sched = BackgroundScheduler(timezone='Asia/Seoul') # 매 10초 주기로 실행 - interval sched.add_job(job1, 'interval', seconds=10, id='test_1') # 매분에 실행 - cron sched.add_job(job2, 'cron', second='0', id="test_2") print('sched before start') sched.start() print('sched after start') # 살행 됨 while True: time.sleep(1) |
예상치 못한 자원 부족 에러
(1) misfire_grace_time
기존 Scheduler 라이브러리로 실행했을 경우 에러 발생시 종료되는 문제로 골머리를 쓰다 신처럼 강림한 APScheduler를 만난 후 내 삶은 편해졌다.
CPU 사용량이 많거나 메모리 부족으로 간혹 사용 가능한 Thread가 부족하여 에러가 발생하지만 시스템 기본 값에 의해 다시 실행된다. (기존 Scheduler 라이브러리와는 가장 큰 차이점인 것 같다)
Thread 부족 에러는 보통 add_job내 misfire_grace_time* 설정을 안했을 경우 종종 다음과 같은 에러가 발생하며, Job내 파라미터 설정시 사용자 정의 시간(또는 지연시간) 이후 Job이 정상 실행될 수 있도록 한다.
(*misfire_grace_time: 누락된 작업 실행 가능한 시간, 기본 값: undefined object)
Run time of job "job1 (trigger: interval[0:00:10], next run at: 2024-02-12 00:45:33 KST)" was missed by 0:00:01.233551
Thread의 자세한 개념은 참고문헌에서 확인 가능하다.
(2) max_instances
또 열심히 돌리보면 아래와 같은 경고도 발견할 수 있다.
WARNING:apscheduler.scheduler:Execution of job "job1 (trigger: interval[0:00:10], next run at: 2024-02-12 11:50:42 UTC)" skipped: maximum number of running instances reached (1)
인스턴스 오류가 발생하여 해당 Job을 수행할 수 없어서 다음 시간에 다시 실행 한다는 건데, 이러한 오류도 보기 싫으면 간단한 매개변수 추가로 문제를 해결할 수 있다.
1 | scheduler.add_job(job, 'interval', seconds=10, max_instances=2) |

댓글
댓글 쓰기