Examples

Examples

While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):

To explore more of these examples please check out the examples directory in the TaskFlow source tree.

Note

If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.

Hello world

Note

Full source located at hello_world.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 from taskflow import engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow.patterns import unordered_flow as uf
16 from taskflow import task
17 
18 
19 # INTRO: This is the defacto hello world equivalent for taskflow; it shows how
20 # an overly simplistic workflow can be created that runs using different
21 # engines using different styles of execution (all can be used to run in
22 # parallel if a workflow is provided that is parallelizable).
23 
24 class PrinterTask(task.Task):
25     def __init__(self, name, show_name=True, inject=None):
26         super(PrinterTask, self).__init__(name, inject=inject)
27         self._show_name = show_name
28 
29     def execute(self, output):
30         if self._show_name:
31             print("%s: %s" % (self.name, output))
32         else:
33             print(output)
34 
35 
36 # This will be the work that we want done, which for this example is just to
37 # print 'hello world' (like a song) using different tasks and different
38 # execution models.
39 song = lf.Flow("beats")
40 
41 # Unordered flows when ran can be ran in parallel; and a chorus is everyone
42 # singing at once of course!
43 hi_chorus = uf.Flow('hello')
44 world_chorus = uf.Flow('world')
45 for (name, hello, world) in [('bob', 'hello', 'world'),
46                              ('joe', 'hellooo', 'worllllld'),
47                              ('sue', "helloooooo!", 'wooorllld!')]:
48     hi_chorus.add(PrinterTask("%s@hello" % name,
49                               # This will show up to the execute() method of
50                               # the task as the argument named 'output' (which
51                               # will allow us to print the character we want).
52                               inject={'output': hello}))
53     world_chorus.add(PrinterTask("%s@world" % name,
54                                  inject={'output': world}))
55 
56 # The composition starts with the conductor and then runs in sequence with
57 # the chorus running in parallel, but no matter what the 'hello' chorus must
58 # always run before the 'world' chorus (otherwise the world will fall apart).
59 song.add(PrinterTask("conductor@begin",
60                      show_name=False, inject={'output': "*ding*"}),
61          hi_chorus,
62          world_chorus,
63          PrinterTask("conductor@end",
64                      show_name=False, inject={'output': "*dong*"}))
65 
66 # Run in parallel using eventlet green threads...
67 try:
68     import eventlet as _eventlet  # noqa
69 except ImportError:
70     # No eventlet currently active, skip running with it...
71     pass
72 else:
73     print("-- Running in parallel using eventlet --")
74     e = engines.load(song, executor='greenthreaded', engine='parallel',
75                      max_workers=1)
76     e.run()
77 
78 
79 # Run in parallel using real threads...
80 print("-- Running in parallel using threads --")
81 e = engines.load(song, executor='threaded', engine='parallel',
82                  max_workers=1)
83 e.run()
84 
85 
86 # Run in parallel using external processes...
87 print("-- Running in parallel using processes --")
88 e = engines.load(song, executor='processes', engine='parallel',
89                  max_workers=1)
90 e.run()
91 
92 
93 # Run serially (aka, if the workflow could have been ran in parallel, it will
94 # not be when ran in this mode)...
95 print("-- Running serially --")
96 e = engines.load(song, engine='serial')
97 e.run()
98 print("-- Statistics gathered --")
99 print(e.statistics)

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 self_dir = os.path.abspath(os.path.dirname(__file__))
 9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                        os.pardir,
11                                        os.pardir))
12 sys.path.insert(0, top_dir)
13 sys.path.insert(0, self_dir)
14 
15 from taskflow import engines
16 from taskflow.patterns import linear_flow
17 from taskflow import task
18 
19 # INTRO: This example shows how a task (in a linear/serial workflow) can
20 # produce an output that can be then consumed/used by a downstream task.
21 
22 
23 class TaskA(task.Task):
24     default_provides = 'a'
25 
26     def execute(self):
27         print("Executing '%s'" % (self.name))
28         return 'a'
29 
30 
31 class TaskB(task.Task):
32     def execute(self, a):
33         print("Executing '%s'" % (self.name))
34         print("Got input '%s'" % (a))
35 
36 
37 print("Constructing...")
38 wf = linear_flow.Flow("pass-from-to")
39 wf.add(TaskA('a'), TaskB('b'))
40 
41 print("Loading...")
42 e = engines.load(wf)
43 
44 print("Compiling...")
45 e.compile()
46 
47 print("Preparing...")
48 e.prepare()
49 
50 print("Running...")
51 e.run()
52 
53 print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.DEBUG)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 from taskflow import engines
14 from taskflow.listeners import logging as logging_listener
15 from taskflow.patterns import linear_flow as lf
16 from taskflow import task
17 
18 # INTRO: This example walks through a miniature workflow which will do a
19 # simple echo operation; during this execution a listener is associated with
20 # the engine to receive all notifications about what the flow has performed,
21 # this example dumps that output to the stdout for viewing (at debug level
22 # to show all the information which is possible).
23 
24 
25 class Echo(task.Task):
26     def execute(self):
27         print(self.name)
28 
29 
30 # Generate the work to be done (but don't do it yet).
31 wf = lf.Flow('abc')
32 wf.add(Echo('a'))
33 wf.add(Echo('b'))
34 wf.add(Echo('c'))
35 
36 # This will associate the listener with the engine (the listener
37 # will automatically register for notifications with the engine and deregister
38 # when the context is exited).
39 e = engines.load(wf)
40 with logging_listener.DynamicLoggingListener(e):
41     e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16 from taskflow.types import notifier
17 
18 ANY = notifier.Notifier.ANY
19 
20 # INTRO: In this example we create two tasks (this time as functions instead
21 # of task subclasses as in the simple_linear.py example), each of which ~calls~
22 # a given ~phone~ number (provided as a function input) in a linear fashion
23 # (one after the other).
24 #
25 # For a workflow which is serial this shows an extremely simple way
26 # of structuring your tasks (the code that does the work) into a linear
27 # sequence (the flow) and then passing the work off to an engine, with some
28 # initial data to be ran in a reliable manner.
29 #
30 # This example shows a basic usage of the taskflow structures without involving
31 # the complexity of persistence. Using the structures that taskflow provides
32 # via tasks and flows makes it possible for you to easily at a later time
33 # hook in a persistence layer (and then gain the functionality that offers)
34 # when you decide the complexity of adding that layer in is 'worth it' for your
35 # applications usage pattern (which some applications may not need).
36 #
37 # It **also** adds on to the simple_linear.py example by adding a set of
38 # callback functions which the engine will call when a flow state transition
39 # or task state transition occurs. These types of functions are useful for
40 # updating task or flow progress, or for debugging, sending notifications to
41 # external systems, or for other yet unknown future usage that you may create!
42 
43 
44 def call_jim(context):
45     print("Calling jim.")
46     print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
47 
48 
49 def call_joe(context):
50     print("Calling joe.")
51     print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
52 
53 
54 def flow_watch(state, details):
55     print('Flow => %s' % state)
56 
57 
58 def task_watch(state, details):
59     print('Task %s => %s' % (details.get('task_name'), state))
60 
61 
62 # Wrap your functions into a task type that knows how to treat your functions
63 # as tasks. There was previous work done to just allow a function to be
64 # directly passed, but in python 3.0 there is no easy way to capture an
65 # instance method, so this wrapping approach was decided upon instead which
66 # can attach to instance methods (if that's desired).
67 flow = lf.Flow("Call-them")
68 flow.add(task.FunctorTask(execute=call_jim))
69 flow.add(task.FunctorTask(execute=call_joe))
70 
71 # Now load (but do not run) the flow using the provided initial data.
72 engine = taskflow.engines.load(flow, store={
73     'context': {
74         "joe_number": 444,
75         "jim_number": 555,
76     }
77 })
78 
79 # This is where we attach our callback functions to the 2 different
80 # notification objects that an engine exposes. The usage of a ANY (kleene star)
81 # here means that we want to be notified on all state changes, if you want to
82 # restrict to a specific state change, just register that instead.
83 engine.notifier.register(ANY, flow_watch)
84 engine.atom_notifier.register(ANY, task_watch)
85 
86 # And now run!
87 engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 self_dir = os.path.abspath(os.path.dirname(__file__))
 9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                        os.pardir,
11                                        os.pardir))
12 sys.path.insert(0, top_dir)
13 sys.path.insert(0, self_dir)
14 
15 from taskflow import engines
16 from taskflow.patterns import linear_flow as lf
17 from taskflow import task
18 
19 # INTRO: in this example we create a dummy flow with a dummy task, and run
20 # it using a in-memory backend and pre/post run we dump out the contents
21 # of the in-memory backends tree structure (which can be quite useful to
22 # look at for debugging or other analysis).
23 
24 
25 class PrintTask(task.Task):
26     def execute(self):
27         print("Running '%s'" % self.name)
28 
29 # Make a little flow and run it...
30 f = lf.Flow('root')
31 for alpha in ['a', 'b', 'c']:
32     f.add(PrintTask(alpha))
33 
34 e = engines.load(f)
35 e.compile()
36 e.prepare()
37 
38 # After prepare the storage layer + backend can now be accessed safely...
39 backend = e.storage.backend
40 
41 print("----------")
42 print("Before run")
43 print("----------")
44 print(backend.memory.pformat())
45 print("----------")
46 
47 e.run()
48 
49 print("---------")
50 print("After run")
51 print("---------")
52 for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
53     value = backend.memory[path]
54     if value:
55         print("%s -> %s" % (path, value))
56     else:
57         print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16 
17 # INTRO: In this example we create two tasks, each of which ~calls~ a given
18 # ~phone~ number (provided as a function input) in a linear fashion (one after
19 # the other). For a workflow which is serial this shows a extremely simple way
20 # of structuring your tasks (the code that does the work) into a linear
21 # sequence (the flow) and then passing the work off to an engine, with some
22 # initial data to be ran in a reliable manner.
23 #
24 # NOTE(harlowja): This example shows a basic usage of the taskflow structures
25 # without involving the complexity of persistence. Using the structures that
26 # taskflow provides via tasks and flows makes it possible for you to easily at
27 # a later time hook in a persistence layer (and then gain the functionality
28 # that offers) when you decide the complexity of adding that layer in
29 # is 'worth it' for your application's usage pattern (which certain
30 # applications may not need).
31 
32 
33 class CallJim(task.Task):
34     def execute(self, jim_number, *args, **kwargs):
35         print("Calling jim %s." % jim_number)
36 
37 
38 class CallJoe(task.Task):
39     def execute(self, joe_number, *args, **kwargs):
40         print("Calling joe %s." % joe_number)
41 
42 
43 # Create your flow and associated tasks (the work to be done).
44 flow = lf.Flow('simple-linear').add(
45     CallJim(),
46     CallJoe()
47 )
48 
49 # Now run that flow using the provided initial data (store below).
50 taskflow.engines.run(flow, store=dict(joe_number=444,
51                                       jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import task
16 
17 # INTRO: In this example we create three tasks, each of which ~calls~ a given
18 # number (provided as a function input), one of those tasks *fails* calling a
19 # given number (the suzzie calling); this causes the workflow to enter the
20 # reverting process, which activates the revert methods of the previous two
21 # phone ~calls~.
22 #
23 # This simulated calling makes it appear like all three calls occur or all
24 # three don't occur (transaction-like capabilities). No persistence layer is
25 # used here so reverting and executing will *not* be tolerant of process
26 # failure.
27 
28 
29 class CallJim(task.Task):
30     def execute(self, jim_number, *args, **kwargs):
31         print("Calling jim %s." % jim_number)
32 
33     def revert(self, jim_number, *args, **kwargs):
34         print("Calling %s and apologizing." % jim_number)
35 
36 
37 class CallJoe(task.Task):
38     def execute(self, joe_number, *args, **kwargs):
39         print("Calling joe %s." % joe_number)
40 
41     def revert(self, joe_number, *args, **kwargs):
42         print("Calling %s and apologizing." % joe_number)
43 
44 
45 class CallSuzzie(task.Task):
46     def execute(self, suzzie_number, *args, **kwargs):
47         raise IOError("Suzzie not home right now.")
48 
49 
50 # Create your flow and associated tasks (the work to be done).
51 flow = lf.Flow('simple-linear').add(
52     CallJim(),
53     CallJoe(),
54     CallSuzzie()
55 )
56 
57 try:
58     # Now run that flow using the provided initial data (store below).
59     taskflow.engines.run(flow, store=dict(joe_number=444,
60                                           jim_number=555,
61                                           suzzie_number=666))
62 except Exception as e:
63     # NOTE(harlowja): This exception will be the exception that came out of the
64     # 'CallSuzzie' task instead of a different exception, this is useful since
65     # typically surrounding code wants to handle the original exception and not
66     # a wrapped or altered one.
67     #
68     # *WARNING* If this flow was multi-threaded and multiple active tasks threw
69     # exceptions then the above exception would be wrapped into a combined
70     # exception (the object has methods to iterate over the contained
71     # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
72     # how to deal with multiple tasks failing while running.
73     #
74     # You will also note that this is not a problem in this case since no
75     # parallelism is involved; this is ensured by the usage of a linear flow
76     # and the default engine type which is 'serial' vs being 'parallel'.
77     print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1 
  2 import logging
  3 import os
  4 import sys
  5 
  6 
  7 logging.basicConfig(level=logging.ERROR)
  8 
  9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 10                                        os.pardir,
 11                                        os.pardir))
 12 sys.path.insert(0, top_dir)
 13 
 14 
 15 import taskflow.engines
 16 from taskflow.patterns import graph_flow as gf
 17 from taskflow.patterns import linear_flow as lf
 18 from taskflow import task
 19 from taskflow.types import notifier
 20 
 21 ANY = notifier.Notifier.ANY
 22 
 23 import example_utils as eu  # noqa
 24 
 25 
 26 # INTRO: This example shows how a graph flow and linear flow can be used
 27 # together to execute dependent & non-dependent tasks by going through the
 28 # steps required to build a simplistic car (an assembly line if you will). It
 29 # also shows how raw functions can be wrapped into a task object instead of
 30 # being forced to use the more *heavy* task base class. This is useful in
 31 # scenarios where pre-existing code has functions that you easily want to
 32 # plug-in to taskflow, without requiring a large amount of code changes.
 33 
 34 
 35 def build_frame():
 36     return 'steel'
 37 
 38 
 39 def build_engine():
 40     return 'honda'
 41 
 42 
 43 def build_doors():
 44     return '2'
 45 
 46 
 47 def build_wheels():
 48     return '4'
 49 
 50 
 51 # These just return true to indiciate success, they would in the real work
 52 # do more than just that.
 53 
 54 def install_engine(frame, engine):
 55     return True
 56 
 57 
 58 def install_doors(frame, windows_installed, doors):
 59     return True
 60 
 61 
 62 def install_windows(frame, doors):
 63     return True
 64 
 65 
 66 def install_wheels(frame, engine, engine_installed, wheels):
 67     return True
 68 
 69 
 70 def trash(**kwargs):
 71     eu.print_wrapped("Throwing away pieces of car!")
 72 
 73 
 74 def startup(**kwargs):
 75     # If you want to see the rollback function being activated try uncommenting
 76     # the following line.
 77     #
 78     # raise ValueError("Car not verified")
 79     return True
 80 
 81 
 82 def verify(spec, **kwargs):
 83     # If the car is not what we ordered throw away the car (trigger reversion).
 84     for key, value in kwargs.items():
 85         if spec[key] != value:
 86             raise Exception("Car doesn't match spec!")
 87     return True
 88 
 89 
 90 # These two functions connect into the state transition notification emission
 91 # points that the engine outputs, they can be used to log state transitions
 92 # that are occurring, or they can be used to suspend the engine (or perform
 93 # other useful activities).
 94 def flow_watch(state, details):
 95     print('Flow => %s' % state)
 96 
 97 
 98 def task_watch(state, details):
 99     print('Task %s => %s' % (details.get('task_name'), state))
