python twisted异步采集
对于大量的数据采集除了多线程,就只有异步来实现了。本文是通过twisted框架来实现异步采集,原文来自:http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html 。
Async Batching with Twisted: A Walkthrough
Example 1: Just a DefferedList
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4def listCallback(results):
5 print results
6def finish(ign):
7 reactor.stop()
8def test():
9 d1 = getPage('http://www.google.com')
10 d2 = getPage('http://yahoo.com')
11 dl = DeferredList([d1, d2])
12 dl.addCallback(listCallback)
13 dl.addCallback(finish)
14test()
15reactor.run()
This is one of the simplest examples you’ll ever see for a deferred list in action. Get two deferreds (the getPage function returns a deferred) and use them to created a deferred list. Add callbacks to the list, garnish with a lemon.
Example 2: Simple Result Manipulation
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4def listCallback(results):
5 for isSuccess, content in results:
6 print "Successful? %s" % isSuccess
7 print "Content Length: %s" % len(content)
8def finish(ign):
9 reactor.stop()
10def test():
11 d1 = getPage('http://www.google.com')
12 d2 = getPage('http://yahoo.com')
13 dl = DeferredList([d1, d2])
14 dl.addCallback(listCallback)
15 dl.addCallback(finish)
16test()
17reactor.run()
We make things a little more interesting in this example by doing some processing on the results. For this to make sense, just remember that a callback gets passed the result when the deferred action completes. If we look up the API documentation for DeferredList, we see that it returns a list of (success, result) tuples, where success is a Boolean and result is the result of a deferred that was put in the list (remember, we’ve got two layers of deferreds here!).
Example 3: Page Callbacks Too
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4def pageCallback(result):
5 return len(result)
6def listCallback(result):
7 print result
8def finish(ign):
9 reactor.stop()
10def test():
11 d1 = getPage('http://www.google.com')
12 d1.addCallback(pageCallback)
13 d2 = getPage('http://yahoo.com')
14 d2.addCallback(pageCallback)
15 dl = DeferredList([d1, d2])
16 dl.addCallback(listCallback)
17 dl.addCallback(finish)
18test()
19reactor.run()
Here, we mix things up a little bit. Instead of doing processing on all the results at once (in the deferred list callback), we’re processing them when the page callbacks fire. Our processing here is just a simple example of getting the length of the getPage deferred result: the HTML content of the page at the given URL.
Example 4: Results with More Structure
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4def pageCallback(result):
5 data = {
6 'length': len(result),
7 'content': result[:10],
8 }
9 return data
10def listCallback(result):
11 for isSuccess, data in result:
12 if isSuccess:
13 print "Call to server succeeded with data %s" % str(data)
14def finish(ign):
15 reactor.stop()
16def test():
17 d1 = getPage('http://www.google.com')
18 d1.addCallback(pageCallback)
19 d2 = getPage('http://yahoo.com')
20 d2.addCallback(pageCallback)
21 dl = DeferredList([d1, d2])
22 dl.addCallback(listCallback)
23 dl.addCallback(finish)
24test()
25reactor.run()
A follow-up to the last example, here we put the data in which we are interested into a dictionary. We don’t end up pulling any of the data out of the dictionary; we just stringify it and print it to stdout.
Example 5: Passing Values to Callbacks
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4def pageCallback(result, url):
5 data = {
6 'length': len(result),
7 'content': result[:10],
8 'url': url,
9 }
10 return data
11def getPageData(url):
12 d = getPage(url)
13 d.addCallback(pageCallback, url)
14 return d
15def listCallback(result):
16 for isSuccess, data in result:
17 if isSuccess:
18 print "Call to %s succeeded with data %s" % (data['url'], str(data))
19def finish(ign):
20 reactor.stop()
21def test():
22 d1 = getPageData('http://www.google.com')
23 d2 = getPageData('http://yahoo.com')
24 dl = DeferredList([d1, d2])
25 dl.addCallback(listCallback)
26 dl.addCallback(finish)
27test()
28reactor.run()
After all this playing, we start asking ourselves more serious questions, like: “I want to decide which values show up in my callbacks” or “Some information that is available here, isn’t available there. How do I get it there?” This is how 🙂 Just pass the parameters you want to your callback. They’ll be tacked on after the result (as you can see from the function signatures).
In this example, we needed to create our own deferred-returning function, one that wraps the getPage function so that we can also pass the URL on to the callback.
Example 6: Adding Some Error Checking
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet.defer import DeferredList
4urls = [
5 'http://yahoo.com',
6 'http://www.google.com',
7 'http://www.google.com/MicrosoftRules.html',
8 'http://bogusdomain.com',
9 ]
10def pageCallback(result, url):
11 data = {
12 'length': len(result),
13 'content': result[:10],
14 'url': url,
15 }
16 return data
17def pageErrback(error, url):
18 return {
19 'msg': error.getErrorMessage(),
20 'err': error,
21 'url': url,
22 }
23def getPageData(url):
24 d = getPage(url, timeout=5)
25 d.addCallback(pageCallback, url)
26 d.addErrback(pageErrback, url)
27 return d
28def listCallback(result):
29 for ignore, data in result:
30 if data.has_key('err'):
31 print "Call to %s failed with data %s" % (data['url'], str(data))
32 else:
33 print "Call to %s succeeded with data %s" % (data['url'], str(data))
34def finish(ign):
35 reactor.stop()
36def test():
37 deferreds = []
38 for url in urls:
39 d = getPageData(url)
40 deferreds.append(d)
41 dl = DeferredList(deferreds, consumeErrors=1)
42 dl.addCallback(listCallback)
43 dl.addCallback(finish)
44test()
45reactor.run()
As we get closer to building real applications, we start getting concerned about things like catching/anticipating errors. We haven’t added any errbacks to the deferred list, but we have added one to our page callback. We’ve added more URLs and put them in a list to ease the pains of duplicate code. As you can see, two of the URLs should return errors: one a 404, and the other should be a domain not resolving (we’ll see this as a timeout).
Example 7: Batching with DeferredSemaphore
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet import defer
4maxRun = 1
5urls = [
6 'http://twistedmatrix.com',
7 'http://twistedsoftwarefoundation.org',
8 'http://yahoo.com',
9 'http://www.google.com',
10 ]
11def listCallback(results):
12 for isSuccess, result in results:
13 print len(result)
14def finish(ign):
15 reactor.stop()
16def test():
17 deferreds = []
18 sem = defer.DeferredSemaphore(maxRun)
19 for url in urls:
20 d = sem.run(getPage, url)
21 deferreds.append(d)
22 dl = defer.DeferredList(deferreds)
23 dl.addCallback(listCallback)
24 dl.addCallback(finish)
25test()
26reactor.run()
These last two examples are for more advanced use cases. As soon as the reactor starts, deferreds that are ready, start “firing” — their “jobs” start running. What if we’ve got 500 deferreds in a list? Well, they all start processing. As you can imagine, this is an easy way to run an accidental DoS against a friendly service. Not cool.
For situations like this, what we want is a way to run only so many deferreds at a time. This is a great use for the deferred semaphore. When I repeated runs of the example above, the content lengths of the four pages returned after about 2.5 seconds. With the example rewritten to use just the deferred list (no deferred semaphore), the content lengths were returned after about 1.2 seconds. The extra time is due to the fact that I (for the sake of the example) forced only one deferred to run at a time, obviously not what you’re going to want to do for a highly concurrent task 😉
Note that without changing the code and only setting maxRun to 4, the timings for getting the the content lengths is about the same, averaging for me 1.3 seconds (there’s a little more overhead involved when using the deferred semaphore).
One last subtle note (in anticipation of the next example): the for loop creates all the deferreds at once; the deferred semaphore simply limits how many get run at a time.
Example 8: Throttling with Cooperator
1from twisted.internet import reactor
2from twisted.web.client import getPage
3from twisted.internet import defer, task
4maxRun = 2
5urls = [
6 'http://twistedmatrix.com',
7 'http://twistedsoftwarefoundation.org',
8 'http://yahoo.com',
9 'http://www.google.com',
10 ]
11def pageCallback(result):
12 print len(result)
13 return result
14def doWork():
15 for url in urls:
16 d = getPage(url)
17 d.addCallback(pageCallback)
18 yield d
19def finish(ign):
20 reactor.stop()
21def test():
22 deferreds = []
23 coop = task.Cooperator()
24 work = doWork()
25 for i in xrange(maxRun):
26 d = coop.coiterate(work)
27 deferreds.append(d)
28 dl = defer.DeferredList(deferreds)
29 dl.addCallback(finish)
30test()
31reactor.run()
虽然目前还没到研究twisted框架的水平,不过这里先记录下,以备以后用时再回味。
捐赠本站(Donate)
如您感觉文章有用,可扫码捐赠本站!(If the article useful, you can scan the QR code to donate))
- Author: shisekong
- Link: https://blog.361way.com/python-twisted-async-gather/3919.html
- License: This work is under a 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. Kindly fulfill the requirements of the aforementioned License when adapting or creating a derivative of this work.