Interfacing with the Pipeline Process
The HERE platform pipeline manages user pipelines on the runtime environment chosen for that pipeline (using either the Flink or Spark framework) associated with the executable Pipeline Version.
This article describes the parameters and configuration files passed by the pipeline to the Pipeline Version submitted as a job.
You can control the parameters and the content of configuration files by configuring the details of each Pipeline Version via the pipeline REST API, the OLP CLI, or interactively through the platform's Portal GUI.
Running Locally, the Same Interface
When running a pipeline locally, as on a development machine, you have to mimic the interface described in this document by passing parameters and files manually.
The HERE platform SDK uses Java and Scala Maven archetypes to create a basic pipeline project. This project contains sample files and scripts that simplify developing and running pipelines locally.
For local development, you may include the pipeline-config.conf
and pipeline-job.conf
in the process classpath or their location (path) on the development machine, specified via the pipeline-config.file
and pipeline-job.file
system properties. Details in these files will change when moving to a production environment.
Entry and Exit Points
You specify the class that represents the entry point of the pipeline in the Pipeline Template. This class is used to select the entry point when a job is submitted to Spark of Flink. A new object is created by the JVM and execution starts at the main
method.
Stream processing pipeline applications don't usually terminate; but they may be terminated. Batch processing pipelines naturally terminate when processing is complete. When the pipeline application terminates, it returns an error code. If the error code is 0, the application is considered terminated successfully. If the error code is non-zero, the application is considered terminated due to errors and the Pipeline service may report this event in the job description or may try to submit the job again.
System Properties
The following JVM system properties are set by the Pipeline API when a pipeline is submitted as a new job. They can be obtained via the System.getProperties()
method, or the equivalent.
-
olp.pipeline.id
: Identifier of the Pipeline, as defined in the Pipeline API -
olp.pipeline.version.id
: Identifier of the Pipeline Version, as defined in the Pipeline API -
olp.deployment.id
: Identifier of the Job, as defined in the Pipeline API -
olp.realm
: The customer realm
Below are additional properties paths used by the platform:
env.api.lookup.host
akka.*
here.platform.*
com.here.*
In addition to these, other properties are set by the system to configure the runtime environment. These include Spark or Flink configuration parameters associated with the Pipeline Version configuration that you selected. The actual details are specific to the environment chosen and its version. Because such details may change, they are considered implementation-specific and left to your determination.
System properties specified in this section are visible from the main user process only. These system properties are not necessarily replicated to the JVMs that run in worker nodes of the cluster.
Pipeline Configuration
Configuration of the pipeline, as specified in the corresponding Pipeline Version, is passed via a file named pipeline-config.conf
. This file is added to the classpath of the main user process.
Note
The format of the file is HOCON, a superset of JSON and Java properties. It can be parsed by the open-source Typesafe Config library of Lightbend.
If the pipeline is implemented using the Data Processing Library, parsing is handled automatically by the pipeline-runner package. This package also provides an application main to easily interface with the Pipeline service and to implement pipelines.
Example content of pipeline-config.conf
:
pipeline.config {
billing-tag = "test-billing-tag"
output-catalog { hrn = "hrn:here:data:::example-output" }
input-catalogs {
test-input-1 { hrn = "hrn:here:data:::example1" }
test-input-2 { hrn = "hrn:here:data:::example2" }
test-input-3 { hrn = "hrn:here:data:::example3" }
}
}
Where:
-
billing-tag
specifies an optional tag to group billing entries for the pipeline. -
output-catalog
specifies the HRN that identifies the output catalog of the pipeline. -
input-catalogs
specifies one or more input catalogs for the pipeline. For each input catalog, its fixed identifier is provided together with the HRN of the actual catalog.
Pipeline implementations may bind to and distinguish between multiple input catalogs via the fixed identifiers. Fixed identifiers are defined in a Pipeline Template. An HRN is defined for each Pipeline Version so that the same Pipeline Template may be reused in multiple setups.
Batch Pipeline Job
Batch pipelines perform a specific job and then terminate. Stream pipelines don't perform any specific, time-constrained job and are instead continuously running. This section applies to batch pipelines only.
Batch pipelines process one or more versioned layer of one or more input catalog to produce one or more versioned layer in the single output catalog. Versioned layers form partitioned, consistent snapshots of datasets. When the pipeline is in the SCHEDULED state, the Pipeline API scheduler manages running the pipeline. It monitors the configured input catalogs and triggers a job submission to Spark when it detects that new data is published in any of the input catalogs.
The job passed to the pipeline describes which version of each input catalog should be processed. These are usually the more recent versions for each catalog.
A simple, but correct, batch pipeline may fetch all the data stored in all the interested input layers from the version described in the job, perform some processing, and publish the result to its own output layers, generating a new version of the output catalog. This event, in turn, may trigger other batch pipelines that use that catalog as input. The job description always contains a version number for each input catalog, whether that catalog has changed or not.
Complete reprocessing of the input is usually a resource-intensive operation. The scheduler also keeps track of previous jobs and which version of each input catalog was used at that time. This additional information is also included in the job description (see since-version
parameter). Instead of reprocessing the whole input, a more advanced pipeline may process just what has changed since the previous run and simply update the output instead of regenerating it from scratch. Implementing such incremental processing is nontrivial. The Data Processing Library included in the HERE platform provides significant support to solve this problem so that users can focus on their business logic while the library takes care of the complexity of incremental processing.
Job description is passed via a file named pipeline-job.conf
. This file is also in HOCON format and is added to the classpath of the main user process. The file is optional and present only for batch processing pipelines.
Example content of pipeline-job.conf
:
pipeline.job.catalog-versions {
output-catalog { base-version = 42 }
input-catalogs {
test-input-1 {
processing-type = "no_changes"
version = 19
}
test-input-2 {
processing-type = "changes"
since-version = 70
version = 75
}
test-input-3 {
processing-type = "reprocess"
version = 314159
}
}
}
Where:
Note
As of HERE platform Release 2.3, all of the parameters of the pipeline-job.conf
file are optional. If the details are not specified, the pipeline will, by default, pick the latest input catalog versions and reprocess.
User Configuration
Both pipeline-config.conf
and pipeline-job.conf
are provided by the pipeline, which also defines their content. Users may make custom configuration available to their pipelines. For more information, see application.properties
on the config file reference page.
See Also