Fonctionnalités supplémentaires du métastore BigQuery
.Pour personnaliser la configuration de votre métastore BigQuery, vous pouvez utiliser les fonctionnalités supplémentaires suivantes:
- Procédures Apache Spark Iceberg
- Option de filtrage pour les tables non compatibles
- Forcer une connexion BigQuery
- Règles de contrôle des accès pour les tables Iceberg du métastore BigQuery
Utiliser des procédures Spark Iceberg
Pour utiliser les procédures Spark Iceberg, vous devez inclure les extensions SQL Iceberg dans votre configuration Spark. Par exemple, vous pouvez créer une procédure pour revenir à un état précédent.
Utiliser Spark-SQL interactif pour effectuer un rollback vers un état antérieur
Vous pouvez utiliser une procédure Spark Iceberg pour créer, modifier et rétablir l'état précédent d'une table. Exemple :
Créez une table Spark:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue qui fait référence à votre table Spark.PROJECT_ID
: ID du projet Google Cloud.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
Remplacez les éléments suivants :
NAMESPACE_NAME
: nom de l'espace de noms qui fait référence à votre table Spark.TABLE_NAME
: nom de table qui fait référence à votre table Spark.
La sortie contient des informations sur la configuration de la table:
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
Modifiez à nouveau la table, puis restaurez-la à l'instantané
1659239298328512231
créé précédemment:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
Remplacez les éléments suivants :
SNAPSHOT_ID
: ID de l'instantané auquel vous effectuez la restauration.
Le résultat ressemble à ce qui suit :
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
Filtrer les tables non compatibles à partir des fonctions de liste de tables
Lorsque vous utilisez Spark SQL avec le catalogue Metastore BigQuery, la commande SHOW TABLES
affiche toutes les tables de l'espace de noms spécifié, même celles qui ne sont pas compatibles avec Spark.
Pour n'afficher que les tables compatibles, activez l'option filter_unsupported_tables
:
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue Spark à utiliser.PROJECT_ID
: ID du projet Google Cloud à utiliser.LOCATION
: emplacement des ressources BigQuery.WAREHOUSE_DIRECTORY
: dossier Cloud Storage à utiliser comme entrepôt de données.
Définir un forçage de connexion BigQuery
Vous pouvez utiliser des connexions BigQuery pour accéder aux données stockées en dehors de BigQuery, par exemple dans Cloud Storage.
Pour définir un forçage de connexion BigQuery qui permet d'accéder à un bucket Cloud Storage, procédez comme suit:
Dans votre projet BigQuery, créez une connexion à votre ressource Cloud Storage. Cette connexion définit la façon dont BigQuery accède à vos données.
Attribuez le rôle
roles/bigquery.connectionUser
à l'utilisateur ou au compte de service qui accède aux données sur la connexion.Assurez-vous que la ressource de connexion partage le même emplacement que les ressources cibles dans BigQuery. Pour en savoir plus, consultez la page Gérer les connexions.
Spécifiez la connexion dans votre table Iceberg à l'aide de la propriété
bq_connection
:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
Remplacez les éléments suivants :
TABLE_NAME
: nom de la table pour votre table Spark.WAREHOUSE_DIRECTORY
: URI du bucket Cloud Storage qui stocke vos données.PROJECT_ID
: ID du projet Google Cloud à utiliser.LOCATION
: emplacement de la connexion.CONNECTION_ID
: ID de la connexion.
Définir des stratégies de contrôle des accès
Vous pouvez activer le contrôle précis des accès (FGAC) sur les tables Iceberg du métastore BigQuery en configurant des stratégies de contrôle des accès. Vous ne pouvez définir des stratégies de contrôle des accès que sur les tables qui utilisent un forçage de connexion BigQuery. Vous pouvez définir ces règles de différentes manières:
Une fois que vous avez configuré vos règles FGAC, vous pouvez interroger le tableau à partir de Spark à l'aide de l'exemple suivant:
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Remplacez les éléments suivants :
CATALOG_NAME
: nom de votre catalogue.PROJECT_ID
: ID du projet contenant vos ressources BigQuery.LOCATION
: emplacement des ressources BigQuery.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage contenant votre entrepôt de données.MATERIALIZATION_NAMESPACE
: espace de noms dans lequel vous souhaitez stocker les résultats temporaires.DATASET_NAME
: nom de votre ensemble de données contenant la table que vous interrogez.ICEBERG_TABLE_NAME
: nom de la table que vous interrogez.
Étape suivante
- Migrer des données Dataproc Metastore vers le métastore BigQuery
- Utiliser BigQuery Metastore avec Dataproc
- Utiliser le métastore BigQuery avec Dataproc sans serveur