Spark mode
SparkDataset
The SparkDataset
data sets can represent statistical tables in a Java application using Trevas in Spark mode.
Import Trevas Spark module
<dependency>
<groupId>fr.insee.trevas</groupId>
<artifactId>vtl-spark</artifactId>
<version>1.7.0</version>
</dependency>
Spark session
In order to execute VTL via Trevas in Spark mode, a Spark session must be instantiated.
The session can be:
- locale (execution on the Java server)
- static (execution on a Spark instance installed on a server beforehand)
- dynamic (dynamic execution on a Kubernetes cluster)
SparkSession spark = SparkSession.builder().master("local").getOrCreate();
Example
ScriptEngine engine = new ScriptEngineManager().getEngineByName("vtl");
Bindings bindings = new SimpleBindings();
SparkDataset dataset = new SparkDataset(spark.read().parquet("folder_path"));
bindings.put("myDataset", dataset);
engine.getContext().setBindings(bindings, ScriptContext.ENGINE_SCOPE);
engine.put("$vtl.engine.processing_engine_names", "spark");
engine.put("$vtl.spark.session", spark);
String script = "res := myDataset[filter var3 > 6];";
try {
engine.eval(script);
} catch (ScriptException e) {
e.printStackTrace();
}
Bindings outputBindings = engine.getContext().getBindings(ScriptContext.ENGINE_SCOPE);
SparkDataset result = (SparkDataset) outputBindings.get("res");
// Ensure direct resolution because of spark lazy mechanism (performance warning!)
InMemoryDataset imResult = new InMemoryDataset(
result.getDataPoints(),
result.getDataStructure()
);
Distributed execution
Whether in static or dynamic mode, the distributed execution of the treatments requires that the executors instantiated by the master be able to solve the VTL processing.
It is thus necessary to provide the Trevas jars to the executors via the spark.jars
option of the SparkConf
object:
SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.jars", String.join(",",
"/vtl-spark.jar",
"/vtl-model.jar",
"/vtl-parser.jar",
"/vtl-engine.jar",
));
sparkBuilder.config(conf);
...
SparkSession spark = sparkBuilder.getOrCreate();
Execution in a Kubernetes cluster
Many options are detailed in the official documentation
Among these, one option is particularly important: the Docker image that will allow executors to resolve VTL processing.
A custom image is available here.
SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.kubernetes.container.image", "inseefrlab/spark-hadoop:trevas-0.4.7-spark-3.2.1-hadoop-3.3.1-postgresql-42.3.3-postgis-2021.1.0");
conf.set("spark.kubernetes.container.pullPolicy", "IfNotPresent");
sparkBuilder.config(conf);
sparkBuilder.master("k8s://...")
...
SparkSession spark = sparkBuilder.getOrCreate();