Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
_flow_graph_node_impl.h
Go to the documentation of this file.
1/*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB__flow_graph_node_impl_H
18#define __TBB__flow_graph_node_impl_H
19
20#ifndef __TBB_flow_graph_H
21#error Do not #include this internal file directly; use public TBB headers instead.
22#endif
23
25
27namespace internal {
28
32
33 template< typename T, typename A >
34 class function_input_queue : public item_buffer<T,A> {
35 public:
36 bool empty() const {
37 return this->buffer_empty();
38 }
39
40 const T& front() const {
41 return this->item_buffer<T, A>::front();
42 }
43
44 bool pop( T& t ) {
45 return this->pop_front( t );
46 }
47
48 void pop() {
49 this->destroy_front();
50 }
51
52 bool push( T& t ) {
53 return this->push_back( t );
54 }
55 };
56
58 // The only up-ref is apply_body_impl, which should implement the function
59 // call and any handling of the result.
60 template< typename Input, typename Policy, typename A, typename ImplType >
61 class function_input_base : public receiver<Input>, tbb::internal::no_assign {
63#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
64 , add_blt_pred, del_blt_pred,
65 blt_pred_cnt, blt_pred_cpy // create vector copies of preds and succs
66#endif
67 };
69
70 public:
71
73 typedef Input input_type;
79 "queueing and rejecting policies can't be specified simultaneously");
80
81#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
82 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
83 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
84#endif
85
90 , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(priority))
92 , forwarder_busy(false)
93 {
95 my_aggregator.initialize_handler(handler_type(this));
96 }
97
100 : receiver<Input>(), tbb::internal::no_assign()
102 , __TBB_FLOW_GRAPH_PRIORITY_ARG1(my_concurrency(0), my_priority(src.my_priority))
103 , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
104 {
106 my_aggregator.initialize_handler(handler_type(this));
107 }
108
110 // The queue is allocated by the constructor for {multi}function_node.
111 // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
112 // This would be an interface-breaking change.
114 if ( my_queue ) delete my_queue;
115 }
116
119 }
120
123 operation_type op_data(reg_pred);
124 op_data.r = &src;
125 my_aggregator.execute(&op_data);
126 return true;
127 }
128
131 operation_type op_data(rem_pred);
132 op_data.r = &src;
133 my_aggregator.execute(&op_data);
134 return true;
135 }
136
137#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
139 void internal_add_built_predecessor( predecessor_type &src) __TBB_override {
140 operation_type op_data(add_blt_pred);
141 op_data.r = &src;
142 my_aggregator.execute(&op_data);
143 }
144
146 void internal_delete_built_predecessor( predecessor_type &src) __TBB_override {
147 operation_type op_data(del_blt_pred);
148 op_data.r = &src;
149 my_aggregator.execute(&op_data);
150 }
151
152 size_t predecessor_count() __TBB_override {
153 operation_type op_data(blt_pred_cnt);
154 my_aggregator.execute(&op_data);
155 return op_data.cnt_val;
156 }
157
158 void copy_predecessors(predecessor_list_type &v) __TBB_override {
159 operation_type op_data(blt_pred_cpy);
160 op_data.predv = &v;
161 my_aggregator.execute(&op_data);
162 }
163
164 built_predecessors_type &built_predecessors() __TBB_override {
165 return my_predecessors.built_predecessors();
166 }
167#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
168
169 protected:
170
172 my_concurrency = 0;
173 if(my_queue) {
174 my_queue->reset();
175 }
177 forwarder_busy = false;
178 }
179
181 const size_t my_max_concurrency;
186
189 else
191 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
192 }
193
195 return my_graph_ref;
196 }
197
199 operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item
200 my_aggregator.execute(&op_data);
201 return op_data.bypass_t;
202 }
203
204 private:
205
207 friend class forward_task_bypass< class_type >;
208
209 class operation_type : public aggregated_operation< operation_type > {
210 public:
211 char type;
212 union {
215#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
216 size_t cnt_val;
217 predecessor_list_type *predv;
218#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
219 };
222 type(char(t)), elem(const_cast<input_type*>(&e)) {}
223 operation_type(op_type t) : type(char(t)), r(NULL) {}
224 };
225
230
232 task* new_task = NULL;
233 if(my_queue) {
234 if(!my_queue->empty()) {
236 new_task = create_body_task(my_queue->front());
237
238 my_queue->pop();
239 }
240 }
241 else {
242 input_type i;
245 new_task = create_body_task(i);
246 }
247 }
248 return new_task;
249 }
251 operation_type *tmp;
252 while (op_list) {
253 tmp = op_list;
254 op_list = op_list->next;
255 switch (tmp->type) {
256 case reg_pred:
257 my_predecessors.add(*(tmp->r));
259 if (!forwarder_busy) {
260 forwarder_busy = true;
262 }
263 break;
264 case rem_pred:
265 my_predecessors.remove(*(tmp->r));
267 break;
268 case app_body_bypass: {
269 tmp->bypass_t = NULL;
274
276 }
277 break;
278 case tryput_bypass: internal_try_put_task(tmp); break;
279 case try_fwd: internal_forward(tmp); break;
284 } else {
286 }
287 break;
288#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
289 case add_blt_pred: {
290 my_predecessors.internal_add_built_predecessor(*(tmp->r));
292 }
293 break;
294 case del_blt_pred:
295 my_predecessors.internal_delete_built_predecessor(*(tmp->r));
297 break;
298 case blt_pred_cnt:
299 tmp->cnt_val = my_predecessors.predecessor_count();
301 break;
302 case blt_pred_cpy:
303 my_predecessors.copy_predecessors( *(tmp->predv) );
305 break;
306#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
307 }
308 }
309 }
310
316 task * new_task = create_body_task(*(op->elem));
317 op->bypass_t = new_task;
319 } else if ( my_queue && my_queue->push(*(op->elem)) ) {
322 } else {
323 op->bypass_t = NULL;
325 }
326 }
327
330 op->bypass_t = NULL;
333 if(op->bypass_t)
335 else {
336 forwarder_busy = false;
338 }
339 }
340
342 operation_type op_data(t, tryput_bypass);
343 my_aggregator.execute(&op_data);
344 if( op_data.status == internal::SUCCEEDED ) {
345 return op_data.bypass_t;
346 }
347 return NULL;
348 }
349
351 if( my_max_concurrency == 0 ) {
352 return apply_body_bypass(t);
353 } else {
355 my_aggregator.execute(&check_op);
356 if( check_op.status == internal::SUCCEEDED ) {
357 return apply_body_bypass(t);
358 }
359 return internal_try_put_bypass(t);
360 }
361 }
362
364 if( my_max_concurrency == 0 ) {
365 return create_body_task(t);
366 } else {
367 return internal_try_put_bypass(t);
368 }
369 }
370
372 // then decides if more work is available
374 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
375 }
376
378 inline task * create_body_task( const input_type &input ) {
380 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
382 *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(input, my_priority))
383 : NULL;
384 }
385
388 operation_type op_data(try_fwd);
389 task* rval = NULL;
390 do {
391 op_data.status = WAIT;
392 my_aggregator.execute(&op_data);
393 if(op_data.status == SUCCEEDED) {
394 task* ttask = op_data.bypass_t;
395 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
396 rval = combine_tasks(my_graph_ref, rval, ttask);
397 }
398 } while (op_data.status == SUCCEEDED);
399 return rval;
400 }
401
404 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
406 : NULL;
407 }
408
410 inline void spawn_forward_task() {
412 if(tp) {
414 }
415 }
416 }; // function_input_base
417
419 // a type Output to its successors.
420 template< typename Input, typename Output, typename Policy, typename A>
421 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
422 public:
423 typedef Input input_type;
424 typedef Output output_type;
429
430 // constructor
431 template<typename Body>
433 graph &g, size_t max_concurrency,
438 }
439
442 base_type(src),
443 my_body( src.my_init_body->clone() ),
444 my_init_body(src.my_init_body->clone() ) {
445 }
446
448 delete my_body;
449 delete my_init_body;
450 }
451
452 template< typename Body >
454 function_body_type &body_ref = *this->my_body;
455 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
456 }
457
459 // There is an extra copied needed to capture the
460 // body execution without the try_put
462 output_type v = (*my_body)(i);
464 return v;
465 }
466
467 //TODO: consider moving into the base class
470#if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
471 task* successor_task = successors().try_put_task(v);
472#endif
473 task* postponed_task = NULL;
475 postponed_task = base_type::try_get_postponed_task(i);
476 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
477 }
478#if TBB_DEPRECATED_MESSAGE_FLOW_ORDER
479 graph& g = base_type::my_graph_ref;
480 return combine_tasks(g, successor_task, postponed_task);
481#else
482 if( postponed_task ) {
483 // make the task available for other workers since we do not know successors'
484 // execution policy
486 }
487 task* successor_task = successors().try_put_task(v);
488#if _MSC_VER && !__INTEL_COMPILER
489#pragma warning (push)
490#pragma warning (disable: 4127) /* suppress conditional expression is constant */
491#endif
493#if _MSC_VER && !__INTEL_COMPILER
494#pragma warning (pop)
495#endif
496 if(!successor_task) {
497 // Return confirmative status since current
498 // node's body has been executed anyway
499 successor_task = SUCCESSFULLY_ENQUEUED;
500 }
501 }
502 return successor_task;
503#endif /* TBB_DEPRECATED_MESSAGE_FLOW_ORDER */
504 }
505
506 protected:
507
510 if(f & rf_reset_bodies) {
512 delete my_body;
513 my_body = tmp;
514 }
515 }
516
520
521 }; // function_input
522
523
524 // helper templates to clear the successor edges of the output ports of an multifunction_node
525 template<int N> struct clear_element {
526 template<typename P> static void clear_this(P &p) {
527 (void)tbb::flow::get<N-1>(p).successors().clear();
529 }
530 template<typename P> static bool this_empty(P &p) {
531 if(tbb::flow::get<N-1>(p).successors().empty())
533 return false;
534 }
535 };
536
537 template<> struct clear_element<1> {
538 template<typename P> static void clear_this(P &p) {
539 (void)tbb::flow::get<0>(p).successors().clear();
540 }
541 template<typename P> static bool this_empty(P &p) {
542 return tbb::flow::get<0>(p).successors().empty();
543 }
544 };
545
546#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
547 // helper templates to extract the output ports of an multifunction_node from graph
548 template<int N> struct extract_element {
549 template<typename P> static void extract_this(P &p) {
550 (void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
551 extract_element<N-1>::extract_this(p);
552 }
553 };
554
555 template<> struct extract_element<1> {
556 template<typename P> static void extract_this(P &p) {
557 (void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
558 }
559 };
560#endif
561
562 template <typename OutputTuple>
564#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
565 template <typename... Args>
566 static OutputTuple call(graph& g, const tbb::flow::tuple<Args...>&) {
567 return OutputTuple(Args(g)...);
568 }
569#else // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
570 template <typename T1>
571 static OutputTuple call(graph& g, const tbb::flow::tuple<T1>&) {
572 return OutputTuple(T1(g));
573 }
574
575 template <typename T1, typename T2>
576 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2>&) {
577 return OutputTuple(T1(g), T2(g));
578 }
579
580 template <typename T1, typename T2, typename T3>
581 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3>&) {
582 return OutputTuple(T1(g), T2(g), T3(g));
583 }
584
585 template <typename T1, typename T2, typename T3, typename T4>
586 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4>&) {
587 return OutputTuple(T1(g), T2(g), T3(g), T4(g));
588 }
589
590 template <typename T1, typename T2, typename T3, typename T4, typename T5>
591 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5>&) {
592 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g));
593 }
594#if __TBB_VARIADIC_MAX >= 6
595 template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
596 static OutputTuple call(graph& g, const tbb::flow::tuple<T1, T2, T3, T4, T5, T6>&) {
597 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g));
598 }
599#endif
600#if __TBB_VARIADIC_MAX >= 7
601 template <typename T1, typename T2, typename T3, typename T4,
602 typename T5, typename T6, typename T7>
603 static OutputTuple call(graph& g,
604 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7>&) {
605 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g));
606 }
607#endif
608#if __TBB_VARIADIC_MAX >= 8
609 template <typename T1, typename T2, typename T3, typename T4,
610 typename T5, typename T6, typename T7, typename T8>
611 static OutputTuple call(graph& g,
612 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8>&) {
613 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g));
614 }
615#endif
616#if __TBB_VARIADIC_MAX >= 9
617 template <typename T1, typename T2, typename T3, typename T4,
618 typename T5, typename T6, typename T7, typename T8, typename T9>
619 static OutputTuple call(graph& g,
620 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9>&) {
621 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g));
622 }
623#endif
624#if __TBB_VARIADIC_MAX >= 9
625 template <typename T1, typename T2, typename T3, typename T4, typename T5,
626 typename T6, typename T7, typename T8, typename T9, typename T10>
627 static OutputTuple call(graph& g,
628 const tbb::flow::tuple<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>&) {
629 return OutputTuple(T1(g), T2(g), T3(g), T4(g), T5(g), T6(g), T7(g), T8(g), T9(g), T10(g));
630 }
631#endif
632#endif // __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
633 }; // struct init_output_ports
634
636 // and has a tuple of output ports specified.
637 template< typename Input, typename OutputPortSet, typename Policy, typename A>
638 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
639 public:
641 typedef Input input_type;
642 typedef OutputPortSet output_ports_type;
647
648 // constructor
649 template<typename Body>
656 }
657
660 base_type(src),
661 my_body( src.my_init_body->clone() ),
662 my_init_body(src.my_init_body->clone() ),
664 }
665
667 delete my_body;
668 delete my_init_body;
669 }
670
671 template< typename Body >
673 multifunction_body_type &body_ref = *this->my_body;
674 return *static_cast<Body*>(dynamic_cast< internal::multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
675 }
676
677 // for multifunction nodes we do not have a single successor as such. So we just tell
678 // the task we were successful.
679 //TODO: consider moving common parts with implementation in function_input into separate function
682 (*my_body)(i, my_output_ports);
684 task* ttask = NULL;
687 }
688 return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
689 }
690
692
693 protected:
694#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
695 void extract() {
696 extract_element<N>::extract_this(my_output_ports);
697 }
698#endif
699
703 if(f & rf_reset_bodies) {
705 delete my_body;
706 my_body = tmp;
707 }
708 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
709 }
710
714
715 }; // multifunction_input
716
717 // template to refer to an output port of a multifunction_node
718 template<size_t N, typename MOP>
720 return tbb::flow::get<N>(op.output_ports());
721 }
722
723 inline void check_task_and_spawn(graph& g, task* t) {
724 if (t && t != SUCCESSFULLY_ENQUEUED) {
726 }
727 }
728
729 // helper structs for split_node
730 template<int N>
732 template<typename T, typename P>
733 static task* emit_this(graph& g, const T &t, P &p) {
734 // TODO: consider to collect all the tasks in task_list and spawn them all at once
735 task* last_task = tbb::flow::get<N-1>(p).try_put_task(tbb::flow::get<N-1>(t));
736 check_task_and_spawn(g, last_task);
738 }
739 };
740
741 template<>
742 struct emit_element<1> {
743 template<typename T, typename P>
744 static task* emit_this(graph& g, const T &t, P &p) {
745 task* last_task = tbb::flow::get<0>(p).try_put_task(tbb::flow::get<0>(t));
746 check_task_and_spawn(g, last_task);
748 }
749 };
750
752 template< typename Output, typename Policy>
753 class continue_input : public continue_receiver {
754 public:
755
757 typedef continue_msg input_type;
758
760 typedef Output output_type;
763
764 template< typename Body >
766 : continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(/*number_of_predecessors=*/0, priority))
767 , my_graph_ref(g)
770 { }
771
772 template< typename Body >
773 continue_input( graph &g, int number_of_predecessors,
775 ) : continue_receiver( __TBB_FLOW_GRAPH_PRIORITY_ARG1(number_of_predecessors, priority) )
776 , my_graph_ref(g)
779 { }
780
781 continue_input( const continue_input& src ) : continue_receiver(src),
783 my_body( src.my_init_body->clone() ),
784 my_init_body( src.my_init_body->clone() ) {}
785
787 delete my_body;
788 delete my_init_body;
789 }
790
791 template< typename Body >
793 function_body_type &body_ref = *my_body;
794 return dynamic_cast< internal::function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
795 }
796
798 continue_receiver::reset_receiver(f);
799 if(f & rf_reset_bodies) {
801 delete my_body;
802 my_body = tmp;
803 }
804 }
805
806 protected:
807
811
813
814 friend class apply_body_task_bypass< class_type, continue_msg >;
815
818 // There is an extra copied needed to capture the
819 // body execution without the try_put
821 output_type v = (*my_body)( continue_msg() );
823 return successors().try_put_task( v );
824 }
825
828 return NULL;
829 }
830#if _MSC_VER && !__INTEL_COMPILER
831#pragma warning (push)
832#pragma warning (disable: 4127) /* suppress conditional expression is constant */
833#endif
835#if _MSC_VER && !__INTEL_COMPILER
836#pragma warning (pop)
837#endif
838 return apply_body_bypass( continue_msg() );
839 }
840 else {
841 return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
843 *this, __TBB_FLOW_GRAPH_PRIORITY_ARG1(continue_msg(), my_priority) );
844 }
845 }
846
848 return my_graph_ref;
849 }
850 }; // continue_input
851
853 template< typename Output >
854 class function_output : public sender<Output> {
855 public:
856
857 template<int N> friend struct clear_element;
858 typedef Output output_type;
859 typedef typename sender<output_type>::successor_type successor_type;
861#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
862 typedef typename sender<output_type>::built_successors_type built_successors_type;
863 typedef typename sender<output_type>::successor_list_type successor_list_type;
864#endif
865
869 }
870
874 return true;
875 }
876
880 return true;
881 }
882
883#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
884 built_successors_type &built_successors() __TBB_override { return successors().built_successors(); }
885
886
887 void internal_add_built_successor( successor_type &r) __TBB_override {
888 successors().internal_add_built_successor( r );
889 }
890
891 void internal_delete_built_successor( successor_type &r) __TBB_override {
892 successors().internal_delete_built_successor( r );
893 }
894
895 size_t successor_count() __TBB_override {
896 return successors().successor_count();
897 }
898
899 void copy_successors( successor_list_type &v) __TBB_override {
900 successors().copy_successors(v);
901 }
902#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
903
904 // for multifunction_node. The function_body that implements
905 // the node will have an input and an output tuple of ports. To put
906 // an item to a successor, the body should
907 //
908 // get<I>(output_ports).try_put(output_value);
909 //
910 // if task pointer is returned will always spawn and return true, else
911 // return value will be bool returned from successors.try_put.
912 task *try_put_task(const output_type &i) { // not a virtual method in this class
913 return my_successors.try_put_task(i);
914 }
915
917
918 graph& graph_reference() const { return my_graph_ref; }
919 protected:
922 }; // function_output
923
924 template< typename Output >
925 class multifunction_output : public function_output<Output> {
926 public:
927 typedef Output output_type;
930
933
934 bool try_put(const output_type &i) {
935 task *res = try_put_task(i);
936 if(!res) return false;
937 if(res != SUCCESSFULLY_ENQUEUED) {
938 FLOW_SPAWN(*res); // TODO: Spawn task inside arena
939 }
940 return true;
941 }
942
944
945 protected:
946
948 return my_successors.try_put_task(i);
949 }
950
951 template <int N> friend struct emit_element;
952
953 }; // multifunction_output
954
955//composite_node
956#if __TBB_FLOW_GRAPH_CPP11_FEATURES
957 template<typename CompositeType>
958 void add_nodes_impl(CompositeType*, bool) {}
959
960 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
961 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
962 void *addr = const_cast<NodeType1 *>(&n1);
963
964 fgt_alias_port(c_node, addr, visible);
965 add_nodes_impl(c_node, visible, n...);
966 }
967#endif
968
969} // internal
970
971#endif // __TBB__flow_graph_node_impl_H
#define FLOW_SPAWN(a)
Definition: flow_graph.h:65
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
void * addr
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void const char const char int ITT_FORMAT __itt_group_sync p
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
The graph class.
static void fgt_begin_body(void *)
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
static void fgt_alias_port(void *, void *, bool)
static void fgt_end_body(void *)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:713
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:199
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
bool is_graph_active(tbb::flow::interface10::graph &g)
void add_nodes_impl(CompositeType *, bool)
void check_task_and_spawn(graph &g, task *t)
tbb::flow::tuple_element< N, typenameMOP::output_ports_type >::type & output_port(MOP &op)
unsigned int node_priority_t
static tbb::task *const SUCCESSFULLY_ENQUEUED
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
Definition: task_arena.h:490
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:370
uintptr_t status
Zero value means "wait" status, all other values are "user" specified values and are defined into the...
allocator_traits< Alloc >::template rebind_alloc< T >::other type
A functor that takes an Input and generates an Output.
virtual function_body * clone()=0
the leaf for function_body
function_body that takes an Input and a set of output ports
virtual multifunction_body * clone()=0
leaf for multifunction. OutputSet can be a std::tuple or a vector.
A task that calls a node's forward_task function.
A task that calls a node's apply_body_bypass function, passing in an input of type Input.
void set_owner(successor_type *owner)
void register_successor(successor_type &r)
void set_owner(owner_type *owner)
void remove_successor(successor_type &r)
A cache of successors that are broadcast to.
task * try_put_task(const T &t) __TBB_override
const item_type & front() const
Input and scheduling for a function node that takes a type Input as input.
void reset_receiver(reset_flags f) __TBB_override
aggregator< handler_type, operation_type > my_aggregator
task * try_get_postponed_task(const input_type &i)
internal::aggregating_functor< class_type, operation_type > handler_type
task * forward_task()
This is executed by an enqueued task, the "forwarder".
void spawn_forward_task()
Spawns a task that calls forward()
function_input_queue< input_type, A > input_queue_type
virtual ~function_input_base()
Destructor.
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
Input input_type
The input type of this receiver.
predecessor_cache< input_type, null_mutex > predecessor_cache_type
predecessor_cache< input_type, null_mutex > my_predecessors
void reset_function_input_base(reset_flags f)
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
task * create_body_task(const input_type &input)
allocates a task to apply a body
void handle_operations(operation_type *op_list)
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
__TBB_STATIC_ASSERT(!((internal::has_policy< queueing, Policy >::value) &&(internal::has_policy< rejecting, Policy >::value)), "queueing and rejecting policies can't be specified simultaneously")
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
function_input_base(const function_input_base &src)
Copy constructor.
graph & graph_reference() const __TBB_override
receiver< input_type >::predecessor_type predecessor_type
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
function_input_base< Input, Policy, A, ImplType > class_type
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
tbb::internal::allocator_rebind< A, input_queue_type >::type queue_allocator_type
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
task * internal_try_put_bypass(const input_type &t)
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
Implements methods for a function node that takes a type Input as input and sends.
task * apply_body_impl_bypass(const input_type &i)
virtual broadcast_cache< output_type > & successors()=0
void reset_function_input(reset_flags f)
function_input(const function_input &src)
Copy constructor.
function_input< Input, Output, Policy, A > my_class
function_input(graph &g, size_t max_concurrency,)
function_input_base< Input, Policy, A, my_class > base_type
function_body< input_type, output_type > function_body_type
function_input_queue< input_type, A > input_queue_type
function_body_type * my_body
function_body_type * my_init_body
output_type apply_body_impl(const input_type &i)
static OutputTuple call(graph &g, const tbb::flow::tuple< Args... > &)
Implements methods for a function node that takes a type Input as input.
function_input_queue< input_type, A > input_queue_type
function_input_base< Input, Policy, A, my_class > base_type
multifunction_body< input_type, output_ports_type > multifunction_body_type
multifunction_body_type * my_init_body
multifunction_input(graph &g, size_t max_concurrency,)
task * apply_body_impl_bypass(const input_type &i)
multifunction_body_type * my_body
multifunction_input< Input, OutputPortSet, Policy, A > my_class
multifunction_input(const multifunction_input &src)
Copy constructor.
static task * emit_this(graph &g, const T &t, P &p)
static task * emit_this(graph &g, const T &t, P &p)
Implements methods for an executable node that takes continue_msg as input.
Output output_type
The output type of this receiver.
void reset_receiver(reset_flags f) __TBB_override
function_body_type * my_init_body
continue_msg input_type
The input type of this receiver.
continue_input< output_type, Policy > class_type
graph & graph_reference() const __TBB_override
function_body< input_type, output_type > function_body_type
continue_input(const continue_input &src)
task * apply_body_bypass(input_type)
Applies the body to the provided input.
function_body_type * my_body
task * execute() __TBB_override
continue_input(graph &g, int number_of_predecessors,)
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
virtual broadcast_cache< output_type > & successors()=0
Implements methods for both executable and function nodes that puts Output to its successors.
function_output(const function_output &other)
broadcast_cache< output_type > broadcast_cache_type
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
task * try_put_task(const output_type &i)
broadcast_cache_type & successors()
broadcast_cache_type my_successors
sender< output_type >::successor_type successor_type
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
multifunction_output(const multifunction_output &other)
task * try_put_task(const output_type &i)
function_output< output_type > base_type
bool try_put(const output_type &i)
Base class for user-defined tasks.
Definition: task.h:615
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.