Python Data Science Jobs & Interviews
20.3K subscribers
188 photos
4 videos
25 files
326 links
Your go-to hub for Python and Data Science—featuring questions, answers, quizzes, and interview tips to sharpen your skills and boost your career in the data-driven world.

Admin: @Hussein_Sheikho
Download Telegram
Q: How can you implement a thread-safe, connection-pooling mechanism using Python's sqlite3 with concurrent.futures.ThreadPoolExecutor, while ensuring atomic transactions and handling database schema migrations dynamically? Provide a complete example with error handling and logging.

A:
import sqlite3
import threading
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
import os
import time

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Database path
DB_PATH = "example.db"

# Schema definition
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
"""

# Connection pool with threading
class DatabaseConnectionPool:
def __init__(self, db_path, max_connections=5):
self.db_path = db_path
self.max_connections = max_connections
self._connections = []
self._lock = threading.Lock()

def get_connection(self):
with self._lock:
if self._connections:
return self._connections.pop()
else:
return sqlite3.connect(self.db_path)

def release_connection(self, conn):
with self._lock:
if len(self._connections) < self.max_connections:
self._connections.append(conn)
else:
conn.close()

def close_all(self):
with self._lock:
for conn in self._connections:
conn.close()
self._connections.clear()

@contextmanager
def get_db_connection(pool):
conn = pool.get_connection()
try:
yield conn
except Exception as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
pool.release_connection(conn)

def execute_transaction(pool, query, params=None):
with get_db_connection(pool) as conn:
cursor = conn.cursor()
cursor.execute(query, params or ())
conn.commit()

def create_user(pool, name, email):
query = "INSERT INTO users (name, email) VALUES (?, ?)"
try:
execute_transaction(pool, query, (name, email))
logger.info(f"User {name} created.")
except sqlite3.IntegrityError:
logger.warning(f"Email {email} already exists.")

def fetch_users(pool):
query = "SELECT id, name, email FROM users"
with get_db_connection(pool) as conn:
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()

def schema_migration(pool, new_schema):
with get_db_connection(pool) as conn:
cursor = conn.cursor()
cursor.executescript(new_schema)
conn.commit()
logger.info("Schema migration applied.")

# Example usage
if __name__ == "__main__":
# Initialize pool
pool = DatabaseConnectionPool(DB_PATH)

# Apply schema
schema_migration(pool, SCHEMA)

# Simulate concurrent user creation
names_emails = [("Alice", "[email protected]"), ("Bob", "[email protected]")]

with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(create_user, pool, name, email)
for name, email in names_emails
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"Task failed: {e}")

# Fetch results
users = fetch_users(pool)
logger.info(f"Users: {users}")

# Cleanup
pool.close_all()

#Python #SQLite #Database #Multithreading #ThreadSafety #ConnectionPooling #AtomicTransactions #SchemaMigration #Concurrency #Programming #AdvancedPython

By: @DataScienceQ 🚀