feast_spark package

Subpackages

Submodules

feast_spark.cli module

feast_spark.client module

class feast_spark.client.Client(feast_client: Client)[source]

Bases: object

property config: Config
property feature_store: Client
get_health_metrics(project: str, table_names: List[str]) Dict[str, List[str]][source]
get_historical_features(feature_refs: List[str], entity_source: Union[DataFrame, FileSource, BigQuerySource], output_location: Optional[str] = None) RetrievalJob[source]

Launch a historical feature retrieval job.

Parameters
  • feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.

  • entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]) –

    Source for the entity rows. If entity_source is a Panda DataFrame, the dataframe will be staged to become accessible by spark workers. If one of feature tables’ source is in BigQuery - entities will be upload to BQ. Otherwise to remote file storage (derived from configured staging location). It is also assumed that the column event_timestamp is present in the dataframe, and is of type datetime without timezone information.

    The user needs to make sure that the source (or staging location, if entity_source is a Panda DataFrame) is accessible from the Spark cluster that will be used for the retrieval job.

  • output_location – Specifies the path in a bucket to write the exported feature data files

Returns

Returns a retrieval job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results.

Examples

>>> import feast
>>> import feast_spark
>>> from feast.data_format import ParquetFormat
>>> from datetime import datetime
>>> feast_client = feast.Client(core_url="localhost:6565")
>>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"]
>>> entity_source = FileSource("event_timestamp", ParquetFormat(), "gs://some-bucket/customer")
>>> feature_retrieval_job = feast_spark.Client(feast_client).get_historical_features(
>>>     feature_refs, entity_source)
>>> output_file_uri = feature_retrieval_job.get_output_file_uri()
    "gs://some-bucket/output/
get_historical_features_df(feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource])[source]

Launch a historical feature retrieval job.

Parameters
  • feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.

  • entity_source (Union[FileSource, BigQuerySource]) – Source for the entity rows. The user needs to make sure that the source is accessible from the Spark cluster that will be used for the retrieval job.

Returns

Returns the historical feature retrieval result in the form of Spark dataframe.

Examples

>>> import feast
>>> import feast_spark
>>> from feast.data_format import ParquetFormat
>>> from datetime import datetime
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> feast_client = feast.Client(core_url="localhost:6565")
>>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"]
>>> entity_source = FileSource("event_timestamp", ParquetFormat, "gs://some-bucket/customer")
>>> df = feast_spark.Client(feast_client).get_historical_features(
>>>     feature_refs, entity_source)
get_job_by_id(job_id: str) SparkJob[source]
list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]

List ingestion jobs currently running in Feast.

Parameters
  • include_terminated – Flag to include terminated jobs or not

  • project – Optionally specify the project to use as filter when retrieving jobs

  • table_name – Optionally specify name of feature table to use as filter when retrieving jobs

Returns

List of SparkJob ingestion jobs.

property metrics_redis: Redis
schedule_offline_to_online_ingestion(feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]

Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table

Parameters
  • feature_table – FeatureTable that will be ingested into the online store

  • ingestion_timespan – Days of data which will be ingestion per job. The boundaries on which to filter the source are [end of day of execution date - ingestion_timespan (days) , end of day of execution date)

  • cron_schedule – Cron schedule expression

Returns: Spark Job Proxy object

start_offline_to_online_ingestion(feature_table: FeatureTable, start: datetime, end: datetime) SparkJob[source]

Launch Ingestion Job from Batch Source to Online Store for given feature table

Parameters
  • feature_table – FeatureTable that will be ingested into the online store

  • start – lower datetime boundary on which to filter the source

  • end – upper datetime boundary on which to filter the source

Returns: Spark Job Proxy object

start_stream_to_online_ingestion(feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, project: Optional[str] = None) SparkJob[source]
unschedule_offline_to_online_ingestion(feature_table: FeatureTable, project=None)[source]
feast_spark.client.stage_entities_to_bq_with_partition(entity_source: DataFrame, project: str, dataset: str) BigQuerySource[source]

Stores given (entity) dataframe as new table in BQ. Name of the table generated based on current time. Table will expire in 1 day. Returns BigQuerySource with reference to created table.

feast_spark.constants module

