DEV Community

Cover image for ```Building Python Self-Healing Applications: 10 Proven Techniques for Zero Downtime```
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

```Building Python Self-Healing Applications: 10 Proven Techniques for Zero Downtime```

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Python has transformed how developers approach application reliability. Throughout my career building resilient systems, I've discovered that self-healing capabilities are essential for modern applications. Let me share what I've learned about implementing these techniques in Python.

Self-Healing Applications: The Foundation

Self-healing applications detect problems and recover without human intervention. This capability becomes increasingly important as systems grow in complexity. Python's expressiveness and rich ecosystem make it particularly well-suited for implementing these recovery mechanisms.

The core principle is simple: monitor continuously, detect deviations quickly, and recover automatically. The implementation, however, requires careful consideration of several techniques.

Health Checks

Health checks form the backbone of any self-healing system. They provide insights into the operational status of your application and its dependencies.

I've found implementing a comprehensive health check system relatively straightforward in Python:

import time
import logging
from threading import Thread
import requests

class HealthMonitor:
    def __init__(self):
        self.services = {}
        self.running = False
        self._monitor_thread = None

    def register(self, name, check_func, interval=60, critical=True):
        self.services[name] = {
            "check": check_func,
            "status": "UNKNOWN",
            "last_check": 0,
            "interval": interval,
            "critical": critical
        }

    def check_service(self, name):
        service = self.services[name]
        now = time.time()

        if now - service["last_check"] > service["interval"]:
            try:
                service["check"]()
                service["status"] = "HEALTHY"
            except Exception as e:
                service["status"] = "UNHEALTHY"
                logging.error(f"Health check failed for {name}: {str(e)}")
            service["last_check"] = now

        return service["status"]

    def is_healthy(self):
        for name, service in self.services.items():
            if service["critical"] and self.check_service(name) != "HEALTHY":
                return False
        return True

    def start_monitoring(self):
        self.running = True
        self._monitor_thread = Thread(target=self._monitor_loop)
        self._monitor_thread.daemon = True
        self._monitor_thread.start()

    def _monitor_loop(self):
        while self.running:
            self.check_all()
            time.sleep(1)

    def check_all(self):
        return {name: self.check_service(name) for name in self.services}
Enter fullscreen mode Exit fullscreen mode

This monitor can be used in a Flask application to expose health information:

from flask import Flask, jsonify

app = Flask(__name__)
health = HealthMonitor()

@app.route('/health')
def health_check():
    status = health.check_all()
    http_status = 200 if health.is_healthy() else 503
    return jsonify(status), http_status

# Register health checks
health.register("database", check_database_connection, interval=30)
health.register("redis_cache", check_redis_connection, interval=60)
health.register("payment_api", check_payment_api, interval=120)

# Start background monitoring
health.start_monitoring()
Enter fullscreen mode Exit fullscreen mode

Retry Mechanisms

Network calls, database operations, and other interactions with external systems frequently fail temporarily. Implementing retry logic prevents these transient failures from affecting users.

I've found the tenacity library particularly effective for implementing retries:

from tenacity import retry, stop_after_attempt, wait_exponential, RetryError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((ConnectionError, TimeoutError))
)
def fetch_user_data(user_id):
    """Retrieve user data from the database with automatic retries."""
    connection = get_database_connection()
    return connection.query(f"SELECT * FROM users WHERE id = {user_id}")

# Using the retry-enabled function
try:
    user_data = fetch_user_data(user_id)
    process_user_data(user_data)
except RetryError:
    # All retries failed
    log_error(f"Failed to fetch user data after multiple attempts")
    fallback_process()
Enter fullscreen mode Exit fullscreen mode

For more complex scenarios, I implement a circuit breaker pattern:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "CLOSED"  # CLOSED, OPEN, HALF-OPEN
        self.last_failure_time = 0

    def execute(self, func, *args, **kwargs):
        if self.state == "OPEN":
            # Check if recovery timeout has elapsed
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF-OPEN"
            else:
                raise CircuitBreakerError("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)

            # Success in half-open state means we can reset
            if self.state == "HALF-OPEN":
                self.failure_count = 0
                self.state = "CLOSED"

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold or self.state == "HALF-OPEN":
                self.state = "OPEN"

            raise e

# Usage example
db_circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)

