randolf.ca  1.00
Randolf Richardson's C++ classes
Loading...
Searching...
No Matches
rsocket_mux
1#pragma once
2
3//#include <randolf/rex>
4//#include <randolf/rsocket>
5#include <randolf/rsocket_mux_fds>
6
7namespace randolf {
8
9 /*======================================================================*//**
10 @brief
11 The rsocket multiplexer extends the functionality of the @ref rsocket class
12 primarily for handling multiple sockets that are beyond the scope of the
13 single-socket-focused nature of rsocket.
14
15 Functionality of @c select(), @c poll(), and related socket functions are
16 implemented within this separate @ref rsocket_mux class partly because these
17 specializations are beyond the scope of what the @ref rsocket class is
18 intended for.
19
20 @par Threads
21 This class is threadsafe.
22 @author Randolf Richardson
23 @version 1.00
24 @par History
25 2022-Dec-24 v1.00 Initial version
26 *///=========================================================================
27 class rsocket_mux {
28
29 private:
30 // --------------------------------------------------------------------------
31 // Internal variables.
32 // --------------------------------------------------------------------------
33 std::mutex __mutex; // Used to ensure thread safety
34 std::set<rsocket_mux_fds> __r; // Set of rsocket objects
35
36 public:
37 /*======================================================================*//**
38 @brief
39 Instantiate an rsocket multiplexer with no @ref rsocket objects.
40
41 @see insert()
42 *///=========================================================================
43 rsocket_mux() noexcept {}; // -x- constructor rsocket -x-
44
45 private:
46 /*======================================================================*//**
47 @brief
48 Internal method that is used by the @ref insert(rsocket*) method.
49
50 @warning
51 This method is not threadsafe.
52 *///=========================================================================
53 void __insert(
54 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
55 const int fd_sets,
56 /// Pointer to instantiated rsocket object
57 rsocket* r) noexcept {
58 if (r == nullptr) return;
59 __r.insert(rsocket_mux_fds{r, fd_sets});
60 }; // -x- void __insert -x-
61
62 /*======================================================================*//**
63 @brief
64 Internal method that is used by the @ref insert(rsocket*, ...) method.
65
66 @warning
67 This method is not threadsafe.
68 *///=========================================================================
69 template<class R, class... Rs> void __insert(
70 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
71 const int fd_sets,
72 /// Pointer to instantiated rsocket object
73 R r,
74 /// Variadic arguments (any quantity of instantiated rsocket objects)
75 Rs... rs) noexcept {
76 __insert(fd_sets, r);
77 __insert(fd_sets, rs...);
78 }; // -x- void __insert -x-
79
80 public:
81 /*======================================================================*//**
82 @brief
83 Instantiate an rsocket multiplexer with one @ref rsocket object.
84
85 The following fd_sets correspond with each respective POSIX select() fd_set,
86 which can be combined to include the specified rsocket in multiple fd_sets:
87 - @c POLLIN = readfds
88 - @c POLLOUT = writefds
89 - @c POLLERR = exceptfds
90
91 @see insert()
92 *///=========================================================================
93 rsocket_mux(
94 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
95 const int fd_sets,
96 /// Pointer to instantiated rsocket object
97 rsocket* r) noexcept {
98 if (r == nullptr) return;
99 __insert(fd_sets, r);
100 }; // -x- constructor rsocket -x-
101
102 /*======================================================================*//**
103 @brief
104 Instantiate an rsocket multiplexer with any number of @ref rsocket objects.
105
106 The following fd_sets correspond with each respective POSIX select() fd_set,
107 which can be combined to include the specified rsocket in multiple fd_sets:
108 - @c POLLIN = readfds
109 - @c POLLOUT = writefds
110 - @c POLLERR = exceptfds
111
112 @see insert()
113 *///=========================================================================
114 template<class R, class... Rs> rsocket_mux(
115 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
116 const int fd_sets,
117 /// Pointer to instantiated rsocket object
118 R r,
119 /// Variadic arguments (any quantity of instantiated rsocket objects)
120 Rs... rs) noexcept {
121 __insert(fd_sets, r);
122 __insert(fd_sets, rs...);
123 }; // -x- constructor rsocket -x-
124
125 /*======================================================================*//**
126 @brief
127 Remove an rsocket object from the underlying std::set.
128
129 @par Threads
130 This method is threadsafe.
131 *///=========================================================================
132 void erase(
133 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
134 const int fd_sets,
135 /// Pointer to instantiated rsocket object
136 rsocket* r) noexcept {
137 if (r == nullptr) return;
138 const std::lock_guard<std::mutex> lock(__mutex);
139 __r.erase(rsocket_mux_fds{r, fd_sets});
140 }; // -x- void erase -x-
141
142 /*======================================================================*//**
143 @brief
144 Remove an rsocket object from the underlying std::set.
145
146 @par Threads
147 This method is threadsafe.
148 *///=========================================================================
149 void erase(
150 /// Structure that is comprised of an rsocket object and its fd_set flags
151 rsocket_mux_fds rmf) {
152 const std::lock_guard<std::mutex> lock(__mutex);
153 __r.erase(rmf);
154 }; // -x- void erase -x-
155
156 /*======================================================================*//**
157 @brief
158 Add one rsocket object to the underlying std::set.
159
160 @par Threads
161 This method is threadsafe.
162 *///=========================================================================
163 void insert(
164 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
165 const int fd_sets,
166 /// Pointer to instantiated rsocket object
167 rsocket* r) {
168 const std::lock_guard<std::mutex> lock(__mutex);
169 __insert(fd_sets, r);
170 }; // -x- void insert -x-
171
172 /*======================================================================*//**
173 @brief
174 Add one or more rsocket objects to the underlying std::set.
175
176 @par Threads
177 This method is threadsafe.
178 *///=========================================================================
179 template<class R, class... Rs> void insert(
180 /// Must be at least one of @c POLLIN, @c POLLOUT, and @c POLLERR
181 const int fd_sets,
182 /// Pointer to instantiated rsocket object
183 R r,
184 /// Variadic arguments (any quantity of instantiated rsocket objects)
185 Rs... rs) {
186 const std::lock_guard<std::mutex> lock(__mutex);
187 __insert(fd_sets, r);
188 __insert(fd_sets, rs...);
189 }; // -x- void insert -x-
190
191 /*======================================================================*//**
192 @brief
193 Use the poll() method on the internal set of instantiated rsocket objects.
194
195 @par Threads
196 This method is not threadsafe. Calls to insert() and erase() during this
197 blocking operation will almost certainly yield unpredictable results.
198
199 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
200 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
201 part of the user address space
202 @throws randolf::rex::xEINTR Interrupted by a signal
203 @throws randolf::rex::xENOMEM Insufficient memory
204 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
205 doesn't refer to a socket
206 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
207 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
208 is a highly improbable chance that a timeout could still occur if the
209 data is read from any of the sockets in this rsocket_mux collection
210 by another thread before the `recv(..., MSG_PEEK)` call)
211
212 @returns An std::vector<socket_mux_fds> that holds only those sockets that
213 were selected
214 *///=========================================================================
215 std::vector<rsocket_mux_fds> poll(
216 /// Number of milliseconds to wait
217 const int timeout = 0,
218 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
219 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
220
221 // --------------------------------------------------------------------------
222 // Populate the FDS super-structures while tracking highest socket_fd handle
223 // in nfds (set is ordered, so highest socket_fd handle value will always be
224 // the last one tracked).
225 // --------------------------------------------------------------------------
226 pollfd fds[__r.size()];
227 nfds_t nfds = 0; // After loop, this variable will hold the size of the array
228 for (rsocket_mux_fds rmf : __r) {
229 fds[nfds++] = { rmf.r->socket_fd(), rmf.fd_sets, 0 };
230 } // -x- foreach __r -x-
231
232 // --------------------------------------------------------------------------
233 // Call the POSIX poll() function. Throw exceptions if there are errors or a
234 // timeout occurred (before allocating std::vector<rsocket_mux_fds>).
235 //
236 // Note: We increment nfds because the API calls for it being one more than
237 // the highest handle number.
238 // --------------------------------------------------------------------------
239 switch (::poll(fds,
240 nfds,
241 timeout)) {
242 case 0: if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
243 else return std::vector<rsocket_mux_fds>();
244 case -1: randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
245 } // -x- switch ::poll() -x-
246
247 // --------------------------------------------------------------------------
248 // Build the std::vector of revents. We're only checking for the socket
249 // handles that we've stored, so this will be a shorter loop since we're
250 // effectively only checking socket handles that are known to rsocket_mux
251 // instead of checking everything.
252 //
253 // This is a drastically more optimal approach, particuarly in systems that
254 // are handling [tens of] thousands of sockets, loops can become quite long
255 // as a result of testing every socket handle up to the highest numbered
256 // socket handle. We only test for handles that we want to, thus completely
257 // eliminating the need to loop through an entire range.
258 //
259 // For example: If you're running select() against socket handles that just
260 // so happen to be numbers 3, 228, and 5832, then we will only
261 // be checking for @c revents on these three socket handles
262 // instead of checking for events on the entire range of socket
263 // handles starting from 0 and performing 5833 unnecessary
264 // tests.
265 // --------------------------------------------------------------------------
266 std::vector<rsocket_mux_fds> revents;
267 nfds = 0; // Reset size/counter
268 for (rsocket_mux_fds rmf : __r) {
269 if (fds[nfds].revents != 0) revents.push_back({ rmf.r, fds[nfds].revents });
270 nfds++;
271 } // -x- foreach __r -x-
272
273 return revents;
274 }; // -x- std::vector<rsocket_mux_fds> poll -x-
275
276 /*======================================================================*//**
277 @brief
278 Use the ppoll() method on the internal set of instantiated rsocket objects.
279
280 @par Threads
281 This method is not threadsafe. Calls to insert() and erase() during this
282 blocking operation will almost certainly yield unpredictable results.
283
284 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
285 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
286 part of the user address space
287 @throws randolf::rex::xEINTR Interrupted by a signal
288 @throws randolf::rex::xENOMEM Insufficient memory
289 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
290 doesn't refer to a socket
291 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
292 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
293 is a highly improbable chance that a timeout could still occur if the
294 data is read from any of the sockets in this rsocket_mux collection
295 by another thread before the `recv(..., MSG_PEEK)` call)
296
297 @returns An std::vector<socket_mux_fds> that holds only those sockets that
298 were selected
299 *///=========================================================================
300 std::vector<rsocket_mux_fds> ppoll(
301 /// Pointer to a @c timespec structure (will not be updated)
302 const struct timespec* tmo_p = nullptr,
303 /// Signal mask
304 const sigset_t* sigmask = nullptr,
305 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
306 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
307
308 // --------------------------------------------------------------------------
309 // Populate the FDS super-structures while tracking highest socket_fd handle
310 // in nfds (set is ordered, so highest socket_fd handle value will always be
311 // the last one tracked).
312 // --------------------------------------------------------------------------
313 pollfd fds[__r.size()];
314 nfds_t nfds = 0; // After loop, this variable will hold the size of the array
315 for (rsocket_mux_fds rmf : __r) {
316 fds[nfds++] = { rmf.r->socket_fd(), rmf.fd_sets, 0 };
317 } // -x- foreach __r -x-
318
319 // --------------------------------------------------------------------------
320 // Call the POSIX poll() function. Throw exceptions if there are errors or a
321 // timeout occurred (before allocating std::vector<rsocket_mux_fds>).
322 //
323 // Note: We increment nfds because the API calls for it being one more than
324 // the highest handle number.
325 // --------------------------------------------------------------------------
326 switch (::ppoll(fds,
327 nfds,
328 tmo_p,
329 sigmask)) {
330 case 0: if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
331 else return std::vector<rsocket_mux_fds>();
332 case -1: randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
333 } // -x- switch ::poll() -x-
334
335 // --------------------------------------------------------------------------
336 // Build the std::vector of revents. We're only checking for the socket
337 // handles that we've stored, so this will be a shorter loop since we're
338 // effectively only checking socket handles that are known to rsocket_mux
339 // instead of checking everything.
340 //
341 // This is a drastically more optimal approach, particuarly in systems that
342 // are handling [tens of] thousands of sockets, loops can become quite long
343 // as a result of testing every socket handle up to the highest numbered
344 // socket handle. We only test for handles that we want to, thus completely
345 // eliminating the need to loop through an entire range.
346 //
347 // For example: If you're running select() against socket handles that just
348 // so happen to be numbers 3, 228, and 5832, then we will only
349 // be checking for @c revents on these three socket handles
350 // instead of checking for events on the entire range of socket
351 // handles starting from 0 and performing 5833 unnecessary
352 // tests.
353 // --------------------------------------------------------------------------
354 std::vector<rsocket_mux_fds> revents;
355 nfds = 0; // Reset size/counter
356 for (rsocket_mux_fds rmf : __r) {
357 if (fds[nfds].revents != 0) revents.push_back({ rmf.r, fds[nfds].revents });
358 nfds++;
359 } // -x- foreach __r -x-
360
361 return revents;
362 }; // -x- std::vector<rsocket_mux_fds> ppoll -x-
363
364 /*======================================================================*//**
365 @brief
366 Use the pselect() method on the internal set of instantiated rsocket objects.
367
368 @note
369 Performance optimization was a major consideration in the design of this
370 method, including only testing the readiness of socket handles that were
371 actually added to the underlying fd_set array(s).
372
373 The easier way to use this method is to only add one polling type (such as @c
374 POLLIN) to the encompassing rsocket_mux object, and then all rsocket_mux_fds
375 records returned will only be for that polling type (and then you won't have
376 to write additional code to confirm the polling type). However, @e easier
377 doesn't necessarily equate to @e better, and your needs and use case(s)
378 should normally be considered in determining which approach is the most
379 appropriate to write code for.
380
381 @warning
382 The underlying POSIX pselect() method is not efficient and is known to have a
383 tendency to use a significant amount of stack memory, particularly when
384 monitoring more sockets. The @ref poll() and @ref ppoll() methods serve as
385 improvements to these problems, and are generally known to be better options
386 for monitoring larger numbers of sockets.
387
388 @par Threads
389 This method is not threadsafe. Calls to insert() and erase() during this
390 blocking operation will almost certainly yield unpredictable results.
391
392 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
393 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
394 part of the user address space
395 @throws randolf::rex::xEINTR Interrupted by a signal
396 @throws randolf::rex::xENOMEM Insufficient memory
397 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
398 doesn't refer to a socket
399 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
400 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
401 is a highly improbable chance that a timeout could still occur if the
402 data is read from any of the sockets in this rsocket_mux collection
403 by another thread before the `recv(..., MSG_PEEK)` call)
404
405 @returns An std::vector<socket_mux_fds> that holds only those sockets that
406 were selected
407 @see poll()
408 @see ppoll()
409 @see select()
410 *///=========================================================================
411 std::vector<rsocket_mux_fds> pselect(
412 /// Pointer to a @c timespec structure (will not be updated)
413 const struct timespec* tmo_p = nullptr,
414 /// Signal mask
415 const sigset_t* sigmask = nullptr,
416 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
417 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
418 fd_set readfds, writefds, exceptfds; // Various fd_set arrays
419 int rfds = 0, wfds = 0, efds = 0; // Internal counters
420 int nfds = 0; // Socket handle tracker / Quantity counter
421 FD_ZERO(&readfds); // Initialize readfds fd_set
422 FD_ZERO(&writefds); // Initialize writefds fd_set
423 FD_ZERO(&exceptfds); // Initialize exceptfds fd_set
424
425 // --------------------------------------------------------------------------
426 // Populate the FDS super-structures while tracking highest socket_fd handle
427 // in nfds (set is ordered, so highest socket_fd handle value will always be
428 // the last one tracked).
429 // --------------------------------------------------------------------------
430 for (rsocket_mux_fds rmf : __r) {
431 if (POLLIN & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &readfds); rfds++; }
432 if (POLLOUT & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &writefds); wfds++; }
433 if (POLLERR & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &exceptfds); efds++; }
434 } // -x- foreach __r -x-
435
436 // --------------------------------------------------------------------------
437 // Call the POSIX pselect() function. Throw exceptions if there are errors
438 // or a timeout occurred (before allocating std::vector<rsocket_mux_fds>).
439 //
440 // Note: We increment nfds because the API calls for it being one more than
441 // the highest handle number.
442 // --------------------------------------------------------------------------
443 switch (::pselect(++nfds,
444 rfds == 0 ? nullptr : &readfds,
445 wfds == 0 ? nullptr : &writefds,
446 efds == 0 ? nullptr : &exceptfds,
447 tmo_p,
448 sigmask)) {
449 case 0: if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
450 else return std::vector<rsocket_mux_fds>();
451 case -1: randolf::rex::mk_exception("pselect() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
452 } // -x- switch ::pselect() -x-
453
454 // --------------------------------------------------------------------------
455 // Build the std::vector of revents. We're only checking for the socket
456 // handles that we've stored, so this will be a shorter loop since we're
457 // effectively only checking socket handles that are known to rsocket_mux
458 // instead of checking everything.
459 //
460 // This is a drastically more optimal approach, particuarly in systems that
461 // are handling [tens of] thousands of sockets, loops can become quite long
462 // as a result of testing every socket handle up to the highest numbered
463 // socket handle. We only test for handles that we want to, thus completely
464 // eliminating the need to loop through an entire range.
465 //
466 // For example: If you're running pselect() against socket handles that just
467 // so happen to be numbers 3, 228, and 5832, then we will only
468 // be checking for @c revents on these three socket handles
469 // instead of checking for events on the entire range of socket
470 // handles starting from 0 and performing 5833 unnecessary
471 // tests.
472 // --------------------------------------------------------------------------
473 std::vector<rsocket_mux_fds> revents;
474 static const int revent = POLLIN | POLLOUT | POLLERR;
475 for (rsocket_mux_fds rmf : __r) {
476 if (revent & rmf.fd_sets) {
477 if (FD_ISSET(rmf.r->socket_fd(), &readfds)
478 || FD_ISSET(rmf.r->socket_fd(), &writefds)
479 || FD_ISSET(rmf.r->socket_fd(), &exceptfds)) revents.push_back(rmf);
480 } // -x- if revent -x-
481 } // -x- foreach __r -x-
482
483 return revents;
484 }; // -x- std::vector<rsocket_mux_fds> pselect() -x-
485
486 /*======================================================================*//**
487 @brief
488 Use the select() method on the internal set of instantiated rsocket objects.
489
490 @note
491 Performance optimization was a major consideration in the design of this
492 method, including only testing the readiness of socket handles that were
493 actually added to the underlying fd_set array(s).
494
495 The easier way to use this method is to only add one polling type (such as @c
496 POLLIN) to the encompassing rsocket_mux object, and then all rsocket_mux_fds
497 records returned will only be for that polling type (and then you won't have
498 to write additional code to confirm the polling type). However, @e easier
499 doesn't necessarily equate to @e better, and your needs and use case(s)
500 should normally be considered in determining which approach is the most
501 appropriate to write code for.
502
503 @warning
504 The underlying POSIX select() method is not efficient and is known to have a
505 tendency to use a significant amount of stack memory, particularly when
506 monitoring more sockets. The @ref poll() and @ref ppoll() methods serve as
507 improvements to these problems, and are generally known to be better options
508 for monitoring larger numbers of sockets.
509
510 @par Threads
511 This method is not threadsafe. Calls to insert() and erase() during this
512 blocking operation will almost certainly yield unpredictable results.
513
514 @throws randolf::rex::xEAGAIN Failure to allocate internal system resources
515 @throws randolf::rex::xEFAULT Address structure/memory is not in a writable
516 part of the user address space
517 @throws randolf::rex::xEINTR Interrupted by a signal
518 @throws randolf::rex::xENOMEM Insufficient memory
519 @throws randolf::rex::xENOTSOCK Underlying socket file descriptor (handle)
520 doesn't refer to a socket
521 @throws randolf::rex::xETIMEDOUT Timeout period elapsed (even if the @c
522 TIMEOUT_BEHAVIOUR flag is @em not set to @c TIMEOUT_EXCEPTION, there
523 is a highly improbable chance that a timeout could still occur if the
524 data is read from any of the sockets in this rsocket_mux collection
525 by another thread before the `recv(..., MSG_PEEK)` call)
526
527 @returns An std::vector<socket_mux_fds> that holds only those sockets that
528 were selected
529 @see poll()
530 @see ppoll()
531 @see pselect()
532 *///=========================================================================
533 std::vector<rsocket_mux_fds> select(
534 /// Pointer to a @c timeval structure (may also be updated with duration remaining if defined)
535 struct timeval* tv = nullptr,
536 /// Timeout behaviour (see @ref rsocket::TIMEOUT_BEHAVIOUR for details)
537 const bool timeout_behaviour = rsocket::TIMEOUT_EXCEPTION) {
538 fd_set readfds, writefds, exceptfds; // Various fd_set arrays
539 int rfds = 0, wfds = 0, efds = 0; // Internal counters
540 int nfds = 0; // Socket handle tracker / Quantity counter
541 FD_ZERO(&readfds); // Initialize readfds fd_set
542 FD_ZERO(&writefds); // Initialize writefds fd_set
543 FD_ZERO(&exceptfds); // Initialize exceptfds fd_set
544
545 // --------------------------------------------------------------------------
546 // Populate the FDS super-structures while tracking highest socket_fd handle
547 // in nfds (set is ordered, so highest socket_fd handle value will always be
548 // the last one tracked).
549 // --------------------------------------------------------------------------
550 for (rsocket_mux_fds rmf : __r) {
551 if (POLLIN & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &readfds); rfds++; }
552 if (POLLOUT & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &writefds); wfds++; }
553 if (POLLERR & rmf.fd_sets) { FD_SET(nfds = rmf.r->socket_fd(), &exceptfds); efds++; }
554 } // -x- foreach __r -x-
555
556 // --------------------------------------------------------------------------
557 // Call the POSIX select() function. Throw exceptions if there are errors or
558 // a timeout occurred (before allocating std::vector<rsocket_mux_fds>).
559 //
560 // Note: We increment nfds because the API calls for it being one more than
561 // the highest handle number.
562 // --------------------------------------------------------------------------
563 switch (::select(++nfds,
564 rfds == 0 ? nullptr : &readfds,
565 wfds == 0 ? nullptr : &writefds,
566 efds == 0 ? nullptr : &exceptfds,
567 tv)) {
568 case 0: if (timeout_behaviour) throw randolf::rex::xETIMEDOUT("ETIMEDOUT");
569 else return std::vector<rsocket_mux_fds>();
570 case -1: randolf::rex::mk_exception("select() failed", 0, rex::rex::REX_FLAGS::REX_FIND_ERRNO | rex::rex::REX_FLAGS::REX_ALT_EAGAIN);
571 } // -x- switch ::select() -x-
572
573 // --------------------------------------------------------------------------
574 // Build the std::vector of revents. We're only checking for the socket
575 // handles that we've stored, so this will be a shorter loop since we're
576 // effectively only checking socket handles that are known to rsocket_mux
577 // instead of checking everything.
578 //
579 // This is a drastically more optimal approach, particuarly in systems that
580 // are handling [tens of] thousands of sockets, loops can become quite long
581 // as a result of testing every socket handle up to the highest numbered
582 // socket handle. We only test for handles that we want to, thus completely
583 // eliminating the need to loop through an entire range.
584 //
585 // For example: If you're running select() against socket handles that just
586 // so happen to be numbers 3, 228, and 5832, then we will only
587 // be checking for @c revents on these three socket handles
588 // instead of checking for events on the entire range of socket
589 // handles starting from 0 and performing 5833 unnecessary
590 // tests.
591 // --------------------------------------------------------------------------
592 std::vector<rsocket_mux_fds> revents;
593 static const int revent = POLLIN | POLLOUT | POLLERR;
594 for (rsocket_mux_fds rmf : __r) {
595 if (revent & rmf.fd_sets) {
596 if (FD_ISSET(rmf.r->socket_fd(), &readfds)
597 || FD_ISSET(rmf.r->socket_fd(), &writefds)
598 || FD_ISSET(rmf.r->socket_fd(), &exceptfds)) revents.push_back(rmf);
599 } // -x- if revent -x-
600 } // -x- foreach __r -x-
601
602 return revents;
603 }; // -x- std::vector<rsocket_mux_fds> select() -x-
604
605 /*======================================================================*//**
606 @brief
607 Find out how many rsocket entries are in this mux.
608
609 @par Threads
610 This method is threadsafe.
611 @returns Number of rsocket objects in the underlying std::set
612 *///=========================================================================
613 size_t size() noexcept {
614 const std::lock_guard<std::mutex> lock(__mutex);
615 return __r.size();
616 }; // -x- size_t size -x-
617
618 }; // -x- class rsocket_mux -x-
619
620}; // -x- namespace randolf -x-