17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #if __TBB_PREVIEW_STREAMING_NODE
30 template <
int N1,
int N2>
33 static const int size = N2 - N1 + 1;
40 template <
int N1,
int N2 = N1>
52 template <
int N1,
int N2>
57 template <
int N1,
int N2>
62 template <
typename... Args>
67 template <
typename T,
typename... Rest>
78 template<
typename Key>
84 template<
typename Key>
90 template<
typename Device,
typename Key>
103 template <
typename T>
108 template <
int N1,
int N2>
113 template <
int N1,
int N2>
118 template <
typename T>
123 template <
typename ...Args1>
126 template <
typename A1,
typename ...Args1>
128 static const size_t my_delta = 1;
130 template <
typename F,
typename Tuple,
typename ...Args2>
131 static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
134 template <
typename F,
typename Tuple,
typename ...Args2>
138 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
141 args2..., std::get<N1 + my_delta>(t));
143 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
148 template <
typename F,
typename Tuple,
int N1,
int N2,
typename ...Args2>
150 doit_impl(x, f, t,
fn(), args1..., args2...);
152 template <
typename F,
typename Tuple,
int N,
typename ...Args2>
154 doit_impl(x, f, t,
fn(), args1..., args2...);
160 template <
typename F,
typename Tuple,
typename ...Args2>
161 static void doit(F& f, Tuple&, Args2&... args2) {
167 template<
typename JP,
typename StreamFactory,
typename... Ports>
170 template <
typename T>
185 template<
typename StreamFactory,
typename KernelInputTuple,
typename =
void>
191 template <
typename ...Args>
193 factory.send_kernel( device, kernel, args... );
198 template<
typename StreamFactory,
typename KernelInputTuple>
207 struct range_wrapper {
209 virtual range_wrapper *clone()
const = 0;
213 struct range_value :
public range_wrapper {
223 return new range_value(my_value);
230 struct range_mapper :
public range_wrapper {
235 return get<N + 1>(ip).data(
false);
239 return new range_mapper<N>;
244 template <
typename ...Args>
246 __TBB_ASSERT(my_range_wrapper,
"Range is not set. Call set_range() before running streaming_node.");
247 factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
257 executor.my_range_wrapper = NULL;
261 if (my_range_wrapper)
delete my_range_wrapper;
265 my_range_wrapper =
new range_value(work_size);
269 my_range_wrapper =
new range_value(
std::move(work_size));
274 my_range_wrapper =
new range_mapper<N>;
279 my_range_wrapper =
new range_mapper<N>;
301 template<
typename... Args>
304 template<
typename... Ports,
typename JP,
typename StreamFactory>
306 :
public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
307 typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
318 typedef composite_node<input_tuple, output_tuple>
base_type;
333 return std::tie( internal::input_port<S>( my_indexer_node )... );
338 return std::tie( internal::output_port<S>( my_kernel_node )... );
351 make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
357 make_edge( my_indexer_node, my_device_selector_node );
358 make_edge( my_device_selector_node, my_join_node );
360 make_edge( my_join_node, my_kernel_node );
367 class device_selector_base {
369 virtual void operator()(
const indexer_node_output_type &v,
typename device_selector_node::output_ports_type &op ) = 0;
370 virtual device_selector_base *clone(
streaming_node &n )
const = 0;
374 template <
typename UserFunctor>
379 , my_user_functor( uf ), my_node(n), my_factory( f )
381 my_port_epoches.fill( 0 );
385 (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
387 || my_port_epoches[v.tag()] == 0,
"Epoch is changed when key matching is requested" );
391 return new device_selector( my_user_functor, n, my_factory );
394 typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(
size_t &,
const indexer_node_output_type &,
typename device_selector_node::output_ports_type &);
403 template <
typename T>
409 template <
typename T>
412 return key_from_message<key_type>( t );
417 typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
418 elem_type e = internal::cast_to<elem_type>( v );
420 my_factory.send_data( device, e );
421 get<N + 1>( op ).try_put( e );
424 template<
typename DevicePort >
427 if ( it == my_devices.end() ) {
429 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair(
key,
d ) );
432 my_node.notify_new_device(
d );
434 epoch_desc &e = it->second;
436 if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
454 class device_selector_body {
459 (*my_device_selector)(v, op);
472 virtual args_storage_base *clone()
const = 0;
477 : my_kernel( kernel ), my_factory( f )
481 :
tbb::
internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
488 template <
typename... Args>
489 class args_storage :
public args_storage_base {
495 const auto& t = get<N + 1>( ip );
496 auto &port = get<N>( op );
497 return port.try_put( t );
509 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
513 template <
typename... FnArgs>
525 : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
528 template <
typename... FnArgs>
530 my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
535 template<
typename FinalizeFn>
539 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(),
fn ) {}
543 template <
typename... FnArgs>
556 : my_factory(factory), my_device(device), my_fn(
fn) {}
558 template <
typename... FnArgs>
560 my_factory.finalize( my_device, my_fn, args... );
565 template<
typename FinalizeFn>
567 return run_finalize_func<FinalizeFn>( ip, factory,
fn );
573 : my_factory(factory), my_device(
d ) {}
575 template <
typename... FnArgs>
577 my_factory.send_data( my_device, args... );
586 : args_storage_base( kernel, f )
587 , my_args_pack( std::forward<Args>(args)... )
590 args_storage(
const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
592 args_storage(
const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
600 tbb::internal::call( run_kernel_func( ip, n, *
this ), const_args_pack );
603 graph& g = n.my_graph;
605 g.increment_wait_count();
610 tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
611 g.decrement_wait_count();
612 }), const_args_pack );
619 tbb::internal::call( send_func( this->my_factory,
d ), my_args_pack );
624 return new args_storage<Args...>( *this );
638 __TBB_ASSERT( (my_node.my_args_storage != NULL),
"No arguments storage" );
640 my_node.my_args_storage->enqueue( ip, op, my_node );
647 struct wrap_to_async {
651 template <
typename T>
656 template <
typename... Args>
659 return new args_storage<Args...>(storage, std::forward<Args>(args)...);
663 my_args_storage->send(
d );
666 template <
typename ...Args>
668 this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
672 template <
typename DeviceSelector>
675 , my_indexer_node( g )
676 , my_device_selector( new device_selector<DeviceSelector>(
d, *this, f ) )
677 , my_device_selector_node( g,
serial, device_selector_body( my_device_selector ) )
679 , my_kernel_node( g,
serial, kernel_body( *this ) )
681 , my_args_storage( make_args_storage( args_storage<>(kernel, f),
port_ref<0, NUM_INPUTS - 1>() ) )
683 base_type::set_external_ports( get_input_ports(), get_output_ports() );
689 , my_indexer_node( node.my_indexer_node )
690 , my_device_selector( node.my_device_selector->clone( *this ) )
691 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
692 , my_join_node( node.my_join_node )
693 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
694 , my_args_storage( node.my_args_storage->clone() )
696 base_type::set_external_ports( get_input_ports(), get_output_ports() );
702 , my_indexer_node( std::
move( node.my_indexer_node ) )
703 , my_device_selector( node.my_device_selector->clone(*this) )
704 , my_device_selector_node( node.my_graph,
serial, device_selector_body( my_device_selector ) )
705 , my_join_node( std::
move( node.my_join_node ) )
706 , my_kernel_node( node.my_graph,
serial, kernel_body( *this ) )
707 , my_args_storage( node.my_args_storage )
709 base_type::set_external_ports( get_input_ports(), get_output_ports() );
712 node.my_args_storage = NULL;
716 if ( my_args_storage )
delete my_args_storage;
717 if ( my_device_selector )
delete my_device_selector;
720 template <
typename... Args>
723 args_storage_base *
const new_args_storage = make_args_storage( *my_args_storage,
typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
724 delete my_args_storage;
725 my_args_storage = new_args_storage;
741 #endif // __TBB_PREVIEW_STREAMING_NODE
742 #endif // __TBB_flow_graph_streaming_H