SoFunction
Updated on 2024-10-29

A practical guide to handling parquet files in Golang.

preamble

Parquet, an Apache Foundation supported project, is a column-oriented storage binary file format. It supports different types of compression and is widely used in data science and big data environments such as the Hadoop ecosystem.

This article focuses on how Go generates and processes parquet files.

Creating Structures

First create struct to represent the data to be processed:

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt  //wont be saved in the parquet file
}

Here is a reminder of the tag, which is used to describe how each field in the struct is handled in the parquet generation process.

parquet-go package can handle parquet data, more tag can refer to its official website.

Generate parquet file

The code for generating the parquet file is now given below and then explained separately:

package main

import (
  "fmt"
  "log"
  "time"
  "/bxcodec/faker/v3"
  "/xitongsys/parquet-go-source/local"
  "/xitongsys/parquet-go/parquet"
  "/xitongsys/parquet-go/reader"
  "/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt  //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        (),
      FirstName: (),
      LastName:  (),
      Email:     (),
      Phone:     (),
      Blog:      (),
      Username:  (),
      Score:     float64(i),
      CreatedAt: (),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    (err)
  }

}

func generateParquet(data []*user) error {
  ("generating parquet file")
  fw, err := ("")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := (fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
   = parquet.CompressionCodec_GZIP
  defer ()
  for _, d := range data {
    if err = (d); err != nil {
      return err
    }
  }
  if err = (); err != nil {
    return err
  }
  return nil
}

Defining structures is explained above, but a reminder is needed that the types are consistent with the documentation:

Primitive Type Go Type
BOOLEAN bool
INT32 int32
INT64 int64
INT96(deprecated) string
FLOAT float32
DOUBLE float64
BYTE_ARRAY string
FIXED_LEN_BYTE_ARRAY string

The next step is to generate simulated data using the faker package. Then call theerr := generateParquet(data)method. The approximate logic of the method is:

  • First prepare the output file and then construct pw,for writing parquet data, based on the local output file:
  fw, err := ("")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := (fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }

  //compression type
   = parquet.CompressionCodec_GZIP
  defer ()

Then set the compression type and make sure to close the file with the defer operation. Next, start writing the data:

  for _, d := range data {
    if err = (d); err != nil {
      return err
    }
  }
  if err = (); err != nil {
    return err
  }
  return nil

The loop writes the data and finally calls the()Stop writing. After successfully writing the file, the following describes how to read the parquet file.

Reading a parquet file

First of all, it introduces how to read files at once, which is mainly used to read smaller files:

func readParquet() ([]*user, error) {
  fr, err := ("")
  if err != nil {
    return nil, err
  }

  pr, err := (fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

  u := make([]*user, recordNumber)
  if err = (&u); err != nil {
    return nil, err
  }
  ()
  ()
  return u, nil
}

The approximate process is as follows: first define the local file, then construct pr for reading the parquet file:

  fr, err := ("")
  if err != nil {
    return nil, err
  }

  pr, err := (fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

Then define the target content container u to read the data at once:

  u := make([]*user, recordNumber)
  if err = (&u); err != nil {
    return nil, err
  }
  ()
  ()

However, loading a large number of records into memory at once may be problematic. This is an official documentation tip:

If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don’t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.

The idea is not to read the file into memory at once, which may result in OOM. in practice, it should be read in pages, as illustrated by the code below:

func readPartialParquet(pageSize, page int) ([]*user, error) {
	fr, err := ("")
	if err != nil {
		return nil, err
	}
	defer func() {
		_ = ()
	}()

	pr, err := (fr, new(user), int64(pageSize))
	if err != nil {
		return nil, err
	}
	defer ()

	//num := ()
	
	(int64(pageSize * page))
	u := make([]*user, pageSize)
	if err = (&u); err != nil {
		return nil, err
	}

	return u, nil
}

With the above function is not very different, the first function includes two parameters, used to specify the page size and number of pages, the key code is to skip a certain record:

  (int64(pageSize * page))

According to this method the total number of rows can be obtained, the(), and then combined with the page size to calculate the total number of pages, and finally the loop can realize the paging query.

Calculate column averages

Since the Parquet column storage format is used, the following demonstrates how to calculate the average of the Score column.

func calcScoreAVG() (float64, error) {
  fr, err := ("")
  if err != nil {
    return 0.0, err
  }
  pr, err := (fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(())

  data, _, _, err := ("parquet_go_root\u0001score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}

First open the file and call the () method to get the total number of rows. Then specify the columns based on the path, whereparquet_go_rootis the root path, because of the previous use of byte arrays, here the separator becomes \u0001, and the full path is:parquet_go_root\u0001score

summarize

This article on Golang processing parquet file is introduced to this article, more related to Golang processing parquet file content please search for my previous articles or continue to browse the following related articles I hope you will support me in the future more!