100 
101 
102 flow = lf.Flow("make-auto").add(
103     task.FunctorTask(startup, revert=trash, provides='ran'),
104     # A graph flow allows automatic dependency based ordering, the ordering
105     # is determined by analyzing the symbols required and provided and ordering
106     # execution based on a functioning order (if one exists).
107     gf.Flow("install-parts").add(
108         task.FunctorTask(build_frame, provides='frame'),
109         task.FunctorTask(build_engine, provides='engine'),
110         task.FunctorTask(build_doors, provides='doors'),
111         task.FunctorTask(build_wheels, provides='wheels'),
112         # These *_installed outputs allow for other tasks to depend on certain
113         # actions being performed (aka the components were installed), another
114         # way to do this is to link() the tasks manually instead of creating
115         # an 'artificial' data dependency that accomplishes the same goal the
116         # manual linking would result in.
117         task.FunctorTask(install_engine, provides='engine_installed'),
118         task.FunctorTask(install_doors, provides='doors_installed'),
119         task.FunctorTask(install_windows, provides='windows_installed'),
120         task.FunctorTask(install_wheels, provides='wheels_installed')),
121     task.FunctorTask(verify, requires=['frame',
122                                        'engine',
123                                        'doors',
124                                        'wheels',
125                                        'engine_installed',
126                                        'doors_installed',
127                                        'windows_installed',
128                                        'wheels_installed']))
129 
130 # This dictionary will be provided to the tasks as a specification for what
131 # the tasks should produce, in this example this specification will influence
132 # what those tasks do and what output they create. Different tasks depend on
133 # different information from this specification, all of which will be provided
134 # automatically by the engine to those tasks.
135 spec = {
136     "frame": 'steel',
137     "engine": 'honda',
138     "doors": '2',
139     "wheels": '4',
140     # These are used to compare the result product, a car without the pieces
141     # installed is not a car after all.
142     "engine_installed": True,
143     "doors_installed": True,
144     "windows_installed": True,
145     "wheels_installed": True,
146 }
147 
148 
149 engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
150 
151 # This registers all (ANY) state transitions to trigger a call to the
152 # flow_watch function for flow state transitions, and registers the
153 # same all (ANY) state transitions for task state transitions.
154 engine.notifier.register(ANY, flow_watch)
155 engine.atom_notifier.register(ANY, task_watch)
156 
157 eu.print_wrapped("Building a car")
158 engine.run()
159 
160 # Alter the specification and ensure that the reverting logic gets triggered
161 # since the resultant car that will be built by the build_wheels function will
162 # build a car with 4 doors only (not 5), this will cause the verification
163 # task to mark the car that is produced as not matching the desired spec.
164 spec['doors'] = 5
165 
166 engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
167 engine.notifier.register(ANY, flow_watch)
168 engine.atom_notifier.register(ANY, task_watch)
169 
170 eu.print_wrapped("Building a wrong car that doesn't match specification")
171 try:
172     engine.run()
173 except Exception as e:
174     eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1 
 2 import fractions
 3 import functools
 4 import logging
 5 import os
 6 import string
 7 import sys
 8 import time
 9 
10 logging.basicConfig(level=logging.ERROR)
11 
12 self_dir = os.path.abspath(os.path.dirname(__file__))
13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14                                        os.pardir,
15                                        os.pardir))
16 sys.path.insert(0, top_dir)
17 sys.path.insert(0, self_dir)
18 
19 from taskflow import engines
20 from taskflow import exceptions
21 from taskflow.patterns import linear_flow
22 from taskflow import task
23 
24 
25 # In this example we show how a simple linear set of tasks can be executed
26 # using local processes (and not threads or remote workers) with minimal (if
27 # any) modification to those tasks to make them safe to run in this mode.
28 #
29 # This is useful since it allows further scaling up your workflows when thread
30 # execution starts to become a bottleneck (which it can start to be due to the
31 # GIL in python). It also offers a intermediary scalable runner that can be
32 # used when the scale and/or setup of remote workers is not desirable.
33 
34 
35 def progress_printer(task, event_type, details):
36     # This callback, attached to each task will be called in the local
37     # process (not the child processes)...
38     progress = details.pop('progress')
39     progress = int(progress * 100.0)
40     print("Task '%s' reached %d%% completion" % (task.name, progress))
41 
42 
43 class AlphabetTask(task.Task):
44     # Second delay between each progress part.
45     _DELAY = 0.1
46 
47     # This task will run in X main stages (each with a different progress
48     # report that will be delivered back to the running process...). The
49     # initial 0% and 100% are triggered automatically by the engine when
50     # a task is started and finished (so that's why those are not emitted
51     # here).
52     _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
53 
54     def execute(self):
55         for p in self._PROGRESS_PARTS:
56             self.update_progress(p)
57             time.sleep(self._DELAY)
58 
59 
60 print("Constructing...")
61 soup = linear_flow.Flow("alphabet-soup")
62 for letter in string.ascii_lowercase:
63     abc = AlphabetTask(letter)
64     abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
65                           functools.partial(progress_printer, abc))
66     soup.add(abc)
67 try:
68     print("Loading...")
69     e = engines.load(soup, engine='parallel', executor='processes')
70     print("Compiling...")
71     e.compile()
72     print("Preparing...")
73     e.prepare()
74     print("Running...")
75     e.run()
76     print("Done: %s" % e.statistics)
77 except exceptions.NotImplementedError as e:
78     print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1 
 2 import logging
 3 import os
 4 import random
 5 import sys
 6 import time
 7 
 8 logging.basicConfig(level=logging.ERROR)
 9 
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                        os.pardir,
12                                        os.pardir))
13 sys.path.insert(0, top_dir)
14 
15 from taskflow import engines
16 from taskflow.listeners import timing
17 from taskflow.patterns import linear_flow as lf
18 from taskflow import task
19 
20 # INTRO: in this example we will attach a listener to an engine
21 # and have variable run time tasks run and show how the listener will print
22 # out how long those tasks took (when they started and when they finished).
23 #
24 # This shows how timing metrics can be gathered (or attached onto an engine)
25 # after a workflow has been constructed, making it easy to gather metrics
26 # dynamically for situations where this kind of information is applicable (or
27 # even adding this information on at a later point in the future when your
28 # application starts to slow down).
29 
30 
31 class VariableTask(task.Task):
32     def __init__(self, name):
33         super(VariableTask, self).__init__(name)
34         self._sleepy_time = random.random()
35 
36     def execute(self):
37         time.sleep(self._sleepy_time)
38 
39 
40 f = lf.Flow('root')
41 f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
42 e = engines.load(f)
43 with timing.PrintingDurationListener(e):
44     e.run()

Distance calculator

Note

Full source located at distance_calculator

 1 
 2 import collections
 3 import math
 4 import os
 5 import sys
 6 
 7 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                        os.pardir,
 9                                        os.pardir))
10 sys.path.insert(0, top_dir)
11 
12 from taskflow import engines
13 from taskflow.patterns import linear_flow
14 from taskflow import task
15 
16 # INTRO: This shows how to use a tasks/atoms ability to take requirements from
17 # its execute functions default parameters and shows how to provide those
18 # via different methods when needed, to influence those parameters to in
19 # this case calculate the distance between two points in 2D space.
20 
21 # A 2D point.
22 Point = collections.namedtuple("Point", "x,y")
23 
24 
25 def is_near(val, expected, tolerance=0.001):
26     # Floats don't really provide equality...
27     if val > (expected + tolerance):
28         return False
29     if val < (expected - tolerance):
30         return False
31     return True
32 
33 
34 class DistanceTask(task.Task):
35     # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
36 
37     default_provides = 'distance'
38 
39     def execute(self, a=Point(0, 0), b=Point(0, 0)):
40         return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
41 
42 
43 if __name__ == '__main__':
44     # For these we rely on the execute() methods points by default being
45     # at the origin (and we override it with store values when we want) at
46     # execution time (which then influences what is calculated).
47     any_distance = linear_flow.Flow("origin").add(DistanceTask())
48     results = engines.run(any_distance)
49     print(results)
50     print("%s is near-enough to %s: %s" % (results['distance'],
51                                            0.0,
52                                            is_near(results['distance'], 0.0)))
53 
54     results = engines.run(any_distance, store={'a': Point(1, 1)})
55     print(results)
56     print("%s is near-enough to %s: %s" % (results['distance'],
57                                            1.4142,
58                                            is_near(results['distance'],
59                                                    1.4142)))
60 
61     results = engines.run(any_distance, store={'a': Point(10, 10)})
62     print(results)
63     print("%s is near-enough to %s: %s" % (results['distance'],
64                                            14.14199,
65                                            is_near(results['distance'],
66                                                    14.14199)))
67 
68     results = engines.run(any_distance,
69                           store={'a': Point(5, 5), 'b': Point(10, 10)})
70     print(results)
71     print("%s is near-enough to %s: %s" % (results['distance'],
72                                            7.07106,
73                                            is_near(results['distance'],
74                                                    7.07106)))
75 
76     # For this we use the ability to override at task creation time the
77     # optional arguments so that we don't need to continue to send them
78     # in via the 'store' argument like in the above (and we fix the new
79     # starting point 'a' at (10, 10) instead of (0, 0)...
80 
81     ten_distance = linear_flow.Flow("ten")
82     ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
83     results = engines.run(ten_distance, store={'b': Point(10, 10)})
84     print(results)
85     print("%s is near-enough to %s: %s" % (results['distance'],
86                                            0.0,
87                                            is_near(results['distance'], 0.0)))
88 
89     results = engines.run(ten_distance)
90     print(results)
91     print("%s is near-enough to %s: %s" % (results['distance'],
92                                            14.14199,
93                                            is_near(results['distance'],
94                                                    14.14199)))

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1 
  2 import csv
  3 import logging
  4 import os
  5 import random
  6 import sys
  7 
  8 logging.basicConfig(level=logging.ERROR)
  9 
 10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                        os.pardir,
 12                                        os.pardir))
 13 sys.path.insert(0, top_dir)
 14 
 15 import futurist
 16 from six.moves import range as compat_range
 17 
 18 from taskflow import engines
 19 from taskflow.patterns import unordered_flow as uf
 20 from taskflow import task
 21 
 22 # INTRO: This example walks through a miniature workflow which does a parallel
 23 # table modification where each row in the table gets adjusted by a thread, or
 24 # green thread (if eventlet is available) in parallel and then the result
 25 # is reformed into a new table and some verifications are performed on it
 26 # to ensure everything went as expected.
 27 
 28 
 29 MULTIPLER = 10
 30 
 31 
 32 class RowMultiplier(task.Task):
 33     """Performs a modification of an input row, creating a output row."""
 34 
 35     def __init__(self, name, index, row, multiplier):
 36         super(RowMultiplier, self).__init__(name=name)
 37         self.index = index
 38         self.multiplier = multiplier
 39         self.row = row
 40 
 41     def execute(self):
 42         return [r * self.multiplier for r in self.row]
 43 
 44 
 45 def make_flow(table):
 46     # This creation will allow for parallel computation (since the flow here
 47     # is specifically unordered; and when things are unordered they have
 48     # no dependencies and when things have no dependencies they can just be
 49     # ran at the same time, limited in concurrency by the executor or max
 50     # workers of that executor...)
 51     f = uf.Flow("root")
 52     for i, row in enumerate(table):
 53         f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 54     # NOTE(harlowja): at this point nothing has ran, the above is just
 55     # defining what should be done (but not actually doing it) and associating
 56     # an ordering dependencies that should be enforced (the flow pattern used
 57     # forces this), the engine in the later main() function will actually
 58     # perform this work...
 59     return f
 60 
 61 
 62 def main():
 63     if len(sys.argv) == 2:
 64         tbl = []
 65         with open(sys.argv[1], 'rb') as fh:
 66             reader = csv.reader(fh)
 67             for row in reader:
 68                 tbl.append([float(r) if r else 0.0 for r in row])
 69     else:
 70         # Make some random table out of thin air...
 71         tbl = []
 72         cols = random.randint(1, 100)
 73         rows = random.randint(1, 100)
 74         for _i in compat_range(0, rows):
 75             row = []
 76             for _j in compat_range(0, cols):
 77                 row.append(random.random())
 78             tbl.append(row)
 79 
 80     # Generate the work to be done.
 81     f = make_flow(tbl)
 82 
 83     # Now run it (using the specified executor)...
 84     try:
 85         executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 86     except RuntimeError:
 87         # No eventlet currently active, use real threads instead.
 88         executor = futurist.ThreadPoolExecutor(max_workers=5)
 89     try:
 90         e = engines.load(f, engine='parallel', executor=executor)
 91         for st in e.run_iter():
 92             print(st)
 93     finally:
 94         executor.shutdown()
 95 
 96     # Find the old rows and put them into place...
 97     #
 98     # TODO(harlowja): probably easier just to sort instead of search...
 99     computed_tbl = []
