SoFunction
Updated on 2024-10-30

Python queues, inter-process communication, threading cases

process mutex lock

Multi-Process Simultaneous Residual Ticket Grabbing

# Running concurrently, efficient, but competing to write the same file, data written incorrectly
# The file reads {"ticket_num": 1}
import json
import time
from multiprocessing import Process
def search(user):
  with open('', 'r', encoding='utf-8') as f:
    dic = (f)
  print(f'subscribers{user}View Remaining Tickets,remain{("ticket_num")}...')
def buy(user):
  with open('', 'r', encoding='utf-8') as f:
    dic = (f)

  (0.1)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('', 'w', encoding='utf-8') as f:
      (dic, f)
    print(f'subscribers{user}Ticket Success!')

  else:
    print(f'subscribers{user}failed attempt to seize tickets')
def run(user):
  search(user)
  buy(user)
if __name__ == '__main__':
  for i in range(10): # Simulate 10 users grabbing tickets
    p = Process(target=run, args=(f'subscribers{i}', ))
    ()

Using locks to secure data

# The file reads {"ticket_num": 1}
import json
import time
from multiprocessing import Process, Lock
def search(user):
  with open('', 'r', encoding='utf-8') as f:
    dic = (f)
  print(f'subscribers{user}View Remaining Tickets,remain{("ticket_num")}...')
def buy(user):
  with open('', 'r', encoding='utf-8') as f:
    dic = (f)

  (0.2)
  if dic['ticket_num'] > 0:
    dic['ticket_num'] -= 1
    with open('', 'w', encoding='utf-8') as f:
      (dic, f)
    print(f'subscribers{user}Ticket Success!')

  else:
    print(f'subscribers{user}failed attempt to seize tickets')
def run(user, mutex):
  search(user)
  () # Locked
  buy(user)
  () # Release the lock
if __name__ == '__main__':
  # Call the Lock() class to get a lock object
  mutex = Lock()

  for i in range(10): # Simulate 10 users grabbing tickets
    p = Process(target=run, args=(f'subscribers{i}', mutex))
    ()

Process Mutual Exclusion Lock:

Making concurrency serial sacrifices execution efficiency and ensures data security

When the program is concurrent, it is necessary to modify the data using the

formation

The queue follows a first-in-first-out

Queue: Equivalent to a queue space in memory, can store multiple data, but the order of the data is from the first to go to the front of the queue.

() Add data

() Fetch data, following the first-in-first-out queue

q.get_nowait() gets the queue data, if there is none in the queue, an error is reported.

q.put_nowait add data, if the queue is full it will also report an error

() Check if the queue is full

() Check if the queue is empty

from multiprocessing import Queue

# Call the queue class to instantiate the queue object
q = Queue(5)  # 5 data stored in the queue

# put adds data, if the queue is full it gets stuck
(1)
print('Enter data 1')
(2)
print('Enter data 2')
(3)
print('Enter data 3')
(4)
print('Enter data 4')
(5)
print('Enter data 5')

# Check if the queue is full
print(())

# Add data, if the queue is full, it will also report an error.
q.put_nowait(6)

# () The data obtained follows a first-in-first-out (FIFO) basis
print(())
print(())
print(())
print(())
print(())
# print(())
print(q.get_nowait())  # Get the queue data. If there's no queue, it's an error.

# Determine if the queue is empty
print(())
(6)
print('Enter data 6')

interprocess communication

IPC(Inter-Process Communication)

Inter-process data is isolated from each other, if you want to realize inter-process communication, you can use the queue

from multiprocessing import Process, Queue
def task1(q):
  data = 'hello, how are you?'
  (data)
  print('Process 1 adding data to the queue')
def task2(q):
  print(())
  print('Process 2 gets data from the queue')
if __name__ == '__main__':
  q = Queue()

  p1 = Process(target=task1, args=(q, ))
  p2 = Process(target=task2, args=(q, ))
  ()
  ()
  print('Master process')

Producers and consumers

In the program, data is added to the queue by the queue producer and the consumer gets the data from the queue

from multiprocessing import Process, Queue
import time


# Producers
def producer(name, food, q):
  for i in range(10):
    data = food, i
    msg = f'subscribers{name}start producing{data}'
    print(msg)
    (data)
    (0.1)
# Consumers
def consumer(name, q):
  while True:
    data = ()
    if not data:
      break

    print(f'subscribers{name}Start eating.{data}')
if __name__ == '__main__':
  q = Queue()
  p1 = Process(target=producer, args=('neo', 'Pancakes', q))
  p2 = Process(target=producer, args=('wick', 'Meat Loaf', q))

  c1 = Process(target=consumer, args=('cwz', q))
  c2 = Process(target=consumer, args=('woods', q))

  ()
  ()
  
   = True
   = True
  ()
  ()
  print('Lord')

threading

The Concept of Threading

Processes and threads are virtual units

Processes: resource units

Thread: Unit of Execution

To start a process, there must be a thread, and the thread is the real executioner

Open the process:

  • Open a namespace that takes up a share of memory resources for each process started
  • It will come with its own thread

Open thread:

  • A process can have multiple threads open
  • Threads have much less overhead than processes

Note: Threads cannot be parallelized, threads can only be concurrent, processes can be parallelized

Two ways to create a thread

from threading import Thread
import time
# Thread creation method 1
def task():
  print('Thread open')
  (1)
  print('End of thread')

if __name__ == '__main__':
  t = Thread(target=task)
  ()
# Thread creation method 2
class MyThread(Thread):
  def run(self):
    print('Thread open...')
    (1)
    print('End of thread...')
if __name__ == '__main__':
  t = MyThread()
  ()

Methods of the thread object

from threading import Thread
from threading import current_thread
import time

def task():
  print(f'Thread open.{current_thread().name}')
  (1)
  print(f'End of thread{current_thread().name}')
if __name__ == '__main__':
  t = Thread(target=task)
  print(())
  #  = True
  ()
  print(())

thread mutex lock

Data is shared between threads

from threading import Thread
from threading import Lock
import time

mutex = Lock()
n = 100

def task(i):
  print(f'threading{i}activate (a plan)')
  global n
  ()
  temp = n
  (0.1)
  n = temp - 1
  print(n)
  ()
  
if __name__ == '__main__':
  t_l = []
  for i in range(100):
    t = Thread(target=task, args=(i, ))
    t_l.append(t)
    ()

  for t in t_l:
    ()

  print(n)

This is the whole content of this article.