Welcome to cooperative’s documentation!¶
Contents:
Getting Started with cooperative¶
Write non-blocking computationally expensive code to go along with non-blocking io, without having to think about everything in callbacks.
batch_accumulate will iterate over a generator in batches, yielding to other iterators passed into twisted.internet.task.cooperate
Examples¶
Write computation code to cooperative.¶
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
from operator import add
import sys
from twisted.internet import defer
from twisted.internet.task import react
from twisted.python import log
from cooperative import batch_accumulate
def expensive(number):
log.msg("starting {}".format(number))
for value in range(100000):
if 25000 == value:
log.msg("1/4 for {}".format(number))
if 50000 == value:
log.msg("1/2 for {}".format(number))
if 75000 == value:
log.msg("3/4 for {}".format(number))
yield number * value / 3.0
@defer.inlineCallbacks
def do_some_expensive_things(number):
"""
Perform one expensive computation cooperatively with any
other iterator passed into twisted's cooperate, then
use it's result to pass into the second computation.
:param number:
:return:
"""
result = yield batch_accumulate(1000, expensive(number))
total = reduce(add, result, 0)
log.msg("first for {}: {}".format(number, total))
result = yield batch_accumulate(1000, expensive(int(total/1e9)))
total = reduce(add, result, 0)
log.msg("second for {}: {}".format(number, total))
defer.returnValue(total)
def main(reactor):
d1 = do_some_expensive_things(54.0)
d2 = do_some_expensive_things(42)
d3 = do_some_expensive_things(10)
d4 = do_some_expensive_things(34)
# Enqueue events to simulate handling external events
d5 = defer.Deferred().addCallback(log.msg)
reactor.callLater(0.3, d5.callback, "########## simulated request 1 ############")
d6 = defer.Deferred().addCallback(log.msg)
reactor.callLater(0.5, d6.callback, "########## sim request 2 ############")
d7 = defer.Deferred().addCallback(log.msg)
reactor.callLater(1.0, d7.callback, "########## simulated request 3 ############")
return defer.gatherResults([d1, d2, d3, d4, d5, d6, d7]).addCallback(log.msg)
if __name__ == "__main__":
log.startLogging(sys.stdout)
react(main, [])
cooperative Package¶
cooperative Package¶
- class cooperative.ValueBucket¶
Bases: object
Produces a callable that accumulates all non-None values it is called with in order.
The contents may be accessed or collected and drained, to make room for new content.
- contents()¶
Returns: contents
- drain_contents()¶
Starts a new collection to accumulate future contents and returns all of existing contents.
- cooperative.accumulate(a_generator, cooperator=None)¶
Start a Deferred whose callBack arg is a deque of the accumulation of the values yielded from a_generator.
Parameters: a_generator – An iterator which yields some not None values. Returns: A Deferred to which the next callback will be called with the yielded contents of the generator function.
- cooperative.accumulation_handler(stopped_generator, spigot)¶
Drain the contents of the bucket from the spigot.
Parameters: - stopped_generator – Generator which as stopped
- spigot – a Bucket.
Returns: The contents of the bucket.
- cooperative.batch_accumulate(max_batch_size, a_generator, cooperator=None)¶
Start a Deferred whose callBack arg is a deque of the accumulation of the values yielded from a_generator which is iterated over in batches the size of max_batch_size.
- It should be more efficient to iterate over the generator in
- batches and still provide enough speed for non-blocking execution.
Parameters: - max_batch_size – The number of iterations of the generator to consume at a time.
- a_generator – An iterator which yields some not None values.
Returns: A Deferred to which the next callback will be called with the yielded contents of the generator function.
_meta Module¶
Subpackages¶
tests Package¶
test_cooperative Module¶
- class cooperative.tests.test_cooperative.Doer(own_reactor, own_cooperator)¶
Bases: object
- count = 0¶
- run(*args, **kwargs)¶
Cooperatively iterator over two iterators consecutively and the result of the final one is returned.
Returns:
- class cooperative.tests.test_cooperative.TestAccumulate(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- test_accumulate(*args, **kwargs)¶
Ensure that within an inline callback function, a accumulate wrapped generator yields the result of the output of the generator.
Returns:
- test_failure(*args, **kwargs)¶
Ensure that within an inline callback function, a accumulate based function yields the result if it’s cooperative generator.
Since and_the_winner_is is designed to always log and error, Ensure one IndexError is logged.
Returns:
- test_multi_deux_batched(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Ensure cooperatively run generators will complete no matter the length.
Ensure the longest one will continue to iterate after the others run out of iterations.
Ensure those called with batch_accumulate will iterate over the generator in batches the size of max_size.
Returns:
- test_multi_deux_chain(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Ensure cooperatively run generators will complete no matter the length.
Ensure the longest one will continue to iterate after the others run out of iterations.
Returns:
- test_multi_winner(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Returns:
- test_multi_winner_chain(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Returns:
- test_trice_winner(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Returns:
- class cooperative.tests.test_cooperative.TestHandler(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- test_accumulation_handler()¶
Ensure the return value of accumulation_handler is the contents of a Bucket instance with it’s contents drained.
Returns:
- class cooperative.tests.test_cooperative.TestOwnCooperator(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- setUp()¶
Create a reactor and Cooperator that can be controlled.
Instantiate a Doer with the reactor and cooperator.
Create a Looping Call and set it’s clock to the reactor.
Returns:
- tearDown()¶
- test_control_coop()¶
Ensure control of own cooperator.
Returns:
- cooperative.tests.test_cooperative.i_get_tenth_11(value)¶
Yield the tenth and eleventh item of value.
Parameters: value – Returns:
- cooperative.tests.test_cooperative.run_some_with_error(*args, **kwargs)¶
- Cooperatively iterator over two iterators consecutively, but
- the second one will always raise an IndexError, which is caught, logged and a message is returned.
Returns:
- cooperative.tests.test_cooperative.run_some_without_error(*args, **kwargs)¶
Cooperatively iterator over two iterators consecutively and the result of the final one is returned.
Parameters: value – Any sequence. Returns:
tests Package¶
test_cooperative Module¶
- class cooperative.tests.test_cooperative.Doer(own_reactor, own_cooperator)¶
Bases: object
- count = 0¶
- run(*args, **kwargs)¶
Cooperatively iterator over two iterators consecutively and the result of the final one is returned.
Returns:
- class cooperative.tests.test_cooperative.TestAccumulate(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- test_accumulate(*args, **kwargs)¶
Ensure that within an inline callback function, a accumulate wrapped generator yields the result of the output of the generator.
Returns:
- test_failure(*args, **kwargs)¶
Ensure that within an inline callback function, a accumulate based function yields the result if it’s cooperative generator.
Since and_the_winner_is is designed to always log and error, Ensure one IndexError is logged.
Returns:
- test_multi_deux_batched(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Ensure cooperatively run generators will complete no matter the length.
Ensure the longest one will continue to iterate after the others run out of iterations.
Ensure those called with batch_accumulate will iterate over the generator in batches the size of max_size.
Returns:
- test_multi_deux_chain(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Ensure cooperatively run generators will complete no matter the length.
Ensure the longest one will continue to iterate after the others run out of iterations.
Returns:
- test_multi_winner(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Returns:
- test_multi_winner_chain(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Ensure the result of gatherResults can be chained together in order.
Returns:
- test_trice_winner(*args, **kwargs)¶
Ensure multiple inline callback functions will run cooperatively.
Returns:
- class cooperative.tests.test_cooperative.TestHandler(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- test_accumulation_handler()¶
Ensure the return value of accumulation_handler is the contents of a Bucket instance with it’s contents drained.
Returns:
- class cooperative.tests.test_cooperative.TestOwnCooperator(methodName='runTest')¶
Bases: twisted.trial._asynctest.TestCase
- setUp()¶
Create a reactor and Cooperator that can be controlled.
Instantiate a Doer with the reactor and cooperator.
Create a Looping Call and set it’s clock to the reactor.
Returns:
- tearDown()¶
- test_control_coop()¶
Ensure control of own cooperator.
Returns:
- cooperative.tests.test_cooperative.i_get_tenth_11(value)¶
Yield the tenth and eleventh item of value.
Parameters: value – Returns:
- cooperative.tests.test_cooperative.run_some_with_error(*args, **kwargs)¶
- Cooperatively iterator over two iterators consecutively, but
- the second one will always raise an IndexError, which is caught, logged and a message is returned.
Returns:
- cooperative.tests.test_cooperative.run_some_without_error(*args, **kwargs)¶
Cooperatively iterator over two iterators consecutively and the result of the final one is returned.
Parameters: value – Any sequence. Returns: