Python is great for quickly writing and testing concepts. Today, I had a task that required sending thousands of requests to a server. Each request took about 300ms - the time added up quickly.
So, I looked into how to run these requests in parallel so that I could cut the time needed to run this through. I'm not a Python guru - I work with Go and Javascript a lot more. I had some idea on how this would be done in Go where it's quite easy to do multitasking.
The solution that I worked out is very similar to how I would go about it in Go. I used a queue where the parameters for the requests were fed, and then started up X number of workers who each run a loop: taking one item off the queue and running the HTTP request and processing the result, and then check for the next item in the queue, etc.
The number of workers determines how many simultaneous connection to the server will be.
So here's a bare-bones sample that you can re-write to your needs:
"""Sample file for multithreading model for paralell processing
"""
import queue,threading
from typing import Callable,Any
import urllib.request
import json
def runParallel(workerFn: Callable[[Any],None],feedFn: Callable[[Callable[[Any],None]],None],threads:int=5,timeout:float=1):
"""runParallel: runs a function using the specified number of threads, in parallel until the feed is finished
Parameters:
- `workerFn` the function that processes the tasks generated by the `feedFn`. It receives one parameter, usually a dictionary, with the necessary information to perform its task
- `feedFn` the function that generates the feed for the workers. It receives an `addTask` parameter which is a function that takes one parameter, the data that will be passed to the worker
- `threads` how many workers will run in parallel. Default=5
- `timeout` if a worker doesn't receive a task within the given number of seconds, then the worker quits. Default is 1 second. It is done in order to clean up after the feed has been processed.
"""
Q = queue.Queue(threads*2)
def worker():
while True:
# if no more tasks are coming within the given timeout period then exit the loop
# done as a cleanup step
try:
task= Q.get(timeout=timeout)
except queue.Empty:
break
# catch any exception to prevent the worker loop from crashing because of a failed task
try:
workerFn(task)
except Exception as e:
print(e)
Q.task_done()
# spin up the worker threads
for _ in range(threads):
threading.Thread(target=worker,daemon=True).start()
feedFn(Q.put)
Q.join()
return None
def main():
baseURL="http://postman-echo.com/get"
def workerFn(task):
# get the info from the task that's needed to generate the request
prodID = task['prodID']
# build the request and execute it
hdr={ 'X-API-KEY':'123456' }
req=urllib.request.Request(f"{baseURL}?prodID={prodID}", headers=hdr)
# if you want to work with JSON response then just uncomment the line below and the import statement above
# res=json.load(urllib.request.urlopen(req))
res = json.load(urllib.request.urlopen(req))
print("--------------------------")
print(res)
# process the result...
def feedFn(addTask:Callable[[Any],None]):
for prodID in range(100,200):
addTask({'prodID':prodID})
# time.sleep(0.3)
runParallel(workerFn,feedFn,12)
main()
I actually made the runParallel
function completely generic. You can use it in any use case which can be broken down into X number of workers running in parallel fed by a feeder function. There's a detailed docstring with it which should make it pretty simple to understand and use it in your project.
I hope this will help some people. As always, constructive feedback is welcome!