connection.cc

00001 /* vim: set et sw=3 tw=0 fo=croqlaw cino=t0:
00002  * 
00003  * Astxx, the Asterisk C++ API and Utility Library.
00004  * Copyright (C) 2005-2007  Matthew A. Nicholson
00005  * Copyright (C) 2005-2007  Digium, Inc.
00006  * 
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License version 2.1 as published by the Free Software Foundation.
00010  * 
00011  * This library is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014  * Lesser General Public License for more details.
00015  * 
00016  * You should have received a copy of the GNU Lesser General Public
00017  * License along with this library; if not, write to the Free Software
00018  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00019  */
00020 
00021 #include <astxx/manager/connection.h>
00022 #include <astxx/manager/message.h>
00023 #include <astxx/manager/error.h>
00024 #include <boost/asio.hpp>
00025 #include <boost/lexical_cast.hpp>
00026 #include <boost/bind.hpp>
00027 #include <boost/ref.hpp>
00028 
00029 namespace astxx {
00030    namespace manager {
00031       using boost::asio::ip::tcp;
00032       using boost::lexical_cast;
00033 
00042       class response_waiter {
00043          public:
00047             response_waiter(manager::connection& connection)
00048                : connection(connection), response("") {
00049             }
00050 
00054             void operator()(message::response r) {
00055                response = r;
00056             }
00057 
00063             message::response wait() const {
00064                while (response == "") {
00065                   connection.wait_response();
00066                   connection.process_responses();
00067                }
00068                return response;
00069             }
00070          private:
00071             manager::connection& connection;
00072             message::response response;
00073       };
00074 
00081       connection::connection(const std::string& host, unsigned short port) : socket(io_service) {
00082          connect(host, port);
00083       }
00084 
00099       void connection::connect(const std::string& host, unsigned short port) {
00100          // update the internal host and port if they are not the default 
00101          // values
00102          if (not host.empty()) {
00103             m_host = host;
00104             if (port) {
00105                m_port = lexical_cast<std::string>(port);
00106             }
00107          }
00108 
00109          tcp::resolver resolver(io_service);
00110          tcp::resolver::query query(m_host, lexical_cast<std::string>(m_port));
00111 
00112          tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
00113          tcp::resolver::iterator end;
00114 
00115          // here we loop through all possible endpoints trying to connect to 
00116          // each as long as we failed to connect to the previous one
00117          boost::system::error_code error = boost::asio::error::host_not_found;
00118          while (error && endpoint_iterator != end) {
00119             socket.close();
00120             socket.connect(*endpoint_iterator++, error);
00121          }
00122 
00123          if (error)
00124             throw boost::system::system_error(error);
00125 
00126          std::string greeting_line = read_line();
00127          
00128          // split the version and the name
00129          std::string::size_type i = greeting_line.find_last_of('/');
00130          if (i != std::string::npos) {
00131             m_name.assign(greeting_line, 0, i);
00132             if (++i != std::string::npos) {
00133                m_version.assign(greeting_line, i, std::string::npos);
00134             }
00135          }
00136       }
00137 
00142       void connection::disconnect() {
00143          socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
00144          socket.close();
00145       }
00146 
00150       bool connection::is_connected() const {
00151          return socket.is_open();
00152       }
00153 
00180       message::response connection::send_action(const basic_action& command) {
00181          response_waiter rw(*this);
00182          send_action_async(command, boost::ref(rw));
00183          return rw.wait();
00184       }
00185 
00212       void connection::send_action_async(const basic_action& command, response_handler_t handler) {
00213 
00214          message::action action = command.action();
00215          if (not command.action_id().empty()) {
00216             action["ActionID"] = command.action_id();
00217          }
00218 
00219          boost::asio::write(socket, boost::asio::buffer(action.format()));
00220          response_handlers.push(handler);
00221       }
00222       
00228       message::response connection::operator()(const basic_action& command) {
00229          return send_action(command);
00230       }
00231       
00238       void connection::operator()(const basic_action& command, response_handler_t handler) {
00239          send_action_async(command, handler);
00240       }
00241       
00249       std::pair<std::string, std::string> connection::parse_header(const std::string& header) {
00250          std::string key;
00251          std::string value;
00252 
00253          if (header.empty()) {
00254             throw manager::empty_header();
00255          }
00256 
00257          std::string::size_type i = header.find_first_of(':');
00258          if (i == std::string::npos) {
00259             throw manager::parse_error("missing ':' in header: " + header);
00260          }
00261 
00262          key.assign(header, 0, i++);
00263          
00264          if (i != std::string::npos) {
00265             if (header[i] == ' ') {
00266                ++i;
00267             }
00268             if (i != std::string::npos) {
00269                value.assign(header, i, std::string::npos);
00270             }
00271          }
00272          return std::make_pair(key, value);
00273       }
00274 
00279       std::string connection::read_line() {
00280          std::string line;
00281 
00282          // loop reading one character at a time until we find a '\n'
00283          for (char c = 0, cc = 0; !(c == '\n' && cc == '\r'); line += c) {
00284             cc = c;
00285             boost::asio::read(socket, boost::asio::buffer(&c, 1), boost::asio::transfer_at_least(1));
00286          }
00287 
00288          // strip the \r\n
00289          std::string::size_type rn = line.find("\r\n");
00290          if (rn != std::string::npos) {
00291             line.erase(rn);
00292          }
00293 
00294          return line;
00295       }
00296 
00304       void connection::read_message() {
00305          std::pair<std::string, std::string> pair = parse_header(read_line());
00306          if (pair.first == "Event") {
00307             message::event event(pair.second);
00308             for (;;) {
00309                std::string line = read_line();
00310 
00311                // stop if we get a blank line
00312                if (line.empty()) {
00313                   break;
00314                }
00315                event.insert(parse_header(line));
00316             }
00317             // put the event in the queue
00318             events.push(event);
00319          }
00320          else if (pair.first == "Response") {
00321             message::response response(pair.second);
00322             for (;;) {
00323                std::string line = read_line();
00324 
00325                // stop if we get a blank line
00326                if (line.empty()) {
00327                   break;
00328                }
00329 
00330                // check for '--END COMMAND--' if necessary
00331                if (pair.second == "Follows") {
00332                   std::string::size_type ec = line.find("--END COMMAND--");
00333 
00334                   // strip the '--END COMMAND--', if found, otherwise this is a 
00335                   // normal header
00336                   if (ec != std::string::npos) {
00337                      line.erase(ec);
00338                      response.data = line;
00339                   }
00340                   else {
00341                      response.insert(parse_header(line));
00342                   }
00343 
00344                } else {
00345                   response.insert(parse_header(line));
00346                }
00347             }
00348             // put the resposne in the queue
00349             responses.push(response);
00350          }
00351          else {
00352             throw manager::unknown_message(pair.first);
00353          }
00354       }
00355       
00362       message::response connection::read_response() {
00363          wait_response();
00364          message::response response = responses.front();
00365          responses.pop();
00366          return response;
00367       }
00368       
00375       message::event connection::read_event() {
00376          wait_event();
00377          message::event event = events.front();
00378          events.pop();
00379          return event;
00380       }
00381 
00386       void connection::process_events() {
00387          /* Here we pop events off of the queue one by one because it is 
00388           * possible for our handlers to add events to the queue (by executing 
00389           * an action which adds events while waiting for a response).  By 
00390           * popping events off one by one, we avoid worring about our iterators 
00391           * becomming invalid.
00392           */
00393          while (not events.empty()) {
00394             message::event e = events.front();
00395             events.pop();
00396 
00397             event_handlers_t::iterator i = event_handlers.find(e.main_header());
00398             if (i != event_handlers.end()) {
00399                (*i->second)(e);
00400             }
00401 
00402             // execute the catch all
00403             i = event_handlers.find("");
00404             if (i != event_handlers.end()) {
00405                (*i->second)(e);
00406             }
00407          }
00408       }
00409 
00415       void connection::wait_event() {
00416          while (events.empty()) {
00417             read_message();
00418          }
00419       }
00420 
00427       void connection::process_responses() {
00428          /* Here we pop responses off of the queue one by one because it is 
00429           * possible for our handlers to add responses to the queue (by 
00430           * executing an action synchronously).  By popping events off one by 
00431           * one, we avoid worring about our iterators becomming invalid.
00432           */
00433          while (not response_handlers.empty() and not responses.empty()) {
00434             response_handler_t f = response_handlers.front();
00435             response_handlers.pop();
00436 
00437             f(read_response());
00438          }
00439       }
00440 
00446       void connection::wait_response() {
00447          while (responses.empty()) {
00448             read_message();
00449          }
00450       }
00451       
00457       void connection::pump_messages() {
00458          while (socket.available()) {
00459             read_message();
00460          }
00461       }
00462 
00487       boost::signals::connection connection::register_event(const std::string& e, boost::function<void (message::event)> f) {
00488          std::pair<event_handlers_t::iterator, bool> ii = event_handlers.insert(std::make_pair(e, boost::shared_ptr<boost::signal<void (message::event)> >(new boost::signal<void (message::event)>())));
00489          return ii.first->second->connect(f);
00490       }
00491 
00492    }
00493 }
00494 

Generated on Thu Jul 3 01:32:42 2008 for Astxx by  doxygen 1.5.6