def get_product(product_id):
    try:
        return db_circuit.execute(database.query, f"SELECT * FROM products WHERE id = {product_id}")
    except CircuitBreakerError:
        return get_cached_product(product_id)  # Fallback mechanism
Enter fullscreen mode Exit fullscreen mode

Graceful Degradation

A key aspect of self-healing applications is the ability to maintain functionality even when some components fail. I implement this through feature flags and fallback mechanisms:

class FeatureManager:
    def __init__(self):
        self.features = {}
        self.fallbacks = {}

    def register(self, feature_name, check_func, fallback_func=None):
        self.features[feature_name] = check_func
        if fallback_func:
            self.fallbacks[feature_name] = fallback_func

    def is_available(self, feature_name):
        if feature_name not in self.features:
            return False

        try:
            return self.features[feature_name]()
        except Exception:
            return False

    def use_feature(self, feature_name, feature_func, *args, **kwargs):
        if self.is_available(feature_name):
            return feature_func(*args, **kwargs)
        elif feature_name in self.fallbacks:
            return self.fallbacks[feature_name](*args, **kwargs)
        else:
            raise FeatureUnavailableError(f"Feature {feature_name} is unavailable")

# Usage example
features = FeatureManager()

# Register recommendation engine with a fallback
features.register(
    "recommendations",
    lambda: recommendation_service.is_healthy(),
    fallback_func=get_popular_products
)

# Using the feature with automatic fallback
def get_recommendations(user_id):
    return features.use_feature(
        "recommendations",
        recommendation_service.get_personalized_recommendations,
        user_id
    )
Enter fullscreen mode Exit fullscreen mode

Process-Level Recovery

Sometimes application processes crash. I implement watchdog processes to detect and restart failed components:

import subprocess
import time
import signal
import sys
import os

class ProcessWatchdog:
    def __init__(self, max_restarts=5, restart_interval=3600):
        self.processes = {}
        self.restart_counts = {}
        self.restart_times = {}
        self.max_restarts = max_restarts
        self.restart_interval = restart_interval

    def register(self, name, cmd, working_dir=None, env=None):
        self.processes[name] = {
            "cmd": cmd,
            "process": None,
            "working_dir": working_dir,
            "env": env
        }
        self.restart_counts[name] = 0
        self.restart_times[name] = []

    def start_all(self):
        for name in self.processes:
            self.start_process(name)

    def start_process(self, name):
        if name not in self.processes:
            raise ValueError(f"Process {name} not registered")

        proc_info = self.processes[name]

        # Check if we've restarted too many times
        now = time.time()
        self.restart_times[name] = [t for t in self.restart_times[name] 
                                   if now - t < self.restart_interval]

        if len(self.restart_times[name]) >= self.max_restarts:
            logging.error(f"Process {name} has restarted too many times. Not restarting.")
            return False

        try:
            process = subprocess.Popen(
                proc_info["cmd"],
                cwd=proc_info["working_dir"],
                env=proc_info["env"]
            )
            proc_info["process"] = process
            self.restart_times[name].append(now)
            return True
        except Exception as e:
            logging.error(f"Failed to start process {name}: {str(e)}")
            return False

    def check_and_restart(self):
        for name, proc_info in self.processes.items():
            process = proc_info["process"]

            if process is None:
                continue

            if process.poll() is not None:  # Process has terminated
                logging.warning(f"Process {name} terminated with code {process.returncode}")
                self.start_process(name)

    def run_monitor(self):
        try:
            self.start_all()

            while True:
                self.check_and_restart()
                time.sleep(5)
        except KeyboardInterrupt:
            self.terminate_all()

    def terminate_all(self):
        for name, proc_info in self.processes.items():
            process = proc_info["process"]
            if process is not None and process.poll() is None:
                process.terminate()

# Usage
if __name__ == "__main__":
    watchdog = ProcessWatchdog()

    watchdog.register(
        "web_server",
        ["python", "web_server.py"],
        working_dir="/app/server"
    )

    watchdog.register(
        "worker",
        ["python", "worker.py"],
        working_dir="/app/worker"
    )

    watchdog.run_monitor()
