SoFunction
Updated on 2024-10-29

python implementation of opencv+scoket network real-time mapping

In this paper, we share the example of python realize opencv+scoket network real-time mapping specific code, for your reference, the specific content is as follows

Server Analysis:

1. first through the use of OpenCV on the server side to capture the video of each frame image

2. Compress these images into JPEG format, which reduces the size of the image and facilitates transmission.

3. Packaging and encoding for transmission at pre-negotiated resolution and frame rate

4. Use the server side to open port 8880, at this time the client connection, you can capture the server-side video in the client.

#server
import socket
import threading
import struct
import time
import cv2
import numpy

class Carame_Accept_Object:
  def __init__(self,S_addr_port=("",8880)):
    =(640,480)    #Resolution
    self.img_fps=15         # How many frames per second are transmitted
    self.addr_port=S_addr_port
    self.Set_Socket(self.addr_port)

  #Setup sockets
  def Set_Socket(self,S_addr_port):
    =(socket.AF_INET,socket.SOCK_STREAM)
    (socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #Port reusability
    (S_addr_port)
    (5)
    #print("the process work in the port:%d" % S_addr_port[1])


def check_option(object,client):
  # Decode by format, determine frame rate and resolution
  info=('lhh',(8))
  if info[0]>888:
    object.img_fps=int(info[0])-888     # Getting frames
    =list()
    # Get resolution
    [0]=info[1]
    [1]=info[2]
     = tuple()
    return 1
  else:
    return 0

def RT_Image(object,client,D_addr):
  if(check_option(object,client)==0):
    return
  camera=(0)                # Getting video from the camera
  img_param=[int(cv2.IMWRITE_JPEG_QUALITY),object.img_fps] #Set the transmission image format, frame rate
  while(1):
    (0.1)       #Postpone threads for 0.1s
    _,=() # Read every frame of the video

    =(,)   # Resize the image as required (resolution must be a tuple)
    _,img_encode=('.jpg',,img_param) # Generate images by format
    img_code=(img_encode)            # Convert to matrix
    object.img_data=img_code.tostring()           # Generate the appropriate string
    try:
      #Package and send images in the appropriate format
      (("lhh",len(object.img_data),[0],[1])+object.img_data)
    except:
      ()    # Resources released
      return

if __name__ == '__main__':
  camera=Carame_Accept_Object()
  while(1):
    client,D_addr=()
    clientThread=(None,target=RT_Image,args=(camera,client,D_addr,))
    ()

Client-side analysis:

1. When the client connects to the port, it first sends the resolution and number of frames that need to be negotiated so that the transmission "protocol" is consistent

2. The client uses threads to collect the images

3. Decode each received picture and play it out using OpenCV, which can realize real-time video transmission from both ends of C/S.

# Client
import socket
import cv2
import threading
import struct
import numpy

class Camera_Connect_Object:
  def __init__(self,D_addr_port=["",8880]):
    =[640,480]
    self.addr_port=D_addr_port
    =888+15         # Both parties determine the number of frames to transmit, (888) is the checksum value
    =0         #Picture playback interval
    self.img_fps=15         # How many frames per second are transmitted

  def Set_socket(self):
    =(socket.AF_INET,socket.SOCK_STREAM)
    (socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

  def Socket_Connect(self):
    self.Set_socket()
    (self.addr_port)
    print("IP is %s:%d" % (self.addr_port[0],self.addr_port[1]))

  def RT_Image(self):
    # Send frames and resolution packaged by format
    =self.addr_port[0]+" Camera"
    (("lhh", , [0], [1]))
    while(1):
      info=("lhh",(8))
      buf_size=info[0]          # Get the total length of the image read
      if buf_size:
        try:
          =b""        # represents the type of bytes
          temp_buf=
          while(buf_size):      # Read the length of each image
            temp_buf=(buf_size)
            buf_size-=len(temp_buf)
            +=temp_buf   #Fetching Pictures
            data = (, dtype='uint8')  # Convert to image matrix by uint8
             = (data, 1)         #Image decoding
            (, )          #Show Pictures
        except:
          pass;
        finally:
          if((10)==27):    # Refresh image every 10ms, press 'ESC' (27) to exit
            ()
            ()
            break

  def Get_Data(self,interval):
    showThread=(target=self.RT_Image)
    ()

if __name__ == '__main__':
  camera=Camera_Connect_Object()
  camera.addr_port[0]="Server's ip."
  camera.addr_port=tuple(camera.addr_port)
  camera.Socket_Connect()
  camera.Get_Data(

This is the whole content of this article.