• Main Page
  • Related Pages
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

Session.h

Go to the documentation of this file.
00001 /* Copyright (C) 2007,2008 by Marc Maurer <uwog@uwog.net>
00002  *
00003  * This program is free software; you can redistribute it and/or
00004  * modify it under the terms of the GNU General Public License
00005  * as published by the Free Software Foundation; either version 2
00006  * of the License, or (at your option) any later version.
00007  *
00008  * This program is distributed in the hope that it will be useful,
00009  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00010  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011  * GNU General Public License for more details.
00012  *
00013  * You should have received a copy of the GNU General Public License
00014  * along with this program; if not, write to the Free Software
00015  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00016  * 02110-1301 USA.
00017  */
00018 
00019 #ifndef __SESSION__
00020 #define __SESSION__
00021 
00022 #include <boost/function.hpp>
00023 #include <boost/bind.hpp>
00024 #include <boost/enable_shared_from_this.hpp>
00025 #include <boost/noncopyable.hpp>
00026 #include <deque>
00027 #include <sync/xp/lock.h>
00028 #include <sync/xp/Synchronizer.h>
00029 
00030 // 64MB seems reasonable enough for now...
00031 #define MAX_PACKET_DATA_SIZE 64*1024*1024
00032 
00033 class TCPAccountHandler;
00034 
00035 class Session : public Synchronizer, public boost::noncopyable, public boost::enable_shared_from_this<Session>
00036 {
00037 public:
00038     Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef)
00039         : Synchronizer(boost::bind(&Session::_signal, this)),
00040         socket(io_service),
00041         queue_protector(),
00042         m_ef(ef)
00043     {
00044     }
00045 
00046     void connect(asio::ip::tcp::resolver::iterator& iterator)
00047     {
00048         socket.connect(*iterator);
00049     }
00050 
00051     // TODO: don't expose this
00052     asio::ip::tcp::socket& getSocket()
00053     {
00054         return socket;
00055     }
00056 
00057     std::string getRemoteAddress()
00058     {
00059         return socket.remote_endpoint().address().to_string();
00060     }
00061 
00062     unsigned short getRemotePort()
00063     {
00064         return socket.remote_endpoint().port();
00065     }
00066 
00067     void push(int size, char* data)
00068     {
00069         {
00070             abicollab::scoped_lock lock(queue_protector);
00071             incoming.push_back( std::pair<int, char*>(size, data) );
00072         }
00073         Synchronizer::signal();
00074     }
00075 
00076     /*
00077         Only called fron the abiword main loop
00078     */
00079     bool pop(int& size, char** data)
00080     {
00081         if (incoming.size() == 0)
00082             return false;
00083         {
00084             abicollab::scoped_lock lock(queue_protector);
00085             std::pair<int, char*> p = incoming.front();
00086             size = p.first;
00087             *data = p.second;
00088             incoming.pop_front();
00089         }
00090         return true;
00091     }
00092 
00093     void asyncReadHeader()
00094     {
00095         UT_DEBUGMSG(("Session::asyncReadHeader()\n"));
00096         packet_data = 0; // just to be sure we'll never touch a datablock we might have read before
00097         asio::async_read(socket,
00098             asio::buffer(&packet_size, 4),
00099             boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00100     }
00101 
00102     void asyncWrite(int size, const char* data)
00103     {
00104         // TODO: this is a race condition, mutex this
00105         bool writeInProgress = outgoing.size() > 0;
00106 
00107         // FIXME: inefficient memory copy
00108         char* store_data = reinterpret_cast<char*>(g_malloc(size));
00109         memcpy(store_data, data, size);
00110         outgoing.push_back(std::pair<int, char*>(size, store_data));
00111 
00112         if (!writeInProgress)
00113         {
00114             packet_size_write = size;
00115             packet_data_write = store_data;
00116 
00117             UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00118             asio::async_write(socket,
00119                 asio::buffer(&packet_size_write, 4),
00120                 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00121         }
00122     }
00123 
00124     /*
00125         Only called fron the abiword main loop
00126     */
00127     bool isConnected()
00128     {
00129         return socket.is_open();
00130     }
00131 
00132     void disconnect()
00133     {
00134         UT_DEBUGMSG(("Session::disconnect()\n"));
00135         if (socket.is_open())
00136         {
00137             asio::error_code ecs;
00138             socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs);
00139             if (ecs) {
00140                 UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str()));
00141             }
00142             asio::error_code ecc;
00143             socket.close(ecc);
00144             if (ecc) {
00145                 UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str()));
00146             }
00147         }
00148         UT_DEBUGMSG(("Socket closed, signalling mainloop\n"));
00149         signal();
00150     }
00151 
00152 private:
00153     void _signal()
00154     {
00155         UT_DEBUGMSG(("Session::_signal()\n"));
00156         m_ef(shared_from_this());
00157     }
00158 
00159     void asyncReadHeaderHandler(const asio::error_code& error,
00160         std::size_t bytes_transferred)
00161     {
00162         if (error)
00163         {
00164             UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str()));
00165             disconnect();
00166             return;
00167         }
00168 
00169         if (bytes_transferred != 4)
00170         {
00171             UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00172             disconnect(); // TODO: should not happen, handle this
00173             return;
00174         }
00175 
00176         if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE)
00177         {
00178             UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE));
00179             disconnect();
00180             return;
00181         }
00182 
00183         UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size));
00184         // now continue reading the packet data
00185         packet_data = reinterpret_cast<char*>(g_malloc(packet_size));
00186         asio::async_read(socket,
00187             asio::buffer(packet_data, packet_size),
00188             boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00189     }
00190 
00191     void asyncReadHandler(const asio::error_code& error,
00192         std::size_t bytes_transferred)
00193     {
00194         if (error)
00195         {
00196             UT_DEBUGMSG(("asyncReadHandler generic error\n"));
00197             disconnect();
00198             return;
00199         }
00200 
00201         if (bytes_transferred != std::size_t(packet_size))
00202         {
00203             UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00204             disconnect(); // TODO: should not happen, handle this
00205             return;
00206         }
00207 
00208         push(packet_size, packet_data);
00209         // start over for a new packet
00210         asyncReadHeader();
00211     }
00212 
00213     void asyncWriteHeaderHandler(const asio::error_code& ec)
00214     {
00215         UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n"));
00216         if (ec)
00217         {
00218             UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n"));
00219             disconnect();
00220             return;
00221         }
00222 
00223         // write the packet body
00224         asio::async_write(socket,
00225             asio::buffer(packet_data_write, packet_size_write),
00226             boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error));
00227     }
00228 
00229     void asyncWriteHandler(const asio::error_code& ec)
00230     {
00231         UT_DEBUGMSG(("Session::asyncWriteHandler()\n"));
00232         FREEP(packet_data_write);
00233         if (ec)
00234         {
00235             UT_DEBUGMSG(("asyncWriteHandler generic error\n"));
00236             disconnect();
00237             return;
00238         }
00239 
00240         // TODO: this is a race condition, mutex this
00241         outgoing.pop_front();
00242         if (outgoing.size() > 0)
00243         {
00244             std::pair<int, char*> p = outgoing.front();
00245             packet_size_write = p.first;
00246             packet_data_write = p.second;
00247 
00248             UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00249 
00250             asio::async_write(socket,
00251                 asio::buffer(&packet_size_write, 4),
00252                 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00253         }
00254     }
00255 
00256     asio::ip::tcp::socket                   socket;
00257     abicollab::mutex                        queue_protector;
00258     std::deque< std::pair<int, char*> >     incoming;
00259     std::deque< std::pair<int, char*> >     outgoing;
00260 
00261     int                                     packet_size; // state needed for async reads
00262     char*                                   packet_data; // state needed for async reads
00263 
00264     int                                     packet_size_write; // state needed for async writes
00265     char*                                   packet_data_write; // state needed for async writes
00266 
00267     boost::function<void (boost::shared_ptr<Session>)>      m_ef;
00268 };
00269 
00270 #endif /* __SESSION__ */

Generated on Sun Feb 14 2021 for AbiWord by  doxygen 1.7.1