SoFunction
Updated on 2025-03-04

Python3 automatic update of cache class usage

This class will automatically update cached data in the background, you only need to call methods to get the data.

Automatically update cache classes

The following isAutoUpdatingCacheClass implementation:

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        """
         Initialize the cache class.

         :param update_function: A function that generates or updates cached data.
         :param expiry_time: The cached update period (seconds).
         """
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
         = ()
        self._start_background_update()

    def _start_background_update(self):
        # Start the background thread to update the cache        self.update_thread = (target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = ()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            (1)  # Check once every second
    def _update_cache(self):
        with :
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = ()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with :
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

Instructions for use

Define a data generation function

First, you need to define a function for generating or updating cached data. This function can be any time-consuming operation, such as querying from a database, calculating complex results, etc.

import time

def generate_cache_data():
    # Simulation time-consuming operation    (5)
    return {"value": "fresh data", "timestamp": ()}

Create an instance of the cache class

Pass the data generation function toAutoUpdatingCacheclass and set cache update cycle.

cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

Get cached data

Call where neededget_data()Methods to obtain cached data.

data = cache.get_data()
print(data)

Complete example

Combine the above steps:

import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
        self.update_function = update_function
        self.expiry_time = expiry_time
        self.cache_data = None
        self.last_updated = 0
         = ()
        self._start_background_update()

    def _start_background_update(self):
        self.update_thread = (target=self._update_cache_periodically)
        self.update_thread.daemon = True
        self.update_thread.start()

    def _update_cache_periodically(self):
        while True:
            current_time = ()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            (1)

    def _update_cache(self):
        with :
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = ()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
        with :
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."

# Data generation functiondef generate_cache_data():
    (5)  # Simulation time-consuming operation    return {"value": "fresh data", "timestamp": ()}

# Create a cache instancecache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

# Simulate to get datafor _ in range(10):
    data = cache.get_data()
    print(data)
    (10)

Code explanation

  • AutoUpdatingCache Class

    • initmethod:
      • Initialize the cache, set the data generation function and cache update cycle.
      • Start the background thread_update_cache_periodically
    • _update_cache_periodically method:
      • Infinite loop, checking every second to see if the cache needs to be updated.
      • If the current time has been updated for the last time exceeds theexpiry_time, then call_update_cache
    • _update_cache method:
      • useupdate_functionUpdate cached data.
      • Use lock mechanismEnsure thread safety.
    • get_data method:
      • Get cached data.
      • If the cached data is empty (initialization), the prompt message is returned.
  • Data generation function

    • generate_cache_dataThe function simulates a time-consuming operation and generates new cached data.
  • Example of usage

    • Create a cache instance and get data every 10 seconds in the loop to observe the update of the cache.

Things to note

  • Thread Safety:

    • useEnsure the security of data access in a multi-threaded environment.
  • Exception handling:

    • When updating the cache, catch possible exceptions to prevent thread crashes.
  • Background thread:

    • Set the thread as a daemon thread (daemon=True), causing the thread to automatically end when the main program exits.

Application scenarios

You can apply this cache class to a web application, for example in Sanic's route:

from sanic import Sanic
from  import json

app = Sanic("CacheApp")

@("/data")
async def get_cached_data(request):
    data = cache.get_data()
    return json({"data": data})

if __name__ == "__main__":
    # Make sure the cache is initialized before application startup    cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
    (host="0.0.0.0", port=8000)

In this way, the user is visiting/dataWhen routing, the cache can always be obtained, and the cache will be automatically updated in the background, and the request will not time out due to updating the cache.

This is the introduction to this article about the specific use of python3 automatic update cache class. For more related contents of python3 automatic update cache class, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!