跳到主要内容

Spark 模式

SparkDataset

Les datasets SparkDataset permettent de représenter les tables statistiques dans une application Java utilisant Trevas en mode Spark.

Importer le module Spark de Trevas

<dependency>
<groupId>fr.insee.trevas</groupId>
<artifactId>vtl-spark</artifactId>
<version>1.7.0</version>
</dependency>

Session Spark

Afin d'exécuter du VTL via Trevas en mode Spark, il faut instancier une session Spark.

La session peut être :

  • locale (exécution sur le serveur Java)
  • statique (exécution sur une instance Spark préalablement installée sur un serveur)
  • dynamique (exécution dynamique au sein d'un cluster Kubernetes)
SparkSession spark = SparkSession.builder().master("local").getOrCreate();

Exemple

ScriptEngine engine = new ScriptEngineManager().getEngineByName("vtl");

Bindings bindings = new SimpleBindings();
SparkDataset dataset = new SparkDataset(spark.read().parquet("folder_path"));
bindings.put("myDataset", dataset);

engine.getContext().setBindings(bindings, ScriptContext.ENGINE_SCOPE);
engine.put("$vtl.engine.processing_engine_names", "spark");
engine.put("$vtl.spark.session", spark);

String script = "res := myDataset[filter var3 > 6];";

try {
engine.eval(script);
} catch (ScriptException e) {
e.printStackTrace();
}

Bindings outputBindings = engine.getContext().getBindings(ScriptContext.ENGINE_SCOPE);
SparkDataset result = (SparkDataset) outputBindings.get("res");
// Ensure direct resolution because of spark lazy mechanism (performance warning!)
InMemoryDataset imResult = new InMemoryDataset(
result.getDataPoints(),
result.getDataStructure()
);

Exécution distribuée

Que ce soit en mode statique ou dynamique, l'éxecution distribuée des traitements nécessite que les exécuteurs instanciés par le master soient en capacité de résoudre les traitements VTL.

Il faut alors fournir les jar Trevas aux exécuteurs via l'options spark.jars de l'objet SparkConf :

SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.jars", String.join(",",
"/vtl-spark.jar",
"/vtl-model.jar",
"/vtl-parser.jar",
"/vtl-engine.jar",
));
sparkBuilder.config(conf);
...
SparkSession spark = sparkBuilder.getOrCreate();

Exécution dans un cluster Kubernetes

De nombreuses options sont détaillées dans la documentation officielle

Parmi celles-ci, une option est particulièrement importante : l'image Docker qui permettra au exécuteurs de résoudre les traitements VTL.

Une image à façon est disponible ici.

SparkSession.Builder sparkBuilder = SparkSession.builder()
SparkConf conf = new SparkConf();
conf.set("spark.kubernetes.container.image", "inseefrlab/spark-hadoop:trevas-0.4.7-spark-3.2.1-hadoop-3.3.1-postgresql-42.3.3-postgis-2021.1.0");
conf.set("spark.kubernetes.container.pullPolicy", "IfNotPresent");
sparkBuilder.config(conf);
sparkBuilder.master("k8s://...")
...
SparkSession spark = sparkBuilder.getOrCreate();