100     for i in compat_range(0, len(tbl)):
101         for t in f:
102             if t.index == i:
103                 computed_tbl.append(e.storage.get(t.name))
104 
105     # Do some basic validation (which causes the return code of this process
106     # to be different if things were not as expected...)
107     if len(computed_tbl) != len(tbl):
108         return 1
109     else:
110         return 0
111 
112 
113 if __name__ == "__main__":
114     sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

  1 
  2 import logging
  3 import os
  4 import sys
  5 
  6 logging.basicConfig(level=logging.ERROR)
  7 
  8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                        os.pardir,
 10                                        os.pardir))
 11 sys.path.insert(0, top_dir)
 12 
 13 import taskflow.engines
 14 from taskflow.patterns import linear_flow as lf
 15 from taskflow import task
 16 
 17 
 18 # INTRO: In this example a linear flow is used to group four tasks to calculate
 19 # a value. A single added task is used twice, showing how this can be done
 20 # and the twice added task takes in different bound values. In the first case
 21 # it uses default parameters ('x' and 'y') and in the second case arguments
 22 # are bound with ('z', 'd') keys from the engines internal storage mechanism.
 23 #
 24 # A multiplier task uses a binding that another task also provides, but this
 25 # example explicitly shows that 'z' parameter is bound with 'a' key
 26 # This shows that if a task depends on a key named the same as a key provided
 27 # from another task the name can be remapped to take the desired key from a
 28 # different origin.
 29 
 30 
 31 # This task provides some values from as a result of execution, this can be
 32 # useful when you want to provide values from a static set to other tasks that
 33 # depend on those values existing before those tasks can run.
 34 #
 35 # NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
 36 # that just provides those values on engine running by prepopulating the
 37 # storage backend before your tasks are ran (which accomplishes a similar goal
 38 # in a more uniform manner).
 39 class Provider(task.Task):
 40 
 41     def __init__(self, name, *args, **kwargs):
 42         super(Provider, self).__init__(name=name, **kwargs)
 43         self._provide = args
 44 
 45     def execute(self):
 46         return self._provide
 47 
 48 
 49 # This task adds two input variables and returns the result.
 50 #
 51 # Note that since this task does not have a revert() function (since addition
 52 # is a stateless operation) there are no side-effects that this function needs
 53 # to undo if some later operation fails.
 54 class Adder(task.Task):
 55     def execute(self, x, y):
 56         return x + y
 57 
 58 
 59 # This task multiplies an input variable by a multiplier and returns the
 60 # result.
 61 #
 62 # Note that since this task does not have a revert() function (since
 63 # multiplication is a stateless operation) and there are no side-effects that
 64 # this function needs to undo if some later operation fails.
 65 class Multiplier(task.Task):
 66     def __init__(self, name, multiplier, provides=None, rebind=None):
 67         super(Multiplier, self).__init__(name=name, provides=provides,
 68                                          rebind=rebind)
 69         self._multiplier = multiplier
 70 
 71     def execute(self, z):
 72         return z * self._multiplier
 73 
 74 
 75 # Note here that the ordering is established so that the correct sequences
 76 # of operations occurs where the adding and multiplying is done according
 77 # to the expected and typical mathematical model. A graph flow could also be
 78 # used here to automatically infer & ensure the correct ordering.
 79 flow = lf.Flow('root').add(
 80     # Provide the initial values for other tasks to depend on.
 81     #
 82     # x = 2, y = 3, d = 5
 83     Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
 84     # z = x+y = 5
 85     Adder("add-1", provides='z'),
 86     # a = z+d = 10
 87     Adder("add-2", provides='a', rebind=['z', 'd']),
 88     # Calculate 'r = a*3 = 30'
 89     #
 90     # Note here that the 'z' argument of the execute() function will not be
 91     # bound to the 'z' variable provided from the above 'provider' object but
 92     # instead the 'z' argument will be taken from the 'a' variable provided
 93     # by the second add-2 listed above.
 94     Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
 95 )
 96 
 97 # The result here will be all results (from all tasks) which is stored in an
 98 # in-memory storage location that backs this engine since it is not configured
 99 # with persistence storage.
100 results = taskflow.engines.run(flow)
101 print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

  1 
  2 import logging
  3 import os
  4 import sys
  5 
  6 logging.basicConfig(level=logging.ERROR)
  7 
  8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                        os.pardir,
 10                                        os.pardir))
 11 sys.path.insert(0, top_dir)
 12 
 13 import taskflow.engines
 14 from taskflow.patterns import graph_flow as gf
 15 from taskflow.patterns import linear_flow as lf
 16 from taskflow import task
 17 
 18 
 19 # In this example there are complex *inferred* dependencies between tasks that
 20 # are used to perform a simple set of linear equations.
 21 #
 22 # As you will see below the tasks just define what they require as input
 23 # and produce as output (named values). Then the user doesn't care about
 24 # ordering the tasks (in this case the tasks calculate pieces of the overall
 25 # equation).
 26 #
 27 # As you will notice a graph flow resolves dependencies automatically using the
 28 # tasks symbol requirements and provided symbol values and no orderin
 29 # dependency has to be manually created.
 30 #
 31 # Also notice that flows of any types can be nested into a graph flow; showing
 32 # that subflow dependencies (and associated ordering) will be inferred too.
 33 
 34 
 35 class Adder(task.Task):
 36 
 37     def execute(self, x, y):
 38         return x + y
 39 
 40 
 41 flow = gf.Flow('root').add(
 42     lf.Flow('nested_linear').add(
 43         # x2 = y3+y4 = 12
 44         Adder("add2", provides='x2', rebind=['y3', 'y4']),
 45         # x1 = y1+y2 = 4
 46         Adder("add1", provides='x1', rebind=['y1', 'y2'])
 47     ),
 48     # x5 = x1+x3 = 20
 49     Adder("add5", provides='x5', rebind=['x1', 'x3']),
 50     # x3 = x1+x2 = 16
 51     Adder("add3", provides='x3', rebind=['x1', 'x2']),
 52     # x4 = x2+y5 = 21
 53     Adder("add4", provides='x4', rebind=['x2', 'y5']),
 54     # x6 = x5+x4 = 41
 55     Adder("add6", provides='x6', rebind=['x5', 'x4']),
 56     # x7 = x6+x6 = 82
 57     Adder("add7", provides='x7', rebind=['x6', 'x6']))
 58 
 59 # Provide the initial variable inputs using a storage dictionary.
 60 store = {
 61     "y1": 1,
 62     "y2": 3,
 63     "y3": 5,
 64     "y4": 7,
 65     "y5": 9,
 66 }
 67 
 68 # This is the expected values that should be created.
 69 unexpected = 0
 70 expected = [
 71     ('x1', 4),
 72     ('x2', 12),
 73     ('x3', 16),
 74     ('x4', 21),
 75     ('x5', 20),
 76     ('x6', 41),
 77     ('x7', 82),
 78 ]
 79 
 80 result = taskflow.engines.run(
 81     flow, engine='serial', store=store)
 82 
 83 print("Single threaded engine result %s" % result)
 84 for (name, value) in expected:
 85     actual = result.get(name)
 86     if actual != value:
 87         sys.stderr.write("%s != %s\n" % (actual, value))
 88         unexpected += 1
 89 
 90 result = taskflow.engines.run(
 91     flow, engine='parallel', store=store)
 92 
 93 print("Multi threaded engine result %s" % result)
 94 for (name, value) in expected:
 95     actual = result.get(name)
 96     if actual != value:
 97         sys.stderr.write("%s != %s\n" % (actual, value))
 98         unexpected += 1
 99 
100 if unexpected:
101     sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow.patterns import unordered_flow as uf
16 from taskflow import task
17 
18 # INTRO: These examples show how a linear flow and an unordered flow can be
19 # used together to execute calculations in parallel and then use the
20 # result for the next task/s. The adder task is used for all calculations
21 # and argument bindings are used to set correct parameters for each task.
22 
23 
24 # This task provides some values from as a result of execution, this can be
25 # useful when you want to provide values from a static set to other tasks that
26 # depend on those values existing before those tasks can run.
27 #
28 # NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
29 # that provides those values on engine running by prepopulating the storage
30 # backend before your tasks are ran (which accomplishes a similar goal in a
31 # more uniform manner).
32 class Provider(task.Task):
33     def __init__(self, name, *args, **kwargs):
34         super(Provider, self).__init__(name=name, **kwargs)
35         self._provide = args
36 
37     def execute(self):
38         return self._provide
39 
40 
41 # This task adds two input variables and returns the result of that addition.
42 #
43 # Note that since this task does not have a revert() function (since addition
44 # is a stateless operation) there are no side-effects that this function needs
45 # to undo if some later operation fails.
46 class Adder(task.Task):
47     def execute(self, x, y):
48         return x + y
49 
50 
51 flow = lf.Flow('root').add(
52     # Provide the initial values for other tasks to depend on.
53     #
54     # x1 = 2, y1 = 3, x2 = 5, x3 = 8
55     Provider("provide-adder", 2, 3, 5, 8,
56              provides=('x1', 'y1', 'x2', 'y2')),
57     # Note here that we define the flow that contains the 2 adders to be an
58     # unordered flow since the order in which these execute does not matter,
59     # another way to solve this would be to use a graph_flow pattern, which
60     # also can run in parallel (since they have no ordering dependencies).
61     uf.Flow('adders').add(
62         # Calculate 'z1 = x1+y1 = 5'
63         #
64         # Rebind here means that the execute() function x argument will be
65         # satisfied from a previous output named 'x1', and the y argument
66         # of execute() will be populated from the previous output named 'y1'
67         #
68         # The output (result of adding) will be mapped into a variable named
69         # 'z1' which can then be refereed to and depended on by other tasks.
70         Adder(name="add", provides='z1', rebind=['x1', 'y1']),
71         # z2 = x2+y2 = 13
72         Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
73     ),
74     # r = z1+z2 = 18
75     Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
76 
77 
78 # The result here will be all results (from all tasks) which is stored in an
79 # in-memory storage location that backs this engine since it is not configured
80 # with persistence storage.
81 result = taskflow.engines.run(flow, engine='parallel')
82 print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1 
 2 import contextlib
 3 import logging
 4 import os
 5 import random
 6 import sys
 7 import time
 8 
 9 logging.basicConfig(level=logging.ERROR)
10 
11 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                        os.pardir,
13                                        os.pardir))
14 sys.path.insert(0, top_dir)
15 
16 from oslo_utils import reflection
17 
18 from taskflow import engines
19 from taskflow.listeners import printing
20 from taskflow.patterns import unordered_flow as uf
21 from taskflow import task
22 
23 # INTRO: These examples show how unordered_flow can be used to create a large
24 # number of fake volumes in parallel (or serially, depending on a constant that
25 # can be easily changed).
26 
27 
28 @contextlib.contextmanager
29 def show_time(name):
30     start = time.time()
31     yield
32     end = time.time()
33     print(" -- %s took %0.3f seconds" % (name, end - start))
34 
35 
36 # This affects how many volumes to create and how much time to *simulate*
37 # passing for that volume to be created.
38 MAX_CREATE_TIME = 3
39 VOLUME_COUNT = 5
40 
41 # This will be used to determine if all the volumes are created in parallel
42 # or whether the volumes are created serially (in an undefined ordered since
43 # a unordered flow is used). Note that there is a disconnection between the
44 # ordering and the concept of parallelism (since unordered items can still be
45 # ran in a serial ordering). A typical use-case for offering both is to allow
46 # for debugging using a serial approach, while when running at a larger scale
47 # one would likely want to use the parallel approach.
48 #
49 # If you switch this flag from serial to parallel you can see the overall
50 # time difference that this causes.
51 SERIAL = False
52 if SERIAL:
53     engine = 'serial'
54 else:
55     engine = 'parallel'
56 
57 
58 class VolumeCreator(task.Task):
59     def __init__(self, volume_id):
60         # Note here that the volume name is composed of the name of the class
61         # along with the volume id that is being created, since a name of a
62         # task uniquely identifies that task in storage it is important that
63         # the name be relevant and identifiable if the task is recreated for
64         # subsequent resumption (if applicable).
65         #
66         # UUIDs are *not* used as they can not be tied back to a previous tasks
67         # state on resumption (since they are unique and will vary for each
68         # task that is created). A name based off the volume id that is to be
69         # created is more easily tied back to the original task so that the
70         # volume create can be resumed/revert, and is much easier to use for
71         # audit and tracking purposes.
72         base_name = reflection.get_callable_name(self)
73         super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
74                                                             volume_id))
75         self._volume_id = volume_id
76 
77     def execute(self):
78         print("Making volume %s" % (self._volume_id))
79         time.sleep(random.random() * MAX_CREATE_TIME)
80         print("Finished making volume %s" % (self._volume_id))
81 
82 
83 # Assume there is no ordering dependency between volumes.
84 flow = uf.Flow("volume-maker")
85 for i in range(0, VOLUME_COUNT):
86     flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
87 
88 
89 # Show how much time the overall engine loading and running takes.
90 with show_time(name=flow.name.title()):
91     eng = engines.load(flow, engine=engine)
92     # This context manager automatically adds (and automatically removes) a
93     # helpful set of state transition notification printing helper utilities
94     # that show you exactly what transitions the engine is going through
95     # while running the various volume create tasks.
96     with printing.PrintingListener(eng):
97         eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

  1 
  2 import logging
  3 import os
  4 import sys
  5 
  6 logging.basicConfig(level=logging.ERROR)
  7 
  8 self_dir = os.path.abspath(os.path.dirname(__file__))
  9 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 10                                        os.pardir,
 11                                        os.pardir))
 12 sys.path.insert(0, top_dir)
 13 sys.path.insert(0, self_dir)
 14 
 15 # INTRO: These examples show a simplistic map/reduce implementation where
 16 # a set of mapper(s) will sum a series of input numbers (in parallel) and
 17 # return their individual summed result. A reducer will then use those
 18 # produced values and perform a final summation and this result will then be
 19 # printed (and verified to ensure the calculation was as expected).
 20 
 21 import six
 22 
 23 from taskflow import engines
 24 from taskflow.patterns import linear_flow
 25 from taskflow.patterns import unordered_flow
 26 from taskflow import task
 27 
 28 
 29 class SumMapper(task.Task):
 30     def execute(self, inputs):
 31         # Sums some set of provided inputs.
 32         return sum(inputs)
 33 
 34 
 35 class TotalReducer(task.Task):
 36     def execute(self, *args, **kwargs):
 37         # Reduces all mapped summed outputs into a single value.
 38         total = 0
 39         for (k, v) in six.iteritems(kwargs):
 40             # If any other kwargs was passed in, we don't want to use those
 41             # in the calculation of the total...
 42             if k.startswith('reduction_'):
 43                 total += v
 44         return total
 45 
 46 
 47 def chunk_iter(chunk_size, upperbound):
 48     """Yields back chunk size pieces from zero to upperbound - 1."""
 49     chunk = []
 50     for i in range(0, upperbound):
 51         chunk.append(i)
 52         if len(chunk) == chunk_size:
 53             yield chunk
 54             chunk = []
 55 
 56 
 57 # Upper bound of numbers to sum for example purposes...
 58 UPPER_BOUND = 10000
 59 
 60 # How many mappers we want to have.
 61 SPLIT = 10
 62 
 63 # How big of a chunk we want to give each mapper.
 64 CHUNK_SIZE = UPPER_BOUND // SPLIT
 65 
 66 # This will be the workflow we will compose and run.
 67 w = linear_flow.Flow("root")
 68 
 69 # The mappers will run in parallel.
 70 store = {}
 71 provided = []
 72 mappers = unordered_flow.Flow('map')
 73 for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
 74     mapper_name = 'mapper_%s' % i
 75     # Give that mapper some information to compute.
 76     store[mapper_name] = chunk
 77     # The reducer uses all of the outputs of the mappers, so it needs
 78     # to be recorded that it needs access to them (under a specific name).
 79     provided.append("reduction_%s" % i)
 80     mappers.add(SumMapper(name=mapper_name,
 81                           rebind={'inputs': mapper_name},
 82                           provides=provided[-1]))
 83 w.add(mappers)
 84 
 85 # The reducer will run last (after all the mappers).
 86 w.add(TotalReducer('reducer', requires=provided))
 87 
 88 # Now go!
 89 e = engines.load(w, engine='parallel', store=store, max_workers=4)
 90 print("Running a parallel engine with options: %s" % e.options)
 91 e.run()
 92 
 93 # Now get the result the reducer created.
 94 total = e.storage.get('reducer')
 95 print("Calculated result = %s" % total)
 96 
 97 # Calculate it manually to verify that it worked...
 98 calc_total = sum(range(0, UPPER_BOUND))
 99 if calc_total != total:
