Merge 27981 via fix_p2p_stalling_pr27981

This commit is contained in:
Luke Dashjr 2023-11-15 23:49:11 +00:00
commit 0b6324946a
2 changed files with 30 additions and 37 deletions

View File

@ -841,7 +841,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
}
size_t CConnman::SocketSendData(CNode& node) const
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
@ -896,7 +896,7 @@ size_t CConnman::SocketSendData(CNode& node) const
assert(node.nSendSize == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
return nSentSize;
return {nSentSize, !node.vSendMsg.empty()};
}
/** Try to find a connection to evict when the node is full.
@ -1231,37 +1231,15 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
}
for (CNode* pnode : nodes) {
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
if (!pnode->m_sock) {
continue;
if (pnode->m_sock) {
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
events_per_sock.emplace(pnode->m_sock, Sock::Events{event});
}
Sock::Event requested{0};
if (select_send) {
requested = Sock::SEND;
} else if (select_recv) {
requested = Sock::RECV;
}
events_per_sock.emplace(pnode->m_sock, Sock::Events{requested});
}
return events_per_sock;
@ -1322,6 +1300,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
errorSet = it->second.occurred & Sock::ERR;
}
}
if (sendSet) {
// Send data
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) {
RecordBytesSent(bytes_sent);
// If both receiving and (non-optimistic) sending were possible, we first attempt
// sending. If that succeeds, but does not fully drain the send queue, do not
// attempt to receive. This avoids needlessly queueing data if the remote peer
// is slow at receiving data, by means of TCP flow control. We only do this when
// sending actually succeeded to make sure progress is always made; otherwise a
// deadlock would be possible when both sides have data to send, but neither is
// receiving.
if (data_left) recvSet = false;
}
}
if (recvSet || errorSet)
{
// typical socket buffer is 8K-64K
@ -1368,12 +1364,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
}
if (sendSet) {
// Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent);
}
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
@ -2898,7 +2888,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
// If write queue empty, attempt "optimistic write"
if (optimisticSend) nBytesSent = SocketSendData(*pnode);
bool data_left;
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}

View File

@ -991,7 +991,9 @@ private:
NodeId GetNewNodeId();
size_t SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
void DumpAddresses();
// Network stats