Log monitoring system
Nginx (log file) -> log_process (real-time read, parse, write) -> influxdb (storage) -> grafana (front-end log displayer)
influxdb is an open source timing data written in the GO language, focusing on high-performance query and storage timing data. Influxdb is widely used in storage system monitoring data and real-time data in the IOT industry.
Currently popular TSDB (time-sequential processing database): influxDB, TimescaleDB, QuestDB influxDB is similar to NOSQL experience and is automatically suitable for the technology of markup set model; TimescaleDB is compatible with postgreSQL, more suitable for IoT data, and better compatible with PostgreSQL QuestDB: supports InfluxDB inline protocol and PostgreSQL, but has a relatively large ecological problem.
Project brief introduction
This log system is DEMO, but it can be directly used in the production environment. Use LOG_Process to read Nginx ./, and use influxDB for access
log_process -path ./ influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s
Common concurrency models
- Solve the problem of C10k. Use asynchronous non-blocking models (Nginx, libevent, NodeJS)-- Problem High complexity. Large number of callback functions
- Coroutines (Go, Erlang, lua): Write code like colinear functions; understand the root-added lightweight threads
- Program execution in parallel go foo() // Execute functions
- mgs:= <- c Multiple goroundines need to communicate
- select read data from multiple channels, and randomly select one of multiple channels for consumption.
- Concurrency: A task makes the task look like it runs through a scheduler. It belongs to a single-core CPU (logical run) and is more friendly to IO-intensive.
- Parallel: The task is truly running
In the go language, concurrent execution is used, three different goroundines are used, one is responsible for loading, one is responsible for transportation, and the other is responsible for processing, so that the program can be run concurrently, making tasks more singular. This idea can also be used to parse and read logs and write modules to separate small modules. Each module uses goroundines and interacts through channel data. As for whether so many goroundines are scheduled to be executed on one CPU or allocated to multiple CPUs for execution, it depends on the system.
The go language has its own scheduler. go fun() belongs to an independent work unit. Go scheduler. It allocates a logical processor according to each available physical processor, and processes the independent unit through this logical processor.
By setting: (1)// Assign multiple small specific logical processors to the scheduler
The more physical processors a server have, the more logical processors it gets, resulting in the faster the machine allows. refer to:Portal
System architecture
The basic process-based pseudo-function of log parsing, the following functions have two shortcomings. After parsing, the output can only be written to death, so it needs to be expanded and the interface method is extended.
package main import ( "fmt" "strings" "time" ) /** * The log analysis system is divided into: parsing, reading, and writing */ type LogProcess struct { path string // Read file path influxDBDsn string // influx data source rc chan string // read module to process wc chan string // process to influx } // Return function uses pointers, the structure is large and does not require copying. Performance optimizationfunc (l *LogProcess) ReadFromFile() { // File reading module line := "message" <- line } func (l *LogProcess) Process() { // File parsing module data := <- <- (data) } func (l *LogProcess) writeToInfluxDB() { (<-) } func main() { // lp reference type lp := &LogProcess{ path: "./tmp/", influxDBDsn: "username&password...", rc: make(chan string), wc: make(chan string), } // tree goroutine run go () go () // chan needs to be defined to pass Process data to influxDB go () (2 * ) }
Interface Constraints Input and Output Optimization
package main import ( "fmt" "strings" "time" ) /** * The log analysis system is divided into: parsing, reading, and writing */ type LogProcess struct { rc chan string // read module to process wc chan string // process to influx read Read write Writer } func (l *LogProcess) Process() { // File parsing module data := <- <- (data) } type Writer interface { writer(wc chan string) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan string) { (<-wc) } type Read interface { read(rc chan string) } type ReadFromFile struct { path string // Read the file} func (r *ReadFromFile) read(rc chan string) { // Read module line := "message" rc <- line } func main() { // lp reference type r := &ReadFromFile{ path: "./tmp/", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan string), wc: make(chan string), read: r, write: w, } // Constrain its functions through interface go () go () go () // Through parameter injection (2 * ) }
Read module implementation
Start reading line by line from the last time you read the cursor, without having to read all files every time
package main import ( "bufio" "fmt" "io" "os" "strings" "time" ) /** * The log analysis system is divided into: parsing, reading, and writing */ type LogProcess struct { rc chan []byte // read module to process wc chan string // process to influx read Read write Writer } func (l *LogProcess) Process() { // File parsing module for v := range { <- (string(v)) } } type Writer interface { writer(wc chan string) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan string) { // Another way to read the wc channel for x := range wc { (x) } } type Read interface { read(rc chan []byte) } type ReadFromFile struct { path string // Read the file} func (r *ReadFromFile) read(rc chan []byte) { // Real-time system: Read line by line from the end of the file f, err := () if err != nil { panic(("open file error:%s", ())) } // Read at the end of the file at the beginning (0, 2) rd := (f) for { line, err := ('\n') if err == { // d read to the end of the file, the log has not been written yet (500 * ) continue } else if err != nil { panic(("ReadBytes error:%s", ())) } rc <- line[:len(line)-1] } } func main() { // lp reference type r := &ReadFromFile{ path: "H:\\code\\goprogarm\\src\\", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan []byte), wc: make(chan string), read: r, write: w, } // Constrain its functions through interface go () go () go () // Through parameter injection (100 * ) }
Log parsing module
- Read each row of data in Read Chan
- Extract the required monitoring data in regular way
- Write data to influxDB
package main import ( "bufio" "fmt" "io" "log" "os" "regexp" "strconv" "time" ) /** * The log analysis system is divided into: parsing, reading, and writing */ type LogProcess struct { rc chan []byte // read module to process wc chan *Message // process to influx read Read write Writer } //Login writing to structuretype Message struct { TimeLocal BytesSent int Path, Method, Scheme, Status string UpstreamTime, RequestTime float64 } func (l *LogProcess) Process() { // parse data through regular expressions r := (`(\s*)`) loc, _ := ("Asia/shanghai") // File parsing module for v := range { ret := (string(v)) if len(ret) != 13 { ("FindStringSub match fail:", string(v)) continue } message := &Message{ } location, err := ("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { ("ParseInLocation fail:", (), ret[4]) } = location // Convert string type to int atoi, err := (ret[8]) if err != nil { (" fail:", (), ret[4]) } = atoi <- message } } type Writer interface { writer(wc chan *Message) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan *Message) { // Another way to read the wc channel for x := range wc { (x) } } type Read interface { read(rc chan []byte) } type ReadFromFile struct { path string // Read the file} func (r *ReadFromFile) read(rc chan []byte) { // Real-time system: Read line by line from the end of the file f, err := () if err != nil { panic(("open file error:%s\n", ())) } // Read at the end of the file at the beginning (0, 2) rd := (f) for { line, err := ('\n') if err == { // d read to the end of the file, the log has not been written yet (500 * ) continue } else if err != nil { panic(("ReadBytes error:%s\n", ())) } rc <- line[:len(line)-1] } } func main() { // lp reference type r := &ReadFromFile{ path: "H:\\code\\goprogarm\\src\\", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan []byte), wc: make(chan *Message), read: r, write: w, } // Constrain its functions through interface go () go () go () // Through parameter injection (100 * ) }
This is all about this article about the implementation of GoLang log monitoring system. For more related GoLang log monitoring content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!