Enter fullscreen mode Exit fullscreen mode

Automated Testing in Production

Testing in production might sound alarming, but it's essential for validating self-healing systems. I implement continuous verification through synthetic transactions:

import random
import threading
import time
import logging

class SyntheticTester:
    def __init__(self, app_base_url):
        self.app_url = app_base_url
        self.scenarios = []
        self.running = False
        self._test_thread = None

    def register_scenario(self, name, test_func, frequency=300):
        """Register a test scenario to run periodically

        Args:
            name: Scenario name
            test_func: Function that executes the test
            frequency: How often to run in seconds
        """
        self.scenarios.append({
            "name": name,
            "test": test_func,
            "frequency": frequency,
            "last_run": 0,
            "success_count": 0,
            "failure_count": 0
        })

    def run_scenario(self, scenario):
        """Execute a single test scenario"""
        try:
            scenario["test"](self.app_url)
            scenario["success_count"] += 1
            logging.info(f"Scenario {scenario['name']} completed successfully")
            return True
        except Exception as e:
            scenario["failure_count"] += 1
            logging.error(f"Scenario {scenario['name']} failed: {str(e)}")
            return False

    def start(self):
        """Start the background testing thread"""
        self.running = True
        self._test_thread = threading.Thread(target=self._test_loop)
        self._test_thread.daemon = True
        self._test_thread.start()

    def stop(self):
        """Stop the testing thread"""
        self.running = False
        if self._test_thread:
            self._test_thread.join(timeout=5)

    def _test_loop(self):
        """Main test execution loop"""
        while self.running:
            now = time.time()

            for scenario in self.scenarios:
                # Check if it's time to run this scenario
                if now - scenario["last_run"] >= scenario["frequency"]:
                    self.run_scenario(scenario)
                    scenario["last_run"] = now

            # Sleep a bit to avoid CPU hogging
            time.sleep(1)

    def get_status(self):
        """Return the current status of all test scenarios"""
        return {s["name"]: {
            "success_count": s["success_count"],
            "failure_count": s["failure_count"],
            "last_run": s["last_run"]
        } for s in self.scenarios}

# Example scenarios
def test_user_login(base_url):
    # Simulate user login flow
    session = requests.Session()

    # Get login page to capture CSRF token
    response = session.get(f"{base_url}/login")

    # Extract CSRF token (implementation depends on your app)
    csrf_token = extract_csrf_token(response.text)

    # Attempt login
    login_response = session.post(
        f"{base_url}/login",
        data={
            "username": "test_user",
            "password": "test_password",
            "csrf_token": csrf_token
        },
        allow_redirects=True
    )

    # Verify login was successful
    if "Welcome back" not in login_response.text:
        raise Exception("Login test failed")

def test_product_search(base_url):
    # Test the search functionality
    search_terms = ["laptop", "phone", "headphones", "keyboard"]
    term = random.choice(search_terms)

    response = requests.get(f"{base_url}/search?q={term}")

    if response.status_code != 200:
        raise Exception(f"Search failed with status {response.status_code}")

    if "No results found" in response.text and "Error" in response.text:
        raise Exception("Search returned error page")

# Usage
tester = SyntheticTester("https://example.com")
tester.register_scenario("user_login", test_user_login, frequency=600)  # Every 10 min
tester.register_scenario("product_search", test_product_search, frequency=300)  # Every 5 min
tester.start()
Enter fullscreen mode Exit fullscreen mode

Chaos Engineering in Python

Testing how your system handles failures is critical. I implement controlled chaos to validate recovery mechanisms:

import random
import time
import threading
import logging

