While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):
To explore more of these examples please check out the examples directory in the TaskFlow source tree.
Note
If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.
Note
Full source located at hello_world.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 from taskflow import engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow.patterns import unordered_flow as uf
16 from taskflow import task
17
18
19 # INTRO: This is the defacto hello world equivalent for taskflow; it shows how
20 # an overly simplistic workflow can be created that runs using different
21 # engines using different styles of execution (all can be used to run in
22 # parallel if a workflow is provided that is parallelizable).
23
24 class PrinterTask(task.Task):
25 def __init__(self, name, show_name=True, inject=None):
26 super(PrinterTask, self).__init__(name, inject=inject)
27 self._show_name = show_name
28
29 def execute(self, output):
30 if self._show_name:
31 print("%s: %s" % (self.name, output))
32 else:
33 print(output)
34
35
36 # This will be the work that we want done, which for this example is just to
37 # print 'hello world' (like a song) using different tasks and different
38 # execution models.
39 song = lf.Flow("beats")
40
41 # Unordered flows when ran can be ran in parallel; and a chorus is everyone
42 # singing at once of course!
43 hi_chorus = uf.Flow('hello')
44 world_chorus = uf.Flow('world')
45 for (name, hello, world) in [('bob', 'hello', 'world'),
46 ('joe', 'hellooo', 'worllllld'),
47 ('sue', "helloooooo!", 'wooorllld!')]:
48 hi_chorus.add(PrinterTask("%s@hello" % name,
49 # This will show up to the execute() method of
50 # the task as the argument named 'output' (which
51 # will allow us to print the character we want).
52 inject={'output': hello}))
53 world_chorus.add(PrinterTask("%s@world" % name,
54 inject={'output': world}))
55
56 # The composition starts with the conductor and then runs in sequence with
57 # the chorus running in parallel, but no matter what the 'hello' chorus must
58 # always run before the 'world' chorus (otherwise the world will fall apart).
59 song.add(PrinterTask("conductor@begin",
60 show_name=False, inject={'output': "*ding*"}),
61 hi_chorus,
62 world_chorus,
63 PrinterTask("conductor@end",
64 show_name=False, inject={'output': "*dong*"}))
65
66 # Run in parallel using eventlet green threads...
67 try:
68 import eventlet as _eventlet # noqa
69 except ImportError:
70 # No eventlet currently active, skip running with it...
71 pass
72 else:
73 print("-- Running in parallel using eventlet --")
74 e = engines.load(song, executor='greenthreaded', engine='parallel',
75 max_workers=1)
76 e.run()
77
78
79 # Run in parallel using real threads...
80 print("-- Running in parallel using threads --")
81 e = engines.load(song, executor='threaded', engine='parallel',
82 max_workers=1)
83 e.run()
84
85
86 # Run in parallel using external processes...
87 print("-- Running in parallel using processes --")
88 e = engines.load(song, executor='processes', engine='parallel',
89 max_workers=1)
90 e.run()
91
92
93 # Run serially (aka, if the workflow could have been ran in parallel, it will
94 # not be when ran in this mode)...
95 print("-- Running serially --")
96 e = engines.load(song, engine='serial')
97 e.run()
98 print("-- Statistics gathered --")
99 print(e.statistics)
Note
Full source located at simple_linear_pass.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 self_dir = os.path.abspath(os.path.dirname(__file__))
9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12 sys.path.insert(0, top_dir)
13 sys.path.insert(0, self_dir)
14
15 from taskflow import engines
16 from taskflow.patterns import linear_flow
17 from taskflow import task
18
19 # INTRO: This example shows how a task (in a linear/serial workflow) can
20 # produce an output that can be then consumed/used by a downstream task.
21
22
23 class TaskA(task.Task):
24 default_provides = 'a'
25
26 def execute(self):
27 print("Executing '%s'" % (self.name))
28 return 'a'
29
30
31 class TaskB(task.Task):
32 def execute(self, a):
33 print("Executing '%s'" % (self.name))
34 print("Got input '%s'" % (a))
35
36
37 print("Constructing...")
38 wf = linear_flow.Flow("pass-from-to")
39 wf.add(TaskA('a'), TaskB('b'))
40
41 print("Loading...")
42 e = engines.load(wf)
43
44 print("Compiling...")
45 e.compile()
46
47 print("Preparing...")
48 e.prepare()
49
50 print("Running...")
51 e.run()
52
53 print("Done...")
Note
Full source located at echo_listener.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.DEBUG)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 from taskflow import engines
14 from taskflow.listeners import logging as logging_listener
15 from taskflow.patterns import linear_flow as lf
16 from taskflow import task
17
18 # INTRO: This example walks through a miniature workflow which will do a
19 # simple echo operation; during this execution a listener is associated with
20 # the engine to receive all notifications about what the flow has performed,
21 # this example dumps that output to the stdout for viewing (at debug level
22 # to show all the information which is possible).
23
24
25 class Echo(task.Task):
26 def execute(self):
27 print(self.name)
28
29
30 # Generate the work to be done (but don't do it yet).
31 wf = lf.Flow('abc')
32 wf.add(Echo('a'))
33 wf.add(Echo('b'))
34 wf.add(Echo('c'))
35
36 # This will associate the listener with the engine (the listener
37 # will automatically register for notifications with the engine and deregister
38 # when the context is exited).
39 e = engines.load(wf)
40 with logging_listener.DynamicLoggingListener(e):
41 e.run()
Note
Full source located at simple_linear_listening.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16 from taskflow.types import notifier
17
18 ANY = notifier.Notifier.ANY
19
20 # INTRO: In this example we create two tasks (this time as functions instead
21 # of task subclasses as in the simple_linear.py example), each of which ~calls~
22 # a given ~phone~ number (provided as a function input) in a linear fashion
23 # (one after the other).
24 #
25 # For a workflow which is serial this shows an extremely simple way
26 # of structuring your tasks (the code that does the work) into a linear
27 # sequence (the flow) and then passing the work off to an engine, with some
28 # initial data to be ran in a reliable manner.
29 #
30 # This example shows a basic usage of the taskflow structures without involving
31 # the complexity of persistence. Using the structures that taskflow provides
32 # via tasks and flows makes it possible for you to easily at a later time
33 # hook in a persistence layer (and then gain the functionality that offers)
34 # when you decide the complexity of adding that layer in is 'worth it' for your
35 # applications usage pattern (which some applications may not need).
36 #
37 # It **also** adds on to the simple_linear.py example by adding a set of
38 # callback functions which the engine will call when a flow state transition
39 # or task state transition occurs. These types of functions are useful for
40 # updating task or flow progress, or for debugging, sending notifications to
41 # external systems, or for other yet unknown future usage that you may create!
42
43
44 def call_jim(context):
45 print("Calling jim.")
46 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
47
48
49 def call_joe(context):
50 print("Calling joe.")
51 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
52
53
54 def flow_watch(state, details):
55 print('Flow => %s' % state)
56
57
58 def task_watch(state, details):
59 print('Task %s => %s' % (details.get('task_name'), state))
60
61
62 # Wrap your functions into a task type that knows how to treat your functions
63 # as tasks. There was previous work done to just allow a function to be
64 # directly passed, but in python 3.0 there is no easy way to capture an
65 # instance method, so this wrapping approach was decided upon instead which
66 # can attach to instance methods (if that's desired).
67 flow = lf.Flow("Call-them")
68 flow.add(task.FunctorTask(execute=call_jim))
69 flow.add(task.FunctorTask(execute=call_joe))
70
71 # Now load (but do not run) the flow using the provided initial data.
72 engine = taskflow.engines.load(flow, store={
73 'context': {
74 "joe_number": 444,
75 "jim_number": 555,
76 }
77 })
78
79 # This is where we attach our callback functions to the 2 different
80 # notification objects that an engine exposes. The usage of a ANY (kleene star)
81 # here means that we want to be notified on all state changes, if you want to
82 # restrict to a specific state change, just register that instead.
83 engine.notifier.register(ANY, flow_watch)
84 engine.atom_notifier.register(ANY, task_watch)
85
86 # And now run!
87 engine.run()
Note
Full source located at dump_memory_backend.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 self_dir = os.path.abspath(os.path.dirname(__file__))
9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12 sys.path.insert(0, top_dir)
13 sys.path.insert(0, self_dir)
14
15 from taskflow import engines
16 from taskflow.patterns import linear_flow as lf
17 from taskflow import task
18
19 # INTRO: in this example we create a dummy flow with a dummy task, and run
20 # it using a in-memory backend and pre/post run we dump out the contents
21 # of the in-memory backends tree structure (which can be quite useful to
22 # look at for debugging or other analysis).
23
24
25 class PrintTask(task.Task):
26 def execute(self):
27 print("Running '%s'" % self.name)
28
29 # Make a little flow and run it...
30 f = lf.Flow('root')
31 for alpha in ['a', 'b', 'c']:
32 f.add(PrintTask(alpha))
33
34 e = engines.load(f)
35 e.compile()
36 e.prepare()
37
38 # After prepare the storage layer + backend can now be accessed safely...
39 backend = e.storage.backend
40
41 print("----------")
42 print("Before run")
43 print("----------")
44 print(backend.memory.pformat())
45 print("----------")
46
47 e.run()
48
49 print("---------")
50 print("After run")
51 print("---------")
52 for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
53 value = backend.memory[path]
54 if value:
55 print("%s -> %s" % (path, value))
56 else:
57 print("%s" % (path))
Note
Full source located at simple_linear.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16
17 # INTRO: In this example we create two tasks, each of which ~calls~ a given
18 # ~phone~ number (provided as a function input) in a linear fashion (one after
19 # the other). For a workflow which is serial this shows a extremely simple way
20 # of structuring your tasks (the code that does the work) into a linear
21 # sequence (the flow) and then passing the work off to an engine, with some
22 # initial data to be ran in a reliable manner.
23 #
24 # NOTE(harlowja): This example shows a basic usage of the taskflow structures
25 # without involving the complexity of persistence. Using the structures that
26 # taskflow provides via tasks and flows makes it possible for you to easily at
27 # a later time hook in a persistence layer (and then gain the functionality
28 # that offers) when you decide the complexity of adding that layer in
29 # is 'worth it' for your application's usage pattern (which certain
30 # applications may not need).
31
32
33 class CallJim(task.Task):
34 def execute(self, jim_number, *args, **kwargs):
35 print("Calling jim %s." % jim_number)
36
37
38 class CallJoe(task.Task):
39 def execute(self, joe_number, *args, **kwargs):
40 print("Calling joe %s." % joe_number)
41
42
43 # Create your flow and associated tasks (the work to be done).
44 flow = lf.Flow('simple-linear').add(
45 CallJim(),
46 CallJoe()
47 )
48
49 # Now run that flow using the provided initial data (store below).
50 taskflow.engines.run(flow, store=dict(joe_number=444,
51 jim_number=555))
Note
Full source located at reverting_linear.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16
17 # INTRO: In this example we create three tasks, each of which ~calls~ a given
18 # number (provided as a function input), one of those tasks *fails* calling a
19 # given number (the suzzie calling); this causes the workflow to enter the
20 # reverting process, which activates the revert methods of the previous two
21 # phone ~calls~.
22 #
23 # This simulated calling makes it appear like all three calls occur or all
24 # three don't occur (transaction-like capabilities). No persistence layer is
25 # used here so reverting and executing will *not* be tolerant of process
26 # failure.
27
28
29 class CallJim(task.Task):
30 def execute(self, jim_number, *args, **kwargs):
31 print("Calling jim %s." % jim_number)
32
33 def revert(self, jim_number, *args, **kwargs):
34 print("Calling %s and apologizing." % jim_number)
35
36
37 class CallJoe(task.Task):
38 def execute(self, joe_number, *args, **kwargs):
39 print("Calling joe %s." % joe_number)
40
41 def revert(self, joe_number, *args, **kwargs):
42 print("Calling %s and apologizing." % joe_number)
43
44
45 class CallSuzzie(task.Task):
46 def execute(self, suzzie_number, *args, **kwargs):
47 raise IOError("Suzzie not home right now.")
48
49
50 # Create your flow and associated tasks (the work to be done).
51 flow = lf.Flow('simple-linear').add(
52 CallJim(),
53 CallJoe(),
54 CallSuzzie()
55 )
56
57 try:
58 # Now run that flow using the provided initial data (store below).
59 taskflow.engines.run(flow, store=dict(joe_number=444,
60 jim_number=555,
61 suzzie_number=666))
62 except Exception as e:
63 # NOTE(harlowja): This exception will be the exception that came out of the
64 # 'CallSuzzie' task instead of a different exception, this is useful since
65 # typically surrounding code wants to handle the original exception and not
66 # a wrapped or altered one.
67 #
68 # *WARNING* If this flow was multi-threaded and multiple active tasks threw
69 # exceptions then the above exception would be wrapped into a combined
70 # exception (the object has methods to iterate over the contained
71 # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
72 # how to deal with multiple tasks failing while running.
73 #
74 # You will also note that this is not a problem in this case since no
75 # parallelism is involved; this is ensured by the usage of a linear flow
76 # and the default engine type which is 'serial' vs being 'parallel'.
77 print("Flow failed: %s" % e)
Note
Full source located at build_a_car.
1
2 import logging
3 import os
4 import sys
5
6
7 logging.basicConfig(level=logging.ERROR)
8
9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12 sys.path.insert(0, top_dir)
13
14
15 import taskflow.engines
16 from taskflow.patterns import graph_flow as gf
17 from taskflow.patterns import linear_flow as lf
18 from taskflow import task
19 from taskflow.types import notifier
20
21 ANY = notifier.Notifier.ANY
22
23 import example_utils as eu # noqa
24
25
26 # INTRO: This example shows how a graph flow and linear flow can be used
27 # together to execute dependent & non-dependent tasks by going through the
28 # steps required to build a simplistic car (an assembly line if you will). It
29 # also shows how raw functions can be wrapped into a task object instead of
30 # being forced to use the more *heavy* task base class. This is useful in
31 # scenarios where pre-existing code has functions that you easily want to
32 # plug-in to taskflow, without requiring a large amount of code changes.
33
34
35 def build_frame():
36 return 'steel'
37
38
39 def build_engine():
40 return 'honda'
41
42
43 def build_doors():
44 return '2'
45
46
47 def build_wheels():
48 return '4'
49
50
51 # These just return true to indiciate success, they would in the real work
52 # do more than just that.
53
54 def install_engine(frame, engine):
55 return True
56
57
58 def install_doors(frame, windows_installed, doors):
59 return True
60
61
62 def install_windows(frame, doors):
63 return True
64
65
66 def install_wheels(frame, engine, engine_installed, wheels):
67 return True
68
69
70 def trash(**kwargs):
71 eu.print_wrapped("Throwing away pieces of car!")
72
73
74 def startup(**kwargs):
75 # If you want to see the rollback function being activated try uncommenting
76 # the following line.
77 #
78 # raise ValueError("Car not verified")
79 return True
80
81
82 def verify(spec, **kwargs):
83 # If the car is not what we ordered throw away the car (trigger reversion).
84 for key, value in kwargs.items():
85 if spec[key] != value:
86 raise Exception("Car doesn't match spec!")
87 return True
88
89
90 # These two functions connect into the state transition notification emission
91 # points that the engine outputs, they can be used to log state transitions
92 # that are occurring, or they can be used to suspend the engine (or perform
93 # other useful activities).
94 def flow_watch(state, details):
95 print('Flow => %s' % state)
96
97
98 def task_watch(state, details):
99 print('Task %s => %s' % (details.get('task_name'), state))
100
101
102 flow = lf.Flow("make-auto").add(
103 task.FunctorTask(startup, revert=trash, provides='ran'),
104 # A graph flow allows automatic dependency based ordering, the ordering
105 # is determined by analyzing the symbols required and provided and ordering
106 # execution based on a functioning order (if one exists).
107 gf.Flow("install-parts").add(
108 task.FunctorTask(build_frame, provides='frame'),
109 task.FunctorTask(build_engine, provides='engine'),
110 task.FunctorTask(build_doors, provides='doors'),
111 task.FunctorTask(build_wheels, provides='wheels'),
112 # These *_installed outputs allow for other tasks to depend on certain
113 # actions being performed (aka the components were installed), another
114 # way to do this is to link() the tasks manually instead of creating
115 # an 'artificial' data dependency that accomplishes the same goal the
116 # manual linking would result in.
117 task.FunctorTask(install_engine, provides='engine_installed'),
118 task.FunctorTask(install_doors, provides='doors_installed'),
119 task.FunctorTask(install_windows, provides='windows_installed'),
120 task.FunctorTask(install_wheels, provides='wheels_installed')),
121 task.FunctorTask(verify, requires=['frame',
122 'engine',
123 'doors',
124 'wheels',
125 'engine_installed',
126 'doors_installed',
127 'windows_installed',
128 'wheels_installed']))
129
130 # This dictionary will be provided to the tasks as a specification for what
131 # the tasks should produce, in this example this specification will influence
132 # what those tasks do and what output they create. Different tasks depend on
133 # different information from this specification, all of which will be provided
134 # automatically by the engine to those tasks.
135 spec = {
136 "frame": 'steel',
137 "engine": 'honda',
138 "doors": '2',
139 "wheels": '4',
140 # These are used to compare the result product, a car without the pieces
141 # installed is not a car after all.
142 "engine_installed": True,
143 "doors_installed": True,
144 "windows_installed": True,
145 "wheels_installed": True,
146 }
147
148
149 engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
150
151 # This registers all (ANY) state transitions to trigger a call to the
152 # flow_watch function for flow state transitions, and registers the
153 # same all (ANY) state transitions for task state transitions.
154 engine.notifier.register(ANY, flow_watch)
155 engine.atom_notifier.register(ANY, task_watch)
156
157 eu.print_wrapped("Building a car")
158 engine.run()
159
160 # Alter the specification and ensure that the reverting logic gets triggered
161 # since the resultant car that will be built by the build_wheels function will
162 # build a car with 4 doors only (not 5), this will cause the verification
163 # task to mark the car that is produced as not matching the desired spec.
164 spec['doors'] = 5
165
166 engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
167 engine.notifier.register(ANY, flow_watch)
168 engine.atom_notifier.register(ANY, task_watch)
169
170 eu.print_wrapped("Building a wrong car that doesn't match specification")
171 try:
172 engine.run()
173 except Exception as e:
174 eu.print_wrapped("Flow failed: %s" % e)
Note
Full source located at alphabet_soup.
1
2 import fractions
3 import functools
4 import logging
5 import os
6 import string
7 import sys
8 import time
9
10 logging.basicConfig(level=logging.ERROR)
11
12 self_dir = os.path.abspath(os.path.dirname(__file__))
13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16 sys.path.insert(0, top_dir)
17 sys.path.insert(0, self_dir)
18
19 from taskflow import engines
20 from taskflow import exceptions
21 from taskflow.patterns import linear_flow
22 from taskflow import task
23
24
25 # In this example we show how a simple linear set of tasks can be executed
26 # using local processes (and not threads or remote workers) with minimal (if
27 # any) modification to those tasks to make them safe to run in this mode.
28 #
29 # This is useful since it allows further scaling up your workflows when thread
30 # execution starts to become a bottleneck (which it can start to be due to the
31 # GIL in python). It also offers a intermediary scalable runner that can be
32 # used when the scale and/or setup of remote workers is not desirable.
33
34
35 def progress_printer(task, event_type, details):
36 # This callback, attached to each task will be called in the local
37 # process (not the child processes)...
38 progress = details.pop('progress')
39 progress = int(progress * 100.0)
40 print("Task '%s' reached %d%% completion" % (task.name, progress))
41
42
43 class AlphabetTask(task.Task):
44 # Second delay between each progress part.
45 _DELAY = 0.1
46
47 # This task will run in X main stages (each with a different progress
48 # report that will be delivered back to the running process...). The
49 # initial 0% and 100% are triggered automatically by the engine when
50 # a task is started and finished (so that's why those are not emitted
51 # here).
52 _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
53
54 def execute(self):
55 for p in self._PROGRESS_PARTS:
56 self.update_progress(p)
57 time.sleep(self._DELAY)
58
59
60 print("Constructing...")
61 soup = linear_flow.Flow("alphabet-soup")
62 for letter in string.ascii_lowercase:
63 abc = AlphabetTask(letter)
64 abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
65 functools.partial(progress_printer, abc))
66 soup.add(abc)
67 try:
68 print("Loading...")
69 e = engines.load(soup, engine='parallel', executor='processes')
70 print("Compiling...")
71 e.compile()
72 print("Preparing...")
73 e.prepare()
74 print("Running...")
75 e.run()
76 print("Done: %s" % e.statistics)
77 except exceptions.NotImplementedError as e:
78 print(e)
Note
Full source located at timing_listener.
1
2 import logging
3 import os
4 import random
5 import sys
6 import time
7
8 logging.basicConfig(level=logging.ERROR)
9
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13 sys.path.insert(0, top_dir)
14
15 from taskflow import engines
16 from taskflow.listeners import timing
17 from taskflow.patterns import linear_flow as lf
18 from taskflow import task
19
20 # INTRO: in this example we will attach a listener to an engine
21 # and have variable run time tasks run and show how the listener will print
22 # out how long those tasks took (when they started and when they finished).
23 #
24 # This shows how timing metrics can be gathered (or attached onto an engine)
25 # after a workflow has been constructed, making it easy to gather metrics
26 # dynamically for situations where this kind of information is applicable (or
27 # even adding this information on at a later point in the future when your
28 # application starts to slow down).
29
30
31 class VariableTask(task.Task):
32 def __init__(self, name):
33 super(VariableTask, self).__init__(name)
34 self._sleepy_time = random.random()
35
36 def execute(self):
37 time.sleep(self._sleepy_time)
38
39
40 f = lf.Flow('root')
41 f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
42 e = engines.load(f)
43 with timing.PrintingDurationListener(e):
44 e.run()
Note
Full source located at distance_calculator
1
2 import collections
3 import math
4 import os
5 import sys
6
7 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10 sys.path.insert(0, top_dir)
11
12 from taskflow import engines
13 from taskflow.patterns import linear_flow
14 from taskflow import task
15
16 # INTRO: This shows how to use a tasks/atoms ability to take requirements from
17 # its execute functions default parameters and shows how to provide those
18 # via different methods when needed, to influence those parameters to in
19 # this case calculate the distance between two points in 2D space.
20
21 # A 2D point.
22 Point = collections.namedtuple("Point", "x,y")
23
24
25 def is_near(val, expected, tolerance=0.001):
26 # Floats don't really provide equality...
27 if val > (expected + tolerance):
28 return False
29 if val < (expected - tolerance):
30 return False
31 return True
32
33
34 class DistanceTask(task.Task):
35 # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
36
37 default_provides = 'distance'
38
39 def execute(self, a=Point(0, 0), b=Point(0, 0)):
40 return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
41
42
43 if __name__ == '__main__':
44 # For these we rely on the execute() methods points by default being
45 # at the origin (and we override it with store values when we want) at
46 # execution time (which then influences what is calculated).
47 any_distance = linear_flow.Flow("origin").add(DistanceTask())
48 results = engines.run(any_distance)
49 print(results)
50 print("%s is near-enough to %s: %s" % (results['distance'],
51 0.0,
52 is_near(results['distance'], 0.0)))
53
54 results = engines.run(any_distance, store={'a': Point(1, 1)})
55 print(results)
56 print("%s is near-enough to %s: %s" % (results['distance'],
57 1.4142,
58 is_near(results['distance'],
59 1.4142)))
60
61 results = engines.run(any_distance, store={'a': Point(10, 10)})
62 print(results)
63 print("%s is near-enough to %s: %s" % (results['distance'],
64 14.14199,
65 is_near(results['distance'],
66 14.14199)))
67
68 results = engines.run(any_distance,
69 store={'a': Point(5, 5), 'b': Point(10, 10)})
70 print(results)
71 print("%s is near-enough to %s: %s" % (results['distance'],
72 7.07106,
73 is_near(results['distance'],
74 7.07106)))
75
76 # For this we use the ability to override at task creation time the
77 # optional arguments so that we don't need to continue to send them
78 # in via the 'store' argument like in the above (and we fix the new
79 # starting point 'a' at (10, 10) instead of (0, 0)...
80
81 ten_distance = linear_flow.Flow("ten")
82 ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
83 results = engines.run(ten_distance, store={'b': Point(10, 10)})
84 print(results)
85 print("%s is near-enough to %s: %s" % (results['distance'],
86 0.0,
87 is_near(results['distance'], 0.0)))
88
89 results = engines.run(ten_distance)
90 print(results)
91 print("%s is near-enough to %s: %s" % (results['distance'],
92 14.14199,
93 is_near(results['distance'],
94 14.14199)))
Note
Full source located at parallel_table_multiply
1
2 import csv
3 import logging
4 import os
5 import random
6 import sys
7
8 logging.basicConfig(level=logging.ERROR)
9
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13 sys.path.insert(0, top_dir)
14
15 import futurist
16 from six.moves import range as compat_range
17
18 from taskflow import engines
19 from taskflow.patterns import unordered_flow as uf
20 from taskflow import task
21
22 # INTRO: This example walks through a miniature workflow which does a parallel
23 # table modification where each row in the table gets adjusted by a thread, or
24 # green thread (if eventlet is available) in parallel and then the result
25 # is reformed into a new table and some verifications are performed on it
26 # to ensure everything went as expected.
27
28
29 MULTIPLER = 10
30
31
32 class RowMultiplier(task.Task):
33 """Performs a modification of an input row, creating a output row."""
34
35 def __init__(self, name, index, row, multiplier):
36 super(RowMultiplier, self).__init__(name=name)
37 self.index = index
38 self.multiplier = multiplier
39 self.row = row
40
41 def execute(self):
42 return [r * self.multiplier for r in self.row]
43
44
45 def make_flow(table):
46 # This creation will allow for parallel computation (since the flow here
47 # is specifically unordered; and when things are unordered they have
48 # no dependencies and when things have no dependencies they can just be
49 # ran at the same time, limited in concurrency by the executor or max
50 # workers of that executor...)
51 f = uf.Flow("root")
52 for i, row in enumerate(table):
53 f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
54 # NOTE(harlowja): at this point nothing has ran, the above is just
55 # defining what should be done (but not actually doing it) and associating
56 # an ordering dependencies that should be enforced (the flow pattern used
57 # forces this), the engine in the later main() function will actually
58 # perform this work...
59 return f
60
61
62 def main():
63 if len(sys.argv) == 2:
64 tbl = []
65 with open(sys.argv[1], 'rb') as fh:
66 reader = csv.reader(fh)
67 for row in reader:
68 tbl.append([float(r) if r else 0.0 for r in row])
69 else:
70 # Make some random table out of thin air...
71 tbl = []
72 cols = random.randint(1, 100)
73 rows = random.randint(1, 100)
74 for _i in compat_range(0, rows):
75 row = []
76 for _j in compat_range(0, cols):
77 row.append(random.random())
78 tbl.append(row)
79
80 # Generate the work to be done.
81 f = make_flow(tbl)
82
83 # Now run it (using the specified executor)...
84 try:
85 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
86 except RuntimeError:
87 # No eventlet currently active, use real threads instead.
88 executor = futurist.ThreadPoolExecutor(max_workers=5)
89 try:
90 e = engines.load(f, engine='parallel', executor=executor)
91 for st in e.run_iter():
92 print(st)
93 finally:
94 executor.shutdown()
95
96 # Find the old rows and put them into place...
97 #
98 # TODO(harlowja): probably easier just to sort instead of search...
99 computed_tbl = []
100 for i in compat_range(0, len(tbl)):
101 for t in f:
102 if t.index == i:
103 computed_tbl.append(e.storage.get(t.name))
104
105 # Do some basic validation (which causes the return code of this process
106 # to be different if things were not as expected...)
107 if len(computed_tbl) != len(tbl):
108 return 1
109 else:
110 return 0
111
112
113 if __name__ == "__main__":
114 sys.exit(main())
Note
Full source located at calculate_linear.
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16
17
18 # INTRO: In this example a linear flow is used to group four tasks to calculate
19 # a value. A single added task is used twice, showing how this can be done
20 # and the twice added task takes in different bound values. In the first case
21 # it uses default parameters ('x' and 'y') and in the second case arguments
22 # are bound with ('z', 'd') keys from the engines internal storage mechanism.
23 #
24 # A multiplier task uses a binding that another task also provides, but this
25 # example explicitly shows that 'z' parameter is bound with 'a' key
26 # This shows that if a task depends on a key named the same as a key provided
27 # from another task the name can be remapped to take the desired key from a
28 # different origin.
29
30
31 # This task provides some values from as a result of execution, this can be
32 # useful when you want to provide values from a static set to other tasks that
33 # depend on those values existing before those tasks can run.
34 #
35 # NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
36 # that just provides those values on engine running by prepopulating the
37 # storage backend before your tasks are ran (which accomplishes a similar goal
38 # in a more uniform manner).
39 class Provider(task.Task):
40
41 def __init__(self, name, *args, **kwargs):
42 super(Provider, self).__init__(name=name, **kwargs)
43 self._provide = args
44
45 def execute(self):
46 return self._provide
47
48
49 # This task adds two input variables and returns the result.
50 #
51 # Note that since this task does not have a revert() function (since addition
52 # is a stateless operation) there are no side-effects that this function needs
53 # to undo if some later operation fails.
54 class Adder(task.Task):
55 def execute(self, x, y):
56 return x + y
57
58
59 # This task multiplies an input variable by a multiplier and returns the
60 # result.
61 #
62 # Note that since this task does not have a revert() function (since
63 # multiplication is a stateless operation) and there are no side-effects that
64 # this function needs to undo if some later operation fails.
65 class Multiplier(task.Task):
66 def __init__(self, name, multiplier, provides=None, rebind=None):
67 super(Multiplier, self).__init__(name=name, provides=provides,
68 rebind=rebind)
69 self._multiplier = multiplier
70
71 def execute(self, z):
72 return z * self._multiplier
73
74
75 # Note here that the ordering is established so that the correct sequences
76 # of operations occurs where the adding and multiplying is done according
77 # to the expected and typical mathematical model. A graph flow could also be
78 # used here to automatically infer & ensure the correct ordering.
79 flow = lf.Flow('root').add(
80 # Provide the initial values for other tasks to depend on.
81 #
82 # x = 2, y = 3, d = 5
83 Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
84 # z = x+y = 5
85 Adder("add-1", provides='z'),
86 # a = z+d = 10
87 Adder("add-2", provides='a', rebind=['z', 'd']),
88 # Calculate 'r = a*3 = 30'
89 #
90 # Note here that the 'z' argument of the execute() function will not be
91 # bound to the 'z' variable provided from the above 'provider' object but
92 # instead the 'z' argument will be taken from the 'a' variable provided
93 # by the second add-2 listed above.
94 Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
95 )
96
97 # The result here will be all results (from all tasks) which is stored in an
98 # in-memory storage location that backs this engine since it is not configured
99 # with persistence storage.
100 results = taskflow.engines.run(flow)
101 print(results)
Source: graph_flow.py
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import graph_flow as gf
15 from taskflow.patterns import linear_flow as lf
16 from taskflow import task
17
18
19 # In this example there are complex *inferred* dependencies between tasks that
20 # are used to perform a simple set of linear equations.
21 #
22 # As you will see below the tasks just define what they require as input
23 # and produce as output (named values). Then the user doesn't care about
24 # ordering the tasks (in this case the tasks calculate pieces of the overall
25 # equation).
26 #
27 # As you will notice a graph flow resolves dependencies automatically using the
28 # tasks symbol requirements and provided symbol values and no orderin
29 # dependency has to be manually created.
30 #
31 # Also notice that flows of any types can be nested into a graph flow; showing
32 # that subflow dependencies (and associated ordering) will be inferred too.
33
34
35 class Adder(task.Task):
36
37 def execute(self, x, y):
38 return x + y
39
40
41 flow = gf.Flow('root').add(
42 lf.Flow('nested_linear').add(
43 # x2 = y3+y4 = 12
44 Adder("add2", provides='x2', rebind=['y3', 'y4']),
45 # x1 = y1+y2 = 4
46 Adder("add1", provides='x1', rebind=['y1', 'y2'])
47 ),
48 # x5 = x1+x3 = 20
49 Adder("add5", provides='x5', rebind=['x1', 'x3']),
50 # x3 = x1+x2 = 16
51 Adder("add3", provides='x3', rebind=['x1', 'x2']),
52 # x4 = x2+y5 = 21
53 Adder("add4", provides='x4', rebind=['x2', 'y5']),
54 # x6 = x5+x4 = 41
55 Adder("add6", provides='x6', rebind=['x5', 'x4']),
56 # x7 = x6+x6 = 82
57 Adder("add7", provides='x7', rebind=['x6', 'x6']))
58
59 # Provide the initial variable inputs using a storage dictionary.
60 store = {
61 "y1": 1,
62 "y2": 3,
63 "y3": 5,
64 "y4": 7,
65 "y5": 9,
66 }
67
68 # This is the expected values that should be created.
69 unexpected = 0
70 expected = [
71 ('x1', 4),
72 ('x2', 12),
73 ('x3', 16),
74 ('x4', 21),
75 ('x5', 20),
76 ('x6', 41),
77 ('x7', 82),
78 ]
79
80 result = taskflow.engines.run(
81 flow, engine='serial', store=store)
82
83 print("Single threaded engine result %s" % result)
84 for (name, value) in expected:
85 actual = result.get(name)
86 if actual != value:
87 sys.stderr.write("%s != %s\n" % (actual, value))
88 unexpected += 1
89
90 result = taskflow.engines.run(
91 flow, engine='parallel', store=store)
92
93 print("Multi threaded engine result %s" % result)
94 for (name, value) in expected:
95 actual = result.get(name)
96 if actual != value:
97 sys.stderr.write("%s != %s\n" % (actual, value))
98 unexpected += 1
99
100 if unexpected:
101 sys.exit(1)
Note
Full source located at calculate_in_parallel
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow.patterns import unordered_flow as uf
16 from taskflow import task
17
18 # INTRO: These examples show how a linear flow and an unordered flow can be
19 # used together to execute calculations in parallel and then use the
20 # result for the next task/s. The adder task is used for all calculations
21 # and argument bindings are used to set correct parameters for each task.
22
23
24 # This task provides some values from as a result of execution, this can be
25 # useful when you want to provide values from a static set to other tasks that
26 # depend on those values existing before those tasks can run.
27 #
28 # NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
29 # that provides those values on engine running by prepopulating the storage
30 # backend before your tasks are ran (which accomplishes a similar goal in a
31 # more uniform manner).
32 class Provider(task.Task):
33 def __init__(self, name, *args, **kwargs):
34 super(Provider, self).__init__(name=name, **kwargs)
35 self._provide = args
36
37 def execute(self):
38 return self._provide
39
40
41 # This task adds two input variables and returns the result of that addition.
42 #
43 # Note that since this task does not have a revert() function (since addition
44 # is a stateless operation) there are no side-effects that this function needs
45 # to undo if some later operation fails.
46 class Adder(task.Task):
47 def execute(self, x, y):
48 return x + y
49
50
51 flow = lf.Flow('root').add(
52 # Provide the initial values for other tasks to depend on.
53 #
54 # x1 = 2, y1 = 3, x2 = 5, x3 = 8
55 Provider("provide-adder", 2, 3, 5, 8,
56 provides=('x1', 'y1', 'x2', 'y2')),
57 # Note here that we define the flow that contains the 2 adders to be an
58 # unordered flow since the order in which these execute does not matter,
59 # another way to solve this would be to use a graph_flow pattern, which
60 # also can run in parallel (since they have no ordering dependencies).
61 uf.Flow('adders').add(
62 # Calculate 'z1 = x1+y1 = 5'
63 #
64 # Rebind here means that the execute() function x argument will be
65 # satisfied from a previous output named 'x1', and the y argument
66 # of execute() will be populated from the previous output named 'y1'
67 #
68 # The output (result of adding) will be mapped into a variable named
69 # 'z1' which can then be refereed to and depended on by other tasks.
70 Adder(name="add", provides='z1', rebind=['x1', 'y1']),
71 # z2 = x2+y2 = 13
72 Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
73 ),
74 # r = z1+z2 = 18
75 Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
76
77
78 # The result here will be all results (from all tasks) which is stored in an
79 # in-memory storage location that backs this engine since it is not configured
80 # with persistence storage.
81 result = taskflow.engines.run(flow, engine='parallel')
82 print(result)
Note
Full source located at create_parallel_volume
1
2 import contextlib
3 import logging
4 import os
5 import random
6 import sys
7 import time
8
9 logging.basicConfig(level=logging.ERROR)
10
11 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14 sys.path.insert(0, top_dir)
15
16 from oslo_utils import reflection
17
18 from taskflow import engines
19 from taskflow.listeners import printing
20 from taskflow.patterns import unordered_flow as uf
21 from taskflow import task
22
23 # INTRO: These examples show how unordered_flow can be used to create a large
24 # number of fake volumes in parallel (or serially, depending on a constant that
25 # can be easily changed).
26
27
28 @contextlib.contextmanager
29 def show_time(name):
30 start = time.time()
31 yield
32 end = time.time()
33 print(" -- %s took %0.3f seconds" % (name, end - start))
34
35
36 # This affects how many volumes to create and how much time to *simulate*
37 # passing for that volume to be created.
38 MAX_CREATE_TIME = 3
39 VOLUME_COUNT = 5
40
41 # This will be used to determine if all the volumes are created in parallel
42 # or whether the volumes are created serially (in an undefined ordered since
43 # a unordered flow is used). Note that there is a disconnection between the
44 # ordering and the concept of parallelism (since unordered items can still be
45 # ran in a serial ordering). A typical use-case for offering both is to allow
46 # for debugging using a serial approach, while when running at a larger scale
47 # one would likely want to use the parallel approach.
48 #
49 # If you switch this flag from serial to parallel you can see the overall
50 # time difference that this causes.
51 SERIAL = False
52 if SERIAL:
53 engine = 'serial'
54 else:
55 engine = 'parallel'
56
57
58 class VolumeCreator(task.Task):
59 def __init__(self, volume_id):
60 # Note here that the volume name is composed of the name of the class
61 # along with the volume id that is being created, since a name of a
62 # task uniquely identifies that task in storage it is important that
63 # the name be relevant and identifiable if the task is recreated for
64 # subsequent resumption (if applicable).
65 #
66 # UUIDs are *not* used as they can not be tied back to a previous tasks
67 # state on resumption (since they are unique and will vary for each
68 # task that is created). A name based off the volume id that is to be
69 # created is more easily tied back to the original task so that the
70 # volume create can be resumed/revert, and is much easier to use for
71 # audit and tracking purposes.
72 base_name = reflection.get_callable_name(self)
73 super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
74 volume_id))
75 self._volume_id = volume_id
76
77 def execute(self):
78 print("Making volume %s" % (self._volume_id))
79 time.sleep(random.random() * MAX_CREATE_TIME)
80 print("Finished making volume %s" % (self._volume_id))
81
82
83 # Assume there is no ordering dependency between volumes.
84 flow = uf.Flow("volume-maker")
85 for i in range(0, VOLUME_COUNT):
86 flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
87
88
89 # Show how much time the overall engine loading and running takes.
90 with show_time(name=flow.name.title()):
91 eng = engines.load(flow, engine=engine)
92 # This context manager automatically adds (and automatically removes) a
93 # helpful set of state transition notification printing helper utilities
94 # that show you exactly what transitions the engine is going through
95 # while running the various volume create tasks.
96 with printing.PrintingListener(eng):
97 eng.run()
Note
Full source located at simple_map_reduce
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 self_dir = os.path.abspath(os.path.dirname(__file__))
9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12 sys.path.insert(0, top_dir)
13 sys.path.insert(0, self_dir)
14
15 # INTRO: These examples show a simplistic map/reduce implementation where
16 # a set of mapper(s) will sum a series of input numbers (in parallel) and
17 # return their individual summed result. A reducer will then use those
18 # produced values and perform a final summation and this result will then be
19 # printed (and verified to ensure the calculation was as expected).
20
21 import six
22
23 from taskflow import engines
24 from taskflow.patterns import linear_flow
25 from taskflow.patterns import unordered_flow
26 from taskflow import task
27
28
29 class SumMapper(task.Task):
30 def execute(self, inputs):
31 # Sums some set of provided inputs.
32 return sum(inputs)
33
34
35 class TotalReducer(task.Task):
36 def execute(self, *args, **kwargs):
37 # Reduces all mapped summed outputs into a single value.
38 total = 0
39 for (k, v) in six.iteritems(kwargs):
40 # If any other kwargs was passed in, we don't want to use those
41 # in the calculation of the total...
42 if k.startswith('reduction_'):
43 total += v
44 return total
45
46
47 def chunk_iter(chunk_size, upperbound):
48 """Yields back chunk size pieces from zero to upperbound - 1."""
49 chunk = []
50 for i in range(0, upperbound):
51 chunk.append(i)
52 if len(chunk) == chunk_size:
53 yield chunk
54 chunk = []
55
56
57 # Upper bound of numbers to sum for example purposes...
58 UPPER_BOUND = 10000
59
60 # How many mappers we want to have.
61 SPLIT = 10
62
63 # How big of a chunk we want to give each mapper.
64 CHUNK_SIZE = UPPER_BOUND // SPLIT
65
66 # This will be the workflow we will compose and run.
67 w = linear_flow.Flow("root")
68
69 # The mappers will run in parallel.
70 store = {}
71 provided = []
72 mappers = unordered_flow.Flow('map')
73 for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
74 mapper_name = 'mapper_%s' % i
75 # Give that mapper some information to compute.
76 store[mapper_name] = chunk
77 # The reducer uses all of the outputs of the mappers, so it needs
78 # to be recorded that it needs access to them (under a specific name).
79 provided.append("reduction_%s" % i)
80 mappers.add(SumMapper(name=mapper_name,
81 rebind={'inputs': mapper_name},
82 provides=provided[-1]))
83 w.add(mappers)
84
85 # The reducer will run last (after all the mappers).
86 w.add(TotalReducer('reducer', requires=provided))
87
88 # Now go!
89 e = engines.load(w, engine='parallel', store=store, max_workers=4)
90 print("Running a parallel engine with options: %s" % e.options)
91 e.run()
92
93 # Now get the result the reducer created.
94 total = e.storage.get('reducer')
95 print("Calculated result = %s" % total)
96
97 # Calculate it manually to verify that it worked...
98 calc_total = sum(range(0, UPPER_BOUND))
99 if calc_total != total:
100 sys.exit(1)
Note
Full source located at share_engine_thread
1
2 import logging
3 import os
4 import random
5 import sys
6 import time
7
8 logging.basicConfig(level=logging.ERROR)
9
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13 sys.path.insert(0, top_dir)
14
15 import futurist
16 import six
17
18 from taskflow import engines
19 from taskflow.patterns import unordered_flow as uf
20 from taskflow import task
21 from taskflow.utils import threading_utils as tu
22
23 # INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
24 # run it using a shared thread pool executor to show how a single executor can
25 # be used with more than one engine (sharing the execution thread pool between
26 # them); this allows for saving resources and reusing threads in situations
27 # where this is benefical.
28
29
30 class DelayedTask(task.Task):
31 def __init__(self, name):
32 super(DelayedTask, self).__init__(name=name)
33 self._wait_for = random.random()
34
35 def execute(self):
36 print("Running '%s' in thread '%s'" % (self.name, tu.get_ident()))
37 time.sleep(self._wait_for)
38
39
40 f1 = uf.Flow("f1")
41 f1.add(DelayedTask("f1-1"))
42 f1.add(DelayedTask("f1-2"))
43
44 f2 = uf.Flow("f2")
45 f2.add(DelayedTask("f2-1"))
46 f2.add(DelayedTask("f2-2"))
47
48 # Run them all using the same futures (thread-pool based) executor...
49 with futurist.ThreadPoolExecutor() as ex:
50 e1 = engines.load(f1, engine='parallel', executor=ex)
51 e2 = engines.load(f2, engine='parallel', executor=ex)
52 iters = [e1.run_iter(), e2.run_iter()]
53 # Iterate over a copy (so we can remove from the source list).
54 cloned_iters = list(iters)
55 while iters:
56 # Run a single 'step' of each iterator, forcing each engine to perform
57 # some work, then yield, and repeat until each iterator is consumed
58 # and there is no more engine work to be done.
59 for it in cloned_iters:
60 try:
61 six.next(it)
62 except StopIteration:
63 try:
64 iters.remove(it)
65 except ValueError:
66 pass
Note
Full source located at fake_billing
1
2 import json
3 import logging
4 import os
5 import sys
6 import time
7
8 logging.basicConfig(level=logging.ERROR)
9
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13 sys.path.insert(0, top_dir)
14
15 from oslo_utils import uuidutils
16
17 from taskflow import engines
18 from taskflow.listeners import printing
19 from taskflow.patterns import graph_flow as gf
20 from taskflow.patterns import linear_flow as lf
21 from taskflow import task
22 from taskflow.utils import misc
23
24 # INTRO: This example walks through a miniature workflow which simulates
25 # the reception of an API request, creation of a database entry, driver
26 # activation (which invokes a 'fake' webservice) and final completion.
27 #
28 # This example also shows how a function/object (in this class the url sending)
29 # that occurs during driver activation can update the progress of a task
30 # without being aware of the internals of how to do this by associating a
31 # callback that the url sending can update as the sending progresses from 0.0%
32 # complete to 100% complete.
33
34
35 class DB(object):
36 def query(self, sql):
37 print("Querying with: %s" % (sql))
38
39
40 class UrlCaller(object):
41 def __init__(self):
42 self._send_time = 0.5
43 self._chunks = 25
44
45 def send(self, url, data, status_cb=None):
46 sleep_time = float(self._send_time) / self._chunks
47 for i in range(0, len(data)):
48 time.sleep(sleep_time)
49 # As we send the data, each chunk we 'fake' send will progress
50 # the sending progress that much further to 100%.
51 if status_cb:
52 status_cb(float(i) / len(data))
53
54
55 # Since engines save the output of tasks to a optional persistent storage
56 # backend resources have to be dealt with in a slightly different manner since
57 # resources are transient and can *not* be persisted (or serialized). For tasks
58 # that require access to a set of resources it is a common pattern to provide
59 # a object (in this case this object) on construction of those tasks via the
60 # task constructor.
61 class ResourceFetcher(object):
62 def __init__(self):
63 self._db_handle = None
64 self._url_handle = None
65
66 @property
67 def db_handle(self):
68 if self._db_handle is None:
69 self._db_handle = DB()
70 return self._db_handle
71
72 @property
73 def url_handle(self):
74 if self._url_handle is None:
75 self._url_handle = UrlCaller()
76 return self._url_handle
77
78
79 class ExtractInputRequest(task.Task):
80 def __init__(self, resources):
81 super(ExtractInputRequest, self).__init__(provides="parsed_request")
82 self._resources = resources
83
84 def execute(self, request):
85 return {
86 'user': request.user,
87 'user_id': misc.as_int(request.id),
88 'request_id': uuidutils.generate_uuid(),
89 }
90
91
92 class MakeDBEntry(task.Task):
93 def __init__(self, resources):
94 super(MakeDBEntry, self).__init__()
95 self._resources = resources
96
97 def execute(self, parsed_request):
98 db_handle = self._resources.db_handle
99 db_handle.query("INSERT %s INTO mydb" % (parsed_request))
100
101 def revert(self, result, parsed_request):
102 db_handle = self._resources.db_handle
103 db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
104
105
106 class ActivateDriver(task.Task):
107 def __init__(self, resources):
108 super(ActivateDriver, self).__init__(provides='sent_to')
109 self._resources = resources
110 self._url = "http://blahblah.com"
111
112 def execute(self, parsed_request):
113 print("Sending billing data to %s" % (self._url))
114 url_sender = self._resources.url_handle
115 # Note that here we attach our update_progress function (which is a
116 # function that the engine also 'binds' to) to the progress function
117 # that the url sending helper class uses. This allows the task progress
118 # to be tied to the url sending progress, which is very useful for
119 # downstream systems to be aware of what a task is doing at any time.
120 url_sender.send(self._url, json.dumps(parsed_request),
121 status_cb=self.update_progress)
122 return self._url
123
124 def update_progress(self, progress, **kwargs):
125 # Override the parent method to also print out the status.
126 super(ActivateDriver, self).update_progress(progress, **kwargs)
127 print("%s is %0.2f%% done" % (self.name, progress * 100))
128
129
130 class DeclareSuccess(task.Task):
131 def execute(self, sent_to):
132 print("Done!")
133 print("All data processed and sent to %s" % (sent_to))
134
135
136 class DummyUser(object):
137 def __init__(self, user, id_):
138 self.user = user
139 self.id = id_
140
141
142 # Resources (db handles and similar) of course can *not* be persisted so we
143 # need to make sure that we pass this resource fetcher to the tasks constructor
144 # so that the tasks have access to any needed resources (the resources are
145 # lazily loaded so that they are only created when they are used).
146 resources = ResourceFetcher()
147 flow = lf.Flow("initialize-me")
148
149 # 1. First we extract the api request into a usable format.
150 # 2. Then we go ahead and make a database entry for our request.
151 flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
152
153 # 3. Then we activate our payment method and finally declare success.
154 sub_flow = gf.Flow("after-initialize")
155 sub_flow.add(ActivateDriver(resources), DeclareSuccess())
156 flow.add(sub_flow)
157
158 # Initially populate the storage with the following request object,
159 # prepopulating this allows the tasks that dependent on the 'request' variable
160 # to start processing (in this case this is the ExtractInputRequest task).
161 store = {
162 'request': DummyUser(user="bob", id_="1.35"),
163 }
164 eng = engines.load(flow, engine='serial', store=store)
165
166 # This context manager automatically adds (and automatically removes) a
167 # helpful set of state transition notification printing helper utilities
168 # that show you exactly what transitions the engine is going through
169 # while running the various billing related tasks.
170 with printing.PrintingListener(eng):
171 eng.run()
Note
Full source located at resume_from_backend
1
2 import contextlib
3 import logging
4 import os
5 import sys
6
7 logging.basicConfig(level=logging.ERROR)
8
9 self_dir = os.path.abspath(os.path.dirname(__file__))
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13 sys.path.insert(0, top_dir)
14 sys.path.insert(0, self_dir)
15
16 from oslo_utils import uuidutils
17
18 import taskflow.engines
19 from taskflow.patterns import linear_flow as lf
20 from taskflow.persistence import models
21 from taskflow import task
22
23 import example_utils as eu # noqa
24
25 # INTRO: In this example linear_flow is used to group three tasks, one which
26 # will suspend the future work the engine may do. This suspend engine is then
27 # discarded and the workflow is reloaded from the persisted data and then the
28 # workflow is resumed from where it was suspended. This allows you to see how
29 # to start an engine, have a task stop the engine from doing future work (if
30 # a multi-threaded engine is being used, then the currently active work is not
31 # preempted) and then resume the work later.
32 #
33 # Usage:
34 #
35 # With a filesystem directory as backend
36 #
37 # python taskflow/examples/resume_from_backend.py
38 #
39 # With ZooKeeper as backend
40 #
41 # python taskflow/examples/resume_from_backend.py \
42 # zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
43
44
45 # UTILITY FUNCTIONS #########################################
46
47
48 def print_task_states(flowdetail, msg):
49 eu.print_wrapped(msg)
50 print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
51 # Sort by these so that our test validation doesn't get confused by the
52 # order in which the items in the flow detail can be in.
53 items = sorted((td.name, td.version, td.state, td.results)
54 for td in flowdetail)
55 for item in items:
56 print(" %s==%s: %s, result=%s" % item)
57
58
59 def find_flow_detail(backend, lb_id, fd_id):
60 conn = backend.get_connection()
61 lb = conn.get_logbook(lb_id)
62 return lb.find(fd_id)
63
64
65 # CREATE FLOW ###############################################
66
67
68 class InterruptTask(task.Task):
69 def execute(self):
70 # DO NOT TRY THIS AT HOME
71 engine.suspend()
72
73
74 class TestTask(task.Task):
75 def execute(self):
76 print('executing %s' % self)
77 return 'ok'
78
79
80 def flow_factory():
81 return lf.Flow('resume from backend example').add(
82 TestTask(name='first'),
83 InterruptTask(name='boom'),
84 TestTask(name='second'))
85
86
87 # INITIALIZE PERSISTENCE ####################################
88
89 with eu.get_backend() as backend:
90
91 # Create a place where the persistence information will be stored.
92 book = models.LogBook("example")
93 flow_detail = models.FlowDetail("resume from backend example",
94 uuid=uuidutils.generate_uuid())
95 book.add(flow_detail)
96 with contextlib.closing(backend.get_connection()) as conn:
97 conn.save_logbook(book)
98
99 # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101 flow = flow_factory()
102 engine = taskflow.engines.load(flow, flow_detail=flow_detail,
103 book=book, backend=backend)
104
105 print_task_states(flow_detail, "At the beginning, there is no state")
106 eu.print_wrapped("Running")
107 engine.run()
108 print_task_states(flow_detail, "After running")
109
110 # RE-CREATE, RESUME, RUN ####################################
111
112 eu.print_wrapped("Resuming and running again")
113
114 # NOTE(harlowja): reload the flow detail from backend, this will allow us
115 # to resume the flow from its suspended state, but first we need to search
116 # for the right flow details in the correct logbook where things are
117 # stored.
118 #
119 # We could avoid re-loading the engine and just do engine.run() again, but
120 # this example shows how another process may unsuspend a given flow and
121 # start it again for situations where this is useful to-do (say the process
122 # running the above flow crashes).
123 flow2 = flow_factory()
124 flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
125 engine2 = taskflow.engines.load(flow2,
126 flow_detail=flow_detail_2,
127 backend=backend, book=book)
128 engine2.run()
129 print_task_states(flow_detail_2, "At the end")
Note
Full source located at resume_vm_boot
1
2 import contextlib
3 import hashlib
4 import logging
5 import os
6 import random
7 import sys
8 import time
9
10 logging.basicConfig(level=logging.ERROR)
11
12 self_dir = os.path.abspath(os.path.dirname(__file__))
13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16 sys.path.insert(0, top_dir)
17 sys.path.insert(0, self_dir)
18
19 import futurist
20 from oslo_utils import uuidutils
21
22 from taskflow import engines
23 from taskflow import exceptions as exc
24 from taskflow.patterns import graph_flow as gf
25 from taskflow.patterns import linear_flow as lf
26 from taskflow.persistence import models
27 from taskflow import task
28
29 import example_utils as eu # noqa
30
31 # INTRO: These examples show how a hierarchy of flows can be used to create a
32 # vm in a reliable & resumable manner using taskflow + a miniature version of
33 # what nova does while booting a vm.
34
35
36 @contextlib.contextmanager
37 def slow_down(how_long=0.5):
38 try:
39 yield how_long
40 finally:
41 if len(sys.argv) > 1:
42 # Only both to do this if user input provided.
43 print("** Ctrl-c me please!!! **")
44 time.sleep(how_long)
45
46
47 class PrintText(task.Task):
48 """Just inserts some text print outs in a workflow."""
49 def __init__(self, print_what, no_slow=False):
50 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
51 super(PrintText, self).__init__(name="Print: %s" % (content_hash))
52 self._text = print_what
53 self._no_slow = no_slow
54
55 def execute(self):
56 if self._no_slow:
57 eu.print_wrapped(self._text)
58 else:
59 with slow_down():
60 eu.print_wrapped(self._text)
61
62
63 class DefineVMSpec(task.Task):
64 """Defines a vm specification to be."""
65 def __init__(self, name):
66 super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)
67
68 def execute(self):
69 return {
70 'type': 'kvm',
71 'disks': 2,
72 'vcpu': 1,
73 'ips': 1,
74 'volumes': 3,
75 }
76
77
78 class LocateImages(task.Task):
79 """Locates where the vm images are."""
80 def __init__(self, name):
81 super(LocateImages, self).__init__(provides='image_locations',
82 name=name)
83
84 def execute(self, vm_spec):
85 image_locations = {}
86 for i in range(0, vm_spec['disks']):
87 url = "http://www.yahoo.com/images/%s" % (i)
88 image_locations[url] = "/tmp/%s.img" % (i)
89 return image_locations
90
91
92 class DownloadImages(task.Task):
93 """Downloads all the vm images."""
94 def __init__(self, name):
95 super(DownloadImages, self).__init__(provides='download_paths',
96 name=name)
97
98 def execute(self, image_locations):
99 for src, loc in image_locations.items():
100 with slow_down(1):
101 print("Downloading from %s => %s" % (src, loc))
102 return sorted(image_locations.values())
103
104
105 class CreateNetworkTpl(task.Task):
106 """Generates the network settings file to be placed in the images."""
107 SYSCONFIG_CONTENTS = """DEVICE=eth%s
108 BOOTPROTO=static
109 IPADDR=%s
110 ONBOOT=yes"""
111
112 def __init__(self, name):
113 super(CreateNetworkTpl, self).__init__(provides='network_settings',
114 name=name)
115
116 def execute(self, ips):
117 settings = []
118 for i, ip in enumerate(ips):
119 settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120 return settings
121
122
123 class AllocateIP(task.Task):
124 """Allocates the ips for the given vm."""
125 def __init__(self, name):
126 super(AllocateIP, self).__init__(provides='ips', name=name)
127
128 def execute(self, vm_spec):
129 ips = []
130 for _i in range(0, vm_spec.get('ips', 0)):
131 ips.append("192.168.0.%s" % (random.randint(1, 254)))
132 return ips
133
134
135 class WriteNetworkSettings(task.Task):
136 """Writes all the network settings into the downloaded images."""
137 def execute(self, download_paths, network_settings):
138 for j, path in enumerate(download_paths):
139 with slow_down(1):
140 print("Mounting %s to /tmp/%s" % (path, j))
141 for i, setting in enumerate(network_settings):
142 filename = ("/tmp/etc/sysconfig/network-scripts/"
143 "ifcfg-eth%s" % (i))
144 with slow_down(1):
145 print("Writing to %s" % (filename))
146 print(setting)
147
148
149 class BootVM(task.Task):
150 """Fires off the vm boot operation."""
151 def execute(self, vm_spec):
152 print("Starting vm!")
153 with slow_down(1):
154 print("Created: %s" % (vm_spec))
155
156
157 class AllocateVolumes(task.Task):
158 """Allocates the volumes for the vm."""
159 def execute(self, vm_spec):
160 volumes = []
161 for i in range(0, vm_spec['volumes']):
162 with slow_down(1):
163 volumes.append("/dev/vda%s" % (i + 1))
164 print("Allocated volume %s" % volumes[-1])
165 return volumes
166
167
168 class FormatVolumes(task.Task):
169 """Formats the volumes for the vm."""
170 def execute(self, volumes):
171 for v in volumes:
172 print("Formatting volume %s" % v)
173 with slow_down(1):
174 pass
175 print("Formatted volume %s" % v)
176
177
178 def create_flow():
179 # Setup the set of things to do (mini-nova).
180 flow = lf.Flow("root").add(
181 PrintText("Starting vm creation.", no_slow=True),
182 lf.Flow('vm-maker').add(
183 # First create a specification for the final vm to-be.
184 DefineVMSpec("define_spec"),
185 # This does all the image stuff.
186 gf.Flow("img-maker").add(
187 LocateImages("locate_images"),
188 DownloadImages("download_images"),
189 ),
190 # This does all the network stuff.
191 gf.Flow("net-maker").add(
192 AllocateIP("get_my_ips"),
193 CreateNetworkTpl("fetch_net_settings"),
194 WriteNetworkSettings("write_net_settings"),
195 ),
196 # This does all the volume stuff.
197 gf.Flow("volume-maker").add(
198 AllocateVolumes("allocate_my_volumes", provides='volumes'),
199 FormatVolumes("volume_formatter"),
200 ),
201 # Finally boot it all.
202 BootVM("boot-it"),
203 ),
204 # Ya it worked!
205 PrintText("Finished vm create.", no_slow=True),
206 PrintText("Instance is running!", no_slow=True))
207 return flow
208
209 eu.print_wrapped("Initializing")
210
211 # Setup the persistence & resumption layer.
212 with eu.get_backend() as backend:
213
214 # Try to find a previously passed in tracking id...
215 try:
216 book_id, flow_id = sys.argv[2].split("+", 1)
217 if not uuidutils.is_uuid_like(book_id):
218 book_id = None
219 if not uuidutils.is_uuid_like(flow_id):
220 flow_id = None
221 except (IndexError, ValueError):
222 book_id = None
223 flow_id = None
224
225 # Set up how we want our engine to run, serial, parallel...
226 try:
227 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
228 except RuntimeError:
229 # No eventlet installed, just let the default be used instead.
230 executor = None
231
232 # Create/fetch a logbook that will track the workflows work.
233 book = None
234 flow_detail = None
235 if all([book_id, flow_id]):
236 # Try to find in a prior logbook and flow detail...
237 with contextlib.closing(backend.get_connection()) as conn:
238 try:
239 book = conn.get_logbook(book_id)
240 flow_detail = book.find(flow_id)
241 except exc.NotFound:
242 pass
243 if book is None and flow_detail is None:
244 book = models.LogBook("vm-boot")
245 with contextlib.closing(backend.get_connection()) as conn:
246 conn.save_logbook(book)
247 engine = engines.load_from_factory(create_flow,
248 backend=backend, book=book,
249 engine='parallel',
250 executor=executor)
251 print("!! Your tracking id is: '%s+%s'" % (book.uuid,
252 engine.storage.flow_uuid))
253 print("!! Please submit this on later runs for tracking purposes")
254 else:
255 # Attempt to load from a previously partially completed flow.
256 engine = engines.load_from_detail(flow_detail, backend=backend,
257 engine='parallel', executor=executor)
258
259 # Make me my vm please!
260 eu.print_wrapped('Running')
261 engine.run()
262
263 # How to use.
264 #
265 # 1. $ python me.py "sqlite:////tmp/nova.db"
266 # 2. ctrl-c before this finishes
267 # 3. Find the tracking id (search for 'Your tracking id is')
268 # 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
269 # 5. Watch it pick up where it left off.
270 # 6. Profit!
Note
Full source located at resume_volume_create
1
2 import contextlib
3 import hashlib
4 import logging
5 import os
6 import random
7 import sys
8 import time
9
10 logging.basicConfig(level=logging.ERROR)
11
12 self_dir = os.path.abspath(os.path.dirname(__file__))
13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16 sys.path.insert(0, top_dir)
17 sys.path.insert(0, self_dir)
18
19 from oslo_utils import uuidutils
20
21 from taskflow import engines
22 from taskflow.patterns import graph_flow as gf
23 from taskflow.patterns import linear_flow as lf
24 from taskflow.persistence import models
25 from taskflow import task
26
27 import example_utils # noqa
28
29 # INTRO: These examples show how a hierarchy of flows can be used to create a
30 # pseudo-volume in a reliable & resumable manner using taskflow + a miniature
31 # version of what cinder does while creating a volume (very miniature).
32
33
34 @contextlib.contextmanager
35 def slow_down(how_long=0.5):
36 try:
37 yield how_long
38 finally:
39 print("** Ctrl-c me please!!! **")
40 time.sleep(how_long)
41
42
43 def find_flow_detail(backend, book_id, flow_id):
44 # NOTE(harlowja): this is used to attempt to find a given logbook with
45 # a given id and a given flow details inside that logbook, we need this
46 # reference so that we can resume the correct flow (as a logbook tracks
47 # flows and a flow detail tracks a individual flow).
48 #
49 # Without a reference to the logbook and the flow details in that logbook
50 # we will not know exactly what we should resume and that would mean we
51 # can't resume what we don't know.
52 with contextlib.closing(backend.get_connection()) as conn:
53 lb = conn.get_logbook(book_id)
54 return lb.find(flow_id)
55
56
57 class PrintText(task.Task):
58 def __init__(self, print_what, no_slow=False):
59 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
60 super(PrintText, self).__init__(name="Print: %s" % (content_hash))
61 self._text = print_what
62 self._no_slow = no_slow
63
64 def execute(self):
65 if self._no_slow:
66 print("-" * (len(self._text)))
67 print(self._text)
68 print("-" * (len(self._text)))
69 else:
70 with slow_down():
71 print("-" * (len(self._text)))
72 print(self._text)
73 print("-" * (len(self._text)))
74
75
76 class CreateSpecForVolumes(task.Task):
77 def execute(self):
78 volumes = []
79 for i in range(0, random.randint(1, 10)):
80 volumes.append({
81 'type': 'disk',
82 'location': "/dev/vda%s" % (i + 1),
83 })
84 return volumes
85
86
87 class PrepareVolumes(task.Task):
88 def execute(self, volume_specs):
89 for v in volume_specs:
90 with slow_down():
91 print("Dusting off your hard drive %s" % (v))
92 with slow_down():
93 print("Taking a well deserved break.")
94 print("Your drive %s has been certified." % (v))
95
96
97 # Setup the set of things to do (mini-cinder).
98 flow = lf.Flow("root").add(
99 PrintText("Starting volume create", no_slow=True),
100 gf.Flow('maker').add(
101 CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102 PrintText("I need a nap, it took me a while to build those specs."),
103 PrepareVolumes(),
104 ),
105 PrintText("Finished volume create", no_slow=True))
106
107 # Setup the persistence & resumption layer.
108 with example_utils.get_backend() as backend:
109 try:
110 book_id, flow_id = sys.argv[2].split("+", 1)
111 except (IndexError, ValueError):
112 book_id = None
113 flow_id = None
114
115 if not all([book_id, flow_id]):
116 # If no 'tracking id' (think a fedex or ups tracking id) is provided
117 # then we create one by creating a logbook (where flow details are
118 # stored) and creating a flow detail (where flow and task state is
119 # stored). The combination of these 2 objects unique ids (uuids) allows
120 # the users of taskflow to reassociate the workflows that were
121 # potentially running (and which may have partially completed) back
122 # with taskflow so that those workflows can be resumed (or reverted)
123 # after a process/thread/engine has failed in someway.
124 book = models.LogBook('resume-volume-create')
125 flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
126 book.add(flow_detail)
127 with contextlib.closing(backend.get_connection()) as conn:
128 conn.save_logbook(book)
129 print("!! Your tracking id is: '%s+%s'" % (book.uuid,
130 flow_detail.uuid))
131 print("!! Please submit this on later runs for tracking purposes")
132 else:
133 flow_detail = find_flow_detail(backend, book_id, flow_id)
134
135 # Load and run.
136 engine = engines.load(flow,
137 flow_detail=flow_detail,
138 backend=backend, engine='serial')
139 engine.run()
140
141 # How to use.
142 #
143 # 1. $ python me.py "sqlite:////tmp/cinder.db"
144 # 2. ctrl-c before this finishes
145 # 3. Find the tracking id (search for 'Your tracking id is')
146 # 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
147 # 5. Profit!
Note
Full source located at run_by_iter
1
2 import logging
3 import os
4 import sys
5
6 import six
7
8 logging.basicConfig(level=logging.ERROR)
9
10 self_dir = os.path.abspath(os.path.dirname(__file__))
11 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14 sys.path.insert(0, top_dir)
15 sys.path.insert(0, self_dir)
16
17
18 from taskflow import engines
19 from taskflow.patterns import linear_flow as lf
20 from taskflow import task
21
22
23 # INTRO: This example shows how to run a set of engines at the same time, each
24 # running in different engines using a single thread of control to iterate over
25 # each engine (which causes that engine to advanced to its next state during
26 # each iteration).
27
28
29 class EchoTask(task.Task):
30 def execute(self, value):
31 print(value)
32 return chr(ord(value) + 1)
33
34
35 def make_alphabet_flow(i):
36 f = lf.Flow("alphabet_%s" % (i))
37 start_value = 'A'
38 end_value = 'Z'
39 curr_value = start_value
40 while ord(curr_value) <= ord(end_value):
41 next_value = chr(ord(curr_value) + 1)
42 if curr_value != end_value:
43 f.add(EchoTask(name="echoer_%s" % curr_value,
44 rebind={'value': curr_value},
45 provides=next_value))
46 else:
47 f.add(EchoTask(name="echoer_%s" % curr_value,
48 rebind={'value': curr_value}))
49 curr_value = next_value
50 return f
51
52
53 # Adjust this number to change how many engines/flows run at once.
54 flow_count = 1
55 flows = []
56 for i in range(0, flow_count):
57 f = make_alphabet_flow(i + 1)
58 flows.append(make_alphabet_flow(i + 1))
59 engine_iters = []
60 for f in flows:
61 e = engines.load(f)
62 e.compile()
63 e.storage.inject({'A': 'A'})
64 e.prepare()
65 engine_iters.append(e.run_iter())
66 while engine_iters:
67 for it in list(engine_iters):
68 try:
69 print(six.next(it))
70 except StopIteration:
71 engine_iters.remove(it)
Note
Full source located at retry_flow
1
2 import logging
3 import os
4 import sys
5
6 logging.basicConfig(level=logging.ERROR)
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import retry
16 from taskflow import task
17
18 # INTRO: In this example we create a retry controller that receives a phone
19 # directory and tries different phone numbers. The next task tries to call Jim
20 # using the given number. If it is not a Jim's number, the task raises an
21 # exception and retry controller takes the next number from the phone
22 # directory and retries the call.
23 #
24 # This example shows a basic usage of retry controllers in a flow.
25 # Retry controllers allows to revert and retry a failed subflow with new
26 # parameters.
27
28
29 class CallJim(task.Task):
30 def execute(self, jim_number):
31 print("Calling jim %s." % jim_number)
32 if jim_number != 555:
33 raise Exception("Wrong number!")
34 else:
35 print("Hello Jim!")
36
37 def revert(self, jim_number, **kwargs):
38 print("Wrong number, apologizing.")
39
40
41 # Create your flow and associated tasks (the work to be done).
42 flow = lf.Flow('retrying-linear',
43 retry=retry.ParameterizedForEach(
44 rebind=['phone_directory'],
45 provides='jim_number')).add(CallJim())
46
47 # Now run that flow using the provided initial data (store below).
48 taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})
Note
Full source located at wbe_simple_linear
1
2 import json
3 import logging
4 import os
5 import sys
6 import tempfile
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 from taskflow import engines
14 from taskflow.engines.worker_based import worker
15 from taskflow.patterns import linear_flow as lf
16 from taskflow.tests import utils
17 from taskflow.utils import threading_utils
18
19 import example_utils # noqa
20
21 # INTRO: This example walks through a miniature workflow which shows how to
22 # start up a number of workers (these workers will process task execution and
23 # reversion requests using any provided input data) and then use an engine
24 # that creates a set of *capable* tasks and flows (the engine can not create
25 # tasks that the workers are not able to run, this will end in failure) that
26 # those workers will run and then executes that workflow seamlessly using the
27 # workers to perform the actual execution.
28 #
29 # NOTE(harlowja): this example simulates the expected larger number of workers
30 # by using a set of threads (which in this example simulate the remote workers
31 # that would typically be running on other external machines).
32
33 # A filesystem can also be used as the queue transport (useful as simple
34 # transport type that does not involve setting up a larger mq system). If this
35 # is false then the memory transport is used instead, both work in standalone
36 # setups.
37 USE_FILESYSTEM = False
38 BASE_SHARED_CONF = {
39 'exchange': 'taskflow',
40 }
41
42 # Until https://github.com/celery/kombu/issues/398 is resolved it is not
43 # recommended to run many worker threads in this example due to the types
44 # of errors mentioned in that issue.
45 MEMORY_WORKERS = 2
46 FILE_WORKERS = 1
47 WORKER_CONF = {
48 # These are the tasks the worker can execute, they *must* be importable,
49 # typically this list is used to restrict what workers may execute to
50 # a smaller set of *allowed* tasks that are known to be safe (one would
51 # not want to allow all python code to be executed).
52 'tasks': [
53 'taskflow.tests.utils:TaskOneArgOneReturn',
54 'taskflow.tests.utils:TaskMultiArgOneReturn'
55 ],
56 }
57
58
59 def run(engine_options):
60 flow = lf.Flow('simple-linear').add(
61 utils.TaskOneArgOneReturn(provides='result1'),
62 utils.TaskMultiArgOneReturn(provides='result2')
63 )
64 eng = engines.load(flow,
65 store=dict(x=111, y=222, z=333),
66 engine='worker-based', **engine_options)
67 eng.run()
68 return eng.storage.fetch_all()
69
70
71 if __name__ == "__main__":
72 logging.basicConfig(level=logging.ERROR)
73
74 # Setup our transport configuration and merge it into the worker and
75 # engine configuration so that both of those use it correctly.
76 shared_conf = dict(BASE_SHARED_CONF)
77
78 tmp_path = None
79 if USE_FILESYSTEM:
80 worker_count = FILE_WORKERS
81 tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
82 shared_conf.update({
83 'transport': 'filesystem',
84 'transport_options': {
85 'data_folder_in': tmp_path,
86 'data_folder_out': tmp_path,
87 'polling_interval': 0.1,
88 },
89 })
90 else:
91 worker_count = MEMORY_WORKERS
92 shared_conf.update({
93 'transport': 'memory',
94 'transport_options': {
95 'polling_interval': 0.1,
96 },
97 })
98 worker_conf = dict(WORKER_CONF)
99 worker_conf.update(shared_conf)
100 engine_options = dict(shared_conf)
101 workers = []
102 worker_topics = []
103
104 try:
105 # Create a set of workers to simulate actual remote workers.
106 print('Running %s workers.' % (worker_count))
107 for i in range(0, worker_count):
108 worker_conf['topic'] = 'worker-%s' % (i + 1)
109 worker_topics.append(worker_conf['topic'])
110 w = worker.Worker(**worker_conf)
111 runner = threading_utils.daemon_thread(w.run)
112 runner.start()
113 w.wait()
114 workers.append((runner, w.stop))
115
116 # Now use those workers to do something.
117 print('Executing some work.')
118 engine_options['topics'] = worker_topics
119 result = run(engine_options)
120 print('Execution finished.')
121 # This is done so that the test examples can work correctly
122 # even when the keys change order (which will happen in various
123 # python versions).
124 print("Result = %s" % json.dumps(result, sort_keys=True))
125 finally:
126 # And cleanup.
127 print('Stopping workers.')
128 while workers:
129 r, stopper = workers.pop()
130 stopper()
131 r.join()
132 if tmp_path:
133 example_utils.rm_path(tmp_path)
Note
Full source located at wbe_event_sender
1
2 import logging
3 import os
4 import string
5 import sys
6 import time
7
8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11 sys.path.insert(0, top_dir)
12
13 from six.moves import range as compat_range
14
15 from taskflow import engines
16 from taskflow.engines.worker_based import worker
17 from taskflow.patterns import linear_flow as lf
18 from taskflow import task
19 from taskflow.types import notifier
20 from taskflow.utils import threading_utils
21
22 ANY = notifier.Notifier.ANY
23
24 # INTRO: These examples show how to use a remote worker's event notification
25 # attribute to proxy back task event notifications to the controlling process.
26 #
27 # In this case a simple set of events is triggered by a worker running a
28 # task (simulated to be remote by using a kombu memory transport and threads).
29 # Those events that the 'remote worker' produces will then be proxied back to
30 # the task that the engine is running 'remotely', and then they will be emitted
31 # back to the original callbacks that exist in the originating engine
32 # process/thread. This creates a one-way *notification* channel that can
33 # transparently be used in-process, outside-of-process using remote workers and
34 # so-on that allows tasks to signal to its controlling process some sort of
35 # action that has occurred that the task may need to tell others about (for
36 # example to trigger some type of response when the task reaches 50% done...).
37
38
39 def event_receiver(event_type, details):
40 """This is the callback that (in this example) doesn't do much..."""
41 print("Recieved event '%s'" % event_type)
42 print("Details = %s" % details)
43
44
45 class EventReporter(task.Task):
46 """This is the task that will be running 'remotely' (not really remote)."""
47
48 EVENTS = tuple(string.ascii_uppercase)
49 EVENT_DELAY = 0.1
50
51 def execute(self):
52 for i, e in enumerate(self.EVENTS):
53 details = {
54 'leftover': self.EVENTS[i:],
55 }
56 self.notifier.notify(e, details)
57 time.sleep(self.EVENT_DELAY)
58
59
60 BASE_SHARED_CONF = {
61 'exchange': 'taskflow',
62 'transport': 'memory',
63 'transport_options': {
64 'polling_interval': 0.1,
65 },
66 }
67
68 # Until https://github.com/celery/kombu/issues/398 is resolved it is not
69 # recommended to run many worker threads in this example due to the types
70 # of errors mentioned in that issue.
71 MEMORY_WORKERS = 1
72 WORKER_CONF = {
73 'tasks': [
74 # Used to locate which tasks we can run (we don't want to allow
75 # arbitrary code/tasks to be ran by any worker since that would
76 # open up a variety of vulnerabilities).
77 '%s:EventReporter' % (__name__),
78 ],
79 }
80
81
82 def run(engine_options):
83 reporter = EventReporter()
84 reporter.notifier.register(ANY, event_receiver)
85 flow = lf.Flow('event-reporter').add(reporter)
86 eng = engines.load(flow, engine='worker-based', **engine_options)
87 eng.run()
88
89
90 if __name__ == "__main__":
91 logging.basicConfig(level=logging.ERROR)
92
93 # Setup our transport configuration and merge it into the worker and
94 # engine configuration so that both of those objects use it correctly.
95 worker_conf = dict(WORKER_CONF)
96 worker_conf.update(BASE_SHARED_CONF)
97 engine_options = dict(BASE_SHARED_CONF)
98 workers = []
99
100 # These topics will be used to request worker information on; those
101 # workers will respond with their capabilities which the executing engine
102 # will use to match pending tasks to a matched worker, this will cause
103 # the task to be sent for execution, and the engine will wait until it
104 # is finished (a response is received) and then the engine will either
105 # continue with other tasks, do some retry/failure resolution logic or
106 # stop (and potentially re-raise the remote workers failure)...
107 worker_topics = []
108
109 try:
110 # Create a set of worker threads to simulate actual remote workers...
111 print('Running %s workers.' % (MEMORY_WORKERS))
112 for i in compat_range(0, MEMORY_WORKERS):
113 # Give each one its own unique topic name so that they can
114 # correctly communicate with the engine (they will all share the
115 # same exchange).
116 worker_conf['topic'] = 'worker-%s' % (i + 1)
117 worker_topics.append(worker_conf['topic'])
118 w = worker.Worker(**worker_conf)
119 runner = threading_utils.daemon_thread(w.run)
120 runner.start()
121 w.wait()
122 workers.append((runner, w.stop))
123
124 # Now use those workers to do something.
125 print('Executing some work.')
126 engine_options['topics'] = worker_topics
127 result = run(engine_options)
128 print('Execution finished.')
129 finally:
130 # And cleanup.
131 print('Stopping workers.')
132 while workers:
133 r, stopper = workers.pop()
134 stopper()
135 r.join()
Note
Full source located at wbe_mandelbrot
1
2 import logging
3 import math
4 import os
5 import sys
6
7 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10 sys.path.insert(0, top_dir)
11
12 from six.moves import range as compat_range
13
14 from taskflow import engines
15 from taskflow.engines.worker_based import worker
16 from taskflow.patterns import unordered_flow as uf
17 from taskflow import task
18 from taskflow.utils import threading_utils
19
20 # INTRO: This example walks through a workflow that will in parallel compute
21 # a mandelbrot result set (using X 'remote' workers) and then combine their
22 # results together to form a final mandelbrot fractal image. It shows a usage
23 # of taskflow to perform a well-known embarrassingly parallel problem that has
24 # the added benefit of also being an elegant visualization.
25 #
26 # NOTE(harlowja): this example simulates the expected larger number of workers
27 # by using a set of threads (which in this example simulate the remote workers
28 # that would typically be running on other external machines).
29 #
30 # NOTE(harlowja): to have it produce an image run (after installing pillow):
31 #
32 # $ python taskflow/examples/wbe_mandelbrot.py output.png
33
34 BASE_SHARED_CONF = {
35 'exchange': 'taskflow',
36 }
37 WORKERS = 2
38 WORKER_CONF = {
39 # These are the tasks the worker can execute, they *must* be importable,
40 # typically this list is used to restrict what workers may execute to
41 # a smaller set of *allowed* tasks that are known to be safe (one would
42 # not want to allow all python code to be executed).
43 'tasks': [
44 '%s:MandelCalculator' % (__name__),
45 ],
46 }
47 ENGINE_CONF = {
48 'engine': 'worker-based',
49 }
50
51 # Mandelbrot & image settings...
52 IMAGE_SIZE = (512, 512)
53 CHUNK_COUNT = 8
54 MAX_ITERATIONS = 25
55
56
57 class MandelCalculator(task.Task):
58 def execute(self, image_config, mandelbrot_config, chunk):
59 """Returns the number of iterations before the computation "escapes".
60
61 Given the real and imaginary parts of a complex number, determine if it
62 is a candidate for membership in the mandelbrot set given a fixed
63 number of iterations.
64 """
65
66 # Parts borrowed from (credit to mark harris and benoît mandelbrot).
67 #
68 # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
69 def mandelbrot(x, y, max_iters):
70 c = complex(x, y)
71 z = 0.0j
72 for i in compat_range(max_iters):
73 z = z * z + c
74 if (z.real * z.real + z.imag * z.imag) >= 4:
75 return i
76 return max_iters
77
78 min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
79 height, width = image_config['size']
80 pixel_size_x = (max_x - min_x) / width
81 pixel_size_y = (max_y - min_y) / height
82 block = []
83 for y in compat_range(chunk[0], chunk[1]):
84 row = []
85 imag = min_y + y * pixel_size_y
86 for x in compat_range(0, width):
87 real = min_x + x * pixel_size_x
88 row.append(mandelbrot(real, imag, max_iters))
89 block.append(row)
90 return block
91
92
93 def calculate(engine_conf):
94 # Subdivide the work into X pieces, then request each worker to calculate
95 # one of those chunks and then later we will write these chunks out to
96 # an image bitmap file.
97
98 # And unordered flow is used here since the mandelbrot calculation is an
99 # example of an embarrassingly parallel computation that we can scatter
100 # across as many workers as possible.
101 flow = uf.Flow("mandelbrot")
102
103 # These symbols will be automatically given to tasks as input to their
104 # execute method, in this case these are constants used in the mandelbrot
105 # calculation.
106 store = {
107 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
108 'image_config': {
109 'size': IMAGE_SIZE,
110 }
111 }
112
113 # We need the task names to be in the right order so that we can extract
114 # the final results in the right order (we don't care about the order when
115 # executing).
116 task_names = []
117
118 # Compose our workflow.
119 height, _width = IMAGE_SIZE
120 chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
121 for i in compat_range(0, CHUNK_COUNT):
122 chunk_name = 'chunk_%s' % i
123 task_name = "calculation_%s" % i
124 # Break the calculation up into chunk size pieces.
125 rows = [i * chunk_size, i * chunk_size + chunk_size]
126 flow.add(
127 MandelCalculator(task_name,
128 # This ensures the storage symbol with name
129 # 'chunk_name' is sent into the tasks local
130 # symbol 'chunk'. This is how we give each
131 # calculator its own correct sequence of rows
132 # to work on.
133 rebind={'chunk': chunk_name}))
134 store[chunk_name] = rows
135 task_names.append(task_name)
136
137 # Now execute it.
138 eng = engines.load(flow, store=store, engine_conf=engine_conf)
139 eng.run()
140
141 # Gather all the results and order them for further processing.
142 gather = []
143 for name in task_names:
144 gather.extend(eng.storage.get(name))
145 points = []
146 for y, row in enumerate(gather):
147 for x, color in enumerate(row):
148 points.append(((x, y), color))
149 return points
150
151
152 def write_image(results, output_filename=None):
153 print("Gathered %s results that represents a mandelbrot"
154 " image (using %s chunks that are computed jointly"
155 " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
156 if not output_filename:
157 return
158
159 # Pillow (the PIL fork) saves us from writing our own image writer...
160 try:
161 from PIL import Image
162 except ImportError as e:
163 # To currently get this (may change in the future),
164 # $ pip install Pillow
165 raise RuntimeError("Pillow is required to write image files: %s" % e)
166
167 # Limit to 255, find the max and normalize to that...
168 color_max = 0
169 for _point, color in results:
170 color_max = max(color, color_max)
171
172 # Use gray scale since we don't really have other colors.
173 img = Image.new('L', IMAGE_SIZE, "black")
174 pixels = img.load()
175 for (x, y), color in results:
176 if color_max == 0:
177 color = 0
178 else:
179 color = int((float(color) / color_max) * 255.0)
180 pixels[x, y] = color
181 img.save(output_filename)
182
183
184 def create_fractal():
185 logging.basicConfig(level=logging.ERROR)
186
187 # Setup our transport configuration and merge it into the worker and
188 # engine configuration so that both of those use it correctly.
189 shared_conf = dict(BASE_SHARED_CONF)
190 shared_conf.update({
191 'transport': 'memory',
192 'transport_options': {
193 'polling_interval': 0.1,
194 },
195 })
196
197 if len(sys.argv) >= 2:
198 output_filename = sys.argv[1]
199 else:
200 output_filename = None
201
202 worker_conf = dict(WORKER_CONF)
203 worker_conf.update(shared_conf)
204 engine_conf = dict(ENGINE_CONF)
205 engine_conf.update(shared_conf)
206 workers = []
207 worker_topics = []
208
209 print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
210 try:
211 # Create a set of workers to simulate actual remote workers.
212 print('Running %s workers.' % (WORKERS))
213 for i in compat_range(0, WORKERS):
214 worker_conf['topic'] = 'calculator_%s' % (i + 1)
215 worker_topics.append(worker_conf['topic'])
216 w = worker.Worker(**worker_conf)
217 runner = threading_utils.daemon_thread(w.run)
218 runner.start()
219 w.wait()
220 workers.append((runner, w.stop))
221
222 # Now use those workers to do something.
223 engine_conf['topics'] = worker_topics
224 results = calculate(engine_conf)
225 print('Execution finished.')
226 finally:
227 # And cleanup.
228 print('Stopping workers.')
229 while workers:
230 r, stopper = workers.pop()
231 stopper()
232 r.join()
233 print("Writing image...")
234 write_image(results, output_filename=output_filename)
235
236
237 if __name__ == "__main__":
238 create_fractal()
Note
Full source located at jobboard_produce_consume_colors
1
2 import collections
3 import contextlib
4 import logging
5 import os
6 import random
7 import sys
8 import threading
9 import time
10
11 logging.basicConfig(level=logging.ERROR)
12
13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16 sys.path.insert(0, top_dir)
17
18 import six
19 from six.moves import range as compat_range
20 from zake import fake_client
21
22 from taskflow import exceptions as excp
23 from taskflow.jobs import backends
24 from taskflow.utils import threading_utils
25
26 # In this example we show how a jobboard can be used to post work for other
27 # entities to work on. This example creates a set of jobs using one producer
28 # thread (typically this would be split across many machines) and then having
29 # other worker threads with their own jobboards select work using a given
30 # filters [red/blue] and then perform that work (and consuming or abandoning
31 # the job after it has been completed or failed).
32
33 # Things to note:
34 # - No persistence layer is used (or logbook), just the job details are used
35 # to determine if a job should be selected by a worker or not.
36 # - This example runs in a single process (this is expected to be atypical
37 # but this example shows that it can be done if needed, for testing...)
38 # - The iterjobs(), claim(), consume()/abandon() worker workflow.
39 # - The post() producer workflow.
40
41 SHARED_CONF = {
42 'path': "/taskflow/jobs",
43 'board': 'zookeeper',
44 }
45
46 # How many workers and producers of work will be created (as threads).
47 PRODUCERS = 3
48 WORKERS = 5
49
50 # How many units of work each producer will create.
51 PRODUCER_UNITS = 10
52
53 # How many units of work are expected to be produced (used so workers can
54 # know when to stop running and shutdown, typically this would not be a
55 # a value but we have to limit this example's execution time to be less than
56 # infinity).
57 EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
58
59 # Delay between producing/consuming more work.
60 WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
61
62 # To ensure threads don't trample other threads output.
63 STDOUT_LOCK = threading.Lock()
64
65
66 def dispatch_work(job):
67 # This is where the jobs contained work *would* be done
68 time.sleep(1.0)
69
70
71 def safe_print(name, message, prefix=""):
72 with STDOUT_LOCK:
73 if prefix:
74 print("%s %s: %s" % (prefix, name, message))
75 else:
76 print("%s: %s" % (name, message))
77
78
79 def worker(ident, client, consumed):
80 # Create a personal board (using the same client so that it works in
81 # the same process) and start looking for jobs on the board that we want
82 # to perform.
83 name = "W-%s" % (ident)
84 safe_print(name, "started")
85 claimed_jobs = 0
86 consumed_jobs = 0
87 abandoned_jobs = 0
88 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
89 while len(consumed) != EXPECTED_UNITS:
90 favorite_color = random.choice(['blue', 'red'])
91 for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
92 # See if we should even bother with it...
93 if job.details.get('color') != favorite_color:
94 continue
95 safe_print(name, "'%s' [attempting claim]" % (job))
96 try:
97 board.claim(job, name)
98 claimed_jobs += 1
99 safe_print(name, "'%s' [claimed]" % (job))
100 except (excp.NotFound, excp.UnclaimableJob):
101 safe_print(name, "'%s' [claim unsuccessful]" % (job))
102 else:
103 try:
104 dispatch_work(job)
105 board.consume(job, name)
106 safe_print(name, "'%s' [consumed]" % (job))
107 consumed_jobs += 1
108 consumed.append(job)
109 except Exception:
110 board.abandon(job, name)
111 abandoned_jobs += 1
112 safe_print(name, "'%s' [abandoned]" % (job))
113 time.sleep(WORKER_DELAY)
114 safe_print(name,
115 "finished (claimed %s jobs, consumed %s jobs,"
116 " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
117 abandoned_jobs), prefix=">>>")
118
119
120 def producer(ident, client):
121 # Create a personal board (using the same client so that it works in
122 # the same process) and start posting jobs on the board that we want
123 # some entity to perform.
124 name = "P-%s" % (ident)
125 safe_print(name, "started")
126 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
127 for i in compat_range(0, PRODUCER_UNITS):
128 job_name = "%s-%s" % (name, i)
129 details = {
130 'color': random.choice(['red', 'blue']),
131 }
132 job = board.post(job_name, book=None, details=details)
133 safe_print(name, "'%s' [posted]" % (job))
134 time.sleep(PRODUCER_DELAY)
135 safe_print(name, "finished", prefix=">>>")
136
137
138 def main():
139 if six.PY3:
140 # TODO(harlowja): Hack to make eventlet work right, remove when the
141 # following is fixed: https://github.com/eventlet/eventlet/issues/230
142 from taskflow.utils import eventlet_utils as _eu # noqa
143 try:
144 import eventlet as _eventlet # noqa
145 except ImportError:
146 pass
147 with contextlib.closing(fake_client.FakeClient()) as c:
148 created = []
149 for i in compat_range(0, PRODUCERS):
150 p = threading_utils.daemon_thread(producer, i + 1, c)
151 created.append(p)
152 p.start()
153 consumed = collections.deque()
154 for i in compat_range(0, WORKERS):
155 w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
156 created.append(w)
157 w.start()
158 while created:
159 t = created.pop()
160 t.join()
161 # At the end there should be nothing leftover, let's verify that.
162 board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
163 board.connect()
164 with contextlib.closing(board):
165 if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
166 return 1
167 return 0
168
169
170 if __name__ == "__main__":
171 sys.exit(main())
Note
Full source located at tox_conductor
1
2 import contextlib
3 import itertools
4 import logging
5 import os
6 import shutil
7 import socket
8 import sys
9 import tempfile
10 import threading
11 import time
12
13 logging.basicConfig(level=logging.ERROR)
14
15 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
16 os.pardir,
17 os.pardir))
18 sys.path.insert(0, top_dir)
19
20 from oslo_utils import timeutils
21 from oslo_utils import uuidutils
22 import six
23 from zake import fake_client
24
25 from taskflow.conductors import backends as conductors
26 from taskflow import engines
27 from taskflow.jobs import backends as boards
28 from taskflow.patterns import linear_flow
29 from taskflow.persistence import backends as persistence
30 from taskflow.persistence import models
31 from taskflow import task
32 from taskflow.utils import threading_utils
33
34 # INTRO: This examples shows how a worker/producer can post desired work (jobs)
35 # to a jobboard and a conductor can consume that work (jobs) from that jobboard
36 # and execute those jobs in a reliable & async manner (for example, if the
37 # conductor were to crash then the job will be released back onto the jobboard
38 # and another conductor can attempt to finish it, from wherever that job last
39 # left off).
40 #
41 # In this example a in-memory jobboard (and in-memory storage) is created and
42 # used that simulates how this would be done at a larger scale (it is an
43 # example after all).
44
45 # Restrict how long this example runs for...
46 RUN_TIME = 5
47 REVIEW_CREATION_DELAY = 0.5
48 SCAN_DELAY = 0.1
49 NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
50
51 # This won't really use zookeeper but will use a local version of it using
52 # the zake library that mimics an actual zookeeper cluster using threads and
53 # an in-memory data structure.
54 JOBBOARD_CONF = {
55 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
56 }
57
58
59 class RunReview(task.Task):
60 # A dummy task that clones the review and runs tox...
61
62 def _clone_review(self, review, temp_dir):
63 print("Cloning review '%s' into %s" % (review['id'], temp_dir))
64
65 def _run_tox(self, temp_dir):
66 print("Running tox in %s" % temp_dir)
67
68 def execute(self, review, temp_dir):
69 self._clone_review(review, temp_dir)
70 self._run_tox(temp_dir)
71
72
73 class MakeTempDir(task.Task):
74 # A task that creates and destroys a temporary dir (on failure).
75 #
76 # It provides the location of the temporary dir for other tasks to use
77 # as they see fit.
78
79 default_provides = 'temp_dir'
80
81 def execute(self):
82 return tempfile.mkdtemp()
83
84 def revert(self, *args, **kwargs):
85 temp_dir = kwargs.get(task.REVERT_RESULT)
86 if temp_dir:
87 shutil.rmtree(temp_dir)
88
89
90 class CleanResources(task.Task):
91 # A task that cleans up any workflow resources.
92
93 def execute(self, temp_dir):
94 print("Removing %s" % temp_dir)
95 shutil.rmtree(temp_dir)
96
97
98 def review_iter():
99 """Makes reviews (never-ending iterator/generator)."""
100 review_id_gen = itertools.count(0)
101 while True:
102 review_id = six.next(review_id_gen)
103 review = {
104 'id': review_id,
105 }
106 yield review
107
108
109 # The reason this is at the module namespace level is important, since it must
110 # be accessible from a conductor dispatching an engine, if it was a lambda
111 # function for example, it would not be reimportable and the conductor would
112 # be unable to reference it when creating the workflow to run.
113 def create_review_workflow():
114 """Factory method used to create a review workflow to run."""
115 f = linear_flow.Flow("tester")
116 f.add(
117 MakeTempDir(name="maker"),
118 RunReview(name="runner"),
119 CleanResources(name="cleaner")
120 )
121 return f
122
123
124 def generate_reviewer(client, saver, name=NAME):
125 """Creates a review producer thread with the given name prefix."""
126 real_name = "%s_reviewer" % name
127 no_more = threading.Event()
128 jb = boards.fetch(real_name, JOBBOARD_CONF,
129 client=client, persistence=saver)
130
131 def make_save_book(saver, review_id):
132 # Record what we want to happen (sometime in the future).
133 book = models.LogBook("book_%s" % review_id)
134 detail = models.FlowDetail("flow_%s" % review_id,
135 uuidutils.generate_uuid())
136 book.add(detail)
137 # Associate the factory method we want to be called (in the future)
138 # with the book, so that the conductor will be able to call into
139 # that factory to retrieve the workflow objects that represent the
140 # work.
141 #
142 # These args and kwargs *can* be used to save any specific parameters
143 # into the factory when it is being called to create the workflow
144 # objects (typically used to tell a factory how to create a unique
145 # workflow that represents this review).
146 factory_args = ()
147 factory_kwargs = {}
148 engines.save_factory_details(detail, create_review_workflow,
149 factory_args, factory_kwargs)
150 with contextlib.closing(saver.get_connection()) as conn:
151 conn.save_logbook(book)
152 return book
153
154 def run():
155 """Periodically publishes 'fake' reviews to analyze."""
156 jb.connect()
157 review_generator = review_iter()
158 with contextlib.closing(jb):
159 while not no_more.is_set():
160 review = six.next(review_generator)
161 details = {
162 'store': {
163 'review': review,
164 },
165 }
166 job_name = "%s_%s" % (real_name, review['id'])
167 print("Posting review '%s'" % review['id'])
168 jb.post(job_name,
169 book=make_save_book(saver, review['id']),
170 details=details)
171 time.sleep(REVIEW_CREATION_DELAY)
172
173 # Return the unstarted thread, and a callback that can be used
174 # shutdown that thread (to avoid running forever).
175 return (threading_utils.daemon_thread(target=run), no_more.set)
176
177
178 def generate_conductor(client, saver, name=NAME):
179 """Creates a conductor thread with the given name prefix."""
180 real_name = "%s_conductor" % name
181 jb = boards.fetch(name, JOBBOARD_CONF,
182 client=client, persistence=saver)
183 conductor = conductors.fetch("blocking", real_name, jb,
184 engine='parallel', wait_timeout=SCAN_DELAY)
185
186 def run():
187 jb.connect()
188 with contextlib.closing(jb):
189 conductor.run()
190
191 # Return the unstarted thread, and a callback that can be used
192 # shutdown that thread (to avoid running forever).
193 return (threading_utils.daemon_thread(target=run), conductor.stop)
194
195
196 def main():
197 # Need to share the same backend, so that data can be shared...
198 persistence_conf = {
199 'connection': 'memory',
200 }
201 saver = persistence.fetch(persistence_conf)
202 with contextlib.closing(saver.get_connection()) as conn:
203 # This ensures that the needed backend setup/data directories/schema
204 # upgrades and so on... exist before they are attempted to be used...
205 conn.upgrade()
206 fc1 = fake_client.FakeClient()
207 # Done like this to share the same client storage location so the correct
208 # zookeeper features work across clients...
209 fc2 = fake_client.FakeClient(storage=fc1.storage)
210 entities = [
211 generate_reviewer(fc1, saver),
212 generate_conductor(fc2, saver),
213 ]
214 for t, stopper in entities:
215 t.start()
216 try:
217 watch = timeutils.StopWatch(duration=RUN_TIME)
218 watch.start()
219 while not watch.expired():
220 time.sleep(0.1)
221 finally:
222 for t, stopper in reversed(entities):
223 stopper()
224 t.join()
225
226
227 if __name__ == '__main__':
228 main()
Note
Full source located at 99_bottles
1
2 import contextlib
3 import functools
4 import logging
5 import os
6 import sys
7 import time
8 import traceback
9
10 from kazoo import client
11
12 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
13 os.pardir,
14 os.pardir))
15 sys.path.insert(0, top_dir)
16
17 from taskflow.conductors import backends as conductor_backends
18 from taskflow import engines
19 from taskflow.jobs import backends as job_backends
20 from taskflow import logging as taskflow_logging
21 from taskflow.patterns import linear_flow as lf
22 from taskflow.persistence import backends as persistence_backends
23 from taskflow.persistence import models
24 from taskflow import task
25
26 from oslo_utils import timeutils
27 from oslo_utils import uuidutils
28
29 # Instructions!
30 #
31 # 1. Install zookeeper (or change host listed below)
32 # 2. Download this example, place in file '99_bottles.py'
33 # 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
34 # 4. Run `python 99_bottles.py c` a few times (in different shells)
35 # 5. On demand kill previously listed processes created in (4) and watch
36 # the work resume on another process (and repeat)
37 # 6. Keep enough workers alive to eventually finish the song (if desired).
38
39 ME = os.getpid()
40 ZK_HOST = "localhost:2181"
41 JB_CONF = {
42 'hosts': ZK_HOST,
43 'board': 'zookeeper',
44 'path': '/taskflow/99-bottles-demo',
45 }
46 PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
47 TAKE_DOWN_DELAY = 1.0
48 PASS_AROUND_DELAY = 3.0
49 HOW_MANY_BOTTLES = 99
50
51
52 class TakeABottleDown(task.Task):
53 def execute(self, bottles_left):
54 sys.stdout.write('Take one down, ')
55 sys.stdout.flush()
56 time.sleep(TAKE_DOWN_DELAY)
57 return bottles_left - 1
58
59
60 class PassItAround(task.Task):
61 def execute(self):
62 sys.stdout.write('pass it around, ')
63 sys.stdout.flush()
64 time.sleep(PASS_AROUND_DELAY)
65
66
67 class Conclusion(task.Task):
68 def execute(self, bottles_left):
69 sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
70 sys.stdout.flush()
71
72
73 def make_bottles(count):
74 # This is the function that will be called to generate the workflow
75 # and will also be called to regenerate it on resumption so that work
76 # can continue from where it last left off...
77
78 s = lf.Flow("bottle-song")
79
80 take_bottle = TakeABottleDown("take-bottle-%s" % count,
81 inject={'bottles_left': count},
82 provides='bottles_left')
83 pass_it = PassItAround("pass-%s-around" % count)
84 next_bottles = Conclusion("next-bottles-%s" % (count - 1))
85 s.add(take_bottle, pass_it, next_bottles)
86
87 for bottle in reversed(list(range(1, count))):
88 take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
89 provides='bottles_left')
90 pass_it = PassItAround("pass-%s-around" % bottle)
91 next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
92 s.add(take_bottle, pass_it, next_bottles)
93
94 return s
95
96
97 def run_conductor(only_run_once=False):
98 # This continuously runs consumers until its stopped via ctrl-c or other
99 # kill signal...
100 event_watches = {}
101
102 # This will be triggered by the conductor doing various activities
103 # with engines, and is quite nice to be able to see the various timing
104 # segments (which is useful for debugging, or watching, or figuring out
105 # where to optimize).
106 def on_conductor_event(cond, event, details):
107 print("Event '%s' has been received..." % event)
108 print("Details = %s" % details)
109 if event.endswith("_start"):
110 w = timeutils.StopWatch()
111 w.start()
112 base_event = event[0:-len("_start")]
113 event_watches[base_event] = w
114 if event.endswith("_end"):
115 base_event = event[0:-len("_end")]
116 try:
117 w = event_watches.pop(base_event)
118 w.stop()
119 print("It took %0.3f seconds for event '%s' to finish"
120 % (w.elapsed(), base_event))
121 except KeyError:
122 pass
123 if event == 'running_end' and only_run_once:
124 cond.stop()
125
126 print("Starting conductor with pid: %s" % ME)
127 my_name = "conductor-%s" % ME
128 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
129 with contextlib.closing(persist_backend):
130 with contextlib.closing(persist_backend.get_connection()) as conn:
131 conn.upgrade()
132 job_backend = job_backends.fetch(my_name, JB_CONF,
133 persistence=persist_backend)
134 job_backend.connect()
135 with contextlib.closing(job_backend):
136 cond = conductor_backends.fetch('blocking', my_name, job_backend,
137 persistence=persist_backend)
138 on_conductor_event = functools.partial(on_conductor_event, cond)
139 cond.notifier.register(cond.notifier.ANY, on_conductor_event)
140 # Run forever, and kill -9 or ctrl-c me...
141 try:
142 cond.run()
143 finally:
144 cond.stop()
145 cond.wait()
146
147
148 def run_poster():
149 # This just posts a single job and then ends...
150 print("Starting poster with pid: %s" % ME)
151 my_name = "poster-%s" % ME
152 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
153 with contextlib.closing(persist_backend):
154 with contextlib.closing(persist_backend.get_connection()) as conn:
155 conn.upgrade()
156 job_backend = job_backends.fetch(my_name, JB_CONF,
157 persistence=persist_backend)
158 job_backend.connect()
159 with contextlib.closing(job_backend):
160 # Create information in the persistence backend about the
161 # unit of work we want to complete and the factory that
162 # can be called to create the tasks that the work unit needs
163 # to be done.
164 lb = models.LogBook("post-from-%s" % my_name)
165 fd = models.FlowDetail("song-from-%s" % my_name,
166 uuidutils.generate_uuid())
167 lb.add(fd)
168 with contextlib.closing(persist_backend.get_connection()) as conn:
169 conn.save_logbook(lb)
170 engines.save_factory_details(fd, make_bottles,
171 [HOW_MANY_BOTTLES], {},
172 backend=persist_backend)
173 # Post, and be done with it!
174 jb = job_backend.post("song-from-%s" % my_name, book=lb)
175 print("Posted: %s" % jb)
176 print("Goodbye...")
177
178
179 def main_local():
180 # Run locally typically this is activating during unit testing when all
181 # the examples are made sure to still function correctly...
182 global TAKE_DOWN_DELAY
183 global PASS_AROUND_DELAY
184 global JB_CONF
185 # Make everything go much faster (so that this finishes quickly).
186 PASS_AROUND_DELAY = 0.01
187 TAKE_DOWN_DELAY = 0.01
188 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
189 run_poster()
190 run_conductor(only_run_once=True)
191
192
193 def check_for_zookeeper(timeout=1):
194 sys.stderr.write("Testing for the existence of a zookeeper server...\n")
195 sys.stderr.write("Please wait....\n")
196 with contextlib.closing(client.KazooClient()) as test_client:
197 try:
198 test_client.start(timeout=timeout)
199 except test_client.handler.timeout_exception:
200 sys.stderr.write("Zookeeper is needed for running this example!\n")
201 traceback.print_exc()
202 return False
203 else:
204 test_client.stop()
205 return True
206
207
208 def main():
209 if not check_for_zookeeper():
210 return
211 if len(sys.argv) == 1:
212 main_local()
213 elif sys.argv[1] in ('p', 'c'):
214 if sys.argv[-1] == "v":
215 logging.basicConfig(level=taskflow_logging.TRACE)
216 else:
217 logging.basicConfig(level=logging.ERROR)
218 if sys.argv[1] == 'p':
219 run_poster()
220 else:
221 run_conductor()
222 else:
223 sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
224
225
226 if __name__ == '__main__':
227 main()
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.