100     sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1 
 2 import logging
 3 import os
 4 import random
 5 import sys
 6 import time
 7 
 8 logging.basicConfig(level=logging.ERROR)
 9 
10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                        os.pardir,
12                                        os.pardir))
13 sys.path.insert(0, top_dir)
14 
15 import futurist
16 import six
17 
18 from taskflow import engines
19 from taskflow.patterns import unordered_flow as uf
20 from taskflow import task
21 from taskflow.utils import threading_utils as tu
22 
23 # INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
24 # run it using a shared thread pool executor to show how a single executor can
25 # be used with more than one engine (sharing the execution thread pool between
26 # them); this allows for saving resources and reusing threads in situations
27 # where this is benefical.
28 
29 
30 class DelayedTask(task.Task):
31     def __init__(self, name):
32         super(DelayedTask, self).__init__(name=name)
33         self._wait_for = random.random()
34 
35     def execute(self):
36         print("Running '%s' in thread '%s'" % (self.name, tu.get_ident()))
37         time.sleep(self._wait_for)
38 
39 
40 f1 = uf.Flow("f1")
41 f1.add(DelayedTask("f1-1"))
42 f1.add(DelayedTask("f1-2"))
43 
44 f2 = uf.Flow("f2")
45 f2.add(DelayedTask("f2-1"))
46 f2.add(DelayedTask("f2-2"))
47 
48 # Run them all using the same futures (thread-pool based) executor...
49 with futurist.ThreadPoolExecutor() as ex:
50     e1 = engines.load(f1, engine='parallel', executor=ex)
51     e2 = engines.load(f2, engine='parallel', executor=ex)
52     iters = [e1.run_iter(), e2.run_iter()]
53     # Iterate over a copy (so we can remove from the source list).
54     cloned_iters = list(iters)
55     while iters:
56         # Run a single 'step' of each iterator, forcing each engine to perform
57         # some work, then yield, and repeat until each iterator is consumed
58         # and there is no more engine work to be done.
59         for it in cloned_iters:
60             try:
61                 six.next(it)
62             except StopIteration:
63                 try:
64                     iters.remove(it)
65                 except ValueError:
66                     pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1 
  2 import json
  3 import logging
  4 import os
  5 import sys
  6 import time
  7 
  8 logging.basicConfig(level=logging.ERROR)
  9 
 10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                        os.pardir,
 12                                        os.pardir))
 13 sys.path.insert(0, top_dir)
 14 
 15 from oslo_utils import uuidutils
 16 
 17 from taskflow import engines
 18 from taskflow.listeners import printing
 19 from taskflow.patterns import graph_flow as gf
 20 from taskflow.patterns import linear_flow as lf
 21 from taskflow import task
 22 from taskflow.utils import misc
 23 
 24 # INTRO: This example walks through a miniature workflow which simulates
 25 # the reception of an API request, creation of a database entry, driver
 26 # activation (which invokes a 'fake' webservice) and final completion.
 27 #
 28 # This example also shows how a function/object (in this class the url sending)
 29 # that occurs during driver activation can update the progress of a task
 30 # without being aware of the internals of how to do this by associating a
 31 # callback that the url sending can update as the sending progresses from 0.0%
 32 # complete to 100% complete.
 33 
 34 
 35 class DB(object):
 36     def query(self, sql):
 37         print("Querying with: %s" % (sql))
 38 
 39 
 40 class UrlCaller(object):
 41     def __init__(self):
 42         self._send_time = 0.5
 43         self._chunks = 25
 44 
 45     def send(self, url, data, status_cb=None):
 46         sleep_time = float(self._send_time) / self._chunks
 47         for i in range(0, len(data)):
 48             time.sleep(sleep_time)
 49             # As we send the data, each chunk we 'fake' send will progress
 50             # the sending progress that much further to 100%.
 51             if status_cb:
 52                 status_cb(float(i) / len(data))
 53 
 54 
 55 # Since engines save the output of tasks to a optional persistent storage
 56 # backend resources have to be dealt with in a slightly different manner since
 57 # resources are transient and can *not* be persisted (or serialized). For tasks
 58 # that require access to a set of resources it is a common pattern to provide
 59 # a object (in this case this object) on construction of those tasks via the
 60 # task constructor.
 61 class ResourceFetcher(object):
 62     def __init__(self):
 63         self._db_handle = None
 64         self._url_handle = None
 65 
 66     @property
 67     def db_handle(self):
 68         if self._db_handle is None:
 69             self._db_handle = DB()
 70         return self._db_handle
 71 
 72     @property
 73     def url_handle(self):
 74         if self._url_handle is None:
 75             self._url_handle = UrlCaller()
 76         return self._url_handle
 77 
 78 
 79 class ExtractInputRequest(task.Task):
 80     def __init__(self, resources):
 81         super(ExtractInputRequest, self).__init__(provides="parsed_request")
 82         self._resources = resources
 83 
 84     def execute(self, request):
 85         return {
 86             'user': request.user,
 87             'user_id': misc.as_int(request.id),
 88             'request_id': uuidutils.generate_uuid(),
 89         }
 90 
 91 
 92 class MakeDBEntry(task.Task):
 93     def __init__(self, resources):
 94         super(MakeDBEntry, self).__init__()
 95         self._resources = resources
 96 
 97     def execute(self, parsed_request):
 98         db_handle = self._resources.db_handle
 99         db_handle.query("INSERT %s INTO mydb" % (parsed_request))
100 
101     def revert(self, result, parsed_request):
102         db_handle = self._resources.db_handle
103         db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
104 
105 
106 class ActivateDriver(task.Task):
107     def __init__(self, resources):
108         super(ActivateDriver, self).__init__(provides='sent_to')
109         self._resources = resources
110         self._url = "http://blahblah.com"
111 
112     def execute(self, parsed_request):
113         print("Sending billing data to %s" % (self._url))
114         url_sender = self._resources.url_handle
115         # Note that here we attach our update_progress function (which is a
116         # function that the engine also 'binds' to) to the progress function
117         # that the url sending helper class uses. This allows the task progress
118         # to be tied to the url sending progress, which is very useful for
119         # downstream systems to be aware of what a task is doing at any time.
120         url_sender.send(self._url, json.dumps(parsed_request),
121                         status_cb=self.update_progress)
122         return self._url
123 
124     def update_progress(self, progress, **kwargs):
125         # Override the parent method to also print out the status.
126         super(ActivateDriver, self).update_progress(progress, **kwargs)
127         print("%s is %0.2f%% done" % (self.name, progress * 100))
128 
129 
130 class DeclareSuccess(task.Task):
131     def execute(self, sent_to):
132         print("Done!")
133         print("All data processed and sent to %s" % (sent_to))
134 
135 
136 class DummyUser(object):
137     def __init__(self, user, id_):
138         self.user = user
139         self.id = id_
140 
141 
142 # Resources (db handles and similar) of course can *not* be persisted so we
143 # need to make sure that we pass this resource fetcher to the tasks constructor
144 # so that the tasks have access to any needed resources (the resources are
145 # lazily loaded so that they are only created when they are used).
146 resources = ResourceFetcher()
147 flow = lf.Flow("initialize-me")
148 
149 # 1. First we extract the api request into a usable format.
150 # 2. Then we go ahead and make a database entry for our request.
151 flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
152 
153 # 3. Then we activate our payment method and finally declare success.
154 sub_flow = gf.Flow("after-initialize")
155 sub_flow.add(ActivateDriver(resources), DeclareSuccess())
156 flow.add(sub_flow)
157 
158 # Initially populate the storage with the following request object,
159 # prepopulating this allows the tasks that dependent on the 'request' variable
160 # to start processing (in this case this is the ExtractInputRequest task).
161 store = {
162     'request': DummyUser(user="bob", id_="1.35"),
163 }
164 eng = engines.load(flow, engine='serial', store=store)
165 
166 # This context manager automatically adds (and automatically removes) a
167 # helpful set of state transition notification printing helper utilities
168 # that show you exactly what transitions the engine is going through
169 # while running the various billing related tasks.
170 with printing.PrintingListener(eng):
171     eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1 
  2 import contextlib
  3 import logging
  4 import os
  5 import sys
  6 
  7 logging.basicConfig(level=logging.ERROR)
  8 
  9 self_dir = os.path.abspath(os.path.dirname(__file__))
 10 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                        os.pardir,
 12                                        os.pardir))
 13 sys.path.insert(0, top_dir)
 14 sys.path.insert(0, self_dir)
 15 
 16 from oslo_utils import uuidutils
 17 
 18 import taskflow.engines
 19 from taskflow.patterns import linear_flow as lf
 20 from taskflow.persistence import models
 21 from taskflow import task
 22 
 23 import example_utils as eu  # noqa
 24 
 25 # INTRO: In this example linear_flow is used to group three tasks, one which
 26 # will suspend the future work the engine may do. This suspend engine is then
 27 # discarded and the workflow is reloaded from the persisted data and then the
 28 # workflow is resumed from where it was suspended. This allows you to see how
 29 # to start an engine, have a task stop the engine from doing future work (if
 30 # a multi-threaded engine is being used, then the currently active work is not
 31 # preempted) and then resume the work later.
 32 #
 33 # Usage:
 34 #
 35 #   With a filesystem directory as backend
 36 #
 37 #     python taskflow/examples/resume_from_backend.py
 38 #
 39 #   With ZooKeeper as backend
 40 #
 41 #     python taskflow/examples/resume_from_backend.py \
 42 #       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 43 
 44 
 45 # UTILITY FUNCTIONS #########################################
 46 
 47 
 48 def print_task_states(flowdetail, msg):
 49     eu.print_wrapped(msg)
 50     print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
 51     # Sort by these so that our test validation doesn't get confused by the
 52     # order in which the items in the flow detail can be in.
 53     items = sorted((td.name, td.version, td.state, td.results)
 54                    for td in flowdetail)
 55     for item in items:
 56         print(" %s==%s: %s, result=%s" % item)
 57 
 58 
 59 def find_flow_detail(backend, lb_id, fd_id):
 60     conn = backend.get_connection()
 61     lb = conn.get_logbook(lb_id)
 62     return lb.find(fd_id)
 63 
 64 
 65 # CREATE FLOW ###############################################
 66 
 67 
 68 class InterruptTask(task.Task):
 69     def execute(self):
 70         # DO NOT TRY THIS AT HOME
 71         engine.suspend()
 72 
 73 
 74 class TestTask(task.Task):
 75     def execute(self):
 76         print('executing %s' % self)
 77         return 'ok'
 78 
 79 
 80 def flow_factory():
 81     return lf.Flow('resume from backend example').add(
 82         TestTask(name='first'),
 83         InterruptTask(name='boom'),
 84         TestTask(name='second'))
 85 
 86 
 87 # INITIALIZE PERSISTENCE ####################################
 88 
 89 with eu.get_backend() as backend:
 90 
 91     # Create a place where the persistence information will be stored.
 92     book = models.LogBook("example")
 93     flow_detail = models.FlowDetail("resume from backend example",
 94                                     uuid=uuidutils.generate_uuid())
 95     book.add(flow_detail)
 96     with contextlib.closing(backend.get_connection()) as conn:
 97         conn.save_logbook(book)
 98 
 99     # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100 
