Skip to content

Commit

Permalink
Correcting shutdown/close usage; refactoring TcpServer (#2950)
Browse files Browse the repository at this point in the history
* Correcting shutdown/close usage; refactoring TcpServer

* Fixing Tcp

* Review and CI fixes

* ...and the other half of the fixes
  • Loading branch information
LeStarch authored Oct 16, 2024
1 parent c7e68c5 commit 9ed68fd
Show file tree
Hide file tree
Showing 26 changed files with 447 additions and 361 deletions.
63 changes: 32 additions & 31 deletions Drv/Ip/IpSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@

#ifdef TGT_OS_TYPE_VXWORKS
#include <socket.h>
#include <inetLib.h>
#include <fioLib.h>
#include <hostLib.h>
#include <ioLib.h>
#include <vxWorks.h>
#include <sockLib.h>
#include <fioLib.h>
#include <taskLib.h>
#include <sysLib.h>
#include <errnoLib.h>
#include <cstring>
#include <inetLib.h>
#include <fioLib.h>
#include <hostLib.h>
#include <ioLib.h>
#include <vxWorks.h>
#include <sockLib.h>
#include <fioLib.h>
#include <taskLib.h>
#include <sysLib.h>
#include <errnoLib.h>
#include <cstring>
#elif defined TGT_OS_TYPE_LINUX || TGT_OS_TYPE_DARWIN
#include <sys/socket.h>
#include <unistd.h>
Expand All @@ -52,7 +52,7 @@ IpSocket::IpSocket() : m_timeoutSeconds(0), m_timeoutMicroseconds(0), m_port(0)

SocketIpStatus IpSocket::configure(const char* const hostname, const U16 port, const U32 timeout_seconds, const U32 timeout_microseconds) {
FW_ASSERT(timeout_microseconds < 1000000, static_cast<FwAssertArgType>(timeout_microseconds));
FW_ASSERT(this->isValidPort(port));
FW_ASSERT(this->isValidPort(port), static_cast<FwAssertArgType>(port));
FW_ASSERT(hostname != nullptr);
this->m_timeoutSeconds = timeout_seconds;
this->m_timeoutMicroseconds = timeout_microseconds;
Expand All @@ -65,7 +65,7 @@ bool IpSocket::isValidPort(U16 port) {
return true;
}

SocketIpStatus IpSocket::setupTimeouts(NATIVE_INT_TYPE socketFd) {
SocketIpStatus IpSocket::setupTimeouts(PlatformIntType socketFd) {
// Get the IP address from host
#ifdef TGT_OS_TYPE_VXWORKS
// No timeouts set on Vxworks
Expand Down Expand Up @@ -103,38 +103,39 @@ SocketIpStatus IpSocket::addressToIp4(const char* address, void* ip4) {
return SOCK_SUCCESS;
}

void IpSocket::close(NATIVE_INT_TYPE fd) {
(void)::shutdown(fd, SHUT_RDWR);
(void)::close(fd);
}

void IpSocket::shutdown(NATIVE_INT_TYPE fd) {
this->close(fd);
void IpSocket::close(const SocketDescriptor& socketDescriptor) {
(void)::close(socketDescriptor.fd);
}

SocketIpStatus IpSocket::startup() {
// no op for non-server components
return SOCK_SUCCESS;
void IpSocket::shutdown(const SocketDescriptor& socketDescriptor) {
errno = 0;
PlatformIntType status = ::shutdown(socketDescriptor.fd, SHUT_RDWR);
// If shutdown fails, go straight to the hard-shutdown
if (status != 0) {
this->close(socketDescriptor);
}
}

SocketIpStatus IpSocket::open(NATIVE_INT_TYPE& fd) {
SocketIpStatus IpSocket::open(SocketDescriptor& socketDescriptor) {
SocketIpStatus status = SOCK_SUCCESS;
errno = 0;
// Open a TCP socket for incoming commands, and outgoing data if not using UDP
status = this->openProtocol(fd);
status = this->openProtocol(socketDescriptor);
if (status != SOCK_SUCCESS) {
FW_ASSERT(fd == -1); // Ensure we properly kept closed on error
socketDescriptor.fd = -1;
return status;
}
return status;
}

SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) {
SocketIpStatus IpSocket::send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) {
U32 total = 0;
I32 sent = 0;
// Attempt to send out data and retry as necessary
for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (total < size); i++) {
errno = 0;
// Send using my specific protocol
sent = this->sendProtocol(fd, data + total, size - total);
sent = this->sendProtocol(socketDescriptor, data + total, size - total);
// Error is EINTR or timeout just try again
if (((sent == -1) && (errno == EINTR)) || (sent == 0)) {
continue;
Expand All @@ -159,13 +160,13 @@ SocketIpStatus IpSocket::send(NATIVE_INT_TYPE fd, const U8* const data, const U3
return SOCK_SUCCESS;
}

SocketIpStatus IpSocket::recv(NATIVE_INT_TYPE fd, U8* data, U32& req_read) {
SocketIpStatus IpSocket::recv(const SocketDescriptor& socketDescriptor, U8* data, U32& req_read) {
I32 size = 0;

// Try to read until we fail to receive data
for (U32 i = 0; (i < SOCKET_MAX_ITERATIONS) && (size <= 0); i++) {
errno = 0;
// Attempt to recv out data
size = this->recvProtocol(fd, data, req_read);
size = this->recvProtocol(socketDescriptor, data, req_read);

// Nothing to be received
if ((size == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
Expand Down
63 changes: 32 additions & 31 deletions Drv/Ip/IpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
#include <Os/Mutex.hpp>

namespace Drv {

struct SocketDescriptor final {
PlatformIntType fd = -1; //!< Used for all sockets to track the communication file descriptor
PlatformIntType serverFd = -1; //!< Used for server sockets to track the listening file descriptor
};

/**
* \brief Status enumeration for socket return values
*/
Expand All @@ -36,7 +42,8 @@ enum SocketIpStatus {
SOCK_SEND_ERROR = -13, //!< Failed to send after configured retries
SOCK_NOT_STARTED = -14, //!< Socket has not been started
SOCK_FAILED_TO_READ_BACK_PORT = -15, //!< Failed to read back port from connection
SOCK_NO_DATA_AVAILABLE = -16 //!< No data available or read operation would block
SOCK_NO_DATA_AVAILABLE = -16, //!< No data available or read operation would block
SOCK_ANOTHER_THREAD_OPENING = -17 //!< Another thread is opening
};

/**
Expand Down Expand Up @@ -70,16 +77,6 @@ class IpSocket {
SocketIpStatus configure(const char* hostname, const U16 port, const U32 send_timeout_seconds,
const U32 send_timeout_microseconds);

/**
* \brief startup the socket, a no-op on unless this is server
*
* This will start-up the socket. In the case of most sockets, this is a no-op. On server sockets this binds to the
* server address and progresses through the `listen` step such that on `open` new clients may be accepted.
*
* \return status of startup
*/
virtual SocketIpStatus startup();

/**
* \brief open the IP socket for communications
*
Expand All @@ -95,10 +92,10 @@ class IpSocket {
*
* Note: delegates to openProtocol for protocol specific implementation
*
* \param fd: file descriptor to open
* \param socketDescriptor: socket descriptor to update with opened port
* \return status of open
*/
SocketIpStatus open(NATIVE_INT_TYPE& fd);
SocketIpStatus open(SocketDescriptor& socketDescriptor);
/**
* \brief send data out the IP socket from the given buffer
*
Expand All @@ -115,7 +112,7 @@ class IpSocket {
* \param size: size of data to send
* \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error
*/
SocketIpStatus send(NATIVE_INT_TYPE fd, const U8* const data, const U32 size);
SocketIpStatus send(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size);
/**
* \brief receive data from the IP socket from the given buffer
*
Expand All @@ -127,31 +124,35 @@ class IpSocket {
*
* Note: delegates to `recvProtocol` to send the data
*
* \param fd: file descriptor to recv from
* \param socketDescriptor: socket descriptor to recv from
* \param data: pointer to data to fill with received data
* \param size: maximum size of data buffer to fill
* \return status of the send, SOCK_DISCONNECTED to reopen, SOCK_SUCCESS on success, something else on error
*/
SocketIpStatus recv(NATIVE_INT_TYPE fd, U8* const data, U32& size);
SocketIpStatus recv(const SocketDescriptor& fd, U8* const data, U32& size);

/**
* \brief closes the socket
*
* Closes the socket opened by the open call. In this case of the TcpServer, this does NOT close server's listening
* port (call `shutdown`) but will close the active client connection.
* port but will close the active client connection.
*
* \param fd: file descriptor to close
* \param socketDescriptor: socket descriptor to close
*/
void close(NATIVE_INT_TYPE fd);
void close(const SocketDescriptor& socketDescriptor);

/**
* \brief shutdown the socket
*
* Closes the socket opened by the open call. In this case of the TcpServer, this does close server's listening
* port. This will shutdown all clients.
* Shuts down the socket opened by the open call. In this case of the TcpServer, this does shut down server's
* listening port, but rather shuts down the active client.
*
* A shut down begins the termination of communication. The underlying socket will coordinate a clean shutdown, and
* it is safe to close the socket once a recv with 0 size has returned or an appropriate timeout has been reached.
*
* \param fd: file descriptor to shutdown
* \param socketDescriptor: socket descriptor to shutdown
*/
virtual void shutdown(NATIVE_INT_TYPE fd);
void shutdown(const SocketDescriptor& socketDescriptor);

PROTECTED:
/**
Expand All @@ -168,10 +169,10 @@ class IpSocket {

/**
* \brief setup the socket timeout properties of the opened outgoing socket
* \param socketFd: file descriptor to setup
* \param socketDescriptor: socket descriptor to setup
* \return status of timeout setup
*/
SocketIpStatus setupTimeouts(NATIVE_INT_TYPE socketFd);
SocketIpStatus setupTimeouts(PlatformIntType socketFd);

/**
* \brief converts a given address in dot form x.x.x.x to an ip address. ONLY works for IPv4.
Expand All @@ -182,27 +183,27 @@ class IpSocket {
static SocketIpStatus addressToIp4(const char* address, void* ip4);
/**
* \brief Protocol specific open implementation, called from open.
* \param fd: (output) file descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \param socketDescriptor: (output) socket descriptor opened. Only valid on SOCK_SUCCESS. Otherwise will be invalid
* \return status of open
*/
virtual SocketIpStatus openProtocol(NATIVE_INT_TYPE& fd) = 0;
virtual SocketIpStatus openProtocol(SocketDescriptor& fd) = 0;
/**
* \brief Protocol specific implementation of send. Called directly with retry from send.
* \param fd: file descriptor to send to
* \param socketDescriptor: socket descriptor to send to
* \param data: data to send
* \param size: size of data to send
* \return: size of data sent, or -1 on error.
*/
virtual I32 sendProtocol(NATIVE_INT_TYPE fd, const U8* const data, const U32 size) = 0;
virtual I32 sendProtocol(const SocketDescriptor& socketDescriptor, const U8* const data, const U32 size) = 0;

/**
* \brief Protocol specific implementation of recv. Called directly with error handling from recv.
* \param fd: file descriptor to recv from
* \param socket: socket descriptor to recv from
* \param data: data pointer to fill
* \param size: size of data buffer
* \return: size of data received, or -1 on error.
*/
virtual I32 recvProtocol(NATIVE_INT_TYPE fd, U8* const data, const U32 size) = 0;
virtual I32 recvProtocol(const SocketDescriptor& socketDescriptor, U8* const data, const U32 size) = 0;

U32 m_timeoutSeconds;
U32 m_timeoutMicroseconds;
Expand Down
Loading

0 comments on commit 9ed68fd

Please sign in to comment.