Skip to main content

Spark mode

SparkDataset

The SparkDataset data sets can represent statistical tables in a Java application using Trevas in Spark mode.

Import Trevas Spark module

<dependency>
<groupId>fr.insee.trevas</groupId>
<artifactId>vtl-spark</artifactId>
<version>1.7.0</version>
</dependency>

Spark session

In order to execute VTL via Trevas in Spark mode, a Spark session must be instantiated.

The session can be:

  • locale (execution on the Java server)
  • static (execution on a Spark instance installed on a server beforehand)
  • dynamic (dynamic execution on a Kubernetes cluster)
SparkSession spark = SparkSession.builder().master("local").getOrCreate();

Example

ScriptEngine engine = new ScriptEngineManager().getEngineByName("vtl");

Bindings bindings = new SimpleBindings();
SparkDataset dataset = new SparkDataset(spark.read().parquet("folder_path"));
bindings.put("myDataset", dataset);

engine.getContext().setBindings(bindings, ScriptContext.ENGINE_SCOPE);
engine.put("$vtl.engine.processing_engine_names", "spark");
engine.put("$vtl.spark.session", spark);

String script = "res := myDataset[filter var3 > 6];";

try {
engine.eval(script);
} catch (ScriptException e) {
e.printStackTrace();
}

Bindings outputBindings = engine.getContext().getBindings(ScriptContext.ENGINE_SCOPE);
SparkDataset result = (SparkDataset) outputBindings.get("res");
// Ensure direct resolution because of spark lazy mechanism (performance warning!)
InMemoryDataset imResult = new InMemoryDataset(
result.getDataPoints(),
result.getDataStructure()
);

Distributed execution

Whether in static or dynamic mode, the distributed execution of the treatments requires that the executors instantiated by the master be able to solve the VTL processing.

It is thus necessary to provide the Trevas jars to the executors via the spark.jars option of the SparkConf object:

SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.jars", String.join(",",
"/vtl-spark.jar",
"/vtl-model.jar",
"/vtl-parser.jar",
"/vtl-engine.jar",
));
sparkBuilder.config(conf);
...
SparkSession spark = sparkBuilder.getOrCreate();

Execution in a Kubernetes cluster

Many options are detailed in the official documentation

Among these, one option is particularly important: the Docker image that will allow executors to resolve VTL processing.

A custom image is available here.

SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.kubernetes.container.image", "inseefrlab/spark-hadoop:trevas-0.4.7-spark-3.2.1-hadoop-3.3.1-postgresql-42.3.3-postgis-2021.1.0");
conf.set("spark.kubernetes.container.pullPolicy", "IfNotPresent");
sparkBuilder.config(conf);
sparkBuilder.master("k8s://...")
...
SparkSession spark = sparkBuilder.getOrCreate();