00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef __SESSION__
00020 #define __SESSION__
00021
00022 #include <boost/function.hpp>
00023 #include <boost/bind.hpp>
00024 #include <boost/enable_shared_from_this.hpp>
00025 #include <deque>
00026 #include <sync/xp/lock.h>
00027 #include <sync/xp/Synchronizer.h>
00028
00029
00030 #define MAX_PACKET_DATA_SIZE 64*1024*1024
00031
00032 class TCPAccountHandler;
00033
00034 class Session : public Synchronizer, public boost::noncopyable, public boost::enable_shared_from_this<Session>
00035 {
00036 public:
00037 Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef)
00038 : Synchronizer(boost::bind(&Session::_signal, this)),
00039 socket(io_service),
00040 queue_protector(),
00041 m_ef(ef)
00042 {
00043 }
00044
00045 void connect(asio::ip::tcp::resolver::iterator& iterator)
00046 {
00047 socket.connect(*iterator);
00048 }
00049
00050
00051 asio::ip::tcp::socket& getSocket()
00052 {
00053 return socket;
00054 }
00055
00056 std::string getRemoteAddress()
00057 {
00058 return socket.remote_endpoint().address().to_string();
00059 }
00060
00061 unsigned short getRemotePort()
00062 {
00063 return socket.remote_endpoint().port();
00064 }
00065
00066 void push(int size, char* data)
00067 {
00068 {
00069 abicollab::scoped_lock lock(queue_protector);
00070 incoming.push_back( std::pair<int, char*>(size, data) );
00071 }
00072 Synchronizer::signal();
00073 }
00074
00075
00076
00077
00078 bool pop(int& size, char** data)
00079 {
00080 if (incoming.size() == 0)
00081 return false;
00082 {
00083 abicollab::scoped_lock lock(queue_protector);
00084 std::pair<int, char*> p = incoming.front();
00085 size = p.first;
00086 *data = p.second;
00087 incoming.pop_front();
00088 }
00089 return true;
00090 }
00091
00092 void asyncReadHeader()
00093 {
00094 UT_DEBUGMSG(("Session::asyncReadHeader()\n"));
00095 packet_data = 0;
00096 asio::async_read(socket,
00097 asio::buffer(&packet_size, 4),
00098 boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00099 }
00100
00101 void asyncWrite(int size, const char* data)
00102 {
00103
00104 bool writeInProgress = outgoing.size() > 0;
00105
00106
00107 char* store_data = reinterpret_cast<char*>(g_malloc(size));
00108 memcpy(store_data, data, size);
00109 outgoing.push_back(std::pair<int, char*>(size, store_data));
00110
00111 if (!writeInProgress)
00112 {
00113 packet_size_write = size;
00114 packet_data_write = store_data;
00115
00116 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00117 asio::async_write(socket,
00118 asio::buffer(&packet_size_write, 4),
00119 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00120 }
00121 }
00122
00123
00124
00125
00126 bool isConnected()
00127 {
00128 return socket.is_open();
00129 }
00130
00131 void disconnect()
00132 {
00133 UT_DEBUGMSG(("Session::disconnect()\n"));
00134 if (socket.is_open())
00135 {
00136 asio::error_code ecs;
00137 socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs);
00138 if (ecs) {
00139 UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str()));
00140 }
00141 asio::error_code ecc;
00142 socket.close(ecc);
00143 if (ecc) {
00144 UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str()));
00145 }
00146 }
00147 UT_DEBUGMSG(("Socket closed, signalling mainloop\n"));
00148 signal();
00149 }
00150
00151 private:
00152 void _signal()
00153 {
00154 UT_DEBUGMSG(("Session::_signal()\n"));
00155 m_ef(shared_from_this());
00156 }
00157
00158 void asyncReadHeaderHandler(const asio::error_code& error,
00159 std::size_t bytes_transferred)
00160 {
00161 if (error)
00162 {
00163 UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str()));
00164 disconnect();
00165 return;
00166 }
00167
00168 if (bytes_transferred != 4)
00169 {
00170 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00171 disconnect();
00172 return;
00173 }
00174
00175 if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE)
00176 {
00177 UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE));
00178 disconnect();
00179 return;
00180 }
00181
00182 UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size));
00183
00184 packet_data = reinterpret_cast<char*>(g_malloc(packet_size));
00185 asio::async_read(socket,
00186 asio::buffer(packet_data, packet_size),
00187 boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
00188 }
00189
00190 void asyncReadHandler(const asio::error_code& error,
00191 std::size_t bytes_transferred)
00192 {
00193 if (error)
00194 {
00195 UT_DEBUGMSG(("asyncReadHandler generic error\n"));
00196 disconnect();
00197 return;
00198 }
00199
00200 if (bytes_transferred != std::size_t(packet_size))
00201 {
00202 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
00203 disconnect();
00204 return;
00205 }
00206
00207 push(packet_size, packet_data);
00208
00209 asyncReadHeader();
00210 }
00211
00212 void asyncWriteHeaderHandler(const asio::error_code& ec)
00213 {
00214 UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n"));
00215 if (ec)
00216 {
00217 UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n"));
00218 disconnect();
00219 return;
00220 }
00221
00222
00223 asio::async_write(socket,
00224 asio::buffer(packet_data_write, packet_size_write),
00225 boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error));
00226 }
00227
00228 void asyncWriteHandler(const asio::error_code& ec)
00229 {
00230 UT_DEBUGMSG(("Session::asyncWriteHandler()\n"));
00231 FREEP(packet_data_write);
00232 if (ec)
00233 {
00234 UT_DEBUGMSG(("asyncWriteHandler generic error\n"));
00235 disconnect();
00236 return;
00237 }
00238
00239
00240 outgoing.pop_front();
00241 if (outgoing.size() > 0)
00242 {
00243 std::pair<int, char*> p = outgoing.front();
00244 packet_size_write = p.first;
00245 packet_data_write = p.second;
00246
00247 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
00248
00249 asio::async_write(socket,
00250 asio::buffer(&packet_size_write, 4),
00251 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
00252 }
00253 }
00254
00255 asio::ip::tcp::socket socket;
00256 abicollab::mutex queue_protector;
00257 std::deque< std::pair<int, char*> > incoming;
00258 std::deque< std::pair<int, char*> > outgoing;
00259
00260 int packet_size;
00261 char* packet_data;
00262
00263 int packet_size_write;
00264 char* packet_data_write;
00265
00266 boost::function<void (boost::shared_ptr<Session>)> m_ef;
00267 };
00268
00269 #endif