SoFunction
Updated on 2025-03-03

GoLang log monitoring system implementation

Log monitoring system

Nginx (log file) -> log_process (real-time read, parse, write) -> influxdb (storage) -> grafana (front-end log displayer)

influxdb is an open source timing data written in the GO language, focusing on high-performance query and storage timing data. Influxdb is widely used in storage system monitoring data and real-time data in the IOT industry.

Currently popular TSDB (time-sequential processing database): influxDB, TimescaleDB, QuestDB influxDB is similar to NOSQL experience and is automatically suitable for the technology of markup set model; TimescaleDB is compatible with postgreSQL, more suitable for IoT data, and better compatible with PostgreSQL QuestDB: supports InfluxDB inline protocol and PostgreSQL, but has a relatively large ecological problem.

Project brief introduction

This log system is DEMO, but it can be directly used in the production environment. Use LOG_Process to read Nginx ./, and use influxDB for access

log_process -path ./ influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s

Common concurrency models

  • Solve the problem of C10k. Use asynchronous non-blocking models (Nginx, libevent, NodeJS)-- Problem High complexity. Large number of callback functions
  • Coroutines (Go, Erlang, lua): Write code like colinear functions; understand the root-added lightweight threads
  • Program execution in parallel go foo() // Execute functions
  • mgs:= <- c Multiple goroundines need to communicate
  • select read data from multiple channels, and randomly select one of multiple channels for consumption.
  • Concurrency: A task makes the task look like it runs through a scheduler. It belongs to a single-core CPU (logical run) and is more friendly to IO-intensive.
  • Parallel: The task is truly running

In the go language, concurrent execution is used, three different goroundines are used, one is responsible for loading, one is responsible for transportation, and the other is responsible for processing, so that the program can be run concurrently, making tasks more singular. This idea can also be used to parse and read logs and write modules to separate small modules. Each module uses goroundines and interacts through channel data. As for whether so many goroundines are scheduled to be executed on one CPU or allocated to multiple CPUs for execution, it depends on the system.

The go language has its own scheduler. go fun() belongs to an independent work unit. Go scheduler. It allocates a logical processor according to each available physical processor, and processes the independent unit through this logical processor.
By setting: (1)// Assign multiple small specific logical processors to the scheduler
The more physical processors a server have, the more logical processors it gets, resulting in the faster the machine allows. refer to:Portal

System architecture

The basic process-based pseudo-function of log parsing, the following functions have two shortcomings. After parsing, the output can only be written to death, so it needs to be expanded and the interface method is extended.

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
 * The log analysis system is divided into: parsing, reading, and writing
  */
type LogProcess struct {
	path        string      // Read file path	influxDBDsn string      // influx data source
	rc          chan string // read module to process
	wc          chan string // process to influx
}
// Return function uses pointers, the structure is large and does not require copying. Performance optimizationfunc (l *LogProcess) ReadFromFile() {
	// File reading module	line := "message"
	 &lt;- line
}
func (l *LogProcess) Process() {
	// File parsing module	data := &lt;-
	 &lt;- (data)
}
func (l *LogProcess) writeToInfluxDB() {
	(&lt;-)
}
func main() {
	// lp reference type	lp := &amp;LogProcess{
		path:        "./tmp/",
		influxDBDsn: "username&amp;password...",
		rc:          make(chan string),
		wc:          make(chan string),
	}
	// tree goroutine run
	go ()
	go ()
	// chan needs to be defined to pass Process data to influxDB	go ()
	(2 * )
}

Interface Constraints Input and Output Optimization

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
 * The log analysis system is divided into: parsing, reading, and writing
  */