class ChaosMonkey:
    def __init__(self, enabled=False):
        self.enabled = enabled
        self.targets = {}
        self.running = False
        self._thread = None

    def register_target(self, name, disrupt_func, recovery_func, frequency=3600):
        """Register a target for potential disruption

        Args:
            name: Target identifier
            disrupt_func: Function that causes disruption
            recovery_func: Function that recovers from disruption
            frequency: Average time between disruptions in seconds
        """
        self.targets[name] = {
            "disrupt": disrupt_func,
            "recover": recovery_func,
            "frequency": frequency,
            "last_disruption": 0,
            "disrupted": False
        }

    def start(self):
        """Start the chaos monkey"""
        if not self.enabled:
            logging.info("Chaos Monkey is disabled. Not starting.")
            return

        self.running = True
        self._thread = threading.Thread(target=self._chaos_loop)
        self._thread.daemon = True
        self._thread.start()
        logging.warning("Chaos Monkey has been activated!")

    def stop(self):
        """Stop the chaos monkey and recover all disrupted services"""
        self.running = False

        # Recover any disrupted services
        for name, target in self.targets.items():
            if target["disrupted"]:
                try:
                    target["recover"]()
                    target["disrupted"] = False
                    logging.info(f"Recovered {name} during Chaos Monkey shutdown")
                except Exception as e:
                    logging.error(f"Failed to recover {name}: {str(e)}")

        if self._thread:
            self._thread.join(timeout=5)

    def _chaos_loop(self):
        """Main chaos execution loop"""
        while self.running:
            now = time.time()

            for name, target in self.targets.items():
                # If already disrupted, maybe recover it
                if target["disrupted"]:
                    # 10% chance of recovery per check
                    if random.random() < 0.1:
                        try:
                            target["recover"]()
                            target["disrupted"] = False
                            logging.info(f"Chaos Monkey recovered {name}")
                        except Exception as e:
                            logging.error(f"Chaos Monkey failed to recover {name}: {str(e)}")

                # If not disrupted, maybe disrupt it
                else:
                    # Check if enough time has passed since last disruption
                    time_since_last = now - target["last_disruption"]

                    # Probability increases with time since last disruption
                    # but remains relatively low
                    probability = min(0.01, time_since_last / (target["frequency"] * 10))

                    if random.random() < probability:
                        try:
                            target["disrupt"]()
                            target["disrupted"] = True
                            target["last_disruption"] = now
                            logging.warning(f"Chaos Monkey disrupted {name}")
                        except Exception as e:
                            logging.error(f"Chaos Monkey failed to disrupt {name}: {str(e)}")

            # Sleep between checks
            time.sleep(10)

# Example disruptions for a web application
def disrupt_database_connection():
    # Simulate database connection problems
    # In production, this might:
    # - Add firewall rules to block DB access
    # - Change DB credentials temporarily
    # - Inject latency in DB connection
    db_settings.connection_pool.max_size = 1
    db_settings.connection_timeout = 0.5  # Unreasonably low

def recover_database_connection():
    # Reset the database settings to normal
    db_settings.connection_pool.max_size = 20
    db_settings.connection_timeout = 5.0

def disrupt_cache_service():
    # Simulate Redis cache failure
    redis_client.flushall()  # Clear all caches
    # Block outgoing connections to Redis
    os.system("iptables -A OUTPUT -p tcp --dport 6379 -j DROP")

def recover_cache_service():
    # Restore Redis connectivity
    os.system("iptables -D OUTPUT -p tcp --dport 6379 -j DROP")

# Usage
monkey = ChaosMonkey(enabled=os.environ.get("ENABLE_CHAOS", "False").lower() == "true")

# Only register targets in non-production environments
if not is_production_environment():
    monkey.register_target(
        "database", 
        disrupt_database_connection, 
        recover_database_connection,
        frequency=7200  # Every ~2 hours on average
    )

    monkey.register_target(
        "cache", 
        disrupt_cache_service, 
        recover_cache_service,
        frequency=3600  # Every ~1 hour on average
    )

# Start in a separate thread
monkey.start()
Enter fullscreen mode Exit fullscreen mode

Event-Driven Recovery

For complex systems, I implement event-driven recovery patterns:

import json
import time
import threading
from dataclasses import dataclass
from typing import Dict, List, Callable, Any

@dataclass
class Event:
    type: str
    source: str
    severity: str
    timestamp: float
    data: Dict[str, Any]

