streamsx.elasticsearch package

Elasticsearch integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides classes and functions to store tuple data as JSON documents in Elasticsearch indices for use with Streaming Analytics service on IBM Cloud and IBM Streams including IBM Cloud Pak for Data.

Credentials

Elasticsearch credentials are defined using a Streams application configuration or the Compose for Elasticsearch connection string.

Setup connection string

You can connect to your Elasticsearch cloud service with the connection strings that is provided in the Overview tab of your service dashboard.

The connection string for the Elasticsearch cloud service can be applied with the credentials parameter:

connection = 'https://<USER>:<PASSWORD>@<HOST>:<PORT>/'
s.for_each(es.Insert(credentials=connection, index_name='test-index-cloud'))

Setup application configuration

By default an application configuration named es is used for the credentials parameter. A different configuration name can be specified using the credentials parameter.

The default configuration is “es”, this can be set:

  • Using the “Application Configuration” tab of the “Streams Console”
  • Using page selected by the sub tab “Application Configuration”
  • Create a “New application configuration…” using the “Name” “es”, no description is necessary
  • Set the following properties for your Elasticsearch database connection: “nodeList” (value <HOST>:<PORT>), “userName”, “password”, “sslEnabled” (value true|false), “sslTrustAllCertificates” (value true|false)
class streamsx.elasticsearch.Insert(credentials, index_name, bulk_size=1, **options)

Bases: streamsx.topology.composite.ForEach

Stores JSON documents in a specified index of an Elasticsearch database.

Example writing string messages to an index with credentials stored in application configuration with the name ‘elasticsearch’:

import streamsx.elasticsearch as es
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(['Hello', 'World!']).as_string()
s.for_each(es.Insert(credentials='elasticsearch', index_name='sample-index-cloud'))

Example with specifying connection string as credentials and additional parameters as kwargs:

connection_string = 'https:https://ibm_cloud_USER:PASSWORD@XXXX.databases.appdomain.cloud:30735'
config = {
    'ssl_trust_all_certificates': True
}
s.for_each(es.Insert(credentials=connection_string, index_name='sample-index-cloud', **config))

Example with dynamic index name (part of the stream):

s = topo.source([('idx1','{"msg":"This is message number 1"}'), ('idx2','{"msg":"This is message number 2"}')])
schema = StreamSchema('tuple<rstring indexName, rstring document>')
s = s.map(lambda x : x, schema=schema)
config = {
    'ssl_trust_all_certificates': True,
    'index_name_attribute': 'indexName',
    'message_attribute': 'document'
}
s.for_each(es.Insert(credentials=connection_string, index_name=None, **config))

New in version 1.3.

credentials

Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When set to None, the application configuration name es is used.

Type:str
index_name

Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.

Type:str
bulk_size

Size of the bulk to submit to Elasticsearch. The default value is 1.

Type:int
options

The additional optional parameters as variable keyword arguments.

Type:kwargs
connection_timeout

The timeout for waiting on establishment of the TCP connection to the server node. Specified in milliseconds. The default value is 20000 (20 seconds).

Type:int
index_name_attribute

Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.

Type:str
message_attribute

Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is CommonSchema.Json.

Type:str
populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • name – Name passed into for_each.
  • **options – Future options passed to for_each.
Returns:

Termination for this composite transformation of stream.

Return type:

Sink

read_timeout

The timeout for waiting for a REST response from the server node. Specified in milliseconds. The default value is 5000 (5 seconds).

Type:int
reconnection_policy_count

Specifies the number of reconnection attemps to th Elasticsearch server, upon disconnection.

Type:int
ssl_debug

If set to ‘True’, SSL/TLS protocol debugging is enabled, all protocol data and information is logged to the console. The default is ‘False’.

Type:bool
ssl_trust_all_certificates

If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.

Type:bool
ssl_trust_store

Specifies the name of a file containing trusted certificates. This file is added to the application directory.

Type:str
ssl_trust_store_password

Specify the password used to access the Truststore file.

Type:str
ssl_verify_hostname

this is unsecure and should only be used for debugging purposes. The default is True.

Type:bool
Type:If set to False, the SSL/TLS layer will not verify the hostname in the server certificate against the actual name of the server host. WARNING
vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type:str
streamsx.elasticsearch.download_toolkit(url=None, target_dir=None)

Downloads the latest Elasticsearch toolkit from GitHub.

Example for updating the Elasticsearch toolkit for your topology with the latest toolkit from GitHub:

import streamsx.elasticsearch as es
# download Elasticsearch toolkit from GitHub
elasticsearch_toolkit_location = es.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)

Example for updating the topology with a specific version of the Elasticsearch toolkit using a URL:

import streamsx.elasticsearch as es
url221 = 'https://github.com/IBMStreams/streamsx.elasticsearch/releases/download/v2.1.1/streamsx.elasticsearch.toolkits-2.1.1-20181204-0909.tgz'
elasticsearch_toolkit_location = es.download_toolkit(url=url221)
streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)
Parameters:
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.
Returns:

the location of the downloaded Elasticsearch toolkit

Return type:

str

Note

This function requires an outgoing Internet connection

New in version 1.2.

streamsx.elasticsearch.bulk_insert(stream, index_name, bulk_size=1, message_attribute=None, credentials='es', ssl_trust_all_certificates=False, name=None)

Stores JSON documents in a specified index of an Elasticsearch database.

Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. If input is streamsx.topology.schema.StreamSchema, then each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Stream tuple attribute, the value will be taken from the attributes value. Writes JSON documents without conversion, when input stream is CommonSchema.Json.

Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Supports CommonSchema.Json in the input stream to store the JSON messages in Elasticsearch. Otherwise each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Streams tuple attribute, the value will be taken from the attributes value.
  • index_name (str) – Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.
  • bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.
  • message_attribute (str) – Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is CommonSchema.Json.
  • credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name es is used.
  • ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

Deprecated since version 1.3.0: Use the Insert.

streamsx.elasticsearch.bulk_insert_dynamic(stream, index_name_attribute, message_attribute, bulk_size=1, credentials='es', ssl_trust_all_certificates=False, name=None)

Stores JSON documents in a specified index of an Elasticsearch database. The index name is part of the input stream.

Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. The index name can change per tuple.

Example with dynamic index name passed with input stream attribute, where the input stream “sample_stream” is of type “sample_schema”:

import streamsx.elasticsearch as es

sample_schema = StreamSchema('tuple<rstring indexName, rstring document>')
...
es.bulk_insert_dynamic(sample_stream, index_name_attribute='indexName', message_attribute='document')
Parameters:
  • stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Requires streamsx.topology.schema.StreamSchema (schema for a structured stream) as input.
  • index_name_attribute (str) – Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.
  • message_attribute (str) – Name of the input stream attribute containing the JSON document.
  • bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.
  • credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name es is used.
  • ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination.

Return type:

streamsx.topology.topology.Sink

Deprecated since version 1.3.0: Use the Insert.

Indices and tables