Feast Spark SDK’s documentation!
Client
- class feast_spark.client.Client(feast_client: feast.client.Client)[source]
Bases:
object
- property config: feast.config.Config
- property feature_store: feast.client.Client
- get_historical_features(feature_refs: List[str], entity_source: Union[pandas.core.frame.DataFrame, feast.data_source.FileSource, feast.data_source.BigQuerySource], output_location: Optional[str] = None) feast_spark.pyspark.abc.RetrievalJob [source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]) –
Source for the entity rows. If entity_source is a Panda DataFrame, the dataframe will be staged to become accessible by spark workers. If one of feature tables’ source is in BigQuery - entities will be upload to BQ. Otherwise to remote file storage (derived from configured staging location). It is also assumed that the column event_timestamp is present in the dataframe, and is of type datetime without timezone information.
The user needs to make sure that the source (or staging location, if entity_source is a Panda DataFrame) is accessible from the Spark cluster that will be used for the retrieval job.
output_location – Specifies the path in a bucket to write the exported feature data files
- Returns
Returns a retrieval job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat(), "gs://some-bucket/customer") >>> feature_retrieval_job = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source) >>> output_file_uri = feature_retrieval_job.get_output_file_uri() "gs://some-bucket/output/
- get_historical_features_df(feature_refs: List[str], entity_source: Union[feast.data_source.FileSource, feast.data_source.BigQuerySource])[source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[FileSource, BigQuerySource]) – Source for the entity rows. The user needs to make sure that the source is accessible from the Spark cluster that will be used for the retrieval job.
- Returns
Returns the historical feature retrieval result in the form of Spark dataframe.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.getOrCreate() >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat, "gs://some-bucket/customer") >>> df = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source)
- get_job_by_id(job_id: str) feast_spark.pyspark.abc.SparkJob [source]
- list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[feast_spark.pyspark.abc.SparkJob] [source]
List ingestion jobs currently running in Feast.
- Parameters
include_terminated – Flag to include terminated jobs or not
project – Optionally specify the project to use as filter when retrieving jobs
table_name – Optionally specify name of feature table to use as filter when retrieving jobs
- Returns
List of SparkJob ingestion jobs.
- property metrics_redis: redis.client.Redis
- schedule_offline_to_online_ingestion(feature_table: feast.feature_table.FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]
Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
ingestion_timespan – Days of data which will be ingestion per job. The boundaries on which to filter the source are [end of day of execution date - ingestion_timespan (days) , end of day of execution date)
cron_schedule – Cron schedule expression
Returns: Spark Job Proxy object
- start_offline_to_online_ingestion(feature_table: feast.feature_table.FeatureTable, start: datetime.datetime, end: datetime.datetime) feast_spark.pyspark.abc.SparkJob [source]
Launch Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
start – lower datetime boundary on which to filter the source
end – upper datetime boundary on which to filter the source
Returns: Spark Job Proxy object
- start_stream_to_online_ingestion(feature_table: feast.feature_table.FeatureTable, extra_jars: Optional[List[str]] = None, project: Optional[str] = None) feast_spark.pyspark.abc.SparkJob [source]