class EventBus:
    def __init__(self):
        self.subscribers = {}
        self._lock = threading.Lock()

    def subscribe(self, event_type, callback):
        with self._lock:
            if event_type not in self.subscribers:
                self.subscribers[event_type] = []
            self.subscribers[event_type].append(callback)

    def publish(self, event):
        # Create a copy of subscribers to avoid issues if handlers modify subscriptions
        handlers = []
        with self._lock:
            if event.type in self.subscribers:
                handlers = self.subscribers[event.type].copy()

        # Call each handler
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Error in event handler: {str(e)}")

class RecoveryManager:
    def __init__(self, event_bus):
        self.event_bus = event_bus
        self.recovery_handlers = {}
        self.active_recoveries = {}

        # Subscribe to system events
        self.event_bus.subscribe("system.database.failure", self.handle_event)
        self.event_bus.subscribe("system.cache.failure", self.handle_event)
        self.event_bus.subscribe("system.api.failure", self.handle_event)

    def register_recovery(self, event_type, recovery_func):
        """Register a recovery function for a specific event type"""
        self.recovery_handlers[event_type] = recovery_func

    def handle_event(self, event):
        """Handle incoming system events"""
        if event.type not in self.recovery_handlers:
            print(f"No recovery handler for {event.type}")
            return

        # Check if recovery is already in progress
        recovery_key = f"{event.type}:{event.source}"
        if recovery_key in self.active_recoveries:
            print(f"Recovery already in progress for {recovery_key}")
            return

        # Mark recovery as active
        self.active_recoveries[recovery_key] = time.time()

        # Start recovery in a separate thread
        threading.Thread(
            target=self._execute_recovery,
            args=(event, recovery_key)
        ).start()

    def _execute_recovery(self, event, recovery_key):
        """Execute the recovery function and handle completion"""
        try:
            # Get the appropriate recovery handler
            handler = self.recovery_handlers[event.type]

            # Execute recovery
            print(f"Starting recovery for {recovery_key}")
            handler(event)

            # Publish success event
            self.event_bus.publish(Event(
                type=f"{event.type}.recovered",
                source="recovery_manager",
                severity="info",
                timestamp=time.time(),
                data={"original_event": event.data}
            ))

            print(f"Recovery completed for {recovery_key}")

        except Exception as e:
            # Publish failure event
            self.event_bus.publish(Event(
                type=f"{event.type}.recovery_failed",
                source="recovery_manager",
                severity="error",
                timestamp=time.time(),
                data={
                    "original_event": event.data,
                    "error": str(e)
                }
            ))

            print(f"Recovery failed for {recovery_key}: {str(e)}")

        finally:
            # Remove from active recoveries
            self.active_recoveries.pop(recovery_key, None)

# Example usage
event_bus = EventBus()
recovery = RecoveryManager(event_bus)

# Register recovery handlers
def recover_database(event):
    """Handle database failure recovery"""
    db_host = event.data.get("host")
    print(f"Attempting to recover database on {db_host}")

    # Check if primary DB server is available
    if ping_server(db_host):
        # Try to restart the database service
        restart_database_service(db_host)
    else:
        # Failover to replica
        activate_database_replica()

    # Wait for database to become available
    wait_for_database_connection(timeout=60)

    # Verify data integrity
    verify_database_integrity()

    print("Database recovery completed")

def recover_cache(event):
    """Handle cache failure recovery"""
    # Clear all cache entries to ensure consistency
    flush_cache()

    # Restart cache service if needed
    if not is_cache_service_running():
        restart_cache_service()

    # Wait for cache service to become available
    wait_for_cache_connection(timeout=30)

    # Pre-warm critical caches
    prewarm_critical_caches()

    print("Cache recovery completed")

# Register the recovery handlers
recovery.register_recovery("system.database.failure", recover_database)
recovery.register_recovery("system.cache.failure", recover_cache)

# Somewhere in monitoring code, events are published
def monitor_services():
    while True:
        try:
            # Check database
            if not is_database_healthy():
                event_bus.publish(Event(
                    type="system.database.failure",
                    source="monitor",
                    severity="critical",
                    timestamp=time.time(),
                    data={"host": db_config.host}
                ))

            # Check cache
            if not is_cache_healthy():
                event_bus.publish(Event(
                    type="system.cache.failure",
                    source="monitor",
                    severity="warning",
                    timestamp=time.time(),
                    data={"service": "redis"}
                ))

        except Exception as e:
            print(f"Error in monitoring: {str(e)}")

        time.sleep(30)  # Check every 30 seconds
