Data Pipeline with Dependency Injection
A practical example from my MarketPipe project
Introduction
Dependency Injection is a software design pattern that allows a class to receive its dependencies from an external source rather than creating them itself. See the benefit?
This approach leads to more maintainable and testable code.
Today, we'll explore how DI is implemented in the MarketPipe project to manage dependencies.
Let's dive into a practical example using the DataProcessor
class.
What is Dependency Injection?
Dependency Injection is a design pattern where an object's dependencies are provided by an external entity rather than the object itself. This approach promotes loose coupling and improves testability and maintainability.
The Problem with Hardcoded Dependencies
Without DI, our classes directly create their dependencies, leading to tightly coupled code that's hard to test and maintain. Let's see how MarketPipe leverages DI to overcome these issues.
The MarketPipe Project
MarketPipe is a robust data processing pipeline designed to fetch and store data from several APIs. It uses DI to manage dependencies like API clients and storage connectors.
The DataProcessor
Class
The DataProcessor
class is responsible for fetching data from different APIs and storing it. By injecting ApiClientFactory
and Storage
dependencies, we make the class more modular and testable.
Let's see it:
import logging
from core.base_api import ApiClientFactory
from core.storage import Storage
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("DataPipeline")
# DataProcessor class that uses DI for ApiClientFactory and Storage
class DataProcessor:
def __init__(self, asset_type: str, api_client_factory: ApiClientFactory, db_connector: Storage, logger: logging.Logger):
self.asset_type = asset_type
self.api_client = api_client_factory.get_client(asset_type)
self.logger = logger
self.db_connector = db_connector
# Method to get data from the API client
def get_data(self):
try:
return self.api_client.get_data()
except Exception as e:
self.logger.error(f"Error getting {self.asset_type} data: {e}")
raise
# Method to store data in the database
def store_data(self, data):
try:
if data is not None:
self.db_connector.store_data(data, self.asset_type)
self.logger.info(f"{self.asset_type.capitalize()} data stored successfully.")
else:
self.logger.warning(
f"No {self.asset_type} data retrieved. Nothing stored in the database."
)
except Exception as e:
self.logger.error(f"Error storing {self.asset_type} data: {e}")
raise
Initialization with DI
def __init__(self, asset_type: str, api_client_factory: ApiClientFactory, db_connector: Storage, logger: logging.Logger):
self.asset_type = asset_type
self.api_client = api_client_factory.get_client(asset_type)
self.logger = logger
self.db_connector = db_connector
Here, DataProcessor
receives ApiClientFactory
, Storage
, and logger
as dependencies. This setup decouples the class from the instantiation details, making it easier to modify and test.
Fetching Data
def get_data(self):
try:
return self.api_client.get_data()
except Exception as e:
self.logger.error(f"Error getting {self.asset_type} data: {e}")
raise
The get_data
method fetches data using the injected API client. Any errors are logged, providing clear traceability.
Storing Data
def store_data(self, data):
try:
if data is not None:
self.db_connector.store_data(data, self.asset_type)
self.logger.info(f"{self.asset_type.capitalize()} data stored successfully.")
else:
self.logger.warning(
f"No {self.asset_type} data retrieved. Nothing stored in the database."
)
except Exception as e:
self.logger.error(f"Error storing {self.asset_type} data: {e}")
raise
The store_data
method stores the fetched data using the injected Storage
instance. It handles both successful and unsuccessful data retrieval scenarios.
Practical Usage Example
Now, let's see the DataProcessor
class in action:
if __name__ == "__main__":
# Create an instance of ApiClientFactory
api_client_factory = ApiClientFactory(logger)
# Create an instance of Storage
storage_connector = Storage(logger)
# Creating a DataProcessor for 'stock' client and processing data
processor_stock = DataProcessor("stock", api_client_factory, storage_connector, logger)
stock_data = processor_stock.get_data()
processor_stock.store_data(stock_data)
# Creating a DataProcessor for 'crypto' client and processing data
processor_crypto = DataProcessor("crypto", api_client_factory, storage_connector, logger)
crypto_data = processor_crypto.get_data()
processor_crypto.store_data(crypto_data)
Set up Dependencies:
papi_client_factory = ApiClientFactory(logger) storage_connector = Storage(logger)
We create instances of
ApiClientFactory
andStorage
, which will be injected into theDataProcessor
.Create and Use DataProcessors:
processor_stock = DataProcessor("stock", api_client_factory, storage_connector, logger) stock_data = processor_stock.get_data() processor_stock.store_data(stock_data)
We instantiate
DataProcessor
for different asset types (stock
andcrypto
) and use them to fetch and store data.
Benefits of Dependency Injection
Do you see it now?
Improved Testability: With DI, you can easily mock dependencies in unit tests, making the DataProcessor
class very straightforward to test.
Improved Maintainability: DI promotes loose coupling. If the implementation of ApiClientFactory
or Storage
changes, DataProcessor
remains unaffected.
Greater Flexibility: DI allows for dynamic selection of implementations. You can easily swap out dependencies without modifying the DataProcessor
class.
Conclusion
By leveraging Dependency Injection, MarketPipe achieves a flexible, maintainable, and testable architecture.
The DataProcessor
class serves as an excellent example of how DI can be used to manage dependencies.
Embracing DI in your projects will lead to cleaner and more manageable codebases.
So, next time you're building a data pipeline or any complex system, remember the power of Dependency Injection!