randolf.ca  1.00
Randolf Richardson's C++ classes
Loading...
Searching...
No Matches
rsocket_mux
1#pragma once
2
3//#include <randolf/rex>
4//#include <randolf/rsocket>
5#include <randolf/rsocket_mux_fds>
6
7namespace randolf {
8
9 /*======================================================================*//**
10 @brief
11 The rsocket multiplexer extends the functionality of the @ref rsocket class
12 primarily for handling multiple sockets that are beyond the scope of the
13 single-socket-focused nature of rsocket.
14
15 Functionality of @c select(), @c poll(), and related socket functions are
16 implemented within this separate @ref rsocket_mux class partly because these
17 specializations are beyond the scope of what the @ref rsocket class is
18 intended for.
19
20 @par Threads
21 This class is threadsafe.
22 @author Randolf Richardson
23 @version 1.00
24 @par History
25 - 2022-Dec-24 v1.00 Initial version
26 - 2025-Feb-03 v1.00 Increased use of references and pointers
27 *///=========================================================================
28 class rsocket_mux {
29
30 private:
31 // --------------------------------------------------------------------------
32 // Internal variables.
33 // --------------------------------------------------------------------------
34 std::mutex __mutex; // Used to implement thread safety
35 std::set<rsocket_mux_fds> __r; // Set of rsocket objects
36
37 public:
38 /*======================================================================*//**
39 @brief
40 Instantiate an rsocket multiplexer with no @ref rsocket objects.
41
42 @see insert()
43 *///=========================================================================
44 rsocket_mux() noexcept {} // -x- constructor rsocket -x-
45
46 private:
47 /*======================================================================*//**
48 @brief
49 Internal method that is used by the @ref insert(rsocket*) method.
50
51 @warning
52 This method is not threadsafe.
53 *///=========================================================================
54 void __insert(
55 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
56 const int fd_sets,
57 /// Pointer to instantiated rsocket object
58 rsocket* r) noexcept {
59 if (r == nullptr) return;
60 __r.insert(rsocket_mux_fds{r, fd_sets});
61 } // -x- void __insert -x-
62
63 /*======================================================================*//**
64 @brief
65 Internal method that is used by the @ref insert(rsocket*, ...) method.
66
67 @warning
68 This method is not threadsafe.
69 *///=========================================================================
70 template<class R, class... Rs> void __insert(
71 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
72 const int fd_sets,
73 /// Pointer to instantiated rsocket object
74 R r,
75 /// Variadic arguments (any quantity of instantiated rsocket objects)
76 Rs... rs) noexcept {
77 __insert(fd_sets, r);
78 __insert(fd_sets, rs...);
79 } // -x- void __insert -x-
80
81 public:
82 /*======================================================================*//**
83 @brief
84 Instantiate an rsocket multiplexer with one @ref rsocket object.
85
86 The following fd_sets correspond with each respective POSIX select() fd_set,
87 which can be combined to include the specified rsocket in multiple fd_sets:
88 - @c POLLIN = readfds
89 - @c POLLOUT = writefds
90 - @c POLLERR = exceptfds
91
92 @see insert()
93 *///=========================================================================
94 rsocket_mux(
95 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
96 const int fd_sets,
97 /// Pointer to instantiated rsocket object
98 rsocket* r) noexcept {
99 if (r == nullptr) return;
100 __insert(fd_sets, r);
101 } // -x- constructor rsocket -x-
102
103 /*======================================================================*//**
104 @brief
105 Instantiate an rsocket multiplexer with any number of @ref rsocket objects.
106
107 The following fd_sets correspond with each respective POSIX select() fd_set,
108 which can be combined to include the specified rsocket in multiple fd_sets:
109 - @c POLLIN = readfds
110 - @c POLLOUT = writefds
111 - @c POLLERR = exceptfds
112
113 @see insert()
114 *///=========================================================================
115 template<class R, class... Rs> rsocket_mux(
116 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
117 const int fd_sets,
118 /// Pointer to instantiated rsocket object
119 R r,
120 /// Variadic arguments (any quantity of instantiated rsocket objects)
121 Rs... rs) noexcept {
122 __insert(fd_sets, r);
123 __insert(fd_sets, rs...);
124 } // -x- constructor rsocket -x-
125
126 /*======================================================================*//**
127 @brief
128 Remove an rsocket object from the underlying std::set.
129
130 @par Threads
131 This method is threadsafe.
132 *///=========================================================================
133 rsocket_mux& erase(
134 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
135 const int fd_sets,
136 /// Pointer to instantiated rsocket object
137 rsocket* r) noexcept {
138 if (r == nullptr) return;
139 const std::lock_guard<std::mutex> lock(__mutex);
140 __r.erase(rsocket_mux_fds{r, fd_sets});
141 return *this;
142 } // -x- rsocket_mux& erase -x-
143
144 /*======================================================================*//**
145 @brief
146 Remove an rsocket object from the underlying std::set.
147
148 @par Threads
149 This method is threadsafe.
150 *///=========================================================================
151 rsocket_mux& erase(
152 /// Structure that is comprised of an rsocket object and its fd_set flags
153 rsocket_mux_fds rmf) {
154 const std::lock_guard<std::mutex> lock(__mutex);
155 __r.erase(rmf);
156 return *this;
157 } // -x- rsocket_mux& erase -x-
158
159 /*======================================================================*//**
160 @brief
161 Add one rsocket object to the underlying std::set.
162
163 @par Threads
164 This method is threadsafe.
165 *///=========================================================================
166 rsocket_mux& insert(
167 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
168 const int fd_sets,
169 /// Pointer to instantiated rsocket object
170 rsocket* r) {
171 const std::lock_guard<std::mutex> lock(__mutex);
172 __insert(fd_sets, r);
173 return *this;
174 } // -x- rsocket_mux& insert -x-
175
176 /*======================================================================*//**
177 @brief
178 Add one or more rsocket objects to the underlying std::set.
179
180 @par Threads
181 This method is threadsafe.
182 *///=========================================================================
183 template<class R, class... Rs> rsocket_mux& insert(
184 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
185 const int fd_sets,
186 /// Pointer to instantiated rsocket object
187 R r,
188 /// Variadic arguments (any quantity of instantiated rsocket objects)
189 Rs... rs) {
190 const std::lock_guard<std::mutex> lock(__mutex);
191 __insert(fd_sets, r);
192 __insert(fd_sets, rs...);
193 return *this;
194 } // -x- rsocket_mux& insert -x-
195
196 /*======================================================================*//**
197 @brief
198 Use the poll() method on the internal set of instantiated rsocket objects.
199
200 @par Threads
201 This method is not threadsafe. Calls to insert() and erase() during this
202 blocking operation will almost certainly yield unpredictable results.
203
204 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
205 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
206 part of the user address space
207 @throws randolf::rex::xEINTR Interrupted by a signal
208 @throws randolf::rex::xENOMEM Insufficient memory
209 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
210 doesn't refer to a socket
211 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
212 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
213 is a highly improbable chance that a timeout could still occur if the
214 data is read from any of the sockets in this rsocket_mux collection
215 by another thread before the `recv(..., MSG_PEEK)` call)
216
217 @returns An std::vector<socket_mux_fds> that holds only those sockets that
218 were selected
219 *///=========================================================================
220 std::vector<rsocket_mux_fds> poll(
221 /// Number of milliseconds to wait
222 const int timeout = 0,
223 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
224 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
225
226 // --------------------------------------------------------------------------
227 // Populate the FDS super-structures while tracking highest socket_fd handle
228 // in nfds (set is ordered, so highest socket_fd handle value will always be
229 // the last one tracked).
230 // --------------------------------------------------------------------------
231 pollfd fds[__r.size()];
232 nfds_t nfds = 0; // After loop, this variable will hold the size of the array
233 for (rsocket_mux_fds rmf : __r) {
234 fds[nfds++] = { rmf.r->get_socket_fd(), rmf.fd_sets, 0 };
235 } // -x- foreach __r -x-
236
237 // --------------------------------------------------------------------------
238 // Call the POSIX poll() function. Throw exceptions if there are errors or a
239 // timeout occurred (before allocating std::vector<rsocket_mux_fds>).
240 //
241 // Note: We increment nfds because the API calls for it being one more than
242 // the highest handle number.
243 // --------------------------------------------------------------------------
244 switch (::poll(fds,
245 nfds,
246 timeout)) {
247 case 0:
248 if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
249 return std::vector<rsocket_mux_fds>();
250 case -1:
251 randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
252 } // -x- switch ::poll() -x-
253
254 // --------------------------------------------------------------------------
255 // Build the std::vector of revents. We're only checking for the socket
256 // handles that we've stored, so this will be a shorter loop since we're
257 // effectively only checking socket handles that are known to rsocket_mux
258 // instead of checking everything.
259 //
260 // This is a drastically more optimal approach, particuarly in systems that
261 // are handling [tens of] thousands of sockets, loops can become quite long
262 // as a result of testing every socket handle up to the highest numbered
263 // socket handle. We only test for handles that we want to, thus completely
264 // eliminating the need to loop through an entire range.
265 //
266 // For example: If you're running select() against socket handles that just
267 // so happen to be numbers 3, 228, and 5832, then we will only
268 // be checking for @c revents on these three socket handles
269 // instead of checking for events on the entire range of socket
270 // handles starting from 0 and performing 5833 unnecessary
271 // tests.
272 // --------------------------------------------------------------------------
273 std::vector<rsocket_mux_fds> revents;
274 nfds = 0; // Reset size/counter
275 for (rsocket_mux_fds rmf : __r) {
276 if (fds[nfds].revents != 0) revents.push_back({ rmf.r, fds[nfds].revents });
277 nfds++;
278 } // -x- foreach __r -x-
279
280 return revents;
281 } // -x- std::vector<rsocket_mux_fds> poll -x-
282
283 /*======================================================================*//**
284 @brief
285 Use the ppoll() method on the internal set of instantiated rsocket objects.
286
287 @par Threads
288 This method is not threadsafe. Calls to insert() and erase() during this
289 blocking operation will almost certainly yield unpredictable results.
290
291 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
292 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
293 part of the user address space
294 @throws randolf::rex::xEINTR Interrupted by a signal
295 @throws randolf::rex::xENOMEM Insufficient memory
296 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
297 doesn't refer to a socket
298 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
299 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
300 is a highly improbable chance that a timeout could still occur if the
301 data is read from any of the sockets in this rsocket_mux collection
302 by another thread before the `recv(..., MSG_PEEK)` call)
303
304 @returns An std::vector<socket_mux_fds> that holds only those sockets that
305 were selected
306 *///=========================================================================
307 std::vector<rsocket_mux_fds> ppoll(
308 /// Pointer to a @c timespec structure (will not be updated)
309 const struct timespec* tmo_p = nullptr,
310 /// Signal mask
311 const sigset_t* sigmask = nullptr,
312 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
313 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
314
315 // --------------------------------------------------------------------------
316 // Populate the FDS super-structures while tracking highest socket_fd handle
317 // in nfds (set is ordered, so highest socket_fd handle value will always be
318 // the last one tracked).
319 // --------------------------------------------------------------------------
320 pollfd fds[__r.size()];
321 nfds_t nfds = 0; // After loop, this variable will hold the size of the array
322 for (rsocket_mux_fds rmf : __r) {
323 fds[nfds++] = { rmf.r->get_socket_fd(), rmf.fd_sets, 0 };
324 } // -x- foreach __r -x-
325
326 // --------------------------------------------------------------------------
327 // Call the POSIX poll() function. Throw exceptions if there are errors or a
328 // timeout occurred (before allocating std::vector<rsocket_mux_fds>).
329 //
330 // Note: We increment nfds because the API calls for it being one more than
331 // the highest handle number.
332 // --------------------------------------------------------------------------
333 switch (::ppoll(fds,
334 nfds,
335 tmo_p,
336 sigmask)) {
337 case 0:
338 if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
339 return std::vector<rsocket_mux_fds>();
340 case -1:
341 randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
342 } // -x- switch ::poll() -x-
343
344 // --------------------------------------------------------------------------
345 // Build the std::vector of revents. We're only checking for the socket
346 // handles that we've stored, so this will be a shorter loop since we're
347 // effectively only checking socket handles that are known to rsocket_mux
348 // instead of checking everything.
349 //
350 // This is a drastically more optimal approach, particuarly in systems that
351 // are handling [tens of] thousands of sockets, loops can become quite long
352 // as a result of testing every socket handle up to the highest numbered
353 // socket handle. We only test for handles that we want to, thus completely
354 // eliminating the need to loop through an entire range.
355 //
356 // For example: If you're running select() against socket handles that just
357 // so happen to be numbers 3, 228, and 5832, then we will only
358 // be checking for @c revents on these three socket handles
359 // instead of checking for events on the entire range of socket
360 // handles starting from 0 and performing 5833 unnecessary
361 // tests.
362 // --------------------------------------------------------------------------
363 std::vector<rsocket_mux_fds> revents;
364 nfds = 0; // Reset size/counter
365 for (rsocket_mux_fds rmf : __r) {
366 if (fds[nfds].revents != 0) revents.push_back({ rmf.r, fds[nfds].revents });
367 nfds++;
368 } // -x- foreach __r -x-
369
370 return revents;
371 } // -x- std::vector<rsocket_mux_fds> ppoll -x-
372
373 /*======================================================================*//**
374 @brief
375 Use the pselect() method on the internal set of instantiated rsocket objects.
376
377 @note
378 Performance optimization was a major consideration in the design of this
379 method, including only testing the readiness of socket handles that were
380 actually added to the underlying fd_set array(s).
381
382 The easier way to use this method is to only add one polling type (such as @c
383 POLLIN) to the encompassing rsocket_mux object, and then all rsocket_mux_fds
384 records returned will only be for that polling type (and then you won't have
385 to write additional code to confirm the polling type). However, @e easier
386 doesn't necessarily equate to @e better, and your needs and use case(s)
387 should normally be considered in determining which approach is the most
388 appropriate to write code for.
389
390 @warning
391 The underlying POSIX pselect() method is not efficient and is known to have a
392 tendency to use a significant amount of stack memory, particularly when
393 monitoring more sockets. The @ref poll() and @ref ppoll() methods serve as
394 improvements to these problems, and are generally known to be better options
395 for monitoring larger numbers of sockets.
396
397 @par Threads
398 This method is not threadsafe. Calls to insert() and erase() during this
399 blocking operation will almost certainly yield unpredictable results.
400
401 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
402 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
403 part of the user address space
404 @throws randolf::rex::xEINTR Interrupted by a signal
405 @throws randolf::rex::xENOMEM Insufficient memory
406 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
407 doesn't refer to a socket
408 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
409 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
410 is a highly improbable chance that a timeout could still occur if the
411 data is read from any of the sockets in this rsocket_mux collection
412 by another thread before the `recv(..., MSG_PEEK)` call)
413
414 @returns An std::vector<socket_mux_fds> that holds only those sockets that
415 were selected
416 @see poll()
417 @see ppoll()
418 @see select()
419 *///=========================================================================
420 std::vector<rsocket_mux_fds> pselect(
421 /// Pointer to a @c timespec structure (will not be updated)
422 const struct timespec* tmo_p = nullptr,
423 /// Signal mask
424 const sigset_t* sigmask = nullptr,
425 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
426 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
427 fd_set readfds, writefds, exceptfds; // Various fd_set arrays
428 int rfds = 0, wfds = 0, efds = 0; // Internal counters
429 int nfds = 0; // Socket handle tracker / Quantity counter
430 FD_ZERO(&readfds); // Initialize readfds fd_set
431 FD_ZERO(&writefds); // Initialize writefds fd_set
432 FD_ZERO(&exceptfds); // Initialize exceptfds fd_set
433
434 // --------------------------------------------------------------------------
435 // Populate the FDS super-structures while tracking highest socket_fd handle
436 // in nfds (set is ordered, so highest socket_fd handle value will always be
437 // the last one tracked).
438 // --------------------------------------------------------------------------
439 for (rsocket_mux_fds rmf : __r) {
440 if (POLLIN & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &readfds); rfds++; }
441 if (POLLOUT & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &writefds); wfds++; }
442 if (POLLERR & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &exceptfds); efds++; }
443 } // -x- foreach __r -x-
444
445 // --------------------------------------------------------------------------
446 // Call the POSIX pselect() function. Throw exceptions if there are errors
447 // or a timeout occurred (before allocating std::vector<rsocket_mux_fds>).
448 //
449 // Note: We increment nfds because the API calls for it being one more than
450 // the highest handle number.
451 // --------------------------------------------------------------------------
452 switch (::pselect(++nfds,
453 rfds == 0 ? nullptr : &readfds,
454 wfds == 0 ? nullptr : &writefds,
455 efds == 0 ? nullptr : &exceptfds,
456 tmo_p,
457 sigmask)) {
458 case 0:
459 if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
460 return std::vector<rsocket_mux_fds>();
461 case -1:
462 randolf::rex::mk_exception("pselect() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
463 } // -x- switch ::pselect() -x-
464
465 // --------------------------------------------------------------------------
466 // Build the std::vector of revents. We're only checking for the socket
467 // handles that we've stored, so this will be a shorter loop since we're
468 // effectively only checking socket handles that are known to rsocket_mux
469 // instead of checking everything.
470 //
471 // This is a drastically more optimal approach, particuarly in systems that
472 // are handling [tens of] thousands of sockets, loops can become quite long
473 // as a result of testing every socket handle up to the highest numbered
474 // socket handle. We only test for handles that we want to, thus completely
475 // eliminating the need to loop through an entire range.
476 //
477 // For example: If you're running pselect() against socket handles that just
478 // so happen to be numbers 3, 228, and 5832, then we will only
479 // be checking for @c revents on these three socket handles
480 // instead of checking for events on the entire range of socket
481 // handles starting from 0 and performing 5833 unnecessary
482 // tests.
483 // --------------------------------------------------------------------------
484 std::vector<rsocket_mux_fds> revents;
485 static const int revent = POLLIN | POLLOUT | POLLERR;
486 for (rsocket_mux_fds rmf : __r) {
487 if (revent & rmf.fd_sets) {
488 if (FD_ISSET(rmf.r->get_socket_fd(), &readfds)
489 || FD_ISSET(rmf.r->get_socket_fd(), &writefds)
490 || FD_ISSET(rmf.r->get_socket_fd(), &exceptfds)) revents.push_back(rmf);
491 } // -x- if revent -x-
492 } // -x- foreach __r -x-
493
494 return revents;
495 } // -x- std::vector<rsocket_mux_fds> pselect() -x-
496
497 /*======================================================================*//**
498 @brief
499 Use the select() method on the internal set of instantiated rsocket objects.
500
501 @note
502 Performance optimization was a major consideration in the design of this
503 method, including only testing the readiness of socket handles that were
504 actually added to the underlying fd_set array(s).
505
506 The easier way to use this method is to only add one polling type (such as @c
507 POLLIN) to the encompassing rsocket_mux object, and then all rsocket_mux_fds
508 records returned will only be for that polling type (and then you won't have
509 to write additional code to confirm the polling type). However, @e easier
510 doesn't necessarily equate to @e better, and your needs and use case(s)
511 should normally be considered in determining which approach is the most
512 appropriate to write code for.
513
514 @warning
515 The underlying POSIX select() method is not efficient and is known to have a
516 tendency to use a significant amount of stack memory, particularly when
517 monitoring more sockets. The @ref poll() and @ref ppoll() methods serve as
518 improvements to these problems, and are generally known to be better options
519 for monitoring larger numbers of sockets.
520
521 @par Threads
522 This method is not threadsafe. Calls to insert() and erase() during this
523 blocking operation will almost certainly yield unpredictable results.
524
525 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
526 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
527 part of the user address space
528 @throws randolf::rex::xEINTR Interrupted by a signal
529 @throws randolf::rex::xENOMEM Insufficient memory
530 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
531 doesn't refer to a socket
532 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
533 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
534 is a highly improbable chance that a timeout could still occur if the
535 data is read from any of the sockets in this rsocket_mux collection
536 by another thread before the `recv(..., MSG_PEEK)` call)
537
538 @returns An std::vector<socket_mux_fds> that holds only those sockets that
539 were selected
540 @see poll()
541 @see ppoll()
542 @see pselect()
543 *///=========================================================================
544 std::vector<rsocket_mux_fds> select(
545 /// Pointer to a @c timeval structure (may also be updated with duration remaining if defined)
546 struct timeval* tv = nullptr,
547 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
548 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
549 fd_set readfds, writefds, exceptfds; // Various fd_set arrays
550 int rfds = 0, wfds = 0, efds = 0; // Internal counters
551 int nfds = 0; // Socket handle tracker / Quantity counter
552 FD_ZERO(&readfds); // Initialize readfds fd_set
553 FD_ZERO(&writefds); // Initialize writefds fd_set
554 FD_ZERO(&exceptfds); // Initialize exceptfds fd_set
555
556 // --------------------------------------------------------------------------
557 // Populate the FDS super-structures while tracking highest socket_fd handle
558 // in nfds (set is ordered, so highest socket_fd handle value will always be
559 // the last one tracked).
560 // --------------------------------------------------------------------------
561 for (rsocket_mux_fds rmf : __r) {
562 if (POLLIN & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &readfds); rfds++; }
563 if (POLLOUT & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &writefds); wfds++; }
564 if (POLLERR & rmf.fd_sets) { FD_SET(nfds = rmf.r->get_socket_fd(), &exceptfds); efds++; }
565 } // -x- foreach __r -x-
566
567 // --------------------------------------------------------------------------
568 // Call the POSIX select() function. Throw exceptions if there are errors or
569 // a timeout occurred (before allocating std::vector<rsocket_mux_fds>).
570 //
571 // Note: We increment nfds because the API calls for it being one more than
572 // the highest handle number.
573 // --------------------------------------------------------------------------
574 switch (::select(++nfds,
575 rfds == 0 ? nullptr : &readfds,
576 wfds == 0 ? nullptr : &writefds,
577 efds == 0 ? nullptr : &exceptfds,
578 tv)) {
579 case 0:
580 if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
581 return std::vector<rsocket_mux_fds>();
582 case -1:
583 randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
584 } // -x- switch ::select() -x-
585
586 // --------------------------------------------------------------------------
587 // Build the std::vector of revents. We're only checking for the socket
588 // handles that we've stored, so this will be a shorter loop since we're
589 // effectively only checking socket handles that are known to rsocket_mux
590 // instead of checking everything.
591 //
592 // This is a drastically more optimal approach, particuarly in systems that
593 // are handling [tens of] thousands of sockets, loops can become quite long
594 // as a result of testing every socket handle up to the highest numbered
595 // socket handle. We only test for handles that we want to, thus completely
596 // eliminating the need to loop through an entire range.
597 //
598 // For example: If you're running select() against socket handles that just
599 // so happen to be numbers 3, 228, and 5832, then we will only
600 // be checking for @c revents on these three socket handles
601 // instead of checking for events on the entire range of socket
602 // handles starting from 0 and performing 5833 unnecessary
603 // tests.
604 // --------------------------------------------------------------------------
605 std::vector<rsocket_mux_fds> revents;
606 static const int revent = POLLIN | POLLOUT | POLLERR;
607 for (rsocket_mux_fds rmf : __r) {
608 if (revent & rmf.fd_sets) {
609 if (FD_ISSET(rmf.r->get_socket_fd(), &readfds)
610 || FD_ISSET(rmf.r->get_socket_fd(), &writefds)
611 || FD_ISSET(rmf.r->get_socket_fd(), &exceptfds)) revents.push_back(rmf);
612 } // -x- if revent -x-
613 } // -x- foreach __r -x-
614
615 return revents;
616 } // -x- std::vector<rsocket_mux_fds> select() -x-
617
618 /*======================================================================*//**
619 @brief
620 Find out how many rsocket entries are in this mux.
621
622 @par Threads
623 This method is threadsafe.
624 @returns Number of rsocket objects in the underlying std::set
625 *///=========================================================================
626 size_t size() noexcept {
627 const std::lock_guard<std::mutex> lock(__mutex);
628 return __r.size();
629 } // -x- size_t size -x-
630
631 }; // -x- class rsocket_mux -x-
632
633}; // -x- namespace randolf -x-