Enter fullscreen mode Exit fullscreen mode

Implementing a Complete Self-Healing System

Combining these techniques creates a comprehensive self-healing system. Here's how I integrate them:


python
import os
import threading
import time
import logging
import signal
import sys

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("self_healing.log")
    ]
)

class SelfHealingApp:
    def __init__(self, app_name):
        self.app_name = app_name
        self.health_monitor = HealthMonitor()
        self.feature_manager = FeatureManager()
        self.event_bus = EventBus()
        self.recovery_manager = RecoveryManager(self.event_bus)
        self.synthetic_tester = SyntheticTester(os.environ.get("APP_URL", "http://localhost:8000"))

        # Set up chaos monkey (only in testing environments)
        self.chaos_monkey = ChaosMonkey(enabled=os.environ.get("ENABLE_CHAOS", "False").lower() == "true")

        # Flag to coordinate shutdown
        self.running = True

        # Set up signal handlers
        signal.signal(signal.SIGINT, self.handle_shutdown)
        signal.signal(signal.SIGTERM, self.handle_shutdown)

    def handle_shutdown(self, sig, frame):
        """Handle shutdown signals gracefully"""
        logging.info("Shutdown signal received, stopping services...")
        self.running = False

    def setup_health_checks(self):
        """Set up application health checks"""
        # Database health
        self.health_monitor.register(
            "database", 
            check_database_connection,
            interval=30, 
            critical=True
        )

        # Cache health
        self.health_monitor.register(
            "cache", 
            check_cache_connection,
            interval=60, 
            critical=False
        )

        # External API health
        self.health_monitor.register(
            "payment_api", 
            check_payment_api,
            interval=120, 
            critical=True
        )

    def setup_feature_flags(self):
        """Set up feature flags with fallbacks"""
        # Recommendation engine with fallback
        self.feature_manager.register(
            "recommendations",
            lambda: self.health_monitor.check_service("recommendation_service") == "HEALTHY",
            fallback_func=get_popular_products
        )

        # Search functionality with fallback
        self.feature_manager.register(
            "advanced_search",
            lambda: self.health_monitor.check_service("search_service") == "HEALTHY",
            fallback_func=basic_search
        )

    def setup_recovery_handlers(self):
        """Set up event-driven recovery handlers"""
        # Database recovery
        self.recovery_manager.register_recovery(
            "system.database.failure", 
            recover_database
        )

        # Cache recovery
        self.recovery_manager.register_recovery(
            "system.cache.failure", 
            recover_cache
        )

        # API recovery
        self.recovery_manager.register_recovery(
            "system.api.failure", 
            recover_api_connection
        )

    def setup_synthetic_tests(self):
        """Set up synthetic tests for continuous validation"""
        # Test user login flow
        self.synthetic_tester.register_scenario(
            "user_login", 
            test_user_login,
            frequency=600  # Every 10 minutes
        )

        # Test product search
        self.synthetic_tester.register_scenario(
            "product_search", 
            test_product_search,
            frequency=300  # Every 5 minutes
        )

        # Test checkout process
        self.synthetic_tester.register_scenario(
            "checkout", 
            test_checkout_process,
            frequency=1800  # Every 30 minutes
        )

    def setup_chaos_experiments(self):
        """Set up chaos experiments (non-production only)"""
        if not is_production():
            # Database disruption
            self.chaos_monkey.register_target(
                "database", 
                disrupt_database_connection, 
                recover_database_connection,
                frequency=7200  # Every ~2 hours on average
            )

            # Cache disruption
            self.chaos_monkey.register_target(
                "cache", 
                disrupt_cache_service, 
                recover_cache_service,
                frequency=3600  # Every ~1 hour on average
            )

    def start(self):
        """Start all self-healing components"""
        logging.info(f"Starting {self.app_name} with self-healing capabilities")

        # Start health monitoring
        self.health_monitor.start_monitoring()
        logging.info("Health monitoring started")

        # Start synthetic testing


---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)