SoFunction
Updated on 2025-04-20

Implement an elegant asynchronous timer with Python

Requirement background

The core function of the timer is to be able to trigger callback functions periodically, and it also needs to support operations such as start, stop and status check. In multithreaded or asynchronous programming scenarios, it is expected that the timer can:

  1. Supports asynchronous operations to avoid blocking the main thread;
  2. Single-in-time event loops to save resources;
  3. Gracefully manage the life cycle of the timer;
  4. Provides simple interface and is easy to use.

To this end, aTimerClass, combinedasyncioandthreading, realizes an efficient timer.

Code

Complete code

import asyncio
import threading
import time
import sys

class Timer:
    _loop = None
    _thread = None
    _lock = ()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = (
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()

    @classmethod
    def _run_loop(cls, loop):
        asyncio.set_event_loop(loop)
        try:
            loop.run_forever()
        except Exception as e:
            print(f"Event loop exception: {e}")
        finally:
            ()

    @classmethod
    def _shutdown(cls):
        with cls._lock:
            if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
                cls._loop.call_soon_threadsafe(cls._loop.stop)
                # Don't use join, because the daemon thread will automatically end when the main thread exits
    def __init__(self):
        self.is_running = False
        self._stop_event = ()
        self._task = None

    async def _timer_loop(self, interval, callback):
        try:
            while not self._stop_event.is_set():
                await (interval)
                if not self._stop_event.is_set():
                    await asyncio.get_event_loop().run_in_executor(None, callback)
        except :
            pass  # Ignore when canceling normally        except Exception as e:
            print(f"Timer loop exception: {e}")
        finally:
            self.is_running = False
            Timer._running_timers -= 1
            Timer._shutdown()

    def start(self, interval, callback):
        if not self.is_running:
            Timer._ensure_loop()
            self.is_running = True
            self._stop_event.clear()
            self._task = asyncio.run_coroutine_threadsafe(
                self._timer_loop(interval, callback),
                Timer._loop
            )
            Timer._running_timers += 1
            # print(f"Timer is started, executed every {interval} seconds")
    def stop(self):
        if self.is_running:
            self._stop_event.set()
            if self._task:
                Timer._loop.call_soon_threadsafe(self._task.cancel)
            self.is_running = False
            # print("Timer stopped")
    def is_active(self):
        return self.is_running

#User Exampledef callback1():
    print(f"Callback1trigger: {('%H:%M:%S')}")

def callback2():
    print(f"Callback2trigger: {('%H:%M:%S')}")

if __name__ == "__main__":
    timer1 = Timer()
    timer2 = Timer()

    (2, callback1)
    (3, callback2)

    try:
        (100)
        ()
        (2)
        ()
    except KeyboardInterrupt:
        ()
        ()
    finally:
        # Make sure to clean up when the program exits        Timer._shutdown()

1. Implementation of singleton event loop

To avoid creating an independent event loop for each timer,TimerClass variables and class methods are used in the class to manage globally unique event loops:

class Timer:
    _loop = None
    _thread = None
    _lock = ()
    _running_timers = 0

    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = (
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()
  • _loop:Storing globalasyncioEvent loop.
  • _thread: Run the event loop in a separate daemon thread to avoid blocking the main thread.
  • _lock: Thread lock, ensuring thread safety when creating event loops in a multi-threaded environment.
  • _ensure_loop: Create or reuse event loops when needed, ensuring there is only one global loop.

Daemon thread (daemon=True) is designed so that the program does not need to explicitly close threads when exiting, simplifying resource cleaning.

2. Running and closing of event loops

The running logic of the event loop is encapsulated in_run_loopmiddle:

@classmethod
def _run_loop(cls, loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    except Exception as e:
        print(f"Event loop exception: {e}")
    finally:
        ()
  • run_forever: Let the event loop continue to run until it is stopped externally.
  • Exception handling: Catch possible errors and print them for easy debugging.
  • finally: Ensure that resources are properly released when the loop is closed.

The closing logic is_shutdownMethod control:

@classmethod
def _shutdown(cls):
    with cls._lock:
        if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
            cls._loop.call_soon_threadsafe(cls._loop.stop)

When all timers are stopped (_running_timers == 0), the event loop will be safely stopped.

3. Timer core logic

EachTimerThe instance is responsible for managing an independent timing task:

def __init__(self):
    self.is_running = False
    self._stop_event = ()
    self._task = None

async def _timer_loop(self, interval, callback):
    try:
        while not self._stop_event.is_set():
            await (interval)
            if not self._stop_event.is_set():
                await asyncio.get_event_loop().run_in_executor(None, callback)
    except :
        pass  # Ignore when canceling normally    finally:
        self.is_running = False
        Timer._running_timers -= 1
        Timer._shutdown()
  • _stop_event:oneObject, used to control the stop of the timer.
  • _timer_loop: Asynchronous coroutine, every otherintervalExecute the callback function once in secondscallback
  • run_in_executor: Run the callback function in the default thread pool to avoid blocking event loops.

4. Start and Stop

The start and stop methods are the main interfaces for users:

def start(self, interval, callback):
    if not self.is_running:
        Timer._ensure_loop()
        self.is_running = True
        self._stop_event.clear()
        self._task = asyncio.run_coroutine_threadsafe(
            self._timer_loop(interval, callback),
            Timer._loop
        )
        Timer._running_timers += 1

def stop(self):
    if self.is_running:
        self._stop_event.set()
        if self._task:
            Timer._loop.call_soon_threadsafe(self._task.cancel)
        self.is_running = False
  • start: Start the timer, ensure the event loop is available, and record the number of timers in operation.
  • stop: By setting_stop_eventand cancel the task to stop the timer.

5. Use examples

Here is a simple example of usage:

def callback1():
    print(f"Callback1trigger: {('%H:%M:%S')}")

def callback2():
    print(f"Callback2trigger: {('%H:%M:%S')}")

timer1 = Timer()
timer2 = Timer()

(2, callback1)  # Trigger every 2 seconds(3, callback2)  # Trigger every 3 seconds
(10)  # Run for 10 seconds()
()

The output may be as follows:

Callback1trigger: 14:30:02
Callback2trigger: 14:30:03
Callback1trigger: 14:30:04
Callback1trigger: 14:30:06
Callback2trigger: 14:30:06
...

Design Highlights

  1. Combining asynchronous and multi-threaded:passasyncioandthreading, realizes a non-blocking timer, suitable for high concurrency scenarios.
  2. Efficient resource utilization: A globally unique event loop avoids the overhead of repeated creation.
  3. Elegant life cycle management: Daemon threads and automatic shutdown mechanisms simplify resource cleanup.
  4. Thread safety: Use lock mechanism to ensure stability in multi-threaded environments.

Applicable scenarios

  • Periodic task scheduling, such as data refresh and status check.
  • Timed monitoring in background services.
  • Timer requirements in games or real-time applications.

Summarize

This asynchronous timer implementation combines Python's asynchronous programming and multithreading features to provide a lightweight and flexible solution. Whether it is simple scripts or complex background services, it can do it. If you need a reliable timer, try this implementation, or further optimize it according to your needs!

The above is the detailed content of using Python to implement an elegant asynchronous timer. For more information about Python asynchronous timer, please pay attention to my other related articles!