강좌게시판

제목 [상급] CodeIgniter + MQ, Parallel Processing 강좌
글쓴이 tpae 작성시각 2012/12/28 13:59:17
댓글 : 13 추천 : 0 스크랩 : 1 조회수 : 34597   RSS
 안녕하세요.

오랜만입니다. ^^

한국어가 부족하여 맞춤법이 틀려도 이해 해주세요. ^^;

이번 강좌는 MQ (Messaging Queue)에 관련하여 글을 올립니다. 

프로그래머로써 이런 문제들을 격으셨을겁니다.
    1. 대량 이메일, 이미지 리사이징, 또는 큰 작업이 들어가는 시점에 유져가 기다리는 시간이 오래 걸린다.
    2. API랑 연결하는 시간이 오래걸려서 유져가 기다리는 시간이 오래 걸린다.
    3. 프로세싱이 오래 걸리는 시간, 즉 유져가 프로세싱이 끝날때 까지 기달리는 시간이 오래 걸리면, 문제가 있을수도 있다.

오래걸리는 요소들을 백그라운드에서 처리하게 되면, 유져가 기다리는 시간이 단축뒬수 있습니다. 

간출여서 말하자면, Parallel Processing, 즉 동시에 같이 작업이 진행될수 있도록 하는 방식입니다.

방법은 여러가지가 있는데요, 가장 효과적이고 안전하게 생각하는것은 MQ 라고 생각합니다.

MQ는, 오래걸릴것 같은 작업들을 Queue에 추가하여, Queue에서 백그라운드 일꾼들에게 작업을 순서대로 진행 하게 하는 방식입니다. 시스템에 부담을 덜기위해 최대 일꾼수를 지정할수 있으며, 가능한대로 스케일링, 최적화가 가능한 시스템 입니다.

MQ란, 여러종류들이 있지만, 오늘 강좌에서는 MongoDB로 직접 만들어서 사용하는게 좋을듯 합니다. 이유는 MongoDB에서 쓰기, 읽기가 충분히 빠르기 때문에 다른 솔수션들을 추가로 배우지 않아도 돼니까요.

인기있는 MQ들:
 - RabbitMQ  http://www.rabbitmq.com/
 - Amazon Simple Queue Service http://aws.amazon.com/ko/sqs/

필요 요소:
 - CodeIgniter 2.x (CLI 지원되는 버젼)
 - 리눅스 시스템
 - supervisord - http://supervisord.org/
 - MongoDB
 - MongoDB 라이브러리 - https://github.com/alexbilbie/codeigniter-mongodb-library/tree/v2
 - PHP MongoDB Extension, mongo.so

-------------------------

Part I : Models

일단 Queue model을 만듭시다. 이름은 queue_model.php 입니다.
<?php

class Queue_model extends CI_Model {

    // 콜랙션 이름
    private $queue_collection = 'queues';

    function __construct() {
        parent::__construct();
        // 라이브러리 불러오기
        $this->load->library('mongolib');
    }

    /**
     * 인덱스 만들기, 만들려면 한번 실행을 해주세요.
     */
    public function initialize_index() {
        $this->mongolib->add_index($this->queue_collection, array('locked' => 'ASC'));
    }

    /**
     * 작업 추가하기
     */
    public function add_queue($data) {
        $insert_data = array(
            'data' => $data,
            'locked' => null,
            'locked_at' => null,
            'date' => new MongoDate()
        );
        return $this->mongolib->insert($this->queue_collection, $insert_data);
    }

    /**
     * Queue 사이즈 불러오기, 작업필요한지 확인
     */
    public function count() {
        return $this->mongolib->where(array('locked' => null))->count($this->queue_collection);
    }

