Requirement background
The core function of the timer is to be able to trigger callback functions periodically, and it also needs to support operations such as start, stop and status check. In multithreaded or asynchronous programming scenarios, it is expected that the timer can:
- Supports asynchronous operations to avoid blocking the main thread;
- Single-in-time event loops to save resources;
- Gracefully manage the life cycle of the timer;
- Provides simple interface and is easy to use.
To this end, aTimer
Class, combinedasyncio
andthreading
, realizes an efficient timer.
Code
Complete code
import asyncio import threading import time import sys class Timer: _loop = None _thread = None _lock = () _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = ( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start() @classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"Event loop exception: {e}") finally: () @classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop) # Don't use join, because the daemon thread will automatically end when the main thread exits def __init__(self): self.is_running = False self._stop_event = () self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await (interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except : pass # Ignore when canceling normally except Exception as e: print(f"Timer loop exception: {e}") finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown() def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 # print(f"Timer is started, executed every {interval} seconds") def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False # print("Timer stopped") def is_active(self): return self.is_running #User Exampledef callback1(): print(f"Callback1trigger: {('%H:%M:%S')}") def callback2(): print(f"Callback2trigger: {('%H:%M:%S')}") if __name__ == "__main__": timer1 = Timer() timer2 = Timer() (2, callback1) (3, callback2) try: (100) () (2) () except KeyboardInterrupt: () () finally: # Make sure to clean up when the program exits Timer._shutdown()
1. Implementation of singleton event loop
To avoid creating an independent event loop for each timer,Timer
Class variables and class methods are used in the class to manage globally unique event loops:
class Timer: _loop = None _thread = None _lock = () _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = ( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start()
-
_loop
:Storing globalasyncio
Event loop. -
_thread
: Run the event loop in a separate daemon thread to avoid blocking the main thread. -
_lock
: Thread lock, ensuring thread safety when creating event loops in a multi-threaded environment. -
_ensure_loop
: Create or reuse event loops when needed, ensuring there is only one global loop.
Daemon thread (daemon=True
) is designed so that the program does not need to explicitly close threads when exiting, simplifying resource cleaning.
2. Running and closing of event loops
The running logic of the event loop is encapsulated in_run_loop
middle:
@classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f"Event loop exception: {e}") finally: ()
-
run_forever
: Let the event loop continue to run until it is stopped externally. - Exception handling: Catch possible errors and print them for easy debugging.
-
finally
: Ensure that resources are properly released when the loop is closed.
The closing logic is_shutdown
Method control:
@classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop)
When all timers are stopped (_running_timers == 0
), the event loop will be safely stopped.
3. Timer core logic
EachTimer
The instance is responsible for managing an independent timing task:
def __init__(self): self.is_running = False self._stop_event = () self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await (interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except : pass # Ignore when canceling normally finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown()
-
_stop_event
:oneObject, used to control the stop of the timer.
-
_timer_loop
: Asynchronous coroutine, every otherinterval
Execute the callback function once in secondscallback
。 -
run_in_executor
: Run the callback function in the default thread pool to avoid blocking event loops.
4. Start and Stop
The start and stop methods are the main interfaces for users:
def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False
-
start
: Start the timer, ensure the event loop is available, and record the number of timers in operation. -
stop
: By setting_stop_event
and cancel the task to stop the timer.
5. Use examples
Here is a simple example of usage:
def callback1(): print(f"Callback1trigger: {('%H:%M:%S')}") def callback2(): print(f"Callback2trigger: {('%H:%M:%S')}") timer1 = Timer() timer2 = Timer() (2, callback1) # Trigger every 2 seconds(3, callback2) # Trigger every 3 seconds (10) # Run for 10 seconds() ()
The output may be as follows:
Callback1trigger: 14:30:02 Callback2trigger: 14:30:03 Callback1trigger: 14:30:04 Callback1trigger: 14:30:06 Callback2trigger: 14:30:06 ...
Design Highlights
-
Combining asynchronous and multi-threaded:pass
asyncio
andthreading
, realizes a non-blocking timer, suitable for high concurrency scenarios. - Efficient resource utilization: A globally unique event loop avoids the overhead of repeated creation.
- Elegant life cycle management: Daemon threads and automatic shutdown mechanisms simplify resource cleanup.
- Thread safety: Use lock mechanism to ensure stability in multi-threaded environments.
Applicable scenarios
- Periodic task scheduling, such as data refresh and status check.
- Timed monitoring in background services.
- Timer requirements in games or real-time applications.
Summarize
This asynchronous timer implementation combines Python's asynchronous programming and multithreading features to provide a lightweight and flexible solution. Whether it is simple scripts or complex background services, it can do it. If you need a reliable timer, try this implementation, or further optimize it according to your needs!
The above is the detailed content of using Python to implement an elegant asynchronous timer. For more information about Python asynchronous timer, please pay attention to my other related articles!