feast_spark package
Subpackages
Submodules
feast_spark.cli module
feast_spark.client module
- class feast_spark.client.Client(feast_client: Client)[source]
Bases:
object
- property config: Config
- property feature_store: Client
- get_historical_features(feature_refs: List[str], entity_source: Union[DataFrame, FileSource, BigQuerySource], output_location: Optional[str] = None) RetrievalJob [source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]) –
Source for the entity rows. If entity_source is a Panda DataFrame, the dataframe will be staged to become accessible by spark workers. If one of feature tables’ source is in BigQuery - entities will be upload to BQ. Otherwise to remote file storage (derived from configured staging location). It is also assumed that the column event_timestamp is present in the dataframe, and is of type datetime without timezone information.
The user needs to make sure that the source (or staging location, if entity_source is a Panda DataFrame) is accessible from the Spark cluster that will be used for the retrieval job.
output_location – Specifies the path in a bucket to write the exported feature data files
- Returns
Returns a retrieval job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat(), "gs://some-bucket/customer") >>> feature_retrieval_job = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source) >>> output_file_uri = feature_retrieval_job.get_output_file_uri() "gs://some-bucket/output/
- get_historical_features_df(feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource])[source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[FileSource, BigQuerySource]) – Source for the entity rows. The user needs to make sure that the source is accessible from the Spark cluster that will be used for the retrieval job.
- Returns
Returns the historical feature retrieval result in the form of Spark dataframe.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.getOrCreate() >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat, "gs://some-bucket/customer") >>> df = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source)
- list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob] [source]
List ingestion jobs currently running in Feast.
- Parameters
include_terminated – Flag to include terminated jobs or not
project – Optionally specify the project to use as filter when retrieving jobs
table_name – Optionally specify name of feature table to use as filter when retrieving jobs
- Returns
List of SparkJob ingestion jobs.
- property metrics_redis: Redis
- schedule_offline_to_online_ingestion(feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]
Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
ingestion_timespan – Days of data which will be ingestion per job. The boundaries on which to filter the source are [end of day of execution date - ingestion_timespan (days) , end of day of execution date)
cron_schedule – Cron schedule expression
Returns: Spark Job Proxy object
- start_offline_to_online_ingestion(feature_table: FeatureTable, start: datetime, end: datetime) SparkJob [source]
Launch Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
start – lower datetime boundary on which to filter the source
end – upper datetime boundary on which to filter the source
Returns: Spark Job Proxy object
- feast_spark.client.stage_entities_to_bq_with_partition(entity_source: DataFrame, project: str, dataset: str) BigQuerySource [source]
Stores given (entity) dataframe as new table in BQ. Name of the table generated based on current time. Table will expire in 1 day. Returns BigQuerySource with reference to created table.
feast_spark.constants module
- class feast_spark.constants.ConfigOptions[source]
Bases:
object
- BIGTABLE_INSTANCE: Optional[str] = 'bigtable_instance'
BigTable Instance ID
- BIGTABLE_PROJECT: Optional[str] = 'bigtable_project'
BigTable Project ID
- BQ_STAGING_DATASET: Optional[str] = 'bq_staging_dataset'
instead.
- BQ_STAGING_PROJECT: Optional[str] = 'bq_staging_project'
GCP project of the BigQuery dataset used to stage the entities during historical feature retrieval. If not set, the GCP project of the feature table batch source will be used instead.
- CASSANDRA_HOST: Optional[str] = 'cassandra_host'
Cassandra host. Can be a comma separated string
- CASSANDRA_PORT: Optional[str] = 'cassandra_port'
Cassandra port
- CHECKPOINT_PATH: str = 'checkpoint_path'
Ingestion Job Checkpoint Location. Format same as for DeadLetter path
- DATAPROC_CLUSTER_NAME: Optional[str] = 'dataproc_cluster_name'
Dataproc cluster to run Feast Spark Jobs in
- DATAPROC_EXECUTOR_CORES = 'dataproc_executor_cores'
No. of executor cores for Dataproc cluster
- DATAPROC_EXECUTOR_INSTANCES = 'dataproc_executor_instances'
No. of executor instances for Dataproc cluster
- DATAPROC_EXECUTOR_MEMORY = 'dataproc_executor_memory'
No. of executor memory for Dataproc cluster
- DATAPROC_PROJECT: Optional[str] = 'dataproc_project'
Project of Dataproc cluster
- DATAPROC_REGION: Optional[str] = 'dataproc_region'
Region of Dataproc cluster
- DEADLETTER_PATH: str = 'deadletter_path'
Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER.
Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
- EMR_CLUSTER_ID: Optional[str] = 'emr_cluster_id'
EMR cluster to run Feast Spark Jobs in
- EMR_CLUSTER_TEMPLATE_PATH: Optional[str] = 'emr_cluster_template_path'
Template path of EMR cluster
- EMR_LOG_LOCATION: Optional[str] = 'emr_log_location'
Log path of EMR cluster
- EMR_REGION: Optional[str] = 'emr_region'
Region of EMR cluster
- HISTORICAL_FEATURE_OUTPUT_FORMAT: str = 'historical_feature_output_format'
File format of historical retrieval features
- HISTORICAL_FEATURE_OUTPUT_LOCATION: Optional[str] = 'historical_feature_output_location'
File location of historical retrieval features
- INGESTION_DROP_INVALID_ROWS: str = 'ingestion_drop_invalid_rows'
If set to true rows that do not pass custom validation (see feast.contrib.validation) won’t be saved to Online Storage
- JOB_SERVICE_ENABLE_CONTROL_LOOP: str = 'job_service_enable_control_loop'
Enable or disable control loop for Feast Job Service
- JOB_SERVICE_ENABLE_SSL: str = 'job_service_enable_ssl'
Enable or disable TLS/SSL to Feast Job Service
- JOB_SERVICE_PAUSE_BETWEEN_JOBS: str = 'job_service_pause_between_jobs'
Pause in seconds between starting new jobs in Control Loop
- JOB_SERVICE_PROMETHEUS_METRIC_PORT: int = 'job_service_prometheus_metric_port'
Port for which Prometheus metric server will be running on
- JOB_SERVICE_RETRY_FAILED_JOBS: str = 'job_service_retry_failed_jobs'
If set to True, Control Loop will try to start failed streaming jobss
- JOB_SERVICE_SERVER_SSL_CERT: str = 'job_service_server_ssl_cert'
Path to certificate(s) to secure connection to Feast Job Service
- JOB_SERVICE_URL: Optional[str] = 'job_service_url'
Default Feast Job Service URL
- LOCK_EXPIRY: Optional[str] = 'lock_expiry'
TTL for locks for job management
- LOCK_MGR_REDIS_HOST: Optional[str] = 'lock_mgr_redis_host'
Host to Redis Instance which stores locks for job management
- LOCK_MGR_REDIS_PORT: Optional[str] = 'lock_mgr_redis_port'
Port to Redis Instance which stores locks for job management
- REDIS_HOST: Optional[str] = 'redis_host'
Default Redis host
- REDIS_PASSWORD: Optional[str] = 'redis_password'
Redis credentials
- REDIS_PORT: Optional[str] = 'redis_port'
Default Redis port
- REDIS_SSL: Optional[str] = 'redis_ssl'
Enable or disable TLS/SSL to Redis
- S3_ENDPOINT_URL: Optional[str] = 's3_endpoint_url'
Endpoint URL for S3 storage_client
- SPARK_BQ_MATERIALIZATION_DATASET: Optional[str] = 'spark_bq_materialization_dataset'
The dataset id where the materialized view of BigQuerySource is going to be created by default, use the same dataset where view is located
- SPARK_BQ_MATERIALIZATION_PROJECT: Optional[str] = 'spark_bq_materialization_project'
The project id where the materialized view of BigQuerySource is going to be created by default, use the same project where view is located
- SPARK_HOME: Optional[str] = 'spark_home'
Directory where Spark is installed
- SPARK_INGESTION_JAR: str = 'spark_ingestion_jar'
Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER.
Eg. “dataproc” (http and gs), “emr” (http and s3), “standalone” (http and file)
- SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH: Optional[str] = 'spark_k8s_batch_ingestion_template_path'
- SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = 'spark_k8s_historical_retrieval_template_path'
- SPARK_K8S_JOB_TEMPLATE_PATH = 'spark_k8s_job_template_path'
- SPARK_K8S_NAMESPACE = 'spark_k8s_namespace'
- SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH: Optional[str] = 'spark_k8s_stream_ingestion_template_path'
- SPARK_K8S_USE_INCLUSTER_CONFIG = 'spark_k8s_use_incluster_config'
- SPARK_LAUNCHER: Optional[str] = 'spark_launcher'
Spark Job launcher. The choice of storage is connected to the choice of SPARK_LAUNCHER.
Options: “standalone”, “dataproc”, “emr”
- SPARK_METRICS_REDIS_HOST: Optional[str] = 'spark_metrics_redis_host'
Default Redis host to Redis Instance which stores Spark Ingestion Job metrics
- SPARK_METRICS_REDIS_PORT: Optional[str] = 'spark_metrics_redis_port'
Default Redis port to Redis Instance which stores Spark Ingestion Job metrics
- SPARK_STAGING_LOCATION: Optional[str] = 'spark_staging_location'
Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER.
Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
- SPARK_STANDALONE_MASTER: str = 'spark_standalone_master'
Spark resource manager master url
- SPARK_STREAMING_TRIGGERING_INTERVAL: Optional[str] = 'spark_streaming_triggering_interval'
If set - streaming ingestion job will be consuming incoming rows not continuously, but periodically with configured interval (in seconds). That may help to control amount of write requests to storage
- STATSD_ENABLED: str = 'statsd_enabled'
Enable or disable StatsD
- STATSD_HOST: Optional[str] = 'statsd_host'
Default StatsD port
- STATSD_PORT: Optional[str] = 'statsd_port'
Default StatsD port
- STENCIL_TOKEN: str = 'stencil_token'
Bearer token used for authentication with Stencil Server
- STENCIL_URL: str = 'stencil_url'
ProtoRegistry Address (currently only Stencil Server is supported as registry) https://github.com/odpf/stencil
- WHITELISTED_FEATURE_TABLES_PATH: Optional[str] = 'whitelisted_feature_tables_path'
File path to a whitelist containing all the feature tables allowed for ingestion. Each line in the file should be in the format of <project>:<feature table>
- WHITELISTED_JOB_TYPES: Optional[str] = 'whitelisted_job_types'
Whitelisted Feast Job Types
- WHITELISTED_PROJECTS: Optional[str] = 'whitelisted_projects'
Whitelisted Feast projects
feast_spark.job_service module
- class feast_spark.job_service.HealthServicerImpl[source]
Bases:
HealthServicer
- class feast_spark.job_service.JobServiceServicer(client: Client)[source]
Bases:
JobServiceServicer
- GetHistoricalFeatures(request: GetHistoricalFeaturesRequest, context)[source]
Produce a training dataset, return a job id that will provide a file reference
- ScheduleOfflineToOnlineIngestionJob(request: ScheduleOfflineToOnlineIngestionJobRequest, context)[source]
Schedule job to ingest data from offline store into online store periodically
- StartOfflineToOnlineIngestionJob(request: StartOfflineToOnlineIngestionJobRequest, context)[source]
Start job to ingest data from offline store into online store
- StartStreamToOnlineIngestionJob(request: StartStreamToOnlineIngestionJobRequest, context)[source]
Start job to ingest data from stream into online store
- UnscheduleOfflineToOnlineIngestionJob(request: UnscheduleOfflineToOnlineIngestionJobRequest, context)[source]
Unschedule job to ingest data from offline store into online store
- class feast_spark.job_service.LoggingInterceptor[source]
Bases:
ServerInterceptor
- intercept_service(continuation, handler_call_details)[source]
Intercepts incoming RPCs before handing them over to a handler.
- Parameters
continuation – A function that takes a HandlerCallDetails and proceeds to invoke the next interceptor in the chain, if any, or the RPC handler lookup logic, with the call details passed as an argument, and returns an RpcMethodHandler instance if the RPC is considered serviced, or None otherwise.
handler_call_details – A HandlerCallDetails describing the RPC.
- Returns
An RpcMethodHandler with which the RPC may be serviced if the interceptor chooses to service this RPC, or None otherwise.
- feast_spark.job_service.ensure_stream_ingestion_jobs(client: Client, all_projects: bool)[source]
Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs.
More concretely, it will determine - which stream ingestion jobs are running - which stream ingestion jobs should be running And it’ll do 2 kinds of operations - Cancel all running jobs that should not be running - Start all non-existent jobs that should be running
- Parameters
all_projects (bool) – If true, runs the check for all project. Otherwise only checks the client’s current project.
feast_spark.remote_job module
- class feast_spark.remote_job.RemoteBatchIngestionJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str])[source]
Bases:
RemoteJobMixin
,BatchIngestionJob
Batch ingestion job result.
- class feast_spark.remote_job.RemoteJobMixin(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, start_time: datetime, log_uri: Optional[str])[source]
Bases:
object
- get_status() SparkJobStatus [source]
- class feast_spark.remote_job.RemoteRetrievalJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, output_file_uri: str, start_time: datetime, log_uri: Optional[str])[source]
Bases:
RemoteJobMixin
,RetrievalJob
Historical feature retrieval job result, job being run remotely bt the job service
- get_output_file_uri(timeout_sec=None, block=True)[source]
Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.
- Parameters
timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.
block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.
- Raises
SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.
- Returns
file uri to the result file.
- Return type
str
- class feast_spark.remote_job.RemoteStreamIngestionJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str])[source]
Bases:
RemoteJobMixin
,StreamIngestionJob
Stream ingestion job result.
- feast_spark.remote_job.get_remote_job_from_proto(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job: Job) SparkJob [source]
Get the remote job python object from Job proto.
- Parameters
service (JobServiceStub) – Reference to Job Service
grpc_extra_param_provider (GrpcExtraParamProvider) – Callable for providing extra parameters to grpc requests
job (JobProto) – Proto object describing the Job
- Returns
A remote job object for the given job
- Return type
(SparkJob)
Module contents
- class feast_spark.Client(feast_client: Client)[source]
Bases:
object
- property config: Config
- property feature_store: Client
- get_historical_features(feature_refs: List[str], entity_source: Union[DataFrame, FileSource, BigQuerySource], output_location: Optional[str] = None) RetrievalJob [source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]) –
Source for the entity rows. If entity_source is a Panda DataFrame, the dataframe will be staged to become accessible by spark workers. If one of feature tables’ source is in BigQuery - entities will be upload to BQ. Otherwise to remote file storage (derived from configured staging location). It is also assumed that the column event_timestamp is present in the dataframe, and is of type datetime without timezone information.
The user needs to make sure that the source (or staging location, if entity_source is a Panda DataFrame) is accessible from the Spark cluster that will be used for the retrieval job.
output_location – Specifies the path in a bucket to write the exported feature data files
- Returns
Returns a retrieval job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat(), "gs://some-bucket/customer") >>> feature_retrieval_job = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source) >>> output_file_uri = feature_retrieval_job.get_output_file_uri() "gs://some-bucket/output/
- get_historical_features_df(feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource])[source]
Launch a historical feature retrieval job.
- Parameters
feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.
entity_source (Union[FileSource, BigQuerySource]) – Source for the entity rows. The user needs to make sure that the source is accessible from the Spark cluster that will be used for the retrieval job.
- Returns
Returns the historical feature retrieval result in the form of Spark dataframe.
Examples
>>> import feast >>> import feast_spark >>> from feast.data_format import ParquetFormat >>> from datetime import datetime >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.getOrCreate() >>> feast_client = feast.Client(core_url="localhost:6565") >>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"] >>> entity_source = FileSource("event_timestamp", ParquetFormat, "gs://some-bucket/customer") >>> df = feast_spark.Client(feast_client).get_historical_features( >>> feature_refs, entity_source)
- list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob] [source]
List ingestion jobs currently running in Feast.
- Parameters
include_terminated – Flag to include terminated jobs or not
project – Optionally specify the project to use as filter when retrieving jobs
table_name – Optionally specify name of feature table to use as filter when retrieving jobs
- Returns
List of SparkJob ingestion jobs.
- property metrics_redis: Redis
- schedule_offline_to_online_ingestion(feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]
Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
ingestion_timespan – Days of data which will be ingestion per job. The boundaries on which to filter the source are [end of day of execution date - ingestion_timespan (days) , end of day of execution date)
cron_schedule – Cron schedule expression
Returns: Spark Job Proxy object
- start_offline_to_online_ingestion(feature_table: FeatureTable, start: datetime, end: datetime) SparkJob [source]
Launch Ingestion Job from Batch Source to Online Store for given feature table
- Parameters
feature_table – FeatureTable that will be ingested into the online store
start – lower datetime boundary on which to filter the source
end – upper datetime boundary on which to filter the source
Returns: Spark Job Proxy object