    /**
     * 작업 실행, 가장 최근작업 부터 불러와서 여기서 작업을 실행 한다.
     */
    public function pop_queue() {

        // find and modify
        // 같은 작업이 반복되지 않을려면, 잠기지 않은 아이템들부터 불러온다.
        $query = array('locked' => null);
        
        // 찻았으면 잠근다.
        $update = array('$set' => array('locked' => true, 'locked_at' => new MongoDate()));
        
        // 최근 날짜부터
        $sort = array('date' => 1);

        // 실행
        $data = $this->mongolib->find_and_modify($this->queue_collection, $query, null, $update);

        // 성공 했을경우, 아이템을 삭제하고 작업을 실행한다.
        if ($data['ok']) {
            $queue_item = $data['value'];
            $this->mongolib->where('_id', $queue_item['_id'])->delete($this->queue_collection);
            
            // 작업 실행.
            var_dump($queue_item['data']);
        }
    }
}
간단하게 설명하자면, 작업을 추가하고, 작업을 빼는 함수들입니다. 

*주의* 인덱스를 안만드시면 속도에 문제가 있습니다. 경험상 최대 100배정도 느립니다. 인덱스를 꼭 만드세요.

Part II: Controllers

다음은 백그라운드에서 돌릴 일꾼을 만드는 일 입니다. Controller에서 Model을 접근하여 일거리를 찻은 다음, 찻았으면 일을 하게 하는 방식입니다.

Controller 이름은 queue, queue.php 입니다.
<?php if (!defined('BASEPATH')) exit('No direct script access allowed');

class queue extends CI_Controller {

    protected $phone;

    function __construct() {
        parent::__construct();
        $this->load->model('queue_model');
    }
    
    /**
     * 대몬으로 실행시킬 일꾼.
     */
    public function daemon() {
        while (1) {
            $cur_count = $this->queue_model->count();
            if ($cur_count > 0) {
                // 작업이 있으면 일을 시작해라.
                $this->queue_model->pop_queue();
            } else {
                // 없으면 잠시 기달림.
                sleep(rand(3, 7));
            }
        }
    }
}

Part III: Supervisord

Supervisord를 설치 합니다.

Supervisord에서 mysite.conf 만듭니다. 일단, 일꾼은 3명으로 지정하겠습니다. (numprocs=3).
[program:mysite]
command = php /var/www/index.php queue daemon
process_name=%(program_name)s_mysite%(process_num)02d
numprocs=3
autorestart=true
autostart=true
만들고 나서 Supervisord 실행!

Part IV: Finish

일 추가 할려면, queue_model 불러오셔서 $this->queue_model->add_queue($data) 하시면 됩니다.

일 시작 할려면, pop_queue 함수 안에서 불로온 $data 변수를 가지고 일을 실행하시면 됩니다. 만약 대량 이메일 경우, $data에서 불러오는 이메일 리스트를, 여기서 돌리시면 됩니다. 이미지 프로세싱도 같은 형식입니다.

numprocs=3 지정이 되어 있다면, 한꺼번에 3명에 일꾼들이 동시에 작업하게 됩니다. 동시에 작업을 하게 되면 완성된 parallel processing을 느끼실수 있을겁니다.



수고하세요. ^^
태그 parallel processing,codeigniter,MQ
 다음글 codeigniter에 tank_auth 적용하기 (5)
 이전글 성능테스트툴 jMeter사용법 (11)

댓글

니삼 / 2012/12/28 14:25:36 / 추천 0
 좋은자료 감사합니다
변종원(웅파) / 2012/12/28 16:35:29 / 추천 0
좋은 정보 감사합니다. ^^
milosz / 2012/12/30 05:55:00 / 추천 0
정말 유용한 정보네요 ^^ 좋은 글 감사드립니다!
한대승(불의회상) / 2012/12/31 23:23:55 / 추천 0
좋은 정보 감사 합니다.

이 팁을 곧 사용하게 될거 같습니다. ^^
최용운 / 2013/01/03 15:10:09 / 추천 0
고급강좌는 올만이네요... 많은 도움이 되었습니다.
들국화 / 2013/01/08 14:36:44 / 추천 0
메일보낼때나 말씀하신데로 오래 걸리는 작업에는 좋을듯 하네요.
만약 프로세스 여러개 해서 두개이상의 프로세스가 잡을 점유해서 이중으로 메일이 간다거나 하는경우는 없었나요?
mongoDB는 안써 봤는데 read lock이 걸리지 않으면 다중으로 보낼수도 있을것 같아서.... ^^