class feast_spark.constants.ConfigOptions[source]

Bases: object

BIGTABLE_INSTANCE: Optional[str] = 'bigtable_instance'

BigTable Instance ID

BIGTABLE_PROJECT: Optional[str] = 'bigtable_project'

BigTable Project ID

BQ_STAGING_DATASET: Optional[str] = 'bq_staging_dataset'

instead.

BQ_STAGING_PROJECT: Optional[str] = 'bq_staging_project'

GCP project of the BigQuery dataset used to stage the entities during historical feature retrieval. If not set, the GCP project of the feature table batch source will be used instead.

CASSANDRA_HOST: Optional[str] = 'cassandra_host'

Cassandra host. Can be a comma separated string

CASSANDRA_PORT: Optional[str] = 'cassandra_port'

Cassandra port

CHECKPOINT_PATH: str = 'checkpoint_path'

Ingestion Job Checkpoint Location. Format same as for DeadLetter path

DATAPROC_CLUSTER_NAME: Optional[str] = 'dataproc_cluster_name'

Dataproc cluster to run Feast Spark Jobs in

DATAPROC_EXECUTOR_CORES = 'dataproc_executor_cores'

No. of executor cores for Dataproc cluster

DATAPROC_EXECUTOR_INSTANCES = 'dataproc_executor_instances'

No. of executor instances for Dataproc cluster

DATAPROC_EXECUTOR_MEMORY = 'dataproc_executor_memory'

No. of executor memory for Dataproc cluster

DATAPROC_PROJECT: Optional[str] = 'dataproc_project'

Project of Dataproc cluster

DATAPROC_REGION: Optional[str] = 'dataproc_region'

Region of Dataproc cluster

DEADLETTER_PATH: str = 'deadletter_path'

Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER.

Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/

EMR_CLUSTER_ID: Optional[str] = 'emr_cluster_id'

EMR cluster to run Feast Spark Jobs in

EMR_CLUSTER_TEMPLATE_PATH: Optional[str] = 'emr_cluster_template_path'

Template path of EMR cluster

EMR_LOG_LOCATION: Optional[str] = 'emr_log_location'

Log path of EMR cluster

EMR_REGION: Optional[str] = 'emr_region'

Region of EMR cluster

HISTORICAL_FEATURE_OUTPUT_FORMAT: str = 'historical_feature_output_format'

File format of historical retrieval features

HISTORICAL_FEATURE_OUTPUT_LOCATION: Optional[str] = 'historical_feature_output_location'

File location of historical retrieval features

INGESTION_DROP_INVALID_ROWS: str = 'ingestion_drop_invalid_rows'

If set to true rows that do not pass custom validation (see feast.contrib.validation) won’t be saved to Online Storage

JOB_SERVICE_ENABLE_CONTROL_LOOP: str = 'job_service_enable_control_loop'

Enable or disable control loop for Feast Job Service

JOB_SERVICE_ENABLE_SSL: str = 'job_service_enable_ssl'

Enable or disable TLS/SSL to Feast Job Service

JOB_SERVICE_PAUSE_BETWEEN_JOBS: str = 'job_service_pause_between_jobs'

Pause in seconds between starting new jobs in Control Loop

JOB_SERVICE_PROMETHEUS_METRIC_PORT: int = 'job_service_prometheus_metric_port'

Port for which Prometheus metric server will be running on

JOB_SERVICE_RETRY_FAILED_JOBS: str = 'job_service_retry_failed_jobs'

If set to True, Control Loop will try to start failed streaming jobss

JOB_SERVICE_SERVER_SSL_CERT: str = 'job_service_server_ssl_cert'

Path to certificate(s) to secure connection to Feast Job Service

JOB_SERVICE_URL: Optional[str] = 'job_service_url'

Default Feast Job Service URL

LOCK_EXPIRY: Optional[str] = 'lock_expiry'

TTL for locks for job management

LOCK_MGR_REDIS_HOST: Optional[str] = 'lock_mgr_redis_host'

Host to Redis Instance which stores locks for job management

LOCK_MGR_REDIS_PORT: Optional[str] = 'lock_mgr_redis_port'

Port to Redis Instance which stores locks for job management

REDIS_HOST: Optional[str] = 'redis_host'

Default Redis host

