Termes les plus recherchés
[PDF](+46👁️) Télécharger FastFlow tutorial pdf
FastFlow is a structured parallel programming framework targeting shared memory multicores. Its layered design and the optimized implementation of the communication mechanisms used to implement the FastFlow streaming networks provided to the application programmer as algorithmic skeletons support the development of efficient fine grain parallel applications. FastFlow is available (open source) at SourceForge (http://sourceforge.net/projects/mc-fastflow/). This work introduces FastFlow programming techniques and points out the different ways used to parallelize existing C/C++ code using FastFlow as a software accelerator. In short: this is a kind of tutorial on FastFlow.Télécharger gratuit FastFlow tutorial pdf
>
(N
O
in
^'
o
(N
X
Universita di Pisa
DiPARTIMENTO DI INFORMATICA
(N
T— I
o
(N
Oh
< * Technical Report: TR- 12-04 .
r? FastFlow tutorial
O M. Aldinucci* M. Danelutto° M. TorquatiV^- .
i
/
Informatica, Univ. Torino
°Dip. Informatica. Univ. Pisa
March 28, 2012
ADDRESS: Largo B. Pontecorvo 3, 56127 Pisa, Italy. TEL: +39 050 2212700 FAX: +39 050 2212726
FastFlow tutorial
M. Aldinucci* M. Danelutto° M. Torquati°
* Dip. Informatica, Univ. Torino
° Dip. Informatica. Univ. Pisa
March 28, 2012
Abstract
FastFlow is a structured parallel programming framework targeting
shared memory multicores. Its layered design and the optimized im-
plementation of the communication mechanisms used to implement the
FastFlow streaming networks provided to the application programmer as
algorithmic skeletons support the development of efficient fine grain par-
allel appHcations. FastFlow is available (open source) at SourceForg^
This work introduces FastFlow programming techniques and points out
the different ways used to parallelize existing C/C++ code using FastFlow
as a software accelerator. In short: this is a kind of tutorial on FastFlow.
1 Introduction
FastFlow is an algorithmic skeleton programming environment developed at the
Dept. of Computer Science of Pisa and Torino [T].
A number of different papers and technical reports discuss the different fea-
tures of this programming environment [111 [SI [2] , the kind of results achieved
while parallelizing different applications O El HOI El E Ej and the usage of
FastFlow as software accelerator, i.e. as a mechanisms suitable to exploit un-
used cores of a multicore architecture to speedup execution of sequential code
nig.
This paper represents instead a tutorial aimed at instructing programmers
in the usage of the FastFlow skeletons and in the typical FastFlow programming
techniques.
Therefore, after recalling the FastFlow design principles in Sec. [2| in Se c. [3
we describe the (trivial) installation procedure. Then, in Sections |4] to
we introduce the main features of the FastFlow programming framework. Other
sections detail particular techniques related to FastFlow usage, namely: access to
shared data (Secj?]), FastFlow usage as an accelerator (Sec.|9| and the possibihty
to use FastFlow as a framework to experiment new (w.r.t. the ones already
" Dip. Informatica — Univ. dl Torino*, Dip. Informatica — Univ. di Pisa°
-" [http : / /source forge ■ net/projects/mc- fast flow/
1
Efficient applications
for multicore and manycore
Id FasCFlow
Streaming networic patterns
Skeletons: pipeline, farm, divide&conquer, ...
Arbitrary streaming networks
Lock-free SPSC, SPMC, MPSC, MPMC queues
Simple streaming networl<s
Lock-free SPSC queues and generai
threading model (e.g. Pthread)
Figure 1: Layered FastFlow design
provided) skeletons (Sec. 8.3.1 and Sec. 12.1). Eventually, Sec. 13 gives a rough
idea of the expected performance while running FastFlow programs and Sec. [T4|
outlines the main FastFlow RTS accessory routines.
2 Design principles
FastFlo\/\j^ has been designed to provide programmers with efficient parallelism
exploitation patterns suitable to implement (fine grain) stream parallel applica-
tions. In particular, FastFlow has been designed
• to promote high-level parallel programming, and in particular skeletal
programming (i.e. pattern-based explicit parallel programming), and
• to promote efficient programming of applications for multi-core.
The whole programming framework has been incrementally developed according
to a layered design on top of Pthread/C-|— I- standard programming framework
and targets shared memory multicore architectures (see Fig. [I]).
A first layer, the Simple streaming networks layer, provides lock-free Sin-
gle Producers Single Consumer (SPSC) queues on top of the Pthread standard
threading model.
A second layer, the Arbitrary streaming networks layer, provides lock-
free implementations for Single Producer Multiple Consumer (SPMC), Multiple
Producer Single Consumer (MPSC) and Multiple Producer Multiple Consumer
(MPMC) queues on top of the SPSC implemented in the first layer.
Eventually, the third layer, the Streaming Networks Patterns layer, pro-
vides common stream parallel patterns. The primitive patterns include pipeline
see also the FastFlow home page at http://mc-fastflow.sourceforge.net
2
and farms. Simple specialization of these patterns may be used to implement
more complex patterns, such as divide and conquer, map and reduce patterns.
Parallel application programmers are assumed to use FastFlow directly ex-
ploiting the parallel patterns available in the Streaming Network Patterns level.
In particular:
• defining sequential concurrent activities, by sub classing a proper FastFlow
class, the f f_node class, and
• building complex stream parallel patterns by hierarchically composing se-
quential concurrent activities, pipeline patterns, farm patterns and their
"specialized" versions implementing more complex parallel patterns.
The f f_node sequential concurrent activity abstraction provide suitable ways
to define a sequential activity processing data items appearing on a single input
channel and delivering the related results onto a single output channel. Particu-
lar cases of f f_nodes may be simply implemented with no input channel or no
output channel. The former is used to install a concurrent activity generating
an output stream (e.g. from data items read from keyboard or from a disk file);
the latter to install a concurrent activity consuming an input stream (e.g. to
present results on a video or to store them on disk).
The pipeline pattern may be used to implement sequences of streaming net-
works Si ^ . . . ^ Sk with Sk receiving input from Sk-i and delivering outputs
to Sk+i- Si may be either a sequential activity or another parallel pattern. Si
must be a stream generator activity and Sk a stream consuming one.
The farm pattern models different embarrassingly (stream) parallel con-
structs. In its simplest form, it models a master/worker pattern with workers
producing no stream data items. Rather the worker consolidate results directly
in memory. More complex forms including either an emitter, or a collector of
both an emitter and a collector implement more sophisticated patterns:
• by adding an emitter, the user may specify policies, difl'erent from the
default round robin one, to schedule input tasks to the workers;
• by adding a collector, the user may use worker actually producing some
output values, which are gathered and delivered to the farm output stream.
Different policies may be implemented on the collector to gather data from
the worker and deliver them to the output stream.
In addition, a feedback channel may be added to a farm, moving output results
back from the collector (or from the collection of workers in case no collector is
specified) back to the emitter input channel. The feedback channel may only
be added to the farm/pipe at the root of the skeleton tree.
Specialized version of the farm may be used to implement more complex
patterns, such as:
• divide and conquer, using a farm with feedback loop and proper stream
items tagging (input tasks, subtask results, results)
3
• MISD (multiple instruction single data, that is something computing
fi{x.i),...,fk{x^)
out of each Xi appearing onto the input stream) pattern, using a farm
with an emitter implementing a broadcast scheduling policy
• map, using an emitter partitioning an input collection and scheduling one
partition per worker, and a collector gathering sub-partitions results from
the workers and delivering a collection made out of all these results to the
output stream.
It is worth pointing out that while using plain pipeline and farms (with or
without emitters and collectors) actually can be classified as "using skeletons"
in a traditional skeleton based programming framework, the usage of specialized
versions of the farm streaming network can be more easily classified as "using
skeleton templates", as the base features of the FastFlow framework are used to
build new patterns, not provided as primitive skeletont]^
Concerning the usage of FastFlow to support parallel application develop-
ment on shared memory multicores, the framework provides two abstractions of
structured parallel computation:
• a "skeleton program abstraction" which is used to implement applica-
tions completely modelled according to the algorithmic skeleton concepts.
When using this abstraction, the programmer write a parallel application
by providing the business logic code, wrapped into proper f f_node sub-
classes, a skeleton (composition) modelling the parallelism exploitation
pattern of the application and a single command starting the skeleton
computation and awaiting for its termination.
• an "accelerator abstraction" which is used to parallelize (and therefore
accelerate) only some parts of an existing application. In this case, the
programmer provides a skeleton (composition) which is run on the "spare"
cores of the architecture and implements a parallel version of the business
logic to be accelerated, that is the computing a given f{x). The skeleton
(composition) will have its own input and output channels. When an f{x)
has actually to be computed within the application, rather than writing
proper code to call to the sequential / code, the programmer may insert
code asynchronously "offloading" x to the accelerator skeleton. Later on,
when the result of f{x) is to be used, some code "reading" accelerator
result may be used to retrieve the accelerator computed values.
This second abstraction fully implements the "minimal disruption" principle
stated by Cole in his skeleton manifesto [12], as the programmer using the
accelerator is only required to program a couple of of f load/get_result
primitives in place of the single . . . = f{x) function call statement (see Sec. [o]).
•^Although this may change in future FastFlow releases, this is the current situation as of
FastFlow version 1.1
4
3 Installation
Before entering the details of how FastFlow may be used to implement efficient
stream parallel (and not only) programs on shared memory multicore architec-
tures, let's have a look at how FastFlow may be installecQ
The installation process is trivial, actually:
1. first, you have to download the source code from SourceForge (http
/ / sourcef orge . net /pro jects/mc- fast flow/ |
2. then you have to extract the files using a tar xzvf f astf low-XX . tgz
command, and
3. eventually, you should use the top level directory resulting from the tarxzvf
command as the argument of the -I flag of g++.
As an example, the currently available version (1.1) is hosted ina fastflow-
l.l.O.tar.gz file. If you download it and extract files to your home directory,
you should compile FastFlow code using the flags
g++ -I $HOME/f astf low-1 . 1 . -Ipthread in addition to any
other flags needed to compile your specific code.
Sample makefiles are provided both within the fastf low-1 . 1 . 0/tests
and the fastf low-1 . 1 . 0/examples directories in the source distribution.
4 Hello world in FastFlow
As all programming frameworks tutorials, we start with a Hello world code. In
order to implement our hello world program, we use the following code:
9
10
11
12
13
14
15
16
^include <iostream>
^include < ff / pipeline . hpp>
using namespace ff ;
class Stagel : public ff_node {
public :
void * SVC (void * task) {
std : : cout « "Hello world" « std : : endl ;
return NULL;
}
};
int main(int argc , char * argv[]) {
*We only detail instructions needed to install FastFlow on Linux/Unix/BSD machines here.
A Windows port of FastFlow exist, that requires slightly different steps for the installation.
5
17
18
19
20
21
22
23
24
25
26
ff_pipeline pipe;
pipe . add_stage (new Stagel () ) ;
if ( pipe . run_and_wait_end <0) {
error (" running pipeline \n" ) ;
return 1;
}
return 0;
}
helloworldSimple.cpp
Line 2 includes all what's needed to compile a FastFlow program just us-
ing a pipeline pattern and line 4 instruct compiler to resolve names looking
(also) at ff namespace. Lines 6 to 13 host the application business logic
code, wrapped into a class sub classing ff_n ode. The void * svc (void *)
methocj^ wraps the body of the concurrent activity resulting from the wrapping.
It is called every time the concurrent activity is given a new input stream data
item. The input stream data item pointer is passed through the input void *
parameter. The result of the single invocation of the concurrent activity body
is passed back to the FastFlow runtime returning the void * result. In case
a NULL is returned, the concurrent activity actually terminates itself. The ap-
plication main only hosts code needed to setup the FastFlow streaming network
and to start the skeleton (composition) computation: lines 17 and 18 declare a
pipeline pattern (line 17) and insert a single stage (line 18) in the pipeline. Line
20 starts the computation of the skeleton program and awaits for skeleton com-
putation termination. In case of errors the run_and_wait_end ( ) call will
return a negative number (according to the Unix/Linux syscall conventions).
When the program is started, the FastFlow RTS accomplishes to start the
pipeline. In turn the first stage is started. As the first stage svc returns a
NULL, the stage is terminated immediately after by the FastFlow RTS.
If we compile and run the program, we get the following output:
ffsrc$ g-H Ipthread —I /home/marcod/Documents/ Research/
CodeProgramming/ fast flow — 1 . 1 .0 helloworldSimple.cpp — o
hello
ffsrc$ ./hello
Hello world
ffsrc$
There is nothing parallel here, however. The single pipeline stage is run just
once and there is nothing else, from the programmer viewpoint, running in
parallel. The graph of concurrent activities in this case is the following, trivial
one:
^we use the term svc as a shortcut for "service"
6
A more interesting "HelloWorld" would have been to have a two stage
pipehne where the first stage prints the "Hello" and the second one, after get-
ting the results of the computation of the first one, prints "world" . In order to
implement this behaviour, we have to write two sequential concurrent activities
and to use them as stages in a pipeline. Additionally, we have to send some-
thing out as a result from the first stage to the second stage. Let's assume we
just send the string with the word to be printed. The code may be written as
follows:
^include <iostream>
#:include < ff / pipeline . hpp>
using namespace ff ;
class Stagel : public ff_node {
public :
void * SVC (void * task) {
std::cout « "Hello " « std : : endl ;
char * p = (char *) calloc ( sizeof (char ) ,10)
strcpy(p, "World" ) ;
sleep ( 1 ) ;
return ((void *)p);
}
};
class Stage2 : public ff_node {
public :
void * SVC (void * task) {
std : : cout « ((char *)task) « std :: endl ;
free ( task ) ;
return GO_ON;
}
};
int main(int argc , char * argv[]) {
ff_pipeline pipe;
pipe . add_stage (new Stagel () ) ;
pipe . add_stage (new Stage2 () ) ;
if ( pipe . run_and_wait _end ( ) <0) {
error (" running pipeline \n" ) ;
return —1:
7
37
38
39
40
return 0;
}
hello2stages.cpp
We define two sequential stages. The first one (lines 6-16) prints the "Hello"
message, the allocates some memory buffer, store the "world" message in the
buffer and send its to the output stream (return on line 14). The sleep on line
13 is here just for making more evident the FastFlow scheduling of concurrent
activities. The second one (lines 18-26) just prints whatever he gets on the input
stream (the data item stored after the void * task pointer of svc header
on line 21), frees the allocated memory and then returns a GO_ON mark, which
is intended to be a value interpreted by the FastFlow framework as: "I finished
processing the current task, I give you no result to be delivered onto the output
stream, but please keep me alive ready to receive another input task" . The
main on lines 28-40 is almost identical to the one of the previous version but
for the fact we add two stages to the pipeline pattern. Implicitly, this sets up
a streaming network with St age 1 connected by a stream to St age 2. Items
delivered on the output stream by Stagel will be read on the input stream by
St age 2. The concurrent activity graph is therefore:
If we compile and run the program, however, we get a kind of unexpected
result:
ffsrc$ g-H Ipthread —I /home/marcod/Documents/ Research/
CodeProgramming/ fast flow — 1 . 1 . hello 2 st ages . cpp — o
hello2stages
ffsrc$ . / hello2stages
Hello
WorldHello
Hello World
Hello World
Hello World
~C
ffsrc$
First of all, the program keeps running printing an "Hello world" every
second. We in fact terminate the execution through a CONTROL-C. Second,
8
the initial sequence of strings is a little bit Strang^
The "infinite run" is related to way FastFlow implements concurrent ac-
tivities. Each f f_node is run as many times as the number of the input data
items appearing onto the output stream, unless the svc method returns a NULL.
Therefore, if the method returns either a task (pointer) to be delivered onto the
concurrent activity output stream, or the GO_ON mark (no data output to the
output stream but continue execution), it is re-executed as soon as there is some
input available. The first stage, which has no associated input stream, is re-
executed up to the moment it terminates the svc with a NULL. In order to have
the program terminating, we therefore may use the following code for St age 1:
class Stagel : public ff_node {
public :
Stagel { first = (1 = = 1) ; }
void * svc (void * task) {
if(first) {
std::cout « "Hello " « std : : endl ;
char * p = ( char *) calloc(sizeof( char ) ,10);
strcpy (p , "World" ) ;
sleep ( 1 ) ;
first = 0;
return ((void *)p);
} else {
return NULL;
}
}
private :
int first;
};
If we compile and execute the program with this modified Stagel stage.
we'll get an output such as:
ffsrc$ g-H Ipthread —I /home/
marcod/ Documents /Research/
CodeProgramming/fastflow —1
.1.0 hello2terminate . cpp — o
hello2terminate
ffsrc$ . / hello2terminate
Hello
World
ffsrc$
that is the program terminates after a single run of the two stages. Now the
question is: why the second stage terminated, although the svc method return
^and depending on the actual number of cores of your machine and on the kind of scheduler
used in the operating system, the sequence may vary a little bit
9
value states that more work is to be done? The answer is in the stream se-
mantics implemented by FastFlow. FastFlow streaming networks automatically
manage end-of-streams. That is, as soon as an f f_node returns a NULL-
implicitly declaring he wants to terminate its output stream, the information is
propagated to the node consuming the output stream. This nodes will there-
fore also terminate execution-without actually executing its svc method-and
the end of stream will be propagated onto its output stream, if any. Therefore
Stage2 terminates after the termination of Stagel.
The other problem, namely the appearing of the initial 2 "Hello" strings
apparently related to just one "world" string is related to the fact that FastFlow
does not guarantee any scheduling semantics of the f f_node svc executions.
The first stage delivers a string to the second stage, then it is executed again
and again. The sleep inserted in the first stage prevents to accumulate too
much "hello" strings on the output stream delivered to the second stage. If we
remove the sleep statement, in fact, the output is much more different: we
will see on the input a large number of "hello" strings followed by another large
number of "world" strings. This because the first stage is enabled to send as
much data items on the output stream as of the capacity of the SPSC queue
used to implement the stream between the two stages.
5 Generating a stream
In order to achieve a better idea of how streams are managed within FastFlow,
we slightly change our HelloWorld code in such a way the first stage in the
pipeline produces on the output stream n integer data items and then termi-
nates. The second stage prints a "world -i-" message upon receiving each i item
onto the input stream.
We already discussed the role of the return value of the svc method. There-
fore a first version of this program may be implemented using as the Stagel
class the following code:
#:include <iostream>
#include <ff /pipeline .hpp>
using namespace ff ;
class Stagel : public ff_node {
public :
Stagel ( int n) {
streamlen = n;
current = 0;
}
void * svc (void * task) {
if(current < streamlen) {
current -|—|-;
10
17 std::cout « "Hello number " « current « " " « std
endl ;
18 int * p= (int *) calloc(sizeof(int),l);
19 *p = current ;
20 sleep (1) ;
21 return ((void *)p) ;
22 } else {
23 return NULL;
24 }
25 }
26 private :
27 int streamlen , current ;
28 };
29
30 class Stage2 : public ff_node {
31 public :
32
33 void * SVC (void * task) {
34 int * i = (int *) task;
35 std::cout « "World -" « *i « "- " « std :: endl ;
36 free (task ) ;
37 return GO_ON;
38 }
39 };
40
41 int main (int argc , char * argv[]) {
42
43 ff_pipelinc pipe;
44 pipe . add_stagc (new St age 1 ( at oi ( argv [ 1 ] ) ) ) ;
45 pipe . add_stage (new Stage2());
46
47 if (pipe . run_and_wait_end <0) {
48 error (" running pipeline \n" ) ;
49 return —1;
50 }
51
52 return 0;
53 }
helloStrcaiii. cpp
The output we get is the following one:
ffsrc$ g-H Ipthread —I /home/marcod/Documents/Research/
CodcProgramming/ f astflow — 1 . 1 .0 helloStream . cpp — o
helloStream
2 ffsrc$ ./helloStream 5
3 Hello number 1
4 Hello number 2World — 1—
5
6 Hello number World -32 -
11
World —3— Hello number
4
Hello number 5 World — 4-
World -5-
ffsrc$
However, there is another way we can use to generate the stream, which is
a little bit more "programmatic". FastFlow makes available an f f_send_out
method in the f f_node class, which can be used to direct a data item onto
the concurrent activity output stream, without actually using the svc return
way.
In this case, we could have written the Stage as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Stagel : pubUc ff_node {
public :
Stagel ( int n) {
streamlen = n;
current = 0;
}
void * SVC (void * task) {
while ( current < streamlen) {
current++;
std : : cout « "Hello number " « current « " " « std
endl ;
int * p= (int *) calloc(sizeof(int),l);
*p = current ;
sleep ( 1 ) ;
ff_send_out (p) ;
}
return NULL;
}
private :
int streamlen , current ;
};
In this case, the Stagel is nm just once (as it immediately returns a NULL.
However, during the single run the svc while loop delivers the intended data
items on the output stream through the f f_send_out method. In case the
sends fill up the SPSC queue used to implement the stream, the f f_send_out
will block up to the moment Stage2 consumes some items and consequently
frees space in the SPSC buffers.
12
6 More on f f _node
The f f_node class actually defines three distinct virtual methods:
public :
virtual
void*
SVC (void *
task) = 0;
virtual
int
svc_init
{ return 0; };
virtual
void
svc_end ( )
{}
The first one is the one defining the behaviour of the node while processing the
input stream data items. The other two methods are automatically invoked once
and for all by the FastFlow RTS when the concurrent activity represented by
the node is started (svc_init) and right before it is terminated (svc_end).
These virtual methods may be overwritten in the user supplied f f_node
subclasses to implement initialization code and finalization code, respectively.
Actually, the svc method must be overwritten as it is defined as a pure virtual
method.
We ilhistrate the usage of the two methods with another program, computing
the Sieve of Eratosthenes. The sieve uses a number of stages in a pipeline. Each
stage stores the first integer it got on the input stream. Then is cycles passing
onto the output stream only the input stream items which are not multiple
of the stored integer. An initial stage injects in the pipeline the sequence of
integers starting at 2, up to n. Upon completion, each stage has stored a prime
number.
We can implement the Eratostheness sieve with the following FastFlow pro-
gram.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
^include <iostream>
#include <ff /pipeline .hpp>
using namespace ff;
class Sieve : public ff_node {
public :
SieveO { filter = 0; }
void * svc(void * taslc) {
unsigned int * t = (unsigned int *)tasl!;:
if ( filter = 0) {
filter = *t;
return GO_ON;
} else {
if(*t % filter = 0)
return GO_ON;
else
return task; ;
13
22 }
23 }
24
25 void svc_end() {
26 std : : cout « " Prime (" « filter « ")\n";
27 return ;
28 }
29
30
31 private :
32 int filter ;
33 };
34
35 class Generate: public ff-node {
36 public :
37
38 Generate (int n) {
39 streamlen = n;
40 task = 2;
41 std :: cout « "Generate object created" « std::endl;
42 return ;
43 }
44
45
46 int svc_init() {
47 std :: cout « "Sieve started. Generating a stream of " «
streamlen «
48 " elements, starting with " « task « std : : endl ;
49 return 0;
50 }
51
52 void * SVC (void * tt) {
53 unsigned int * t = (unsigned int *)tt;
54
55 if(task < streamlen) {
56 int * xi = (int *) calloc(l, sizeof ( int ) ) ;
57 *xi = task++;
58 return xi ;
59 } else {
60 return NULL;
61 }
62 }
63 private :
64 int streamlen ;
65 int task ;
66 };
67
68 class Printer : public ff_node {
69
70 int svc_init() {
14
71 std : : cout « "Printer started " « std : : endl ;
72 first = 0;
73 }
74
75 void * SVC (void *t) {
76 int * xi = ( int *) t;
77 if ( first = 0) {
78 first = *xi ;
79 }
80 return GOON;
81 }
82
83 void svc_end() {
84 std : : cout « " Sieve terminating , prime numbers found up to
" « first
85 « std : : endl ;
86 }
87
88 private :
89 int first ;
90 };
91
92 int main (int argc , char * argv[]) {
93 if (argc! = 3) {
94 std : : cerr « "use: " « argv [0] « " nstages streamlen\n"
95 return —1;
96 }
97
98 ff_pipeline pipe;
99 int nstages = at oi ( argv [ 1 ] ) ;
100 pipe . add_stage (new Generate ( at oi ( argv [ 2 ]))) ;
101 for(int j =0; j<nstages; j++)
102 pipe . add_stagc (new Sieve ());
103 pipe . add-stage (new Printer ());
104
105 f f T i m c ( START_TIME ) ;
106 if ( pipe . run_and-wait_end <0) {
107 error (" running pipeline \n" ) ;
108 return -1;
109 }
110 f f T i m e ( STOP_TIME ) ;
111
112 std:: cerr « "DONE, pipe time= " « pipe . ffTime ( ) « " (ms)
\n" ;
113 std:: cerr « "DONE, total time= " « ffTime (GET_TIME) « " (
ms) \n" ;
114 pipe . ff Stats ( std :: cerr ) ;
115 return 0;
116 }
15
sieve, cpp
The Generate stage at line 35-66 generates the integer stream, from 2 up
to a value taken from the command line parameters. It uses an svc_init just
to point out when the concurrent activity is started. The creation of the object
used to represent the concurrent activity is instead evidenced by the message
printed in the constructor.
The Sieve stage (lines 6-28) defines the generic pipeline stage. This stores
the initial value got from the input stream on lines 14-16 and then goes on pass-
ing the inputs not multiple of the stored values on lines 18-21. The svc_end
method is executed right before terminating the concurrent activity and prints
out the stored value, which happen to be the prime number found in that node.
The Printer stage is used as the last stage in the pipeline (the pipeline
build on lines 98-103 in the program main) and just discards all the received
values but the first one, which is kept to remember the point where we arrived
storing prime numbers. It defines both an svc_init method (to print a mes-
sage when the concurrent activity is started) and an svc_end method, which
is used to print the first integer received, representing the upper bound (non in-
cluded in) of the sequence of prime numbers discovered with the pipeline stages.
The concurrent activity graph of the program is the following one:
The program output, when run with 7 Sieve stages on a stream from 2 to
30, is the following one:
9
10
11
12
13
14
15
16
ffsrc$ ./sieve 7 30
Generate object created
Printer started
Sieve started. Generating a stream of 30 elements , starting
with 2
Prime(2)
Prime ( 3 )
Prime ( 5 )
Prime (7)
Prime ( Prime ( Sieve terminating, prime numbers found up to 1317)
)
19
Prime (11)
DONE, pipe time= 0.275 (ms)
DONE, total time= 25.568 (ms)
FastFlow trace not enabled
ffsrc$
showing that the prime numbers up to 19 (excluded) has been found.
16
7 Managing access to shared objects
Shared objects may be accessed within FastFlow programs using the classical
pthread concurrency control mechanisms. The FastFlow program is actually
a multithreaded code using the pthread library, in fact.
We demonstrate how access to sharc;(l objects may be ensured within a
FastFlow program forcing mutual exclusion in the access to the std: : cout
file descriptor. This will be used to have much nicer strings output on the
screen when running the Sieve program illustrated in the previous section.
In order to guarantee mutual exclusion on the shared std: : cout descrip-
tor we use a pthread_mutex_lock. The lock is declared and properly ini-
tialized as a static, global variable in the program (see code below, line 7).
Then each one of the writes to the std: : cout descriptor in the concurrent
activities relative to the different stages of the pipeline are protected through
a pthread_mutex_lock / pthread_mutex_unlock. "brackets" (see line
29-31 in the code below, as an example).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
^include <iostream>
#include <ff / pipeline . hpp>
#include <pthread.h>
using namespace ff;
static pthread_mutex_t lock = PTHREAD JVTUTEX JNITIALIZER ;
class Sieve : public ff_node {
public :
Sieve () { filter = 0; }
void * SVC (void * task) {
unsigned int * t = (unsigned int *)task;
if (filter = 0) {
filter = *t;
return GO_ON;
} else {
if(*t % filter = 0)
return GO_ON;
else
return task ;
}
}
void svc_end () {
pthread_mutex_Iock(&lock ) ;
std:: cout « "Prime(" « filter « ")\n";
pthroad_mutex_unlock(&lock ) ;
return ;
17
private :
int filter;
};
class Generate: public ff-node {
public :
Generate (int n) {
streamlen = n;
task = 2;
pthread_mutex_lock(&lock ) ;
std : : cout « "Generate object created" « std::endl;
pthread_mutex_unlock(&lock ) ;
return ;
}
int svc_init () {
pthread_mutex_lock(&lock ) ;
std :: cout « "Sieve started. Generating a stream of " «
streamlen «
" elements, starting with " « task « std : : endl ;
pthread_mutex_unlock(&lock ) ;
return 0;
}
void * SVC (void * tt ) {
unsigned int * t = (unsigned int *)tt;
if(task < streamlen) {
int * xi = (int *) calloc(l, sizeof ( int ) ) ;
*xi = task++;
return xi ;
} else {
return NULL;
}
}
private :
int streamlen ;
int task ;
};
class Printer : public ff_node {
int svc_init {
pthread_mutex_lock(&lock ) ;
std :: cout « "Printer started " « std :: endl ;
18
82 pthread_mutex_unlock(&lock ) ;
83 first = 0;
84 }
85
86 void * SVC (void *t) {
87 int * xi = ( int *) t;
88 if ( first = 0) {
89 first = *xi ;
90 }
91 return GOON;
92 }
93
94 void svc_end() {
95 pthread_mutex_Iock(&Iock ) ;
96 std : : cout « " Sieve terminating , prime numbers found up to
" « first
97 « std : : endl ;
98 pthread_mutex_unlock(&lock ) ;
99 }
100
101 private :
102 int first ;
103 };
104
105 int main(int argc , char * argv[]) {
106 if (argc! = 3) {
107 std : : cerr « "use: " « argv [0] « " nstages streamlen\n"
108 return -1;
109 }
110
111 ff_pipeline pipe;
112 int nstages = atoi ( argv [ 1 ] ) ;
113 pipe . add_stagc (new Generate ( at oi ( argv [ 2 ]))) ;
114 for (int j=0; j<nstages; j++)
115 pipe . add_stage (new Sieve ());
116 pipe . add_stage (new Printer ());
117
118 ffTime (START.TTME) ;
119 if ( pipe . run_and_wait_end <0) {
120 error (" running pipeline \n" ) ;
121 return —1;
122 }
123 ffTime (ST0P_T1ME) ;
124
125 std:: cerr « "DONE, pipe time= " « pipe . ffTime ( ) « " (ms)
\n" ;
126 std:: cerr « "DONE, total time= " « ffTime (GET.TIME) « " (
ms) \n" ;
127 pipe . ff Stats ( std :: cerr ) ;
19
return 0;
}
sievelock.cpp
When running the program, we get a sHghtly different output than the one
we obtained when the usage of std: : cout was not properly regulated:
1
ffsrcS ./a. out 7 30
2
Generate object created
3
Printer started
4
Sieve started. Generating
; a stream of 30 elements , starting
with 2
5
Prime (2)
6
Prime ( 5 )
7
Prime(13)
8
Prime (11)
9
Prime (7)
10
Sieve terminating , prime
numbers found up to 19
11
Prime ( 3 )
12
Prime(17)
13
DONE, pipe time= 58.439
(ms)
14
DONE, total time= 64.473
(ms)
15
FastFlow trace not enabled
16
ffsrc$
The strings are printed in clearly separated lines, although some apparently
unordered string sequence appears, which is due to the FastFlow scheduling of
the concurrent activities and to the way locks are implemented and managed
in the pthread library.
It is worth pointing out that
• FastFlow ensures correct access sequences to the shared object used to
implement the streaming networks (the graph of concurrent activities),
such as the SPSC queues used to implement the streams, as an example.
• FastFlow stream semantics guarantee correct sequencing of activation of
the concurrent activities modelled through f f_nodes and connected through
streams. The stream implementation actually ensures pure data flow se-
mantics.
• any access to any user defined shared data structure must be protected
with either the primitive mechanisms provided by FastFlow (see Sec.[7| or
the primitives provided within the pthread library.
8 More skeletons: the FastFlow farm
In the previous sections, we used only pipeline skeletons in the sample code.
Here we introduce the other primitive skeleton provided in FastFlow, namely
20
the farm skeleton.
The simplest way to define a farm skeleton in FastFlow is by declaring a
farm object and adding a vector of worker concurrent activities to the farm.
An excerpt of the needed code is the following one
^include < f f /farm . hpp>
using namespace ff ;
int main(int argc , char * argv[]) {
ff_farmO myFarm;
std : : vector<ff _node *> w;
for(int i =0; i <nworkers;++ i
w. push_back (new Worker);
myFarm. add_workers (w) ;
This code basically defines a farm with nworkers workers processing the data
items appearing onto the farm input stream and delivering results onto the farm
output stream. The scheduling policy used to send input tasks to workers is the
default one, that is round robin one. Workers are implemented by the f f_node
Worker objects. These objects may represent sequential concurrent activities
as well as further skeletons, that is either pipeline or farm instances.
However, this farm may not be used alone. There is no way to provide an
input stream to a FastFlow streaming network but having the first component
in the network generating the stream. To this purpose, FastFlow supports two
options:
we can use the farm defined with a code similar to the one described
above as the second stage of a pipeline whose first stage generates the
input stream according to one of the techniques discussed in Sec. [5] This
means we will use the farm writing a code such as:
ff_pipeline myPipe ;
myPipe . add_stage (new GeneratorStage ()
myPipe . add_stage (myFarm) ;
• or we can provide an emitter and a collector to the farm, specialized
in such a way they can be used to produce the input stream and consume
the output stream of the farm, respectively, while inheriting the default
scheduling and gathering policies.
21
The former case is simple. We only have to understand why adding the farm
to the pipeline as a pipeline stage works. This will discussed in detail in Sec. 10
The latter case is simple as well, but we discuss it through some more code.
8.1 Farm with emitter and collector
First, let us see what kind of objects we have to build to provide the farm
an emitter and a collector. Both emitter and collector must be
supplied as f f_node subclass objects. If we implement the emitter just pro-
viding the SVC method, the tasks delivered by the svc on the output stream
either using a f f_send_out or returning the proper pointer with the svc
return statement, those elements will be dispatched to the available work-
ers according to the default round robin scheduling. An example of emitter
node, generating the stream of tasks actually eventually processed by the farm
worker nodes is the following one:
1
class Emitter : public ff_node {
2
public :
3
Emitter(int n) {
4
streamlen — n;
5
task = 0;
6
};
7
8
void * svc (void *) {
9
sleep ( 1 ) ;
10
task+-|-;
11
int * t = new int ( task ) ;
12
if ( task<streamlen )
13
return t ;
14
else
15
return NULL;
16
}
17
18
private :
19
int streamlen ;
20
int task ;
21
};
In this case, the node svc actually does not take into account any input
stream item (the input parameter name is omitted on line 5). Rather, each
time the node is activated, it returns a task to be computed using the internal
ntasks value. The task is directed to the "next" worker by the FastFlow farm
run time support.
Concerning the collector, we can also use a f f_node: in case the results
need further processing, they can be directed to the next node in the stream-
ing network using the mechanisms detailed in Sec. [5] Otherwise, they can be
processed within the svc method of the f f_node subclass.
22
As an example, a collector just printing the tasks/results he gets from
the workers may be programmed as follows:
class Collector : public ff_nodc
{
public :
void * SVC (void * task) {
int * t = (int *)task;
std : : cout « " Collector
got " « *t « std::endl;
return GO_ON;
}
};
With these classes defined and assuming to have a worker defined by the
class:
class Worker: public ff_node {
public :
void * SVC (void * task) {
int * t = (int *)task;
(*t)++;
return task ;
}
};
we can define a program processing a stream of integers by increasing each one
of tliem witli a farm as follows:
1
int main(int argc , char * argv[]) {
2
int nworkers=atoi (argv [1] ) ;
3
int streamlen=atoi (argv [2] ) ;
4
5
ff_farmO farm;
6
7
Emitter E( streamlen ) ;
8
9
farm . add_emitter(&E) ;
10
std :: vector <ff_nodc *> w;
11
for (int i =0; i <nworkers;++ i )
12
w. push_back (new Worker);
13
farm . add-workers (w) ;
14
15
Collector C;
16
farm . add -collect or (&C) ;
17
18
if (farm . run_and_wait_end <0) {
19
error (" running farm\n" ) ;
20
return —1;
21
}
22
return 0;
23
23| }
The concurrent activity graph in this case is the foUowing one:
When run with the first argument specifying the number of workers to be
used and the second one specifying the length of the input stream generated in
the collector node, we get the expected output:
1
ffsrc$ ./a
. out
2 10
2
Collector
got
2
3
Collector
got
3
4
Collector
got
4
5
Collector
got
5
6
Collector
got
6
7
Collector
got
7
8
Collector
got
8
9
Collector
got
9
10
Collector
got
10
11
ffsrc$
8.2 Farm with no collector
We move on considering a further case: a farm with emitter but no collector.
Having no collector the workers may not deliver results: all the results computed
by the workers must
Lire la suite
- 1.19 MB
- 15
Vous recherchez le terme ""

46

56

40