Once, I was tasked with improving the database and general app operations of backend services which were using MongoDB as their main database.
These services were part of a huge infrastructure where millions of messages came through the queues and needed to be processed based on the message actions. That means tons of DB ops each second and other additional checks while processing.
The processing layer of the service was using Pymongo to interact with MongoDB and the service itself was running in a synchronous environment. Even though the database operations were handled in bulk, the performance was still not capable of handling incoming data.
Synchronous code was making things even worse. The code execution waits for the result from the current operation to move forward. That's a serious bottleneck in scalable systems.
This was causing queue overflows and potential data loss every time.
The solution I implemented was a combination of:
Let's quickly go through the definitions of these items.
PyMongo is the official MongoDB driver for Python, providing a simple and intuitive way to interact with MongoDB databases. It's synchronous, meaning each database operation blocks the execution of your program until it completes it, which can be a bottleneck in I/O-bound tasks.
Motor is the asynchronous driver for MongoDB, built on top of PyMongo and designed to take advantage of Python's asyncio library. Motor allows you to perform non-blocking database operations, making it suitable for high-performance applications that require concurrency.
To illustrate the performance differences, I prepared a stress test using two scripts: one using Motor (asynchronous) and the other using PyMongo (synchronous). Both scripts performed the same task of reading and writing documents to MongoDB in batches.
Both scripts read 300k documents from the source collection and migrated them to the new target collection.
import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["new_collection"]
async def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = await cursor.to_list(length=batch_size)
return products
async def bulk_write_to_mongo(products):
for product in products:
product['_id'] = ObjectId() # Generate a new ObjectId for each product
try:
result = await target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
async def process_batches(batch_size, concurrency_limit):
tasks = []
last_id = None
while True:
products = await fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
tasks.append(bulk_write_to_mongo(products))
if len(tasks) >= concurrency_limit:
await asyncio.gather(*tasks)
tasks = []
# Process remaining tasks if any
if tasks:
await asyncio.gather(*tasks)
async def main():
batch_size = 1000
concurrency_limit = 10
start_time = time.time()
await process_batches(batch_size, concurrency_limit)
end_time = time.time()
logger.info(f'Total time: {end_time - start_time:.2f} seconds.')
if __name__ == '__main__':
asyncio.run(main())
import logging
import time
from bson import ObjectId
from pymongo import MongoClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
TARGET_COLLECTION_NAME = 'new_collection'
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db[TARGET_COLLECTION_NAME]
def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = list(cursor)
return products
def bulk_write_to_mongo(products):
for product in products:
product['_id'] = ObjectId() # Generate a new ObjectId for each product
try:
result = target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
def process_batches(batch_size):
last_id = None
while True:
products = fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
bulk_write_to_mongo(products)
def main():
batch_size = 1000
start_time = time.time()
process_batches(batch_size)
end_time = time.time()
logger.info(f'Total time: {end_time - start_time:.2f} seconds.')
if __name__ == '__main__':
main()
Execution Time of Migrating 300k documents:
The asynchronous script completed the task 6.11 seconds faster than the synchronous script.
While this might not seem like a significant difference for a single run, it becomes more pronounced in high-load scenarios or when processing large datasets continuously.
Improved Throughput: Asynchronous operations can handle more tasks concurrently, increasing overall throughput. This is especially beneficial in applications with high I/O operations, such as web servers handling multiple database queries simultaneously.
Non-Blocking I/O: Asynchronous operations do not block the main thread, allowing other tasks to continue running. This results in better CPU utilization and smoother application performance, particularly under load.
Scalability: Asynchronous code scales better with the number of concurrent operations. For example, a web application using Motor can handle more simultaneous requests compared to one using PyMongo.
Resource Efficiency: Asynchronous operations can lead to more efficient use of system resources. For instance, the event loop in asyncio allows the application to switch between tasks, reducing idle times and improving overall efficiency.
You can find the source code on the GitHub repository below:
The choice between Motor and PyMongo depends on the specific needs of your application. For applications that require high concurrency and efficient I/O handling, Motor and the asynchronous approach offer significant advantages. However, for simpler applications or scripts where ease of implementation is a priority, PyMongo's synchronous approach might be sufficient.
By leveraging asynchronous operations with Motor, you can build more scalable and performant applications, making it a worthwhile consideration for modern web development.