SoFunction
Updated on 2024-10-29

Tutorial on manipulating Elasticsearch data indexes using Python

Elasticsearch is a distributed, Restful search and analytics server, as is Apache Solr, which is also a Lucence-based indexing server, but I think the advantage of Elasticsearch over Solr is:

  • Lightweight: easy to install and start, one command after downloading the file;
  • Schema free: you can submit to the server any structure of the JSON object , Solr in the use of the specified index structure ;
  • Multi-index file support: using a different index parameter can create another index file, Solr needs to be configured separately;
  • Distributed: Solr Cloud is more complex to configure.

Environment Setup

Start Elasticsearch, access port 9200, through the browser can view the returned JSON data, Elasticsearch submit and return data format are JSON.

>> bin/elasticsearch -f

Install the official Python API, after installing it on OS X there are some Python runtime errors caused by the setuptools version being too old, remove and reinstall it to return to normal.

>> pip install elasticsearch

Indexing operations

For a single index, either the create or index method can be called.

from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch() #create a localhost server connection, or Elasticsearch("ip")
(index="test-index", doc_type="test-type", id=1,
  body={"any":"data", "timestamp": ()})

Elasticsearch bulk indexing command is bulk, the current Python API documentation examples are less, spent a lot of time reading the source code to figure out the bulk indexing submission format.

from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch("10.18.13.3")
j = 0
count = int(df[0].count())
actions = []
while (j < count):
   action = {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j + 1,
        "_source": {
              "crawaldate":df[0][j],
              "flight":df[1][j],
              "price":float(df[2][j]),
              "discount":float(df[3][j]),
              "date":df[4][j],
              "takeoff":df[5][j],
              "land":df[6][j],
              "source":df[7][j],
              "timestamp": ()}
        }
  (action)
  j += 1

  if (len(actions) == 500000):
    (es, actions)
    del actions[0:len(actions)]

if (len(actions) > 0):
  (es, actions)
  del actions[0:len(actions)]

Here found that the Python API serialization JSON when the data type support is more limited, the original data used NumPy.Int32 must be converted to int in order to index. In addition, the default of the current bulk operation is 500 data per submission, I modified it to 5000 or even 50000 for testing, and there will be unsuccessful indexing.

# source code
def streaming_bulk(client, actions, chunk_size=500, raise_on_error=False,
    expand_action_callback=expand_action, **kwargs):
  actions = map(expand_action_callback, actions)

  # if raise on error is set, we need to collect errors per chunk before raising them
  errors = []

  while True:
    chunk = islice(actions, chunk_size)
    bulk_actions = []
    for action, data in chunk:
      bulk_actions.append(action)
      if data is not None:
        bulk_actions.append(data)

    if not bulk_actions:
      return

def bulk(client, actions, stats_only=False, **kwargs):
  success, failed = 0, 0

  # list of errors to be collected is not stats_only
  errors = []

  for ok, item in streaming_bulk(client, actions, **kwargs):
    # go through request-reponse pairs and detect failures
    if not ok:
      if not stats_only:
        (item)
      failed += 1
    else:
      success += 1

  return success, failed if stats_only else errors

For batch delete and update operations of the index, the corresponding document format is as follows, and updating the doc node in the document is mandatory.

{
  '_op_type': 'delete',
  '_index': 'index-name',
  '_type': 'document',
  '_id': 42,
}
{
  '_op_type': 'update',
  '_index': 'index-name',
  '_type': 'document',
  '_id': 42,
  'doc': {'question': 'The life, universe and everything.'}
}

common error

  • SerializationError: JSON data serialization error, usually because the data type of a node value is not supported
  • RequestError: Submitted data is not in the correct format.
  • ConflictError: Index ID Conflict
  • TransportError: Connection could not be established.

performances

 (721×134)

The above is a comparison of using MongoDB and Elasticsearch to store the same data. Although the servers and operations are not exactly the same, you can see that databases still have an advantage over indexing servers for batch writes.

Elasticsearch's index files are automatically chunked, and reaching 10 million data has no effect on write speed. However, when reaching the upper limit of disk space, Elasticsearch had a file merge error and lost a large amount of data (more than 1 million items in total), and after stopping client-side writes, the server couldn't recover automatically and had to be stopped manually. This is fatal in production environments, especially with non-Java clients, where it seems impossible to get server-side Java exceptions on the client side, which makes programmers have to be very careful with server-side returns.