/**
 * 消息列队服务
 * @example
 * -----------------------------------Create----------------------------------------
 * $array = array('a','b','c','d');
 * $this->load->library('amqp_service');
 * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
 * $this->amqp_service->createMessageQueue($array);
 * -----------------------------------End-------------------------------------------
 *
 * -----------------------------------Get-------------------------------------------
 * $this->load->library('amqp_service');
 * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
 * $message_queue = $this->amqp_service->getMessageQueue();
 * var_dump($message_queue)
 * -----------------------------------End-------------------------------------------
 */
 
class Amqp_service extends Base_service{
     
    public $conn;
    public $exchange;
    public $queue;
    public $router;
     
    function __construct(){
        parent:: __construct();
         
        //获取系统配置
        $this->load->config('app_config', TRUE);
        $app_config = $this->config->item('app_config');
 
        $this->connect($app_config['amqp']);
    }
     
    /**
     *
     * 尝试连接Amqp服务
     */
    private function connect($amqp_args)
    {  
        $this->conn = new AMQPConnection($amqp_args);
        $this->conn->connect();
         
        if (!$this->conn->isConnected())
        {
            throw new Exception('Cannot connect to the broker.');
        }
    }
     
    /**
     *
     * 设定消息列队保存方式
     * @param String $exchange_name 交换机名
     * @param String $queue_name    消息列队名
     * @param String $router_name   路由名
     */
    public function setSaveType($exchange_name, $queue_name, $router_name)
    {
        $this->exchange = $exchange_name;
        $this->queue    = $queue_name;
        $this->router   = $router_name;
    }
     
    /**
     *
     * 创建消息列队
     * @param Array $array
     */
    public function createMessageQueue($array)
    {
        //创建交换机
        $channel = new AMQPChannel($this->conn);
        $ex      = new AMQPExchange($channel);
         
        //交换机名
        $ex->setName($this->exchange);
        $ex->setType(AMQP_EX_TYPE_DIRECT);
        $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        $ex->declare();
         
        //创建消息列队
        $q = new AMQPQueue($channel);
         
        //队列名
        $q->setName($this->queue);
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        $q->declare();
         
        //绑定交换机与队列,并指定路由键
        $q->bind($this->exchange, $this->router);
         
        //消息发布
        $channel->startTransaction();
        $message = json_encode($array);
        $ex->publish($message, $this->router);
        $channel->commitTransaction();
         
        //$this->conn->disconnect();
    }
     
    /**
     *
     * 获取消息列队
     */
    public function getMessageQueue()
    {
        try
        {
            //设置queue名称,使用exchange,绑定routingkey
            $channel = new AMQPChannel($this->conn);
            $q       = new AMQPQueue($channel);
             
            $q->setName($this->queue);
            $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
            $q->declare();
            $q->bind($this->exchange, $this->router);  
             
            //消息获取
            $messages = $q->get(AMQP_AUTOACK) ;
             
            $arr = array();
            if ($messages){
                $arr = json_decode($messages->getBody(), true );
            }
        }catch (Exception $e){
            throw new Exception($e->getMessage());
        }
        //$this->conn->disconnect();
         
        return $arr;
    }
     
    /*
    public function getAllMessageQueue()
    {
        //设置queue名称,使用exchange,绑定routingkey
        $channel = new AMQPChannel($this->conn);
        $q       = new AMQPQueue($channel);
         
        $q->setName($this->queue);
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        $q->declare();
        $q->bind($this->exchange, $this->router);  
        $this->conn->disconnect();
         
        //阻塞模式获取消息列队
        while(True){
            $q->consume('processMessage');  
            //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 
        }
    }
    */
     
    public function __destruct()
    {
        $this->conn->disconnect();
    }
}
 
 
/**
 * 消费回调函数
 * 处理消息
 * @param Object $envelope
 * @param Object $queue
 */
/*
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg . '<br />';
     
    //手动发送ACK应答
    $queue->ack($envelope->getDeliveryTag());
}
*/