type LogProcess struct {
	rc    chan string // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// File parsing module	data := &lt;-
	 &lt;- (data)
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	(&lt;-wc)
}
type Read interface {
	read(rc chan string)
}
type ReadFromFile struct {
	path string // Read the file}
func (r *ReadFromFile) read(rc chan string) {
	// Read module	line := "message"
	rc &lt;- line
}
func main() {
	// lp reference type	r := &amp;ReadFromFile{
		path: "./tmp/",
	}
	w := &amp;WriteToInfluxDB{
		influxDBDsn: "username&amp;password"}
	lp := &amp;LogProcess{
		rc:    make(chan string),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// Constrain its functions through interface	go ()
	go ()
	go ()
	// Through parameter injection	(2 * )
}

Read module implementation

Start reading line by line from the last time you read the cursor, without having to read all files every time

package main
import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"
)
/**
 * The log analysis system is divided into: parsing, reading, and writing
  */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// File parsing module	for v := range  {
		 &lt;- (string(v))
	}
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	// Another way to read the wc channel	for x := range wc {
		(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // Read the file}
func (r *ReadFromFile) read(rc chan []byte) {
	// Real-time system: Read line by line from the end of the file	f, err := ()
	if err != nil {
		panic(("open file error:%s", ()))
	}
	// Read at the end of the file at the beginning	(0, 2)
	rd := (f)
	for {
		line, err := ('\n')
		if err ==  {
			// d read to the end of the file, the log has not been written yet			(500 * )
			continue
		} else if err != nil {
			panic(("ReadBytes error:%s", ()))
		}
		rc &lt;- line[:len(line)-1]
	}
}
func main() {
	// lp reference type	r := &amp;ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\",
	}
	w := &amp;WriteToInfluxDB{
		influxDBDsn: "username&amp;password"}
	lp := &amp;LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// Constrain its functions through interface	go ()
	go ()
	go ()
	// Through parameter injection	(100 * )
}

Log parsing module

  • Read each row of data in Read Chan
  • Extract the required monitoring data in regular way
  • Write data to influxDB
package main
import (
	"bufio"
	"fmt"
	"io"
	"log"
	"os"
	"regexp"
	"strconv"
	"time"
)
/**
 * The log analysis system is divided into: parsing, reading, and writing
  */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan *Message // process to influx
	read  Read
	write Writer
}
//Login writing to structuretype Message struct {
	TimeLocal 
	BytesSent int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
	// parse data through regular expressions	r := (`(\s*)`)
	loc, _ := ("Asia/shanghai")
	// File parsing module	for v := range  {
		ret := (string(v))
		if len(ret) != 13 {
			("FindStringSub match fail:", string(v))
			continue
		}
		message := &amp;Message{
		}
		location, err := ("02/Jan/2006:15:04:05 +0000", ret[4], loc)
		if err != nil {
			("ParseInLocation fail:", (), ret[4])
		}
		 = location
		// Convert string type to int		atoi, err := (ret[8])
		if err != nil {
			(" fail:", (), ret[4])
		}
		 = atoi
		 &lt;- message
	}
}
type Writer interface {
	writer(wc chan *Message)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
	// Another way to read the wc channel	for x := range wc {
		(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // Read the file}
func (r *ReadFromFile) read(rc chan []byte) {
	// Real-time system: Read line by line from the end of the file	f, err := ()
	if err != nil {
		panic(("open file error:%s\n", ()))
	}
	// Read at the end of the file at the beginning	(0, 2)
	rd := (f)
	for {
		line, err := ('\n')
		if err ==  {
			// d read to the end of the file, the log has not been written yet			(500 * )
			continue
		} else if err != nil {
			panic(("ReadBytes error:%s\n", ()))
		}
		rc &lt;- line[:len(line)-1]
	}
}
func main() {
	// lp reference type	r := &amp;ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\",
	}
	w := &amp;WriteToInfluxDB{
		influxDBDsn: "username&amp;password"}
	lp := &amp;LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan *Message),
		read:  r,
		write: w,
	}
	// Constrain its functions through interface	go ()
	go ()
	go ()
	// Through parameter injection	(100 * )
}

This is all about this article about the implementation of GoLang log monitoring system. For more related GoLang log monitoring content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!