101     flow = flow_factory()
102     engine = taskflow.engines.load(flow, flow_detail=flow_detail,
103                                    book=book, backend=backend)
104 
105     print_task_states(flow_detail, "At the beginning, there is no state")
106     eu.print_wrapped("Running")
107     engine.run()
108     print_task_states(flow_detail, "After running")
109 
110     # RE-CREATE, RESUME, RUN ####################################
111 
112     eu.print_wrapped("Resuming and running again")
113 
114     # NOTE(harlowja): reload the flow detail from backend, this will allow us
115     # to resume the flow from its suspended state, but first we need to search
116     # for the right flow details in the correct logbook where things are
117     # stored.
118     #
119     # We could avoid re-loading the engine and just do engine.run() again, but
120     # this example shows how another process may unsuspend a given flow and
121     # start it again for situations where this is useful to-do (say the process
122     # running the above flow crashes).
123     flow2 = flow_factory()
124     flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
125     engine2 = taskflow.engines.load(flow2,
126                                     flow_detail=flow_detail_2,
127                                     backend=backend, book=book)
128     engine2.run()
129     print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1 
  2 import contextlib
  3 import hashlib
  4 import logging
  5 import os
  6 import random
  7 import sys
  8 import time
  9 
 10 logging.basicConfig(level=logging.ERROR)
 11 
 12 self_dir = os.path.abspath(os.path.dirname(__file__))
 13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                        os.pardir,
 15                                        os.pardir))
 16 sys.path.insert(0, top_dir)
 17 sys.path.insert(0, self_dir)
 18 
 19 import futurist
 20 from oslo_utils import uuidutils
 21 
 22 from taskflow import engines
 23 from taskflow import exceptions as exc
 24 from taskflow.patterns import graph_flow as gf
 25 from taskflow.patterns import linear_flow as lf
 26 from taskflow.persistence import models
 27 from taskflow import task
 28 
 29 import example_utils as eu  # noqa
 30 
 31 # INTRO: These examples show how a hierarchy of flows can be used to create a
 32 # vm in a reliable & resumable manner using taskflow + a miniature version of
 33 # what nova does while booting a vm.
 34 
 35 
 36 @contextlib.contextmanager
 37 def slow_down(how_long=0.5):
 38     try:
 39         yield how_long
 40     finally:
 41         if len(sys.argv) > 1:
 42             # Only both to do this if user input provided.
 43             print("** Ctrl-c me please!!! **")
 44             time.sleep(how_long)
 45 
 46 
 47 class PrintText(task.Task):
 48     """Just inserts some text print outs in a workflow."""
 49     def __init__(self, print_what, no_slow=False):
 50         content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 51         super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 52         self._text = print_what
 53         self._no_slow = no_slow
 54 
 55     def execute(self):
 56         if self._no_slow:
 57             eu.print_wrapped(self._text)
 58         else:
 59             with slow_down():
 60                 eu.print_wrapped(self._text)
 61 
 62 
 63 class DefineVMSpec(task.Task):
 64     """Defines a vm specification to be."""
 65     def __init__(self, name):
 66         super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)
 67 
 68     def execute(self):
 69         return {
 70             'type': 'kvm',
 71             'disks': 2,
 72             'vcpu': 1,
 73             'ips': 1,
 74             'volumes': 3,
 75         }
 76 
 77 
 78 class LocateImages(task.Task):
 79     """Locates where the vm images are."""
 80     def __init__(self, name):
 81         super(LocateImages, self).__init__(provides='image_locations',
 82                                            name=name)
 83 
 84     def execute(self, vm_spec):
 85         image_locations = {}
 86         for i in range(0, vm_spec['disks']):
 87             url = "http://www.yahoo.com/images/%s" % (i)
 88             image_locations[url] = "/tmp/%s.img" % (i)
 89         return image_locations
 90 
 91 
 92 class DownloadImages(task.Task):
 93     """Downloads all the vm images."""
 94     def __init__(self, name):
 95         super(DownloadImages, self).__init__(provides='download_paths',
 96                                              name=name)
 97 
 98     def execute(self, image_locations):
 99         for src, loc in image_locations.items():
100             with slow_down(1):
101                 print("Downloading from %s => %s" % (src, loc))
102         return sorted(image_locations.values())
103 
104 
105 class CreateNetworkTpl(task.Task):
106     """Generates the network settings file to be placed in the images."""
107     SYSCONFIG_CONTENTS = """DEVICE=eth%s
108 BOOTPROTO=static
109 IPADDR=%s
110 ONBOOT=yes"""
111 
112     def __init__(self, name):
113         super(CreateNetworkTpl, self).__init__(provides='network_settings',
114                                                name=name)
115 
116     def execute(self, ips):
117         settings = []
118         for i, ip in enumerate(ips):
119             settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120         return settings
121 
122 
123 class AllocateIP(task.Task):
124     """Allocates the ips for the given vm."""
125     def __init__(self, name):
126         super(AllocateIP, self).__init__(provides='ips', name=name)
127 
128     def execute(self, vm_spec):
129         ips = []
130         for _i in range(0, vm_spec.get('ips', 0)):
131             ips.append("192.168.0.%s" % (random.randint(1, 254)))
132         return ips
133 
134 
135 class WriteNetworkSettings(task.Task):
136     """Writes all the network settings into the downloaded images."""
137     def execute(self, download_paths, network_settings):
138         for j, path in enumerate(download_paths):
139             with slow_down(1):
140                 print("Mounting %s to /tmp/%s" % (path, j))
141             for i, setting in enumerate(network_settings):
142                 filename = ("/tmp/etc/sysconfig/network-scripts/"
143                             "ifcfg-eth%s" % (i))
144                 with slow_down(1):
145                     print("Writing to %s" % (filename))
146                     print(setting)
147 
148 
149 class BootVM(task.Task):
150     """Fires off the vm boot operation."""
151     def execute(self, vm_spec):
152         print("Starting vm!")
153         with slow_down(1):
154             print("Created: %s" % (vm_spec))
155 
156 
157 class AllocateVolumes(task.Task):
158     """Allocates the volumes for the vm."""
159     def execute(self, vm_spec):
160         volumes = []
161         for i in range(0, vm_spec['volumes']):
162             with slow_down(1):
163                 volumes.append("/dev/vda%s" % (i + 1))
164                 print("Allocated volume %s" % volumes[-1])
165         return volumes
166 
167 
168 class FormatVolumes(task.Task):
169     """Formats the volumes for the vm."""
170     def execute(self, volumes):
171         for v in volumes:
172             print("Formatting volume %s" % v)
173             with slow_down(1):
174                 pass
175             print("Formatted volume %s" % v)
176 
177 
178 def create_flow():
179     # Setup the set of things to do (mini-nova).
180     flow = lf.Flow("root").add(
181         PrintText("Starting vm creation.", no_slow=True),
182         lf.Flow('vm-maker').add(
183             # First create a specification for the final vm to-be.
184             DefineVMSpec("define_spec"),
185             # This does all the image stuff.
186             gf.Flow("img-maker").add(
187                 LocateImages("locate_images"),
188                 DownloadImages("download_images"),
189             ),
190             # This does all the network stuff.
191             gf.Flow("net-maker").add(
192                 AllocateIP("get_my_ips"),
193                 CreateNetworkTpl("fetch_net_settings"),
194                 WriteNetworkSettings("write_net_settings"),
195             ),
196             # This does all the volume stuff.
197             gf.Flow("volume-maker").add(
198                 AllocateVolumes("allocate_my_volumes", provides='volumes'),
199                 FormatVolumes("volume_formatter"),
200             ),
201             # Finally boot it all.
202             BootVM("boot-it"),
203         ),
204         # Ya it worked!
205         PrintText("Finished vm create.", no_slow=True),
206         PrintText("Instance is running!", no_slow=True))
207     return flow
208 
209 eu.print_wrapped("Initializing")
210 
211 # Setup the persistence & resumption layer.
212 with eu.get_backend() as backend:
213 
214     # Try to find a previously passed in tracking id...
215     try:
216         book_id, flow_id = sys.argv[2].split("+", 1)
217         if not uuidutils.is_uuid_like(book_id):
218             book_id = None
219         if not uuidutils.is_uuid_like(flow_id):
220             flow_id = None
221     except (IndexError, ValueError):
222         book_id = None
223         flow_id = None
224 
225     # Set up how we want our engine to run, serial, parallel...
226     try:
227         executor = futurist.GreenThreadPoolExecutor(max_workers=5)
228     except RuntimeError:
229         # No eventlet installed, just let the default be used instead.
230         executor = None
231 
232     # Create/fetch a logbook that will track the workflows work.
233     book = None
234     flow_detail = None
235     if all([book_id, flow_id]):
236         # Try to find in a prior logbook and flow detail...
237         with contextlib.closing(backend.get_connection()) as conn:
238             try:
239                 book = conn.get_logbook(book_id)
240                 flow_detail = book.find(flow_id)
241             except exc.NotFound:
242                 pass
243     if book is None and flow_detail is None:
244         book = models.LogBook("vm-boot")
245         with contextlib.closing(backend.get_connection()) as conn:
246             conn.save_logbook(book)
247         engine = engines.load_from_factory(create_flow,
248                                            backend=backend, book=book,
249                                            engine='parallel',
250                                            executor=executor)
251         print("!! Your tracking id is: '%s+%s'" % (book.uuid,
252                                                    engine.storage.flow_uuid))
253         print("!! Please submit this on later runs for tracking purposes")
254     else:
255         # Attempt to load from a previously partially completed flow.
256         engine = engines.load_from_detail(flow_detail, backend=backend,
257                                           engine='parallel', executor=executor)
258 
259     # Make me my vm please!
260     eu.print_wrapped('Running')
261     engine.run()
262 
263 # How to use.
264 #
265 # 1. $ python me.py "sqlite:////tmp/nova.db"
266 # 2. ctrl-c before this finishes
267 # 3. Find the tracking id (search for 'Your tracking id is')
268 # 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
269 # 5. Watch it pick up where it left off.
270 # 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1 
  2 import contextlib
  3 import hashlib
  4 import logging
  5 import os
  6 import random
  7 import sys
  8 import time
  9 
 10 logging.basicConfig(level=logging.ERROR)
 11 
 12 self_dir = os.path.abspath(os.path.dirname(__file__))
 13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                        os.pardir,
 15                                        os.pardir))
 16 sys.path.insert(0, top_dir)
 17 sys.path.insert(0, self_dir)
 18 
 19 from oslo_utils import uuidutils
 20 
 21 from taskflow import engines
 22 from taskflow.patterns import graph_flow as gf
 23 from taskflow.patterns import linear_flow as lf
 24 from taskflow.persistence import models
 25 from taskflow import task
 26 
 27 import example_utils  # noqa
 28 
 29 # INTRO: These examples show how a hierarchy of flows can be used to create a
 30 # pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 31 # version of what cinder does while creating a volume (very miniature).
 32 
 33 
 34 @contextlib.contextmanager
 35 def slow_down(how_long=0.5):
 36     try:
 37         yield how_long
 38     finally:
 39         print("** Ctrl-c me please!!! **")
 40         time.sleep(how_long)
 41 
 42 
 43 def find_flow_detail(backend, book_id, flow_id):
 44     # NOTE(harlowja): this is used to attempt to find a given logbook with
 45     # a given id and a given flow details inside that logbook, we need this
 46     # reference so that we can resume the correct flow (as a logbook tracks
 47     # flows and a flow detail tracks a individual flow).
 48     #
 49     # Without a reference to the logbook and the flow details in that logbook
 50     # we will not know exactly what we should resume and that would mean we
 51     # can't resume what we don't know.
 52     with contextlib.closing(backend.get_connection()) as conn:
 53         lb = conn.get_logbook(book_id)
 54         return lb.find(flow_id)
 55 
 56 
 57 class PrintText(task.Task):
 58     def __init__(self, print_what, no_slow=False):
 59         content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 60         super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 61         self._text = print_what
 62         self._no_slow = no_slow
 63 
 64     def execute(self):
 65         if self._no_slow:
 66             print("-" * (len(self._text)))
 67             print(self._text)
 68             print("-" * (len(self._text)))
 69         else:
 70             with slow_down():
 71                 print("-" * (len(self._text)))
 72                 print(self._text)
 73                 print("-" * (len(self._text)))
 74 
 75 
 76 class CreateSpecForVolumes(task.Task):
 77     def execute(self):
 78         volumes = []
 79         for i in range(0, random.randint(1, 10)):
 80             volumes.append({
 81                 'type': 'disk',
 82                 'location': "/dev/vda%s" % (i + 1),
 83             })
 84         return volumes
 85 
 86 
 87 class PrepareVolumes(task.Task):
 88     def execute(self, volume_specs):
 89         for v in volume_specs:
 90             with slow_down():
 91                 print("Dusting off your hard drive %s" % (v))
 92             with slow_down():
 93                 print("Taking a well deserved break.")
 94             print("Your drive %s has been certified." % (v))
 95 
 96 
 97 # Setup the set of things to do (mini-cinder).
 98 flow = lf.Flow("root").add(
 99     PrintText("Starting volume create", no_slow=True),
100     gf.Flow('maker').add(
101         CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102         PrintText("I need a nap, it took me a while to build those specs."),
103         PrepareVolumes(),
104     ),
105     PrintText("Finished volume create", no_slow=True))
106 
107 # Setup the persistence & resumption layer.
108 with example_utils.get_backend() as backend:
109     try:
110         book_id, flow_id = sys.argv[2].split("+", 1)
111     except (IndexError, ValueError):
112         book_id = None
113         flow_id = None
114 
115     if not all([book_id, flow_id]):
116         # If no 'tracking id' (think a fedex or ups tracking id) is provided
117         # then we create one by creating a logbook (where flow details are
118         # stored) and creating a flow detail (where flow and task state is
119         # stored). The combination of these 2 objects unique ids (uuids) allows
120         # the users of taskflow to reassociate the workflows that were
121         # potentially running (and which may have partially completed) back
122         # with taskflow so that those workflows can be resumed (or reverted)
123         # after a process/thread/engine has failed in someway.
124         book = models.LogBook('resume-volume-create')
125         flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
126         book.add(flow_detail)
127         with contextlib.closing(backend.get_connection()) as conn:
128             conn.save_logbook(book)
129         print("!! Your tracking id is: '%s+%s'" % (book.uuid,
130                                                    flow_detail.uuid))
131         print("!! Please submit this on later runs for tracking purposes")
132     else:
133         flow_detail = find_flow_detail(backend, book_id, flow_id)
134 
135     # Load and run.
136     engine = engines.load(flow,
137                           flow_detail=flow_detail,
138                           backend=backend, engine='serial')
139     engine.run()
140 
141 # How to use.
142 #
143 # 1. $ python me.py "sqlite:////tmp/cinder.db"
144 # 2. ctrl-c before this finishes
145 # 3. Find the tracking id (search for 'Your tracking id is')
146 # 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
147 # 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 import six
 7 
 8 logging.basicConfig(level=logging.ERROR)
 9 
