net: speed up select fdwatch lookups
This commit is contained in:
@@ -52,8 +52,10 @@ struct FdwatchSelectState
|
|||||||
fd_set rfd_set;
|
fd_set rfd_set;
|
||||||
fd_set wfd_set;
|
fd_set wfd_set;
|
||||||
socket_t* select_fds;
|
socket_t* select_fds;
|
||||||
|
int* fd_slot_by_fd;
|
||||||
int* select_rfdidx;
|
int* select_rfdidx;
|
||||||
int nselect_fds;
|
int nselect_fds;
|
||||||
|
int max_fd_plus_one;
|
||||||
fd_set working_rfd_set;
|
fd_set working_rfd_set;
|
||||||
fd_set working_wfd_set;
|
fd_set working_wfd_set;
|
||||||
void** fd_data;
|
void** fd_data;
|
||||||
@@ -343,11 +345,19 @@ static LPFDWATCH fdwatch_new_select(int nfiles)
|
|||||||
FD_ZERO(&fdw->state.working_wfd_set);
|
FD_ZERO(&fdw->state.working_wfd_set);
|
||||||
|
|
||||||
CREATE(fdw->state.select_fds, socket_t, fdw->descriptor_limit);
|
CREATE(fdw->state.select_fds, socket_t, fdw->descriptor_limit);
|
||||||
|
CREATE(fdw->state.fd_slot_by_fd, int, fdw->descriptor_limit);
|
||||||
CREATE(fdw->state.select_rfdidx, int, fdw->descriptor_limit);
|
CREATE(fdw->state.select_rfdidx, int, fdw->descriptor_limit);
|
||||||
CREATE(fdw->state.fd_rw, int, fdw->descriptor_limit);
|
CREATE(fdw->state.fd_rw, int, fdw->descriptor_limit);
|
||||||
CREATE(fdw->state.fd_data, void*, fdw->descriptor_limit);
|
CREATE(fdw->state.fd_data, void*, fdw->descriptor_limit);
|
||||||
|
|
||||||
|
for (int i = 0; i < fdw->descriptor_limit; ++i)
|
||||||
|
{
|
||||||
|
fdw->state.fd_slot_by_fd[i] = -1;
|
||||||
|
fdw->state.select_rfdidx[i] = -1;
|
||||||
|
}
|
||||||
|
|
||||||
fdw->state.nselect_fds = 0;
|
fdw->state.nselect_fds = 0;
|
||||||
|
fdw->state.max_fd_plus_one = 1;
|
||||||
return fdw;
|
return fdw;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -356,6 +366,7 @@ static void fdwatch_delete_select(LPFDWATCH fdw)
|
|||||||
free(fdw->state.fd_data);
|
free(fdw->state.fd_data);
|
||||||
free(fdw->state.fd_rw);
|
free(fdw->state.fd_rw);
|
||||||
free(fdw->state.select_fds);
|
free(fdw->state.select_fds);
|
||||||
|
free(fdw->state.fd_slot_by_fd);
|
||||||
free(fdw->state.select_rfdidx);
|
free(fdw->state.select_rfdidx);
|
||||||
free(fdw);
|
free(fdw);
|
||||||
|
|
||||||
@@ -366,26 +377,22 @@ static void fdwatch_delete_select(LPFDWATCH fdw)
|
|||||||
|
|
||||||
static int fdwatch_get_fdidx_select(LPFDWATCH fdw, socket_t fd)
|
static int fdwatch_get_fdidx_select(LPFDWATCH fdw, socket_t fd)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < fdw->state.nselect_fds; ++i)
|
if (fd < 0 || fd >= fdw->descriptor_limit)
|
||||||
{
|
return -1;
|
||||||
if (fdw->state.select_fds[i] == fd)
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
return fdw->state.fd_slot_by_fd[fd];
|
||||||
}
|
}
|
||||||
|
|
||||||
static int fdwatch_get_maxfd_select(LPFDWATCH fdw)
|
static void fdwatch_recompute_maxfd_select(LPFDWATCH fdw)
|
||||||
{
|
{
|
||||||
socket_t max_fd = 0;
|
fdw->state.max_fd_plus_one = 1;
|
||||||
|
|
||||||
for (int i = 0; i < fdw->state.nselect_fds; ++i)
|
for (int i = 0; i < fdw->state.nselect_fds; ++i)
|
||||||
{
|
{
|
||||||
if (fdw->state.select_fds[i] > max_fd)
|
const int candidate = static_cast<int>(fdw->state.select_fds[i]) + 1;
|
||||||
max_fd = fdw->state.select_fds[i];
|
if (candidate > fdw->state.max_fd_plus_one)
|
||||||
|
fdw->state.max_fd_plus_one = candidate;
|
||||||
}
|
}
|
||||||
|
|
||||||
return static_cast<int>(max_fd) + 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int fdwatch_check_fd_select(LPFDWATCH fdw, socket_t fd);
|
static int fdwatch_check_fd_select(LPFDWATCH fdw, socket_t fd);
|
||||||
@@ -407,16 +414,27 @@ static void fdwatch_clear_fd_select(LPFDWATCH fdw, socket_t fd)
|
|||||||
|
|
||||||
static void fdwatch_add_fd_select(LPFDWATCH fdw, socket_t fd, void* client_data, int rw, int oneshot)
|
static void fdwatch_add_fd_select(LPFDWATCH fdw, socket_t fd, void* client_data, int rw, int oneshot)
|
||||||
{
|
{
|
||||||
|
if (fd < 0 || fd >= fdw->descriptor_limit)
|
||||||
|
{
|
||||||
|
sys_err("fdwatch_add_fd_select: descriptor %d exceeds select backend limit %d", fd, fdw->descriptor_limit);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int idx = fdwatch_get_fdidx_select(fdw, fd);
|
int idx = fdwatch_get_fdidx_select(fdw, fd);
|
||||||
|
|
||||||
if (idx < 0)
|
if (idx < 0)
|
||||||
{
|
{
|
||||||
if (fdw->state.nselect_fds >= fdw->descriptor_limit)
|
if (fdw->state.nselect_fds >= fdw->descriptor_limit)
|
||||||
|
{
|
||||||
|
sys_err("fdwatch_add_fd_select: descriptor table full (%d)", fdw->descriptor_limit);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
idx = fdw->state.nselect_fds;
|
idx = fdw->state.nselect_fds;
|
||||||
fdw->state.select_fds[fdw->state.nselect_fds++] = fd;
|
fdw->state.select_fds[fdw->state.nselect_fds++] = fd;
|
||||||
|
fdw->state.fd_slot_by_fd[fd] = idx;
|
||||||
fdw->state.fd_rw[idx] = 0;
|
fdw->state.fd_rw[idx] = 0;
|
||||||
|
fdw->state.select_rfdidx[idx] = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fdw->state.fd_rw[idx] |= rw;
|
fdw->state.fd_rw[idx] |= rw;
|
||||||
@@ -431,6 +449,10 @@ static void fdwatch_add_fd_select(LPFDWATCH fdw, socket_t fd, void* client_data,
|
|||||||
|
|
||||||
if (rw & FDW_WRITE)
|
if (rw & FDW_WRITE)
|
||||||
FD_SET(fd, &fdw->state.wfd_set);
|
FD_SET(fd, &fdw->state.wfd_set);
|
||||||
|
|
||||||
|
const int fd_plus_one = static_cast<int>(fd) + 1;
|
||||||
|
if (fd_plus_one > fdw->state.max_fd_plus_one)
|
||||||
|
fdw->state.max_fd_plus_one = fd_plus_one;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void fdwatch_del_fd_select(LPFDWATCH fdw, socket_t fd)
|
static void fdwatch_del_fd_select(LPFDWATCH fdw, socket_t fd)
|
||||||
@@ -444,22 +466,35 @@ static void fdwatch_del_fd_select(LPFDWATCH fdw, socket_t fd)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
--fdw->state.nselect_fds;
|
--fdw->state.nselect_fds;
|
||||||
|
const int last_idx = fdw->state.nselect_fds;
|
||||||
|
const socket_t moved_fd = fdw->state.select_fds[last_idx];
|
||||||
|
|
||||||
fdw->state.select_fds[idx] = fdw->state.select_fds[fdw->state.nselect_fds];
|
if (idx != last_idx)
|
||||||
fdw->state.fd_data[idx] = fdw->state.fd_data[fdw->state.nselect_fds];
|
{
|
||||||
fdw->state.fd_rw[idx] = fdw->state.fd_rw[fdw->state.nselect_fds];
|
fdw->state.select_fds[idx] = moved_fd;
|
||||||
|
fdw->state.fd_data[idx] = fdw->state.fd_data[last_idx];
|
||||||
|
fdw->state.fd_rw[idx] = fdw->state.fd_rw[last_idx];
|
||||||
|
fdw->state.select_rfdidx[idx] = -1;
|
||||||
|
fdw->state.fd_slot_by_fd[moved_fd] = idx;
|
||||||
|
}
|
||||||
|
|
||||||
|
fdw->state.fd_slot_by_fd[fd] = -1;
|
||||||
|
fdw->state.select_rfdidx[last_idx] = -1;
|
||||||
|
|
||||||
FD_CLR(fd, &fdw->state.rfd_set);
|
FD_CLR(fd, &fdw->state.rfd_set);
|
||||||
FD_CLR(fd, &fdw->state.wfd_set);
|
FD_CLR(fd, &fdw->state.wfd_set);
|
||||||
FD_CLR(fd, &fdw->state.working_rfd_set);
|
FD_CLR(fd, &fdw->state.working_rfd_set);
|
||||||
FD_CLR(fd, &fdw->state.working_wfd_set);
|
FD_CLR(fd, &fdw->state.working_wfd_set);
|
||||||
|
|
||||||
|
if (static_cast<int>(fd) + 1 >= fdw->state.max_fd_plus_one)
|
||||||
|
fdwatch_recompute_maxfd_select(fdw);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int fdwatch_select(LPFDWATCH fdw, struct timeval* timeout)
|
static int fdwatch_select(LPFDWATCH fdw, struct timeval* timeout)
|
||||||
{
|
{
|
||||||
int r;
|
int r;
|
||||||
int event_idx = 0;
|
int event_idx = 0;
|
||||||
int max_fd = fdwatch_get_maxfd_select(fdw);
|
int max_fd = fdw->state.max_fd_plus_one;
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
|
|
||||||
fdw->state.working_rfd_set = fdw->state.rfd_set;
|
fdw->state.working_rfd_set = fdw->state.rfd_set;
|
||||||
|
|||||||
@@ -308,6 +308,46 @@ void TestFdwatchBackendMetadata()
|
|||||||
|
|
||||||
fdwatch_delete(fdw);
|
fdwatch_delete(fdw);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TestFdwatchSlotReuseAfterDelete()
|
||||||
|
{
|
||||||
|
int sockets_a[2] = { -1, -1 };
|
||||||
|
int sockets_b[2] = { -1, -1 };
|
||||||
|
Expect(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_a) == 0, "socketpair A failed");
|
||||||
|
Expect(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets_b) == 0, "socketpair B failed");
|
||||||
|
|
||||||
|
LPFDWATCH fdw = fdwatch_new(64);
|
||||||
|
Expect(fdw != nullptr, "fdwatch_new for slot reuse failed");
|
||||||
|
|
||||||
|
int marker_a = 11;
|
||||||
|
int marker_b = 22;
|
||||||
|
|
||||||
|
fdwatch_add_fd(fdw, sockets_a[1], &marker_a, FDW_READ, false);
|
||||||
|
fdwatch_add_fd(fdw, sockets_b[1], &marker_b, FDW_READ, false);
|
||||||
|
fdwatch_del_fd(fdw, sockets_a[1]);
|
||||||
|
|
||||||
|
const uint8_t byte = 0x51;
|
||||||
|
Expect(write(sockets_b[0], &byte, sizeof(byte)) == sizeof(byte), "socketpair B write failed");
|
||||||
|
|
||||||
|
timeval timeout {};
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 200000;
|
||||||
|
|
||||||
|
const int num_events = fdwatch(fdw, &timeout);
|
||||||
|
Expect(num_events == 1, "Expected one read event after slot reuse");
|
||||||
|
Expect(fdwatch_get_client_data(fdw, 0) == &marker_b, "Unexpected client data after slot reuse");
|
||||||
|
Expect(fdwatch_check_event(fdw, sockets_b[1], 0) == FDW_READ, "Expected FDW_READ after slot reuse");
|
||||||
|
|
||||||
|
uint8_t read_back = 0;
|
||||||
|
Expect(read(sockets_b[1], &read_back, sizeof(read_back)) == sizeof(read_back), "socketpair B read failed");
|
||||||
|
Expect(read_back == byte, "Read payload mismatch after slot reuse");
|
||||||
|
|
||||||
|
fdwatch_delete(fdw);
|
||||||
|
close(sockets_a[0]);
|
||||||
|
close(sockets_a[1]);
|
||||||
|
close(sockets_b[0]);
|
||||||
|
close(sockets_b[1]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int main()
|
int main()
|
||||||
@@ -319,6 +359,7 @@ int main()
|
|||||||
TestSocketAuthWireFlow();
|
TestSocketAuthWireFlow();
|
||||||
TestFdwatchBackendMetadata();
|
TestFdwatchBackendMetadata();
|
||||||
TestFdwatchReadAndOneshotWrite();
|
TestFdwatchReadAndOneshotWrite();
|
||||||
|
TestFdwatchSlotReuseAfterDelete();
|
||||||
std::cout << "metin smoke tests passed\n";
|
std::cout << "metin smoke tests passed\n";
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user