REDIS_PASSWORD: Optional[str] = 'redis_password'

Redis credentials

REDIS_PORT: Optional[str] = 'redis_port'

Default Redis port

REDIS_SSL: Optional[str] = 'redis_ssl'

Enable or disable TLS/SSL to Redis

S3_ENDPOINT_URL: Optional[str] = 's3_endpoint_url'

Endpoint URL for S3 storage_client

SPARK_BQ_MATERIALIZATION_DATASET: Optional[str] = 'spark_bq_materialization_dataset'

The dataset id where the materialized view of BigQuerySource is going to be created by default, use the same dataset where view is located

SPARK_BQ_MATERIALIZATION_PROJECT: Optional[str] = 'spark_bq_materialization_project'

The project id where the materialized view of BigQuerySource is going to be created by default, use the same project where view is located

SPARK_HOME: Optional[str] = 'spark_home'

Directory where Spark is installed

SPARK_INGESTION_JAR: str = 'spark_ingestion_jar'

Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER.

Eg. “dataproc” (http and gs), “emr” (http and s3), “standalone” (http and file)

SPARK_K8S_BATCH_INGESTION_TEMPLATE_PATH: Optional[str] = 'spark_k8s_batch_ingestion_template_path'
SPARK_K8S_HISTORICAL_RETRIEVAL_TEMPLATE_PATH: Optional[str] = 'spark_k8s_historical_retrieval_template_path'
SPARK_K8S_JOB_TEMPLATE_PATH = 'spark_k8s_job_template_path'
SPARK_K8S_NAMESPACE = 'spark_k8s_namespace'
SPARK_K8S_STREAM_INGESTION_TEMPLATE_PATH: Optional[str] = 'spark_k8s_stream_ingestion_template_path'
SPARK_K8S_USE_INCLUSTER_CONFIG = 'spark_k8s_use_incluster_config'
SPARK_LAUNCHER: Optional[str] = 'spark_launcher'

Spark Job launcher. The choice of storage is connected to the choice of SPARK_LAUNCHER.

Options: “standalone”, “dataproc”, “emr”

SPARK_METRICS_REDIS_HOST: Optional[str] = 'spark_metrics_redis_host'

Default Redis host to Redis Instance which stores Spark Ingestion Job metrics

SPARK_METRICS_REDIS_PORT: Optional[str] = 'spark_metrics_redis_port'

Default Redis port to Redis Instance which stores Spark Ingestion Job metrics

SPARK_STAGING_LOCATION: Optional[str] = 'spark_staging_location'

Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER.

Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/

SPARK_STANDALONE_MASTER: str = 'spark_standalone_master'

Spark resource manager master url

SPARK_STREAMING_TRIGGERING_INTERVAL: Optional[str] = 'spark_streaming_triggering_interval'

If set - streaming ingestion job will be consuming incoming rows not continuously, but periodically with configured interval (in seconds). That may help to control amount of write requests to storage

STATSD_ENABLED: str = 'statsd_enabled'

Enable or disable StatsD

STATSD_HOST: Optional[str] = 'statsd_host'

Default StatsD port

STATSD_PORT: Optional[str] = 'statsd_port'

Default StatsD port

STENCIL_TOKEN: str = 'stencil_token'

Bearer token used for authentication with Stencil Server

STENCIL_URL: str = 'stencil_url'

ProtoRegistry Address (currently only Stencil Server is supported as registry) https://github.com/odpf/stencil

WHITELISTED_FEATURE_TABLES_PATH: Optional[str] = 'whitelisted_feature_tables_path'

File path to a whitelist containing all the feature tables allowed for ingestion. Each line in the file should be in the format of <project>:<feature table>

WHITELISTED_JOB_TYPES: Optional[str] = 'whitelisted_job_types'

Whitelisted Feast Job Types

WHITELISTED_PROJECTS: Optional[str] = 'whitelisted_projects'

Whitelisted Feast projects

defaults()[source]

feast_spark.job_service module

class feast_spark.job_service.HealthServicerImpl[source]

Bases: HealthServicer

Check(request, context)[source]

Missing associated documentation comment in .proto file.

class feast_spark.job_service.JobServiceServicer(client: Client)[source]

Bases: JobServiceServicer