10 self_dir = os.path.abspath(os.path.dirname(__file__))
11 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                        os.pardir,
13                                        os.pardir))
14 sys.path.insert(0, top_dir)
15 sys.path.insert(0, self_dir)
16 
17 
18 from taskflow import engines
19 from taskflow.patterns import linear_flow as lf
20 from taskflow import task
21 
22 
23 # INTRO: This example shows how to run a set of engines at the same time, each
24 # running in different engines using a single thread of control to iterate over
25 # each engine (which causes that engine to advanced to its next state during
26 # each iteration).
27 
28 
29 class EchoTask(task.Task):
30     def execute(self, value):
31         print(value)
32         return chr(ord(value) + 1)
33 
34 
35 def make_alphabet_flow(i):
36     f = lf.Flow("alphabet_%s" % (i))
37     start_value = 'A'
38     end_value = 'Z'
39     curr_value = start_value
40     while ord(curr_value) <= ord(end_value):
41         next_value = chr(ord(curr_value) + 1)
42         if curr_value != end_value:
43             f.add(EchoTask(name="echoer_%s" % curr_value,
44                            rebind={'value': curr_value},
45                            provides=next_value))
46         else:
47             f.add(EchoTask(name="echoer_%s" % curr_value,
48                            rebind={'value': curr_value}))
49         curr_value = next_value
50     return f
51 
52 
53 # Adjust this number to change how many engines/flows run at once.
54 flow_count = 1
55 flows = []
56 for i in range(0, flow_count):
57     f = make_alphabet_flow(i + 1)
58     flows.append(make_alphabet_flow(i + 1))
59 engine_iters = []
60 for f in flows:
61     e = engines.load(f)
62     e.compile()
63     e.storage.inject({'A': 'A'})
64     e.prepare()
65     engine_iters.append(e.run_iter())
66 while engine_iters:
67     for it in list(engine_iters):
68         try:
69             print(six.next(it))
70         except StopIteration:
71             engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1 
 2 import logging
 3 import os
 4 import sys
 5 
 6 logging.basicConfig(level=logging.ERROR)
 7 
 8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                        os.pardir,
10                                        os.pardir))
11 sys.path.insert(0, top_dir)
12 
13 import taskflow.engines
14 from taskflow.patterns import linear_flow as lf
15 from taskflow import retry
16 from taskflow import task
17 
18 # INTRO: In this example we create a retry controller that receives a phone
19 # directory and tries different phone numbers. The next task tries to call Jim
20 # using the given number. If it is not a Jim's number, the task raises an
21 # exception and retry controller takes the next number from the phone
22 # directory and retries the call.
23 #
24 # This example shows a basic usage of retry controllers in a flow.
25 # Retry controllers allows to revert and retry a failed subflow with new
26 # parameters.
27 
28 
29 class CallJim(task.Task):
30     def execute(self, jim_number):
31         print("Calling jim %s." % jim_number)
32         if jim_number != 555:
33             raise Exception("Wrong number!")
34         else:
35             print("Hello Jim!")
36 
37     def revert(self, jim_number, **kwargs):
38         print("Wrong number, apologizing.")
39 
40 
41 # Create your flow and associated tasks (the work to be done).
42 flow = lf.Flow('retrying-linear',
43                retry=retry.ParameterizedForEach(
44                    rebind=['phone_directory'],
45                    provides='jim_number')).add(CallJim())
46 
47 # Now run that flow using the provided initial data (store below).
48 taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1 
  2 import json
  3 import logging
  4 import os
  5 import sys
  6 import tempfile
  7 
  8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                        os.pardir,
 10                                        os.pardir))
 11 sys.path.insert(0, top_dir)
 12 
 13 from taskflow import engines
 14 from taskflow.engines.worker_based import worker
 15 from taskflow.patterns import linear_flow as lf
 16 from taskflow.tests import utils
 17 from taskflow.utils import threading_utils
 18 
 19 import example_utils  # noqa
 20 
 21 # INTRO: This example walks through a miniature workflow which shows how to
 22 # start up a number of workers (these workers will process task execution and
 23 # reversion requests using any provided input data) and then use an engine
 24 # that creates a set of *capable* tasks and flows (the engine can not create
 25 # tasks that the workers are not able to run, this will end in failure) that
 26 # those workers will run and then executes that workflow seamlessly using the
 27 # workers to perform the actual execution.
 28 #
 29 # NOTE(harlowja): this example simulates the expected larger number of workers
 30 # by using a set of threads (which in this example simulate the remote workers
 31 # that would typically be running on other external machines).
 32 
 33 # A filesystem can also be used as the queue transport (useful as simple
 34 # transport type that does not involve setting up a larger mq system). If this
 35 # is false then the memory transport is used instead, both work in standalone
 36 # setups.
 37 USE_FILESYSTEM = False
 38 BASE_SHARED_CONF = {
 39     'exchange': 'taskflow',
 40 }
 41 
 42 # Until https://github.com/celery/kombu/issues/398 is resolved it is not
 43 # recommended to run many worker threads in this example due to the types
 44 # of errors mentioned in that issue.
 45 MEMORY_WORKERS = 2
 46 FILE_WORKERS = 1
 47 WORKER_CONF = {
 48     # These are the tasks the worker can execute, they *must* be importable,
 49     # typically this list is used to restrict what workers may execute to
 50     # a smaller set of *allowed* tasks that are known to be safe (one would
 51     # not want to allow all python code to be executed).
 52     'tasks': [
 53         'taskflow.tests.utils:TaskOneArgOneReturn',
 54         'taskflow.tests.utils:TaskMultiArgOneReturn'
 55     ],
 56 }
 57 
 58 
 59 def run(engine_options):
 60     flow = lf.Flow('simple-linear').add(
 61         utils.TaskOneArgOneReturn(provides='result1'),
 62         utils.TaskMultiArgOneReturn(provides='result2')
 63     )
 64     eng = engines.load(flow,
 65                        store=dict(x=111, y=222, z=333),
 66                        engine='worker-based', **engine_options)
 67     eng.run()
 68     return eng.storage.fetch_all()
 69 
 70 
 71 if __name__ == "__main__":
 72     logging.basicConfig(level=logging.ERROR)
 73 
 74     # Setup our transport configuration and merge it into the worker and
 75     # engine configuration so that both of those use it correctly.
 76     shared_conf = dict(BASE_SHARED_CONF)
 77 
 78     tmp_path = None
 79     if USE_FILESYSTEM:
 80         worker_count = FILE_WORKERS
 81         tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 82         shared_conf.update({
 83             'transport': 'filesystem',
 84             'transport_options': {
 85                 'data_folder_in': tmp_path,
 86                 'data_folder_out': tmp_path,
 87                 'polling_interval': 0.1,
 88             },
 89         })
 90     else:
 91         worker_count = MEMORY_WORKERS
 92         shared_conf.update({
 93             'transport': 'memory',
 94             'transport_options': {
 95                 'polling_interval': 0.1,
 96             },
 97         })
 98     worker_conf = dict(WORKER_CONF)
 99     worker_conf.update(shared_conf)
100     engine_options = dict(shared_conf)
101     workers = []
102     worker_topics = []
103 
104     try:
105         # Create a set of workers to simulate actual remote workers.
106         print('Running %s workers.' % (worker_count))
107         for i in range(0, worker_count):
108             worker_conf['topic'] = 'worker-%s' % (i + 1)
109             worker_topics.append(worker_conf['topic'])
110             w = worker.Worker(**worker_conf)
111             runner = threading_utils.daemon_thread(w.run)
112             runner.start()
113             w.wait()
114             workers.append((runner, w.stop))
115 
116         # Now use those workers to do something.
117         print('Executing some work.')
118         engine_options['topics'] = worker_topics
119         result = run(engine_options)
120         print('Execution finished.')
121         # This is done so that the test examples can work correctly
122         # even when the keys change order (which will happen in various
123         # python versions).
124         print("Result = %s" % json.dumps(result, sort_keys=True))
125     finally:
126         # And cleanup.
127         print('Stopping workers.')
128         while workers:
129             r, stopper = workers.pop()
130             stopper()
131             r.join()
132         if tmp_path:
133             example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1 
  2 import logging
  3 import os
  4 import string
  5 import sys
  6 import time
  7 
  8 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                        os.pardir,
 10                                        os.pardir))
 11 sys.path.insert(0, top_dir)
 12 
 13 from six.moves import range as compat_range
 14 
 15 from taskflow import engines
 16 from taskflow.engines.worker_based import worker
 17 from taskflow.patterns import linear_flow as lf
 18 from taskflow import task
 19 from taskflow.types import notifier
 20 from taskflow.utils import threading_utils
 21 
 22 ANY = notifier.Notifier.ANY
 23 
 24 # INTRO: These examples show how to use a remote worker's event notification
 25 # attribute to proxy back task event notifications to the controlling process.
 26 #
 27 # In this case a simple set of events is triggered by a worker running a
 28 # task (simulated to be remote by using a kombu memory transport and threads).
 29 # Those events that the 'remote worker' produces will then be proxied back to
 30 # the task that the engine is running 'remotely', and then they will be emitted
 31 # back to the original callbacks that exist in the originating engine
 32 # process/thread. This creates a one-way *notification* channel that can
 33 # transparently be used in-process, outside-of-process using remote workers and
 34 # so-on that allows tasks to signal to its controlling process some sort of
 35 # action that has occurred that the task may need to tell others about (for
 36 # example to trigger some type of response when the task reaches 50% done...).
 37 
 38 
 39 def event_receiver(event_type, details):
 40     """This is the callback that (in this example) doesn't do much..."""
 41     print("Recieved event '%s'" % event_type)
 42     print("Details = %s" % details)
 43 
 44 
 45 class EventReporter(task.Task):
 46     """This is the task that will be running 'remotely' (not really remote)."""
 47 
 48     EVENTS = tuple(string.ascii_uppercase)
 49     EVENT_DELAY = 0.1
 50 
 51     def execute(self):
 52         for i, e in enumerate(self.EVENTS):
 53             details = {
 54                 'leftover': self.EVENTS[i:],
 55             }
 56             self.notifier.notify(e, details)
 57             time.sleep(self.EVENT_DELAY)
 58 
 59 
 60 BASE_SHARED_CONF = {
 61     'exchange': 'taskflow',
 62     'transport': 'memory',
 63     'transport_options': {
 64         'polling_interval': 0.1,
 65     },
 66 }
 67 
 68 # Until https://github.com/celery/kombu/issues/398 is resolved it is not
 69 # recommended to run many worker threads in this example due to the types
 70 # of errors mentioned in that issue.
 71 MEMORY_WORKERS = 1
 72 WORKER_CONF = {
 73     'tasks': [
 74         # Used to locate which tasks we can run (we don't want to allow
 75         # arbitrary code/tasks to be ran by any worker since that would
 76         # open up a variety of vulnerabilities).
 77         '%s:EventReporter' % (__name__),
 78     ],
 79 }
 80 
 81 
 82 def run(engine_options):
 83     reporter = EventReporter()
 84     reporter.notifier.register(ANY, event_receiver)
 85     flow = lf.Flow('event-reporter').add(reporter)
 86     eng = engines.load(flow, engine='worker-based', **engine_options)
 87     eng.run()
 88 
 89 
 90 if __name__ == "__main__":
 91     logging.basicConfig(level=logging.ERROR)
 92 
 93     # Setup our transport configuration and merge it into the worker and
 94     # engine configuration so that both of those objects use it correctly.
 95     worker_conf = dict(WORKER_CONF)
 96     worker_conf.update(BASE_SHARED_CONF)
 97     engine_options = dict(BASE_SHARED_CONF)
 98     workers = []
 99 