혹시 sleep(rand(3, 7)); 이렇케 처리하신 이유가?
한대승(불의회상) / 2013/01/08 16:12:20 / 추천 0
들국화// sleep(rand(3,7)) 을 한 이유는 해야 할 작업이 없는데 루프를 돌면서 서버 리소스 잡아먹는 시간을 최소화 하기 위해 사용한것으로 생각됩니다.
들국화 / 2013/01/09 10:57:14 / 추천 0
불의회상// sleep 상태도 디비를 억세스 하지 않을테지만 돌고 있는 상태일테고 저는 왜 rand함수를 썼는지 궁금 하더라구요. 구지 sleep 값을 다르게 할 필요가 있을지...  설정값 찾아보지 대기하는 옵션도 있는것 같던데..
메일링에 도입해 볼까 생각중인데... 혹시나 특별한 팁이 있나 해서요. ^^
한대승(불의회상) / 2013/01/09 13:07:31 / 추천 0
들국화// 3~7초 지난후에 디비를 다시 연결하는 비용이 더 들거 같다는 생각이 드는군요.. ^^
tpae / 2013/01/11 10:45:35 / 추천 0
rand 함수를 쓴 이유는 2가지가 있습니다. 

만약에 일꾼이 3명일 경우, supervisord 시작하게 되면 동시에 시작한다는 점이죠. 동시에 시작하고 같은 시간에 sleep을 하게 되는것을 방시하기 위해서 입니다. 일꾼 3이서 싱크률을 없애기 위한 변수 입니다. 

while(1) 루프를 돌리고 있고, MongoDB에서 변경된값 이벤트를 추가 할수 없기 때문에, 항상 제접속을 해다 한다는점이죠. 제접속을 하는 상황에서 최대한 리소스 낭비를 줄일려면 싱크률을 없애는게 중요하겠죠.
우왕아항 / 2013/01/11 11:33:08 / 추천 0
tpae//

우와~ 제가 작년에 고민하다 접은 문제의 대안인듯 한 글을 발견하니 기쁘네요.
일단 supervisord 에 대해 잘 몰라서.. 여러가지 질문좀 드려도 될까요. ^^;


1. supervisord가 프로세스(or데몬의 쓰레드화)를 관리해주는 데몬같은 건가요?

2. 위의 코드는 제가 파악하기로는 멀티쓰레드(or 멀티프로세싱)를 supervisord라는 데몬에 의존하여 구현한 서버 프로그램 정도로 보이는데요. 혹시 틀렸을 경우 지적 부탁드립니다.


3. rand를 준 이유가 저도 궁금합니다.
supervisord 데몬을 시작하면 각각의 쓰레드(or 프로세스)가 동시에 시작하고 같은 시간에 sleep을 하게 되는것을 방지하기 위함이라고 설명해주셨는데요,

아마 동시간에 실행하면 동일한 데이터를 가져오고, 그 때문에 매번 다시 셀렉트해서 락이 걸렸는지를 판단하는 작업이 비효율적이라 보았기 때문에 하신 것으로 파악됩니다.
재접속이라는게 db컨넥트가 아닌, 제가 이해한 형태가 맞나요?


4. 만약 위의 이유가 맞다면,
이미 큐가 쌓여있는 상태에서 supervisord를 실행하면, 최초의 한번은 결국 같은 문제 (동시에 시작)가 발생할 것으로 보이는데요. 이부분도 맞는것인지..


답변 부탁드립니다. :)
UYEONG / 2013/01/31 00:09:22 / 추천 0
우와 ! 감사합니다.
kirrie / 2013/03/04 15:47:04 / 추천 0
gearman이라는 오픈소스 프로젝트도 있습니다. 좋은 팁 잘 봤습니다.