Mode Spark
SparkDataset
Les datasets SparkDataset
permettent de représenter les tables statistiques dans une application Java utilisant Trevas en mode Spark.
Importer le module Spark de Trevas
<dependency>
<groupId>fr.insee.trevas</groupId>
<artifactId>vtl-spark</artifactId>
<version>1.7.0</version>
</dependency>
Session Spark
Afin d'exécuter du VTL via Trevas en mode Spark, il faut instancier une session Spark.
La session peut être :
- locale (exécution sur le serveur Java)
- statique (exécution sur une instance Spark préalablement installée sur un serveur)
- dynamique (exécution dynamique au sein d'un cluster Kubernetes)
SparkSession spark = SparkSession.builder().master("local").getOrCreate();
Exemple
ScriptEngine engine = new ScriptEngineManager().getEngineByName("vtl");
Bindings bindings = new SimpleBindings();
SparkDataset dataset = new SparkDataset(spark.read().parquet("folder_path"));
bindings.put("myDataset", dataset);
engine.getContext().setBindings(bindings, ScriptContext.ENGINE_SCOPE);
engine.put("$vtl.engine.processing_engine_names", "spark");
engine.put("$vtl.spark.session", spark);
String script = "res := myDataset[filter var3 > 6];";
try {
engine.eval(script);
} catch (ScriptException e) {
e.printStackTrace();
}
Bindings outputBindings = engine.getContext().getBindings(ScriptContext.ENGINE_SCOPE);
SparkDataset result = (SparkDataset) outputBindings.get("res");
// Ensure direct resolution because of spark lazy mechanism (performance warning!)
InMemoryDataset imResult = new InMemoryDataset(
result.getDataPoints(),
result.getDataStructure()
);
Exécution distribuée
Que ce soit en mode statique ou dynamique, l'éxecution distribuée des traitements nécessite que les exécuteurs instanciés par le master soient en capacité de résoudre les traitements VTL.
Il faut alors fournir les jar Trevas aux exécuteurs via l'options spark.jars
de l'objet SparkConf
:
SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.jars", String.join(",",
"/vtl-spark.jar",
"/vtl-model.jar",
"/vtl-parser.jar",
"/vtl-engine.jar",
));
sparkBuilder.config(conf);
...
SparkSession spark = sparkBuilder.getOrCreate();
Exécution dans un cluster Kubernetes
De nombreuses options sont détaillées dans la documentation officielle
Parmi celles-ci, une option est particulièrement importante : l'image Docker qui permettra au exécuteurs de résoudre les traitements VTL.
Une image à façon est disponible ici.
SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.kubernetes.container.image", "inseefrlab/spark-hadoop:trevas-0.4.7-spark-3.2.1-hadoop-3.3.1-postgresql-42.3.3-postgis-2021.1.0");
conf.set("spark.kubernetes.container.pullPolicy", "IfNotPresent");
sparkBuilder.config(conf);
sparkBuilder.master("k8s://...")
...
SparkSession spark = sparkBuilder.getOrCreate();