100     # These topics will be used to request worker information on; those
101     # workers will respond with their capabilities which the executing engine
102     # will use to match pending tasks to a matched worker, this will cause
103     # the task to be sent for execution, and the engine will wait until it
104     # is finished (a response is received) and then the engine will either
105     # continue with other tasks, do some retry/failure resolution logic or
106     # stop (and potentially re-raise the remote workers failure)...
107     worker_topics = []
108 
109     try:
110         # Create a set of worker threads to simulate actual remote workers...
111         print('Running %s workers.' % (MEMORY_WORKERS))
112         for i in compat_range(0, MEMORY_WORKERS):
113             # Give each one its own unique topic name so that they can
114             # correctly communicate with the engine (they will all share the
115             # same exchange).
116             worker_conf['topic'] = 'worker-%s' % (i + 1)
117             worker_topics.append(worker_conf['topic'])
118             w = worker.Worker(**worker_conf)
119             runner = threading_utils.daemon_thread(w.run)
120             runner.start()
121             w.wait()
122             workers.append((runner, w.stop))
123 
124         # Now use those workers to do something.
125         print('Executing some work.')
126         engine_options['topics'] = worker_topics
127         result = run(engine_options)
128         print('Execution finished.')
129     finally:
130         # And cleanup.
131         print('Stopping workers.')
132         while workers:
133             r, stopper = workers.pop()
134             stopper()
135             r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1 
  2 import logging
  3 import math
  4 import os
  5 import sys
  6 
  7 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  8                                        os.pardir,
  9                                        os.pardir))
 10 sys.path.insert(0, top_dir)
 11 
 12 from six.moves import range as compat_range
 13 
 14 from taskflow import engines
 15 from taskflow.engines.worker_based import worker
 16 from taskflow.patterns import unordered_flow as uf
 17 from taskflow import task
 18 from taskflow.utils import threading_utils
 19 
 20 # INTRO: This example walks through a workflow that will in parallel compute
 21 # a mandelbrot result set (using X 'remote' workers) and then combine their
 22 # results together to form a final mandelbrot fractal image. It shows a usage
 23 # of taskflow to perform a well-known embarrassingly parallel problem that has
 24 # the added benefit of also being an elegant visualization.
 25 #
 26 # NOTE(harlowja): this example simulates the expected larger number of workers
 27 # by using a set of threads (which in this example simulate the remote workers
 28 # that would typically be running on other external machines).
 29 #
 30 # NOTE(harlowja): to have it produce an image run (after installing pillow):
 31 #
 32 # $ python taskflow/examples/wbe_mandelbrot.py output.png
 33 
 34 BASE_SHARED_CONF = {
 35     'exchange': 'taskflow',
 36 }
 37 WORKERS = 2
 38 WORKER_CONF = {
 39     # These are the tasks the worker can execute, they *must* be importable,
 40     # typically this list is used to restrict what workers may execute to
 41     # a smaller set of *allowed* tasks that are known to be safe (one would
 42     # not want to allow all python code to be executed).
 43     'tasks': [
 44         '%s:MandelCalculator' % (__name__),
 45     ],
 46 }
 47 ENGINE_CONF = {
 48     'engine': 'worker-based',
 49 }
 50 
 51 # Mandelbrot & image settings...
 52 IMAGE_SIZE = (512, 512)
 53 CHUNK_COUNT = 8
 54 MAX_ITERATIONS = 25
 55 
 56 
 57 class MandelCalculator(task.Task):
 58     def execute(self, image_config, mandelbrot_config, chunk):
 59         """Returns the number of iterations before the computation "escapes".
 60 
 61         Given the real and imaginary parts of a complex number, determine if it
 62         is a candidate for membership in the mandelbrot set given a fixed
 63         number of iterations.
 64         """
 65 
 66         # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 67         #
 68         # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 69         def mandelbrot(x, y, max_iters):
 70             c = complex(x, y)
 71             z = 0.0j
 72             for i in compat_range(max_iters):
 73                 z = z * z + c
 74                 if (z.real * z.real + z.imag * z.imag) >= 4:
 75                     return i
 76             return max_iters
 77 
 78         min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 79         height, width = image_config['size']
 80         pixel_size_x = (max_x - min_x) / width
 81         pixel_size_y = (max_y - min_y) / height
 82         block = []
 83         for y in compat_range(chunk[0], chunk[1]):
 84             row = []
 85             imag = min_y + y * pixel_size_y
 86             for x in compat_range(0, width):
 87                 real = min_x + x * pixel_size_x
 88                 row.append(mandelbrot(real, imag, max_iters))
 89             block.append(row)
 90         return block
 91 
 92 
 93 def calculate(engine_conf):
 94     # Subdivide the work into X pieces, then request each worker to calculate
 95     # one of those chunks and then later we will write these chunks out to
 96     # an image bitmap file.
 97 
 98     # And unordered flow is used here since the mandelbrot calculation is an
 99     # example of an embarrassingly parallel computation that we can scatter
100     # across as many workers as possible.
101     flow = uf.Flow("mandelbrot")
102 
103     # These symbols will be automatically given to tasks as input to their
104     # execute method, in this case these are constants used in the mandelbrot
105     # calculation.
106     store = {
107         'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
108         'image_config': {
109             'size': IMAGE_SIZE,
110         }
111     }
112 
113     # We need the task names to be in the right order so that we can extract
114     # the final results in the right order (we don't care about the order when
115     # executing).
116     task_names = []
117 
118     # Compose our workflow.
119     height, _width = IMAGE_SIZE
120     chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
121     for i in compat_range(0, CHUNK_COUNT):
122         chunk_name = 'chunk_%s' % i
123         task_name = "calculation_%s" % i
124         # Break the calculation up into chunk size pieces.
125         rows = [i * chunk_size, i * chunk_size + chunk_size]
126         flow.add(
127             MandelCalculator(task_name,
128                              # This ensures the storage symbol with name
129                              # 'chunk_name' is sent into the tasks local
130                              # symbol 'chunk'. This is how we give each
131                              # calculator its own correct sequence of rows
132                              # to work on.
133                              rebind={'chunk': chunk_name}))
134         store[chunk_name] = rows
135         task_names.append(task_name)
136 
137     # Now execute it.
138     eng = engines.load(flow, store=store, engine_conf=engine_conf)
139     eng.run()
140 
141     # Gather all the results and order them for further processing.
142     gather = []
143     for name in task_names:
144         gather.extend(eng.storage.get(name))
145     points = []
146     for y, row in enumerate(gather):
147         for x, color in enumerate(row):
148             points.append(((x, y), color))
149     return points
150 
151 
152 def write_image(results, output_filename=None):
153     print("Gathered %s results that represents a mandelbrot"
154           " image (using %s chunks that are computed jointly"
155           " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
156     if not output_filename:
157         return
158 
159     # Pillow (the PIL fork) saves us from writing our own image writer...
160     try:
161         from PIL import Image
162     except ImportError as e:
163         # To currently get this (may change in the future),
164         # $ pip install Pillow
165         raise RuntimeError("Pillow is required to write image files: %s" % e)
166 
167     # Limit to 255, find the max and normalize to that...
168     color_max = 0
169     for _point, color in results:
170         color_max = max(color, color_max)
171 
172     # Use gray scale since we don't really have other colors.
173     img = Image.new('L', IMAGE_SIZE, "black")
174     pixels = img.load()
175     for (x, y), color in results:
176         if color_max == 0:
177             color = 0
178         else:
179             color = int((float(color) / color_max) * 255.0)
180         pixels[x, y] = color
181     img.save(output_filename)
182 
183 
184 def create_fractal():
185     logging.basicConfig(level=logging.ERROR)
186 
187     # Setup our transport configuration and merge it into the worker and
188     # engine configuration so that both of those use it correctly.
189     shared_conf = dict(BASE_SHARED_CONF)
190     shared_conf.update({
191         'transport': 'memory',
192         'transport_options': {
193             'polling_interval': 0.1,
194         },
195     })
196 
197     if len(sys.argv) >= 2:
198         output_filename = sys.argv[1]
199     else:
200         output_filename = None
201 
202     worker_conf = dict(WORKER_CONF)
203     worker_conf.update(shared_conf)
204     engine_conf = dict(ENGINE_CONF)
205     engine_conf.update(shared_conf)
206     workers = []
207     worker_topics = []
208 
209     print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
210     try:
211         # Create a set of workers to simulate actual remote workers.
212         print('Running %s workers.' % (WORKERS))
213         for i in compat_range(0, WORKERS):
214             worker_conf['topic'] = 'calculator_%s' % (i + 1)
215             worker_topics.append(worker_conf['topic'])
216             w = worker.Worker(**worker_conf)
217             runner = threading_utils.daemon_thread(w.run)
218             runner.start()
219             w.wait()
220             workers.append((runner, w.stop))
221 
222         # Now use those workers to do something.
223         engine_conf['topics'] = worker_topics
224         results = calculate(engine_conf)
225         print('Execution finished.')
226     finally:
227         # And cleanup.
228         print('Stopping workers.')
229         while workers:
230             r, stopper = workers.pop()
231             stopper()
232             r.join()
233     print("Writing image...")
234     write_image(results, output_filename=output_filename)
235 
236 
237 if __name__ == "__main__":
238     create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1 
  2 import collections
  3 import contextlib
  4 import logging
  5 import os
  6 import random
  7 import sys
  8 import threading
  9 import time
 10 
 11 logging.basicConfig(level=logging.ERROR)
 12 
 13 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                        os.pardir,
 15                                        os.pardir))
 16 sys.path.insert(0, top_dir)
 17 
 18 import six
 19 from six.moves import range as compat_range
 20 from zake import fake_client
 21 
 22 from taskflow import exceptions as excp
 23 from taskflow.jobs import backends
 24 from taskflow.utils import threading_utils
 25 
 26 # In this example we show how a jobboard can be used to post work for other
 27 # entities to work on. This example creates a set of jobs using one producer
 28 # thread (typically this would be split across many machines) and then having
 29 # other worker threads with their own jobboards select work using a given
 30 # filters [red/blue] and then perform that work (and consuming or abandoning
 31 # the job after it has been completed or failed).
 32 
 33 # Things to note:
 34 # - No persistence layer is used (or logbook), just the job details are used
 35 #   to determine if a job should be selected by a worker or not.
 36 # - This example runs in a single process (this is expected to be atypical
 37 #   but this example shows that it can be done if needed, for testing...)
 38 # - The iterjobs(), claim(), consume()/abandon() worker workflow.
 39 # - The post() producer workflow.
 40 
 41 SHARED_CONF = {
 42     'path': "/taskflow/jobs",
 43     'board': 'zookeeper',
 44 }
 45 
 46 # How many workers and producers of work will be created (as threads).
 47 PRODUCERS = 3
 48 WORKERS = 5
 49 
 50 # How many units of work each producer will create.
 51 PRODUCER_UNITS = 10
 52 
 53 # How many units of work are expected to be produced (used so workers can
 54 # know when to stop running and shutdown, typically this would not be a
 55 # a value but we have to limit this example's execution time to be less than
 56 # infinity).
 57 EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 58 
 59 # Delay between producing/consuming more work.
 60 WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 61 
 62 # To ensure threads don't trample other threads output.
 63 STDOUT_LOCK = threading.Lock()
 64 
 65 
 66 def dispatch_work(job):
 67     # This is where the jobs contained work *would* be done
 68     time.sleep(1.0)
 69 
 70 
 71 def safe_print(name, message, prefix=""):
 72     with STDOUT_LOCK:
 73         if prefix:
 74             print("%s %s: %s" % (prefix, name, message))
 75         else:
 76             print("%s: %s" % (name, message))
 77 
 78 
 79 def worker(ident, client, consumed):
 80     # Create a personal board (using the same client so that it works in
 81     # the same process) and start looking for jobs on the board that we want
 82     # to perform.
 83     name = "W-%s" % (ident)
 84     safe_print(name, "started")
 85     claimed_jobs = 0
 86     consumed_jobs = 0
 87     abandoned_jobs = 0
 88     with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 89         while len(consumed) != EXPECTED_UNITS:
 90             favorite_color = random.choice(['blue', 'red'])
 91             for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 92                 # See if we should even bother with it...
 93                 if job.details.get('color') != favorite_color:
 94                     continue
 95                 safe_print(name, "'%s' [attempting claim]" % (job))
 96                 try:
 97                     board.claim(job, name)
 98                     claimed_jobs += 1
 99                     safe_print(name, "'%s' [claimed]" % (job))
100                 except (excp.NotFound, excp.UnclaimableJob):
101                     safe_print(name, "'%s' [claim unsuccessful]" % (job))
102                 else:
103                     try:
104                         dispatch_work(job)
105                         board.consume(job, name)
106                         safe_print(name, "'%s' [consumed]" % (job))
107                         consumed_jobs += 1
108                         consumed.append(job)
109                     except Exception:
110                         board.abandon(job, name)
111                         abandoned_jobs += 1
112                         safe_print(name, "'%s' [abandoned]" % (job))
113             time.sleep(WORKER_DELAY)
114     safe_print(name,
115                "finished (claimed %s jobs, consumed %s jobs,"
116                " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
117                                         abandoned_jobs), prefix=">>>")
118 
119 
120 def producer(ident, client):
121     # Create a personal board (using the same client so that it works in
122     # the same process) and start posting jobs on the board that we want
123     # some entity to perform.
124     name = "P-%s" % (ident)
125     safe_print(name, "started")
126     with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
127         for i in compat_range(0, PRODUCER_UNITS):
128             job_name = "%s-%s" % (name, i)
129             details = {
130                 'color': random.choice(['red', 'blue']),
131             }
132             job = board.post(job_name, book=None, details=details)
133             safe_print(name, "'%s' [posted]" % (job))
134             time.sleep(PRODUCER_DELAY)
135     safe_print(name, "finished", prefix=">>>")
136 
137 
138 def main():
139     if six.PY3:
140         # TODO(harlowja): Hack to make eventlet work right, remove when the
141         # following is fixed: https://github.com/eventlet/eventlet/issues/230
142         from taskflow.utils import eventlet_utils as _eu  # noqa
143         try:
144             import eventlet as _eventlet  # noqa
145         except ImportError:
146             pass
147     with contextlib.closing(fake_client.FakeClient()) as c:
148         created = []
149         for i in compat_range(0, PRODUCERS):
150             p = threading_utils.daemon_thread(producer, i + 1, c)
151             created.append(p)
152             p.start()
153         consumed = collections.deque()
154         for i in compat_range(0, WORKERS):
155             w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
156             created.append(w)
157             w.start()
158         while created:
159             t = created.pop()
160             t.join()
161         # At the end there should be nothing leftover, let's verify that.
162         board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
163         board.connect()
164         with contextlib.closing(board):
165             if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
166                 return 1
167             return 0
168 
169 
170 if __name__ == "__main__":
171     sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1 
  2 import contextlib
  3 import itertools
  4 import logging
  5 import os
  6 import shutil
  7 import socket
  8 import sys
  9 import tempfile
 10 import threading
 11 import time
 12 
 13 logging.basicConfig(level=logging.ERROR)
 14 
 15 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 16                                        os.pardir,
 17                                        os.pardir))
 18 sys.path.insert(0, top_dir)
 19 
 20 from oslo_utils import timeutils
 21 from oslo_utils import uuidutils
 22 import six
 23 from zake import fake_client
 24 
 25 from taskflow.conductors import backends as conductors
 26 from taskflow import engines
 27 from taskflow.jobs import backends as boards
 28 from taskflow.patterns import linear_flow
 29 from taskflow.persistence import backends as persistence
 30 from taskflow.persistence import models
 31 from taskflow import task
 32 from taskflow.utils import threading_utils
 33 
 34 # INTRO: This examples shows how a worker/producer can post desired work (jobs)
 35 # to a jobboard and a conductor can consume that work (jobs) from that jobboard
 36 # and execute those jobs in a reliable & async manner (for example, if the
 37 # conductor were to crash then the job will be released back onto the jobboard
 38 # and another conductor can attempt to finish it, from wherever that job last
 39 # left off).
 40 #
 41 # In this example a in-memory jobboard (and in-memory storage) is created and
 42 # used that simulates how this would be done at a larger scale (it is an
 43 # example after all).
 44 
 45 # Restrict how long this example runs for...
 46 RUN_TIME = 5
 47 REVIEW_CREATION_DELAY = 0.5
 48 SCAN_DELAY = 0.1
 49 NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
 50 
 51 # This won't really use zookeeper but will use a local version of it using
 52 # the zake library that mimics an actual zookeeper cluster using threads and
 53 # an in-memory data structure.
 54 JOBBOARD_CONF = {
 55     'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
 56 }
 57 
 58 
 59 class RunReview(task.Task):
 60     # A dummy task that clones the review and runs tox...
 61 
 62     def _clone_review(self, review, temp_dir):
 63         print("Cloning review '%s' into %s" % (review['id'], temp_dir))
 64 
 65     def _run_tox(self, temp_dir):
 66         print("Running tox in %s" % temp_dir)
 67 
 68     def execute(self, review, temp_dir):
 69         self._clone_review(review, temp_dir)
 70         self._run_tox(temp_dir)
 71 
 72 
 73 class MakeTempDir(task.Task):
 74     # A task that creates and destroys a temporary dir (on failure).
 75     #
 76     # It provides the location of the temporary dir for other tasks to use
 77     # as they see fit.
 78 
 79     default_provides = 'temp_dir'
 80 
 81     def execute(self):
 82         return tempfile.mkdtemp()
 83 
 84     def revert(self, *args, **kwargs):
 85         temp_dir = kwargs.get(task.REVERT_RESULT)
 86         if temp_dir:
 87             shutil.rmtree(temp_dir)
 88 
 89 
 90 class CleanResources(task.Task):
 91     # A task that cleans up any workflow resources.
 92 
 93     def execute(self, temp_dir):
 94         print("Removing %s" % temp_dir)
 95         shutil.rmtree(temp_dir)
 96 
 97 
 98 def review_iter():
 99     """Makes reviews (never-ending iterator/generator)."""
