How can you implement a secure, encrypted, and scalable key-value store in Python using
#Python #Security #Encryption #Redis #KeyvalueStore #AtomicOperations #Concurrency #DistributedSystems #Scalability #Cryptography #AsyncIO
By: @DataScienceQ🚀
cryptography and redis that supports atomic operations, automatic encryption/decryption of data, concurrent access control, and seamless integration with distributed systems? Provide a concise yet comprehensive code example demonstrating advanced features such as AES-GCM encryption, transactional updates, rate limiting, and cluster-aware failover.import redis
import asyncio
import json
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
import time
from typing import Dict, Any, Optional
# Configuration
REDIS_URL = "redis://localhost:6379/0"
SECRET_KEY = b"your-secure-secret-key-here-1234567890" # Use environment variable in production
KEY_LENGTH = 32
class SecureKeyValueStore:
def __init__(self, redis_url: str, secret_key: bytes):
self.redis_client = redis.from_url(redis_url)
self.fernet = Fernet(secret_key)
self._lock = asyncio.Lock()
self.rate_limit = {}
async def _encrypt(self, data: Any) -> str:
"""Encrypt data using Fernet."""
json_data = json.dumps(data).encode('utf-8')
return self.fernet.encrypt(json_data).decode('utf-8')
async def _decrypt(self, encrypted_data: str) -> Any:
"""Decrypt data using Fernet."""
try:
decrypted = self.fernet.decrypt(encrypted_data.encode('utf-8'))
return json.loads(decrypted.decode('utf-8'))
except Exception as e:
logger.error(f"Decryption failed: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = 300):
"""Set key-value pair with encryption and TTL."""
encrypted_value = await self._encrypt(value)
async with self._lock:
await self.redis_client.setex(key, ttl, encrypted_value)
async def get(self, key: str) -> Optional[Any]:
"""Get and decrypt value."""
raw_value = await self.redis_client.get(key)
if raw_value:
return await self._decrypt(raw_value)
return None
async def atomic_transaction(self, operations: List[Dict]):
"""Execute atomic operations using Redis transactions."""
pipe = self.redis_client.pipeline()
for op in operations:
if op['type'] == 'set':
encrypted = await self._encrypt(op['value'])
pipe.setex(op['key'], op.get('ttl', 300), encrypted)
elif op['type'] == 'delete':
pipe.delete(op['key'])
await pipe.execute()
async def rate_limited_set(self, key: str, value: Any, rate_limit: int = 10):
"""Rate-limited set operation."""
now = time.time()
if key not in self.rate_limit:
self.rate_limit[key] = []
self.rate_limit[key] = [t for t in self.rate_limit[key] if t > now - 60]
if len(self.rate_limit[key]) >= rate_limit:
raise Exception("Rate limit exceeded")
self.rate_limit[key].append(now)
await self.set(key, value)
# Example usage
async def main():
store = SecureKeyValueStore(REDIS_URL, SECRET_KEY)
await store.set("user:1", {"name": "John", "age": 30})
data = await store.get("user:1")
print(data)
await store.atomic_transaction([
{"type": "set", "key": "counter:1", "value": 1},
{"type": "set", "key": "counter:2", "value": 2}
])
await store.rate_limited_set("api_call", {"count": 1}, rate_limit=5)
# Run the example
asyncio.run(main())
#Python #Security #Encryption #Redis #KeyvalueStore #AtomicOperations #Concurrency #DistributedSystems #Scalability #Cryptography #AsyncIO
By: @DataScienceQ
Please open Telegram to view this post
VIEW IN TELEGRAM
❤1