streamsx.elasticsearch package

Elasticsearch integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides classes and functions to store tuple data as JSON documents in Elasticsearch indices for use with Streaming Analytics service on IBM Cloud and IBM Streams including IBM Cloud Pak for Data.

Credentials

Elasticsearch credentials are defined using a Streams application configuration or the Compose for Elasticsearch connection string.

Setup connection string

You can connect to your Elasticsearch cloud service with the connection strings that is provided in the Overview tab of your service dashboard.

The connection string for the Elasticsearch cloud service can be applied with the credentials parameter:

connection = 'https://<USER>:<PASSWORD>@<HOST>:<PORT>/'
s.for_each(es.Insert(credentials=connection, index_name='test-index-cloud'))

Setup application configuration

By default an application configuration named es is used for the credentials parameter. A different configuration name can be specified using the credentials parameter.

The default configuration is “es”, this can be set:

  • Using the “Application Configuration” tab of the “Streams Console”

  • Using page selected by the sub tab “Application Configuration”

  • Create a “New application configuration…” using the “Name” “es”, no description is necessary

  • Set the following properties for your Elasticsearch database connection: “nodeList” (value <HOST>:<PORT>), “userName”, “password”, “sslEnabled” (value true|false), “sslTrustAllCertificates” (value true|false)

class streamsx.elasticsearch.Insert(credentials, index_name, bulk_size=1, **options)

Bases: streamsx.topology.composite.ForEach

Stores JSON documents in a specified index of an Elasticsearch database.

Example writing string messages to an index with credentials stored in application configuration with the name ‘elasticsearch’:

import streamsx.elasticsearch as es
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(['Hello', 'World!']).as_string()
s.for_each(es.Insert(credentials='elasticsearch', index_name='sample-index-cloud'))

Example with specifying connection string as credentials and additional parameters as kwargs:

connection_string = 'https:https://ibm_cloud_USER:PASSWORD@XXXX.databases.appdomain.cloud:30735'
config = {
    'ssl_trust_all_certificates': True
}
s.for_each(es.Insert(credentials=connection_string, index_name='sample-index-cloud', **config))

Example with dynamic index name (part of the stream):

s = topo.source([('idx1','{"msg":"This is message number 1"}'), ('idx2','{"msg":"This is message number 2"}')])
schema = StreamSchema('tuple<rstring indexName, rstring document>')
s = s.map(lambda x : x, schema=schema)
config = {
    'ssl_trust_all_certificates': True,
    'index_name_attribute': 'indexName',
    'message_attribute': 'document'
}
s.for_each(es.Insert(credentials=connection_string, index_name=None, **config))

New in version 1.3.

credentials

Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When set to None, the application configuration name es is used.

Type

str

index_name

Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.

Type

str

bulk_size

Size of the bulk to submit to Elasticsearch. The default value is 1.

Type

int

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property connection_timeout

The timeout for waiting on establishment of the TCP connection to the server node. Specified in milliseconds. The default value is 20000 (20 seconds).

Type

int

property index_name_attribute

Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.

Type

str

property message_attribute

Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is CommonSchema.Json.

Type

str

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property read_timeout

The timeout for waiting for a REST response from the server node. Specified in milliseconds. The default value is 5000 (5 seconds).

Type

int

property reconnection_policy_count

Specifies the number of reconnection attemps to th Elasticsearch server, upon disconnection.

Type

int

property ssl_debug

If set to ‘True’, SSL/TLS protocol debugging is enabled, all protocol data and information is logged to the console. The default is ‘False’.

Type

bool

property ssl_trust_all_certificates

If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.

Type

bool

property ssl_trust_store

Specifies the name of a file containing trusted certificates. This file is added to the application directory.

Type

str

property ssl_trust_store_password

Specify the password used to access the Truststore file.

Type

str

property ssl_verify_hostname

this is unsecure and should only be used for debugging purposes. The default is True.

Type

bool

Type

If set to False, the SSL/TLS layer will not verify the hostname in the server certificate against the actual name of the server host. WARNING

property vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type

str

streamsx.elasticsearch.download_toolkit(url=None, target_dir=None)

Downloads the latest Elasticsearch toolkit from GitHub.

Example for updating the Elasticsearch toolkit for your topology with the latest toolkit from GitHub:

import streamsx.elasticsearch as es
# download Elasticsearch toolkit from GitHub
elasticsearch_toolkit_location = es.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)

Example for updating the topology with a specific version of the Elasticsearch toolkit using a URL:

import streamsx.elasticsearch as es
url221 = 'https://github.com/IBMStreams/streamsx.elasticsearch/releases/download/v2.1.1/streamsx.elasticsearch.toolkits-2.1.1-20181204-0909.tgz'
elasticsearch_toolkit_location = es.download_toolkit(url=url221)
streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Elasticsearch toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.2.

streamsx.elasticsearch.bulk_insert(stream, index_name, bulk_size=1, message_attribute=None, credentials='es', ssl_trust_all_certificates=False, name=None)

Stores JSON documents in a specified index of an Elasticsearch database.

Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. If input is streamsx.topology.schema.StreamSchema, then each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Stream tuple attribute, the value will be taken from the attributes value. Writes JSON documents without conversion, when input stream is CommonSchema.Json.

Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Supports CommonSchema.Json in the input stream to store the JSON messages in Elasticsearch. Otherwise each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Streams tuple attribute, the value will be taken from the attributes value.

  • index_name (str) – Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.

  • bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.

  • message_attribute (str) – Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is CommonSchema.Json.

  • credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name es is used.

  • ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Stream termination.

Return type

streamsx.topology.topology.Sink

Deprecated since version 1.3.0: Use the Insert.

streamsx.elasticsearch.bulk_insert_dynamic(stream, index_name_attribute, message_attribute, bulk_size=1, credentials='es', ssl_trust_all_certificates=False, name=None)

Stores JSON documents in a specified index of an Elasticsearch database. The index name is part of the input stream.

Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. The index name can change per tuple.

Example with dynamic index name passed with input stream attribute, where the input stream “sample_stream” is of type “sample_schema”:

import streamsx.elasticsearch as es

sample_schema = StreamSchema('tuple<rstring indexName, rstring document>')
...
es.bulk_insert_dynamic(sample_stream, index_name_attribute='indexName', message_attribute='document')
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Requires streamsx.topology.schema.StreamSchema (schema for a structured stream) as input.

  • index_name_attribute (str) – Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.

  • message_attribute (str) – Name of the input stream attribute containing the JSON document.

  • bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.

  • credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name es is used.

  • ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Stream termination.

Return type

streamsx.topology.topology.Sink

Deprecated since version 1.3.0: Use the Insert.

Indices and tables