connection.cc
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <astxx/manager/connection.h>
00022 #include <astxx/manager/message.h>
00023 #include <astxx/manager/error.h>
00024 #include <boost/asio.hpp>
00025 #include <boost/lexical_cast.hpp>
00026 #include <boost/bind.hpp>
00027 #include <boost/ref.hpp>
00028
00029 namespace astxx {
00030 namespace manager {
00031 using boost::asio::ip::tcp;
00032 using boost::lexical_cast;
00033
00042 class response_waiter {
00043 public:
00047 response_waiter(manager::connection& connection)
00048 : connection(connection), response("") {
00049 }
00050
00054 void operator()(message::response r) {
00055 response = r;
00056 }
00057
00063 message::response wait() const {
00064 while (response == "") {
00065 connection.wait_response();
00066 connection.process_responses();
00067 }
00068 return response;
00069 }
00070 private:
00071 manager::connection& connection;
00072 message::response response;
00073 };
00074
00081 connection::connection(const std::string& host, unsigned short port) : socket(io_service) {
00082 connect(host, port);
00083 }
00084
00099 void connection::connect(const std::string& host, unsigned short port) {
00100
00101
00102 if (not host.empty()) {
00103 m_host = host;
00104 if (port) {
00105 m_port = lexical_cast<std::string>(port);
00106 }
00107 }
00108
00109 tcp::resolver resolver(io_service);
00110 tcp::resolver::query query(m_host, lexical_cast<std::string>(m_port));
00111
00112 tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
00113 tcp::resolver::iterator end;
00114
00115
00116
00117 boost::system::error_code error = boost::asio::error::host_not_found;
00118 while (error && endpoint_iterator != end) {
00119 socket.close();
00120 socket.connect(*endpoint_iterator++, error);
00121 }
00122
00123 if (error)
00124 throw boost::system::system_error(error);
00125
00126 std::string greeting_line = read_line();
00127
00128
00129 std::string::size_type i = greeting_line.find_last_of('/');
00130 if (i != std::string::npos) {
00131 m_name.assign(greeting_line, 0, i);
00132 if (++i != std::string::npos) {
00133 m_version.assign(greeting_line, i, std::string::npos);
00134 }
00135 }
00136 }
00137
00142 void connection::disconnect() {
00143 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
00144 socket.close();
00145 }
00146
00150 bool connection::is_connected() const {
00151 return socket.is_open();
00152 }
00153
00180 message::response connection::send_action(const basic_action& command) {
00181 response_waiter rw(*this);
00182 send_action_async(command, boost::ref(rw));
00183 return rw.wait();
00184 }
00185
00212 void connection::send_action_async(const basic_action& command, response_handler_t handler) {
00213
00214 message::action action = command.action();
00215 if (not command.action_id().empty()) {
00216 action["ActionID"] = command.action_id();
00217 }
00218
00219 boost::asio::write(socket, boost::asio::buffer(action.format()));
00220 response_handlers.push(handler);
00221 }
00222
00228 message::response connection::operator()(const basic_action& command) {
00229 return send_action(command);
00230 }
00231
00238 void connection::operator()(const basic_action& command, response_handler_t handler) {
00239 send_action_async(command, handler);
00240 }
00241
00249 std::pair<std::string, std::string> connection::parse_header(const std::string& header) {
00250 std::string key;
00251 std::string value;
00252
00253 if (header.empty()) {
00254 throw manager::empty_header();
00255 }
00256
00257 std::string::size_type i = header.find_first_of(':');
00258 if (i == std::string::npos) {
00259 throw manager::parse_error("missing ':' in header: " + header);
00260 }
00261
00262 key.assign(header, 0, i++);
00263
00264 if (i != std::string::npos) {
00265 if (header[i] == ' ') {
00266 ++i;
00267 }
00268 if (i != std::string::npos) {
00269 value.assign(header, i, std::string::npos);
00270 }
00271 }
00272 return std::make_pair(key, value);
00273 }
00274
00279 std::string connection::read_line() {
00280 std::string line;
00281
00282
00283 for (char c = 0, cc = 0; !(c == '\n' && cc == '\r'); line += c) {
00284 cc = c;
00285 boost::asio::read(socket, boost::asio::buffer(&c, 1), boost::asio::transfer_at_least(1));
00286 }
00287
00288
00289 std::string::size_type rn = line.find("\r\n");
00290 if (rn != std::string::npos) {
00291 line.erase(rn);
00292 }
00293
00294 return line;
00295 }
00296
00304 void connection::read_message() {
00305 std::pair<std::string, std::string> pair = parse_header(read_line());
00306 if (pair.first == "Event") {
00307 message::event event(pair.second);
00308 for (;;) {
00309 std::string line = read_line();
00310
00311
00312 if (line.empty()) {
00313 break;
00314 }
00315 event.insert(parse_header(line));
00316 }
00317
00318 events.push(event);
00319 }
00320 else if (pair.first == "Response") {
00321 message::response response(pair.second);
00322 for (;;) {
00323 std::string line = read_line();
00324
00325
00326 if (line.empty()) {
00327 break;
00328 }
00329
00330
00331 if (pair.second == "Follows") {
00332 std::string::size_type ec = line.find("--END COMMAND--");
00333
00334
00335
00336 if (ec != std::string::npos) {
00337 line.erase(ec);
00338 response.data = line;
00339 }
00340 else {
00341 response.insert(parse_header(line));
00342 }
00343
00344 } else {
00345 response.insert(parse_header(line));
00346 }
00347 }
00348
00349 responses.push(response);
00350 }
00351 else {
00352 throw manager::unknown_message(pair.first);
00353 }
00354 }
00355
00362 message::response connection::read_response() {
00363 wait_response();
00364 message::response response = responses.front();
00365 responses.pop();
00366 return response;
00367 }
00368
00375 message::event connection::read_event() {
00376 wait_event();
00377 message::event event = events.front();
00378 events.pop();
00379 return event;
00380 }
00381
00386 void connection::process_events() {
00387
00388
00389
00390
00391
00392
00393 while (not events.empty()) {
00394 message::event e = events.front();
00395 events.pop();
00396
00397 event_handlers_t::iterator i = event_handlers.find(e.main_header());
00398 if (i != event_handlers.end()) {
00399 (*i->second)(e);
00400 }
00401
00402
00403 i = event_handlers.find("");
00404 if (i != event_handlers.end()) {
00405 (*i->second)(e);
00406 }
00407 }
00408 }
00409
00415 void connection::wait_event() {
00416 while (events.empty()) {
00417 read_message();
00418 }
00419 }
00420
00427 void connection::process_responses() {
00428
00429
00430
00431
00432
00433 while (not response_handlers.empty() and not responses.empty()) {
00434 response_handler_t f = response_handlers.front();
00435 response_handlers.pop();
00436
00437 f(read_response());
00438 }
00439 }
00440
00446 void connection::wait_response() {
00447 while (responses.empty()) {
00448 read_message();
00449 }
00450 }
00451
00457 void connection::pump_messages() {
00458 while (socket.available()) {
00459 read_message();
00460 }
00461 }
00462
00487 boost::signals::connection connection::register_event(const std::string& e, boost::function<void (message::event)> f) {
00488 std::pair<event_handlers_t::iterator, bool> ii = event_handlers.insert(std::make_pair(e, boost::shared_ptr<boost::signal<void (message::event)> >(new boost::signal<void (message::event)>())));
00489 return ii.first->second->connect(f);
00490 }
00491
00492 }
00493 }
00494