CancelJob(request, context)[source]

Stop a single job

GetHealthMetrics(request, context)[source]

Return ingestion jobs health metrics

GetHistoricalFeatures(request: GetHistoricalFeaturesRequest, context)[source]

Produce a training dataset, return a job id that will provide a file reference

GetJob(request, context)[source]

Get details of a single job

ListJobs(request, context)[source]

List all types of jobs

ScheduleOfflineToOnlineIngestionJob(request: ScheduleOfflineToOnlineIngestionJobRequest, context)[source]

Schedule job to ingest data from offline store into online store periodically

StartOfflineToOnlineIngestionJob(request: StartOfflineToOnlineIngestionJobRequest, context)[source]

Start job to ingest data from offline store into online store

StartStreamToOnlineIngestionJob(request: StartStreamToOnlineIngestionJobRequest, context)[source]

Start job to ingest data from stream into online store

UnscheduleOfflineToOnlineIngestionJob(request: UnscheduleOfflineToOnlineIngestionJobRequest, context)[source]

Unschedule job to ingest data from offline store into online store

is_whitelisted(project: str)[source]
class feast_spark.job_service.LoggingInterceptor[source]

Bases: ServerInterceptor

intercept_service(continuation, handler_call_details)[source]

Intercepts incoming RPCs before handing them over to a handler.

Parameters
  • continuation – A function that takes a HandlerCallDetails and proceeds to invoke the next interceptor in the chain, if any, or the RPC handler lookup logic, with the call details passed as an argument, and returns an RpcMethodHandler instance if the RPC is considered serviced, or None otherwise.

  • handler_call_details – A HandlerCallDetails describing the RPC.

Returns

An RpcMethodHandler with which the RPC may be serviced if the interceptor chooses to service this RPC, or None otherwise.

feast_spark.job_service.ensure_stream_ingestion_jobs(client: Client, all_projects: bool)[source]

Ensures all required stream ingestion jobs are running and cleans up the unnecessary jobs.

More concretely, it will determine - which stream ingestion jobs are running - which stream ingestion jobs should be running And it’ll do 2 kinds of operations - Cancel all running jobs that should not be running - Start all non-existent jobs that should be running

Parameters

all_projects (bool) – If true, runs the check for all project. Otherwise only checks the client’s current project.

feast_spark.job_service.start_control_loop() None[source]

Starts control loop that continuously ensures that correct jobs are being run.

Currently this affects only the stream ingestion jobs. Please refer to ensure_stream_ingestion_jobs for full documentation on how the check works.

feast_spark.job_service.start_job_service() None[source]

Start Feast Job Service

feast_spark.job_service.start_prometheus_serving(port: int = 8080) None[source]

Initialize Prometheus metric server

feast_spark.remote_job module

class feast_spark.remote_job.RemoteBatchIngestionJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str])[source]

Bases: RemoteJobMixin, BatchIngestionJob

Batch ingestion job result.

get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

class feast_spark.remote_job.RemoteJobMixin(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, start_time: datetime, log_uri: Optional[str])[source]

Bases: object

cancel()[source]
get_error_message() str[source]
get_id() str[source]
get_log_uri() Optional[str][source]
get_start_time() datetime[source]
get_status() SparkJobStatus[source]
wait_termination(timeout_sec=None)[source]
class feast_spark.remote_job.RemoteRetrievalJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, output_file_uri: str, start_time: datetime, log_uri: Optional[str])[source]

Bases: RemoteJobMixin, RetrievalJob

Historical feature retrieval job result, job being run remotely bt the job service

get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.remote_job.RemoteStreamIngestionJob(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str])[source]

Bases: RemoteJobMixin, StreamIngestionJob

Stream ingestion job result.

get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str

feast_spark.remote_job.get_remote_job_from_proto(service: JobServiceStub, grpc_extra_param_provider: Callable[[], Dict[str, Any]], job: Job) SparkJob[source]

Get the remote job python object from Job proto.

Parameters
  • service (JobServiceStub) – Reference to Job Service

  • grpc_extra_param_provider (GrpcExtraParamProvider) – Callable for providing extra parameters to grpc requests

  • job (JobProto) – Proto object describing the Job

Returns

A remote job object for the given job

Return type

