streamsx.elasticsearch package¶
Elasticsearch integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
Provides classes and functions to store tuple data as JSON documents in Elasticsearch indices for use with Streaming Analytics service on IBM Cloud and IBM Streams including IBM Cloud Pak for Data.
Credentials¶
Elasticsearch credentials are defined using a Streams application configuration or the Compose for Elasticsearch connection string.
Setup connection string¶
You can connect to your Elasticsearch cloud service with the connection strings that is provided in the Overview tab of your service dashboard.
The connection string for the Elasticsearch cloud service can be applied with the credentials
parameter:
connection = 'https://<USER>:<PASSWORD>@<HOST>:<PORT>/'
s.for_each(es.Insert(credentials=connection, index_name='test-index-cloud'))
Setup application configuration¶
By default an application configuration named es is used for the credentials
parameter.
A different configuration name can be specified using the credentials
parameter.
The default configuration is “es”, this can be set:
- Using the “Application Configuration” tab of the “Streams Console”
- Using page selected by the sub tab “Application Configuration”
- Create a “New application configuration…” using the “Name” “es”, no description is necessary
- Set the following properties for your Elasticsearch database connection: “nodeList” (value <HOST>:<PORT>), “userName”, “password”, “sslEnabled” (value true|false), “sslTrustAllCertificates” (value true|false)
-
class
streamsx.elasticsearch.
Insert
(credentials, index_name, bulk_size=1, **options)¶ Bases:
streamsx.topology.composite.ForEach
Stores JSON documents in a specified index of an Elasticsearch database.
Example writing string messages to an index with credentials stored in application configuration with the name ‘elasticsearch’:
import streamsx.elasticsearch as es from streamsx.topology.topology import Topology topo = Topology() s = topo.source(['Hello', 'World!']).as_string() s.for_each(es.Insert(credentials='elasticsearch', index_name='sample-index-cloud'))
Example with specifying connection string as credentials and additional parameters as kwargs:
connection_string = 'https:https://ibm_cloud_USER:PASSWORD@XXXX.databases.appdomain.cloud:30735' config = { 'ssl_trust_all_certificates': True } s.for_each(es.Insert(credentials=connection_string, index_name='sample-index-cloud', **config))
Example with dynamic index name (part of the stream):
s = topo.source([('idx1','{"msg":"This is message number 1"}'), ('idx2','{"msg":"This is message number 2"}')]) schema = StreamSchema('tuple<rstring indexName, rstring document>') s = s.map(lambda x : x, schema=schema) config = { 'ssl_trust_all_certificates': True, 'index_name_attribute': 'indexName', 'message_attribute': 'document' } s.for_each(es.Insert(credentials=connection_string, index_name=None, **config))
New in version 1.3.
-
credentials
¶ Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When set to
None
, the application configuration namees
is used.Type: str
-
index_name
¶ Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.
Type: str
-
bulk_size
¶ Size of the bulk to submit to Elasticsearch. The default value is 1.
Type: int
-
options
¶ The additional optional parameters as variable keyword arguments.
Type: kwargs
-
connection_timeout
¶ The timeout for waiting on establishment of the TCP connection to the server node. Specified in milliseconds. The default value is 20000 (20 seconds).
Type: int
-
index_name_attribute
¶ Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.
Type: str
-
message_attribute
¶ Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is
CommonSchema.Json
.Type: str
-
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation.
Parameters: - topology – Topology containing the composite map.
- stream – Stream to be transformed.
- name – Name passed into
for_each
. - **options – Future options passed to
for_each
.
Returns: Termination for this composite transformation of stream.
Return type:
-
read_timeout
¶ The timeout for waiting for a REST response from the server node. Specified in milliseconds. The default value is 5000 (5 seconds).
Type: int
-
reconnection_policy_count
¶ Specifies the number of reconnection attemps to th Elasticsearch server, upon disconnection.
Type: int
-
ssl_debug
¶ If set to ‘True’, SSL/TLS protocol debugging is enabled, all protocol data and information is logged to the console. The default is ‘False’.
Type: bool
-
ssl_trust_all_certificates
¶ If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.
Type: bool
-
ssl_trust_store
¶ Specifies the name of a file containing trusted certificates. This file is added to the application directory.
Type: str
-
ssl_trust_store_password
¶ Specify the password used to access the Truststore file.
Type: str
-
ssl_verify_hostname
¶ this is unsecure and should only be used for debugging purposes. The default is True.
Type: bool Type: If set to False, the SSL/TLS layer will not verify the hostname in the server certificate against the actual name of the server host. WARNING
-
vm_arg
¶ Arbitrary JVM arguments can be passed to the Streams operator
Type: str
-
-
streamsx.elasticsearch.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest Elasticsearch toolkit from GitHub.
Example for updating the Elasticsearch toolkit for your topology with the latest toolkit from GitHub:
import streamsx.elasticsearch as es # download Elasticsearch toolkit from GitHub elasticsearch_toolkit_location = es.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)
Example for updating the topology with a specific version of the Elasticsearch toolkit using a URL:
import streamsx.elasticsearch as es url221 = 'https://github.com/IBMStreams/streamsx.elasticsearch/releases/download/v2.1.1/streamsx.elasticsearch.toolkits-2.1.1-20181204-0909.tgz' elasticsearch_toolkit_location = es.download_toolkit(url=url221) streamsx.spl.toolkit.add_toolkit(topology, elasticsearch_toolkit_location)
Parameters: - url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
- target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given,
the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems.
If target_dir is
None
a location relative to the system temporary directory is chosen.
Returns: the location of the downloaded Elasticsearch toolkit
Return type: str
Note
This function requires an outgoing Internet connection
New in version 1.2.
-
streamsx.elasticsearch.
bulk_insert
(stream, index_name, bulk_size=1, message_attribute=None, credentials='es', ssl_trust_all_certificates=False, name=None)¶ Stores JSON documents in a specified index of an Elasticsearch database.
Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. If input is
streamsx.topology.schema.StreamSchema
, then each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Stream tuple attribute, the value will be taken from the attributes value. Writes JSON documents without conversion, when input stream isCommonSchema.Json
.Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Supports
CommonSchema.Json
in the input stream to store the JSON messages in Elasticsearch. Otherwise each attribute in the input schema will become an document attribute, the name of the JSON attribute will be the name of the Streams tuple attribute, the value will be taken from the attributes value. - index_name (str) – Name of the Elasticsearch index, the documents will be inserted to. If the index does not exist in the Elasticsearch server, it will be created by the server. However, you should create and configure indices by yourself before using them, to avoid automatic creation with properties that do not match the use case. For example unsuitable mapping or number of shards or replicas.
- bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.
- message_attribute (str) – Name of the input stream attribute containing the JSON document. Parameter is not required when input stream schema is
CommonSchema.Json
. - credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name
es
is used. - ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.
- name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: Deprecated since version 1.3.0: Use the
Insert
.- stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Supports
-
streamsx.elasticsearch.
bulk_insert_dynamic
(stream, index_name_attribute, message_attribute, bulk_size=1, credentials='es', ssl_trust_all_certificates=False, name=None)¶ Stores JSON documents in a specified index of an Elasticsearch database. The index name is part of the input stream.
Ingests tuples and stores them in Elasticsearch as documents when bulk size is reached. The index name can change per tuple.
Example with dynamic index name passed with input stream attribute, where the input stream “sample_stream” is of type “sample_schema”:
import streamsx.elasticsearch as es sample_schema = StreamSchema('tuple<rstring indexName, rstring document>') ... es.bulk_insert_dynamic(sample_stream, index_name_attribute='indexName', message_attribute='document')
Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Requires
streamsx.topology.schema.StreamSchema
(schema for a structured stream) as input. - index_name_attribute (str) – Name of the input stream attribute containing the Elasticsearch index, the documents will be inserted to.
- message_attribute (str) – Name of the input stream attribute containing the JSON document.
- bulk_size (int) – Size of the bulk to submit to Elasticsearch. The default value is 1.
- credentials (str) – Name of the application configuration containing the credentials as properties or the connection string for your Elasticsearch database. When not set, the application configuration name
es
is used. - ssl_trust_all_certificates (bool) – If set to ‘True’, the SSL/TLS layer will not verify the server certificate chain. The default is ‘False’. This parameter can be overwritten by the application configuration.
- name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: Deprecated since version 1.3.0: Use the
Insert
.- stream (streamsx.topology.topology.Stream) – Stream of tuples stored in Elasticsearch as documents. Requires