🎉 Celebrating 25 Years of GameDev.net! 🎉

Not many can claim 25 years on the Internet! Join us in celebrating this milestone. Learn more about our history, and thank you for being a part of our community!

Sending queued messages

Started by
5 comments, last by hplus0603 9 years, 4 months ago

Hi all.

I think I have worked out how to pass many messages to a client call async_write in a loop.

I'm using this flag( boost::atomic_bool AtomicSendInprogress;) to trigger a send if there is no pending sends,

else the message gets queued in a lockfree queue. when the complete_write is called I pop a queue item if ok I send from within the comlete_write call back.

Is this the correct way to do this.

some code.

.


boost::atomic_bool AtomicSendInprogress;


//all clients hold a queue of messages because you cant send while sending allready or the stream will mix and be one

boost::lockfree::queue<cPacket> SendQueue;

//----------------------------------------------------------------------------

//if we are not sending we will send message else the message will go into

//the queue and be sent when the last message is complete

//-----------------------------------------------------------------------------

void deliver(cPacket &msg)

{

    //if the send queue is empty we can start a send or we will all ready be sending

    if(AtomicSendInprogress == false)

    {

         //flags so we can't mix the stream bad thing happen

         AtomicSendInprogress = true;

         //this class hold the buffer

         async_write(boost::bind(&cSession::handle_write, this,boost::asio::placeholders::error, shared_from_this()),

     msg);

     }//end empty queue

     else

     {

          //just add the message to the queue for sending after we handle send

   SendQueue.push(msg);

     }


}//end deliver

///////////////////////////////////////////////////////////////////////////////////////////////////


//----------------------------------------------------------------

//handle completed send

//----------------------------------------------------------------

void Handle_completion(...)

{

   //we have sent the data now reset our flag

   AtomicSendInprogress = false;

   cPacket packet;

   if(SendQueue.pop(packet))

   {

//send this next message

AtomicSendInprogress = true;//so we don't mix the stream with a nother packet

async_write(boost::bind(&cSession::handle_write, this,boost::asio::placeholders::error, shared_from_this()),

     packet);


    }//end sending data

}

Advertisement
The description sounds like it could work. I don't know if the code works, because the most important part is the handle_write callback, which is not included.
Also, if you bind a member function using "this," you can shared_from_this() as that argument; you don't need to pass yourself along as a separate argument to keep the object alive.

boost::bind(&MyClass::my_callback, shared_from_this(), _1, _2);
I note that you don't pass along the "number of bytes written" to the callback, though. That's bad form, because there's no guarantee that all the bytes will have been successfully written, and handle_write() needs to deal with that possibility, unless you pass along a specific predicate that ensures that it re-tries until all is sent.
enum Bool { True, False, FileNotFound };

Hi.

Now Im all confused. The asycn_write call back handler only return when all your buffer is sent.

Am I reading this wrong.

async_write function is used to asynchronously write a certain number of bytes of data to a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:

  • All of the data in the supplied buffers has been written. That is, the bytes transferred is equal to the sum of the buffer sizes.
  • An error occurred.

Does that not mean that the whole buffer is sent. Like wise read is the same the call back is not invoked until all the data is ready.

Here is my connection class the relevant parts


//------------------------------------------------------------------
	// Asynchronously write a data structure to the socket.
	//this uses the cPacket its allready serialized
	 //------------------------------------------------------------------
	template <typename Handler>
	void async_write(Handler handler, cPacket &t)
	{
		//copy the message to the buffer so we dont lose it
		t.GetData(outbound_data_);

		// Format the header.
		std::ostringstream header_stream;
		header_stream << std::setw(header_length)
		  << std::hex << outbound_data_.size();
		if (!header_stream || header_stream.str().size() != header_length)
		{
		  // Something went wrong, inform the caller.
		  boost::system::error_code error(boost::asio::error::invalid_argument);
		  socket_->get_io_service().post(boost::bind(handler, error));
		  return;
		}
		outbound_header_ = header_stream.str();

		// Write the serialized data to the socket. We use "gather-write" to send
		// both the header and the data in a single write operation.
		std::vector<boost::asio::const_buffer> buffers;
		buffers.push_back(boost::asio::buffer(outbound_header_));
		buffers.push_back(boost::asio::buffer(outbound_data_));
		boost::asio::async_write(*socket_, buffers, handler);
	}//end async_write
	///////////////////////////////////////////////////////////////////////////////////
	///////////////////////////////////////////////////////////////////////////////////



//------------------------------------------------------------------
	// Handle a completed read of a message header. The handler is passed using
	// a tuple since boost::bind seems to have trouble binding a function object
	// created using boost::bind as a parameter.
	//------------------------------------------------------------------
	template <typename Handler>
	void handle_read_header(const boost::system::error_code& e,
								cPacket &t, boost::tuple<Handler> handler)
	{
		if(e)
		{
			std::cout << "Error Client Read Header = " << e.message() << std::endl;

		  boost::get<0>(handler)(e);
		}
		else
		{
		  // Determine the length of the serialized data.
		  std::istringstream is(std::string(inbound_header_, header_length));
		  std::size_t inbound_data_size = 0;
		  if (!(is >> std::hex >> inbound_data_size))
		  {
			// Header doesn't seem to be valid. Inform the caller.
			boost::system::error_code error(boost::asio::error::invalid_argument);
			boost::get<0>(handler)(error);
			return;
		  }

		  // Start an asynchronous call to receive the data.
		  inbound_data_.resize(inbound_data_size);
		  void (cConnection::*f)(const boost::system::error_code&, cPacket&, boost::tuple<Handler>) = &cConnection::handle_read_data< Handler>;

		  boost::asio::async_read(*socket_, boost::asio::buffer(inbound_data_),
									boost::bind(f, this,
									boost::asio::placeholders::error, boost::ref(t),
									handler));
		}
	}//end handle_read_header
	/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////