(SparkJob)

Module contents

class feast_spark.Client(feast_client: Client)[source]

Bases: object

property config: Config
property feature_store: Client
get_health_metrics(project: str, table_names: List[str]) Dict[str, List[str]][source]
get_historical_features(feature_refs: List[str], entity_source: Union[DataFrame, FileSource, BigQuerySource], output_location: Optional[str] = None) RetrievalJob[source]

Launch a historical feature retrieval job.

Parameters
  • feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.

  • entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]) –

    Source for the entity rows. If entity_source is a Panda DataFrame, the dataframe will be staged to become accessible by spark workers. If one of feature tables’ source is in BigQuery - entities will be upload to BQ. Otherwise to remote file storage (derived from configured staging location). It is also assumed that the column event_timestamp is present in the dataframe, and is of type datetime without timezone information.

    The user needs to make sure that the source (or staging location, if entity_source is a Panda DataFrame) is accessible from the Spark cluster that will be used for the retrieval job.

  • output_location – Specifies the path in a bucket to write the exported feature data files

Returns

Returns a retrieval job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results.

Examples

>>> import feast
>>> import feast_spark
>>> from feast.data_format import ParquetFormat
>>> from datetime import datetime
>>> feast_client = feast.Client(core_url="localhost:6565")
>>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"]
>>> entity_source = FileSource("event_timestamp", ParquetFormat(), "gs://some-bucket/customer")
>>> feature_retrieval_job = feast_spark.Client(feast_client).get_historical_features(
>>>     feature_refs, entity_source)
>>> output_file_uri = feature_retrieval_job.get_output_file_uri()
    "gs://some-bucket/output/
get_historical_features_df(feature_refs: List[str], entity_source: Union[FileSource, BigQuerySource])[source]

Launch a historical feature retrieval job.

Parameters
  • feature_refs – List of feature references that will be returned for each entity. Each feature reference should have the following format: “feature_table:feature” where “feature_table” & “feature” refer to the feature and feature table names respectively.

  • entity_source (Union[FileSource, BigQuerySource]) – Source for the entity rows. The user needs to make sure that the source is accessible from the Spark cluster that will be used for the retrieval job.

Returns

Returns the historical feature retrieval result in the form of Spark dataframe.

Examples

>>> import feast
>>> import feast_spark
>>> from feast.data_format import ParquetFormat
>>> from datetime import datetime
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> feast_client = feast.Client(core_url="localhost:6565")
>>> feature_refs = ["bookings:bookings_7d", "bookings:booking_14d"]
>>> entity_source = FileSource("event_timestamp", ParquetFormat, "gs://some-bucket/customer")
>>> df = feast_spark.Client(feast_client).get_historical_features(
>>>     feature_refs, entity_source)
get_job_by_id(job_id: str) SparkJob[source]
list_jobs(include_terminated: bool, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]

List ingestion jobs currently running in Feast.

Parameters
  • include_terminated – Flag to include terminated jobs or not

  • project – Optionally specify the project to use as filter when retrieving jobs

  • table_name – Optionally specify name of feature table to use as filter when retrieving jobs

Returns

List of SparkJob ingestion jobs.

property metrics_redis: Redis
schedule_offline_to_online_ingestion(feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]

Launch Scheduled Ingestion Job from Batch Source to Online Store for given feature table

Parameters
  • feature_table – FeatureTable that will be ingested into the online store

  • ingestion_timespan – Days of data which will be ingestion per job. The boundaries on which to filter the source are [end of day of execution date - ingestion_timespan (days) , end of day of execution date)

  • cron_schedule – Cron schedule expression

Returns: Spark Job Proxy object

start_offline_to_online_ingestion(feature_table: FeatureTable, start: datetime, end: datetime) SparkJob[source]

Launch Ingestion Job from Batch Source to Online Store for given feature table

Parameters
  • feature_table – FeatureTable that will be ingested into the online store

  • start – lower datetime boundary on which to filter the source

  • end – upper datetime boundary on which to filter the source

Returns: Spark Job Proxy object

start_stream_to_online_ingestion(feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, project: Optional[str] = None) SparkJob[source]
unschedule_offline_to_online_ingestion(feature_table: FeatureTable, project=None)[source]