Categories
Development Python

Python3: Why don’t 3 processes complete faster than 1?

In trying to understand Python’s multiprocessing better, I have adapted the code found here for a producer/consumer model. The producer will push integers onto a queue, and each consumer will take an integer off of the queue, convert it to a byte string, and encrypt it with a specified key. What I don’t understand is why running it with three consumers is only marginally faster than running it with one consumer. With one consumer, it completed in 63 seconds, and with three consumers it completed in 52 seconds. I expected it to scale linearly, so I thought three consumers could finish it in 20 seconds. Also, by monitoring the CPU usage with top, I noticed that the producer was using as much or more CPU than the consumers, which I don’t understand since the producer isn’t doing much compared to the consumers. Am I missing something to make this an effective multiprocessor application?

import time
import os
import random
import binascii
from multiprocessing import Process, Queue, Lock
from Crypto.Cipher import AES # pip3 install PyCryptodome


# Producer function that places data on the Queue
def producer(queue, lock):

    # Synchronize access to the console
    with lock:
        print('Starting producer => {}'.format(os.getpid()))


    # Put integers 0 to 1000000 on the queue
    i = 0
    while i < 1000:
        for j in range(0,1000):
            queue.put(i*1000 + j)
        i += 1

        # Synchronize access to the console
        with lock:
            print('Producer finished putting {} items in queue'.format(i*1000))


    # Synchronize access to the console
    with lock:
        print('Producer {} exiting...'.format(os.getpid()))


# The consumer function takes data off of the Queue
def consumer(queue, lock, key):
    # Synchronize access to the console
    with lock:
        print('Starting consumer => {}'.format(os.getpid()))

    rijn = AES.new(key, AES.MODE_ECB)

    # Run indefinitely
    while True:

        # If the queue is empty, queue.get() will block until the queue has data
        plaintext_int = queue.get()
        plaintext_bytes = plaintext_int.to_bytes(16, 'big')

        ciphertext = binascii.hexlify(rijn.encrypt(plaintext_bytes)).decode('utf-8')

if __name__ == '__main__':

    # Create the Queue object
    #queue = Queue(maxsize=10)
    queue = Queue()

    key = binascii.unhexlify('AAAABBBBCCCCDDDDEEEEFFFF00001111')

    # Create a lock object to synchronize resource access
    lock = Lock()

    producers = []
    consumers = []

    # Create our producer processes by passing the producer function and it's arguments
    producers.append(Process(target=producer, args=(queue, lock)))

    # Create consumer processes
    n_consumers = 3
    for i in range(n_consumers): 
        p = Process(target=consumer, args=(queue, lock, key))

        # This is critical! The consumer function has an infinite loop
        # Which means it will never exit unless we set daemon to true
        p.daemon = True
        consumers.append(p)

    # Start the producers and consumer
    # The Python VM will launch new independent processes for each Process object
    for p in producers:
        p.start()

    for c in consumers:
        c.start()

    # Like threading, we have a join() method that synchronizes our program
    for p in producers:
        p.join()

    print('Parent process exiting...')

    while not queue.empty():
        print("Waiting for queue to empty.  Remaining items: {qsize}".format(qsize=queue.qsize()))
        time.sleep(1)

EDIT

Here are a couple benchmarks of the individual tasks.

Converting int to byte string:

python3 -m timeit -s 'plaintext_int = 326543' 'plaintext_int.to_bytes(16, "big")'
1000000 loops, best of 3: 0.504 usec per loop

Encrypting byte string:

python3 -m timeit -s 'from Crypto.Cipher import AES; import binascii; key=binascii.unhexlify("AAAABBBBCCCCDDDDEEEEFFFF00001111"); rijn = AES.new(key, AES.MODE_ECB); plaintext_int = 326543; plaintext_bytes=plaintext_int.to_bytes(16, "big")' 'binascii.hexlify(rijn.encrypt(plaintext_bytes)).decode("utf-8")'
1000000 loops, best of 3: 1.76 usec per loop

Leave a Reply

Your email address will not be published. Required fields are marked *