/// Handle a completed read of message data.
	template <typename Handler>
	void handle_read_data(const boost::system::error_code& e,
							 cPacket &t, boost::tuple<Handler> handler)
	{
		if(e)
		{
			std::cout << "Error Client Read Data Block = " << e.message() << std::endl;
		  boost::get<0>(handler)(e);
		}
		else
		{
		  // Extract the data structure from the data just received.
		  try
		  {
			  std::cout << " Client Read Data Block Size = " << inbound_data_.size() << std::endl;

			std::string archive_data(&inbound_data_[0], inbound_data_.size());

			t.Set(archive_data);//keep it serialized
			//std::istringstream archive_stream(archive_data);

			//boost::archive::text_iarchive archive(archive_stream);
			//archive >> t;
		  }
		  catch (std::exception& e)
		  {
			// Unable to decode data.
			boost::system::error_code error(boost::asio::error::invalid_argument);
			boost::get<0>(handler)(error);
			return;
		  }

		  // Inform caller that data has been received ok.
		  boost::get<0>(handler)(e);
		}
	}//end handle_read_data
	/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////



the drived class has the handle write



  /// Handle completion of a write operation.
  void handle_write(const boost::system::error_code& e, cConnection_ptr client)
  {
   
	std::stringstream ss;
	//ss << "Server Handle Write Completed \n";

	//global_stream_lock.lock();
	//send the data to the client list box
	//SendMessage(GlobalList, (UINT)LB_ADDSTRING , 0, (LPARAM) ss.str().c_str()); 
	//global_stream_lock.unlock();
	std::string ip;
	std::string ipport;
	client->GetIPAddress(ip);
	client->GetIPAddressAndPort(ipport);

		if(!e)// && read_msg_.decode_header())
		{
			//std::cout << "Server Sent All DATA" << std::endl;
			ss.str("");
			ss.clear();
			ss << "SClient["<< ip << "]" << "Port[" << ipport << "]" << " Socket Has Sent All DATA";

	global_stream_lock.lock();
	//send the data to the client list box
	SendMessage(GlobalList, (UINT)LB_ADDSTRING , 0, (LPARAM) ss.str().c_str()); 
	global_stream_lock.unlock();

		//we have sent the data now reset our flag
		AtomicSendInprogress = false;
		cPacket packet;
		if(SendQueue.pop(packet))
		{
			//send this next message
			AtomicSendInprogress = true;//so we don't mix the stream with a nother packet

			async_write(boost::bind(&cSession::handle_write, this,boost::asio::placeholders::error, shared_from_this()),
					packet);

		}//end sending data


		}
		else
		{
			
//there could be other messages when the first removes a client
		
			//remove this client from the lobby its a dead client
			Lobby.leave(shared_from_this());

		}
  }

You are write -- I was thinking of async_send(). async_write() will indeed keep going until all the bytes have been written, or an error occurs.
(Thus means that the operation could conceivably live for hours before it errored out if the other end stopped receiving data, which is why I don't like write() for sockets, and prefer send().)
enum Bool { True, False, FileNotFound };

Thanks. I think now Im going to make a sever and a chat app to see how it all works and get a feel of how the system works in a semi real world application.

this should reveal any problems and help further knowladge of the subject. Thanks All.

Hi. Just when you think it works lol.

I fund a bug, one what never popped up dueing testing but popped up when statring chat app.

I'll explain it, In the handle_write function it set out like this, i'll mark the location

.


void Handle_Write(...)

{

   //we have sent the data now reset our flag

   AtomicSendInprogress = false;THIS HERE BAD can allow a message to be proccessed

   cPacket packet;//THIS HERE ALSO BAD lost scope

   if(SendQueue.pop(packet))

   {

//send this next message

AtomicSendInprogress = true;//so we don't mix the stream with a nother packet

async_write(boost::bind(&cSession::handle_write, this,boost::asio::placeholders::error, shared_from_this()),

     packet);


    }//end sending data

}



It should look like this
void Handle_Write(...)

{

 

   

   //InternalPacket;//held until we are done
   if(SendQueue.pop(InternalPacket))

   {

//send this next message

AtomicSendInprogress = true;//so we don't mix the stream with a nother packet

async_write(boost::bind(&cSession::handle_write, this,boost::asio::placeholders::error, shared_from_this()),

     InternalPacket);


    }//end sending data
    else
       AtomicSendInprogress = false;//safe

}

in the test I passed a loop sending 16 messages of large size never missed a beat, then in the chat app one disconnected player and it died, now I think its working.

How does one test a network app any pointers and tips.

The boost-typical way to do this is to use a boost::shared_ptr<> (in C++11, std::shared_ptr<>) to keep the necessary data structures alive for as long as the request is alive.
Another option is to keep the necessary data structures in the data structure you use for the client in question, and then make sure you don't generate more outstanding requests than what you have available data for. If you only ever have one write request per client outstanding, then you only ever need one currently active write buffer for that client.
enum Bool { True, False, FileNotFound };

This topic is closed to new replies.

Advertisement