100     review_id_gen = itertools.count(0)
101     while True:
102         review_id = six.next(review_id_gen)
103         review = {
104             'id': review_id,
105         }
106         yield review
107 
108 
109 # The reason this is at the module namespace level is important, since it must
110 # be accessible from a conductor dispatching an engine, if it was a lambda
111 # function for example, it would not be reimportable and the conductor would
112 # be unable to reference it when creating the workflow to run.
113 def create_review_workflow():
114     """Factory method used to create a review workflow to run."""
115     f = linear_flow.Flow("tester")
116     f.add(
117         MakeTempDir(name="maker"),
118         RunReview(name="runner"),
119         CleanResources(name="cleaner")
120     )
121     return f
122 
123 
124 def generate_reviewer(client, saver, name=NAME):
125     """Creates a review producer thread with the given name prefix."""
126     real_name = "%s_reviewer" % name
127     no_more = threading.Event()
128     jb = boards.fetch(real_name, JOBBOARD_CONF,
129                       client=client, persistence=saver)
130 
131     def make_save_book(saver, review_id):
132         # Record what we want to happen (sometime in the future).
133         book = models.LogBook("book_%s" % review_id)
134         detail = models.FlowDetail("flow_%s" % review_id,
135                                    uuidutils.generate_uuid())
136         book.add(detail)
137         # Associate the factory method we want to be called (in the future)
138         # with the book, so that the conductor will be able to call into
139         # that factory to retrieve the workflow objects that represent the
140         # work.
141         #
142         # These args and kwargs *can* be used to save any specific parameters
143         # into the factory when it is being called to create the workflow
144         # objects (typically used to tell a factory how to create a unique
145         # workflow that represents this review).
146         factory_args = ()
147         factory_kwargs = {}
148         engines.save_factory_details(detail, create_review_workflow,
149                                      factory_args, factory_kwargs)
150         with contextlib.closing(saver.get_connection()) as conn:
151             conn.save_logbook(book)
152             return book
153 
154     def run():
155         """Periodically publishes 'fake' reviews to analyze."""
156         jb.connect()
157         review_generator = review_iter()
158         with contextlib.closing(jb):
159             while not no_more.is_set():
160                 review = six.next(review_generator)
161                 details = {
162                     'store': {
163                         'review': review,
164                     },
165                 }
166                 job_name = "%s_%s" % (real_name, review['id'])
167                 print("Posting review '%s'" % review['id'])
168                 jb.post(job_name,
169                         book=make_save_book(saver, review['id']),
170                         details=details)
171                 time.sleep(REVIEW_CREATION_DELAY)
172 
173     # Return the unstarted thread, and a callback that can be used
174     # shutdown that thread (to avoid running forever).
175     return (threading_utils.daemon_thread(target=run), no_more.set)
176 
177 
178 def generate_conductor(client, saver, name=NAME):
179     """Creates a conductor thread with the given name prefix."""
180     real_name = "%s_conductor" % name
181     jb = boards.fetch(name, JOBBOARD_CONF,
182                       client=client, persistence=saver)
183     conductor = conductors.fetch("blocking", real_name, jb,
184                                  engine='parallel', wait_timeout=SCAN_DELAY)
185 
186     def run():
187         jb.connect()
188         with contextlib.closing(jb):
189             conductor.run()
190 
191     # Return the unstarted thread, and a callback that can be used
192     # shutdown that thread (to avoid running forever).
193     return (threading_utils.daemon_thread(target=run), conductor.stop)
194 
195 
196 def main():
197     # Need to share the same backend, so that data can be shared...
198     persistence_conf = {
199         'connection': 'memory',
200     }
201     saver = persistence.fetch(persistence_conf)
202     with contextlib.closing(saver.get_connection()) as conn:
203         # This ensures that the needed backend setup/data directories/schema
204         # upgrades and so on... exist before they are attempted to be used...
205         conn.upgrade()
206     fc1 = fake_client.FakeClient()
207     # Done like this to share the same client storage location so the correct
208     # zookeeper features work across clients...
209     fc2 = fake_client.FakeClient(storage=fc1.storage)
210     entities = [
211         generate_reviewer(fc1, saver),
212         generate_conductor(fc2, saver),
213     ]
214     for t, stopper in entities:
215         t.start()
216     try:
217         watch = timeutils.StopWatch(duration=RUN_TIME)
218         watch.start()
219         while not watch.expired():
220             time.sleep(0.1)
221     finally:
222         for t, stopper in reversed(entities):
223             stopper()
224             t.join()
225 
226 
227 if __name__ == '__main__':
228     main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1 
  2 import contextlib
  3 import functools
  4 import logging
  5 import os
  6 import sys
  7 import time
  8 import traceback
  9 
 10 from kazoo import client
 11 
 12 top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 13                                        os.pardir,
 14                                        os.pardir))
 15 sys.path.insert(0, top_dir)
 16 
 17 from taskflow.conductors import backends as conductor_backends
 18 from taskflow import engines
 19 from taskflow.jobs import backends as job_backends
 20 from taskflow import logging as taskflow_logging
 21 from taskflow.patterns import linear_flow as lf
 22 from taskflow.persistence import backends as persistence_backends
 23 from taskflow.persistence import models
 24 from taskflow import task
 25 
 26 from oslo_utils import timeutils
 27 from oslo_utils import uuidutils
 28 
 29 # Instructions!
 30 #
 31 # 1. Install zookeeper (or change host listed below)
 32 # 2. Download this example, place in file '99_bottles.py'
 33 # 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 34 # 4. Run `python 99_bottles.py c` a few times (in different shells)
 35 # 5. On demand kill previously listed processes created in (4) and watch
 36 #    the work resume on another process (and repeat)
 37 # 6. Keep enough workers alive to eventually finish the song (if desired).
 38 
 39 ME = os.getpid()
 40 ZK_HOST = "localhost:2181"
 41 JB_CONF = {
 42     'hosts': ZK_HOST,
 43     'board': 'zookeeper',
 44     'path': '/taskflow/99-bottles-demo',
 45 }
 46 PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 47 TAKE_DOWN_DELAY = 1.0
 48 PASS_AROUND_DELAY = 3.0
 49 HOW_MANY_BOTTLES = 99
 50 
 51 
 52 class TakeABottleDown(task.Task):
 53     def execute(self, bottles_left):
 54         sys.stdout.write('Take one down, ')
 55         sys.stdout.flush()
 56         time.sleep(TAKE_DOWN_DELAY)
 57         return bottles_left - 1
 58 
 59 
 60 class PassItAround(task.Task):
 61     def execute(self):
 62         sys.stdout.write('pass it around, ')
 63         sys.stdout.flush()
 64         time.sleep(PASS_AROUND_DELAY)
 65 
 66 
 67 class Conclusion(task.Task):
 68     def execute(self, bottles_left):
 69         sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 70         sys.stdout.flush()
 71 
 72 
 73 def make_bottles(count):
 74     # This is the function that will be called to generate the workflow
 75     # and will also be called to regenerate it on resumption so that work
 76     # can continue from where it last left off...
 77 
 78     s = lf.Flow("bottle-song")
 79 
 80     take_bottle = TakeABottleDown("take-bottle-%s" % count,
 81                                   inject={'bottles_left': count},
 82                                   provides='bottles_left')
 83     pass_it = PassItAround("pass-%s-around" % count)
 84     next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 85     s.add(take_bottle, pass_it, next_bottles)
 86 
 87     for bottle in reversed(list(range(1, count))):
 88         take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
 89                                       provides='bottles_left')
 90         pass_it = PassItAround("pass-%s-around" % bottle)
 91         next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 92         s.add(take_bottle, pass_it, next_bottles)
 93 
 94     return s
 95 
 96 
 97 def run_conductor(only_run_once=False):
 98     # This continuously runs consumers until its stopped via ctrl-c or other
 99     # kill signal...
100     event_watches = {}
101 
102     # This will be triggered by the conductor doing various activities
103     # with engines, and is quite nice to be able to see the various timing
104     # segments (which is useful for debugging, or watching, or figuring out
105     # where to optimize).
106     def on_conductor_event(cond, event, details):
107         print("Event '%s' has been received..." % event)
108         print("Details = %s" % details)
109         if event.endswith("_start"):
110             w = timeutils.StopWatch()
111             w.start()
112             base_event = event[0:-len("_start")]
113             event_watches[base_event] = w
114         if event.endswith("_end"):
115             base_event = event[0:-len("_end")]
116             try:
117                 w = event_watches.pop(base_event)
118                 w.stop()
119                 print("It took %0.3f seconds for event '%s' to finish"
120                       % (w.elapsed(), base_event))
121             except KeyError:
122                 pass
123         if event == 'running_end' and only_run_once:
124             cond.stop()
125 
126     print("Starting conductor with pid: %s" % ME)
127     my_name = "conductor-%s" % ME
128     persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
129     with contextlib.closing(persist_backend):
130         with contextlib.closing(persist_backend.get_connection()) as conn:
131             conn.upgrade()
132         job_backend = job_backends.fetch(my_name, JB_CONF,
133                                          persistence=persist_backend)
134         job_backend.connect()
135         with contextlib.closing(job_backend):
136             cond = conductor_backends.fetch('blocking', my_name, job_backend,
137                                             persistence=persist_backend)
138             on_conductor_event = functools.partial(on_conductor_event, cond)
139             cond.notifier.register(cond.notifier.ANY, on_conductor_event)
140             # Run forever, and kill -9 or ctrl-c me...
141             try:
142                 cond.run()
143             finally:
144                 cond.stop()
145                 cond.wait()
146 
147 
148 def run_poster():
149     # This just posts a single job and then ends...
150     print("Starting poster with pid: %s" % ME)
151     my_name = "poster-%s" % ME
152     persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
153     with contextlib.closing(persist_backend):
154         with contextlib.closing(persist_backend.get_connection()) as conn:
155             conn.upgrade()
156         job_backend = job_backends.fetch(my_name, JB_CONF,
157                                          persistence=persist_backend)
158         job_backend.connect()
159         with contextlib.closing(job_backend):
160             # Create information in the persistence backend about the
161             # unit of work we want to complete and the factory that
162             # can be called to create the tasks that the work unit needs
163             # to be done.
164             lb = models.LogBook("post-from-%s" % my_name)
165             fd = models.FlowDetail("song-from-%s" % my_name,
166                                    uuidutils.generate_uuid())
167             lb.add(fd)
168             with contextlib.closing(persist_backend.get_connection()) as conn:
169                 conn.save_logbook(lb)
170             engines.save_factory_details(fd, make_bottles,
171                                          [HOW_MANY_BOTTLES], {},
172                                          backend=persist_backend)
173             # Post, and be done with it!
174             jb = job_backend.post("song-from-%s" % my_name, book=lb)
175             print("Posted: %s" % jb)
176             print("Goodbye...")
177 
178 
179 def main_local():
180     # Run locally typically this is activating during unit testing when all
181     # the examples are made sure to still function correctly...
182     global TAKE_DOWN_DELAY
183     global PASS_AROUND_DELAY
184     global JB_CONF
185     # Make everything go much faster (so that this finishes quickly).
186     PASS_AROUND_DELAY = 0.01
187     TAKE_DOWN_DELAY = 0.01
188     JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
189     run_poster()
190     run_conductor(only_run_once=True)
191 
192 
193 def check_for_zookeeper(timeout=1):
194     sys.stderr.write("Testing for the existence of a zookeeper server...\n")
195     sys.stderr.write("Please wait....\n")
196     with contextlib.closing(client.KazooClient()) as test_client:
197         try:
198             test_client.start(timeout=timeout)
199         except test_client.handler.timeout_exception:
200             sys.stderr.write("Zookeeper is needed for running this example!\n")
201             traceback.print_exc()
202             return False
203         else:
204             test_client.stop()
205             return True
206 
207 
208 def main():
209     if not check_for_zookeeper():
210         return
211     if len(sys.argv) == 1:
212         main_local()
213     elif sys.argv[1] in ('p', 'c'):
214         if sys.argv[-1] == "v":
215             logging.basicConfig(level=taskflow_logging.TRACE)
216         else:
217             logging.basicConfig(level=logging.ERROR)
218         if sys.argv[1] == 'p':
219             run_poster()
220         else:
221             run_conductor()
222     else:
223         sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
224 
225 
226 if __name__ == '__main__':
227     main()
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.