feast_spark.pyspark package

Subpackages

Submodules

feast_spark.pyspark.abc module

class feast_spark.pyspark.abc.BatchIngestionJob[source]

Bases: SparkJob

Container for the ingestion job result

abstract get_feature_table() str[source]

Get the feature table name associated with this job. Return empty string if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

class feast_spark.pyspark.abc.BatchIngestionJobParameters(feature_table: Dict, source: Dict, start: datetime, end: datetime, jar: str, redis_host: Optional[str], redis_port: Optional[int], redis_password: Optional[str], redis_ssl: Optional[bool], bigtable_project: Optional[str], bigtable_instance: Optional[str], cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None)[source]

Bases: IngestionJobParameters

get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_job_type() SparkJobType[source]

Getter for job type. :returns: Job type enum. :rtype: SparkJobType

get_name() str[source]

Getter for job name :returns: Job name. :rtype: str

class feast_spark.pyspark.abc.IngestionJobParameters(feature_table: Dict, source: Dict, jar: str, redis_host: Optional[str] = None, redis_port: Optional[int] = None, redis_password: Optional[str] = None, redis_ssl: Optional[bool] = None, bigtable_project: Optional[str] = None, bigtable_instance: Optional[str] = None, cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None, drop_invalid_rows: bool = False)[source]

Bases: SparkJobParameters

get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_class_name() Optional[str][source]

Getter for main class name if it’s applicable :returns: java class path, e.g. feast.ingestion.IngestionJob. :rtype: Optional[str]

get_feature_table_name() str[source]
get_main_file_path() str[source]

Getter for jar | python path :returns: Full path to file. :rtype: str

get_project() str[source]
class feast_spark.pyspark.abc.JobLauncher[source]

Bases: ABC

Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs.

abstract get_job_by_id(job_id: str) SparkJob[source]
abstract historical_feature_retrieval(retrieval_job_params: RetrievalJobParameters) RetrievalJob[source]

Submits a historical feature retrieval job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that returns file uri to the result file.

Return type

RetrievalJob

abstract list_jobs(include_terminated: bool, project: Optional[str], table_name: Optional[str]) List[SparkJob][source]
abstract offline_to_online_ingestion(ingestion_job_params: BatchIngestionJobParameters) BatchIngestionJob[source]

Submits a batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

BatchIngestionJob

abstract schedule_offline_to_online_ingestion(ingestion_job_params: ScheduledBatchIngestionJobParameters)[source]

Submits a scheduled batch ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job that can be used to check when job completed.

Return type

ScheduledBatchIngestionJob

abstract start_stream_to_online_ingestion(ingestion_job_params: StreamIngestionJobParameters) StreamIngestionJob[source]

Starts a stream ingestion job to a Spark cluster.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

wrapper around remote job.

Return type

StreamIngestionJob

abstract unschedule_offline_to_online_ingestion(project: str, feature_table: str)[source]

Unschedule a scheduled batch ingestion job.

class feast_spark.pyspark.abc.RetrievalJob[source]

Bases: SparkJob

Container for the historical feature retrieval job result

abstract get_output_file_uri(timeout_sec=None, block=True)[source]

Get output file uri to the result file. This method will block until the job succeeded, or if the job didn’t execute successfully within timeout.

Parameters
  • timeout_sec (int) – Max no of seconds to wait until job is done. If “timeout_sec” is exceeded or if the job fails, an exception will be raised.

  • block (bool) – If false, don’t block until the job is done. If block=True, timeout parameter is ignored.

Raises

SparkJobFailure – The spark job submission failed, encountered error during execution, or timeout.

Returns

file uri to the result file.

Return type

str

class feast_spark.pyspark.abc.RetrievalJobParameters(project: str, feature_tables: List[Dict], feature_tables_sources: List[Dict], entity_source: Dict, destination: Dict, extra_packages: Optional[List[str]] = None, checkpoint_path: Optional[str] = None)[source]

Bases: SparkJobParameters

get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_destination_path() str[source]
get_extra_packages() List[str][source]

Getter for extra maven packages to be included on driver and executor classpath if applicable. :returns: List of maven packages :rtype: List[str]

get_job_type() SparkJobType[source]

Getter for job type. :returns: Job type enum. :rtype: SparkJobType

get_main_file_path() str[source]

Getter for jar | python path :returns: Full path to file. :rtype: str

get_name() str[source]

Getter for job name :returns: Job name. :rtype: str

get_project() str[source]
class feast_spark.pyspark.abc.ScheduledBatchIngestionJobParameters(feature_table: Dict, source: Dict, ingestion_timespan: int, cron_schedule: str, jar: str, redis_host: Optional[str], redis_port: Optional[int], redis_password: Optional[str], redis_ssl: Optional[bool], bigtable_project: Optional[str], bigtable_instance: Optional[str], cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, stencil_url: Optional[str] = None)[source]

Bases: IngestionJobParameters

get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_job_schedule() str[source]
get_job_type() SparkJobType[source]

Getter for job type. :returns: Job type enum. :rtype: SparkJobType

get_name() str[source]

Getter for job name :returns: Job name. :rtype: str

class feast_spark.pyspark.abc.SparkJob[source]

Bases: ABC

Base class for all spark jobs

abstract cancel()[source]

Manually terminate job

get_error_message() Optional[str][source]

Get Spark job error message, if applicable.

abstract get_id() str[source]

Getter for the job id. The job id must be unique for each spark job submission.

Returns

Job id.

Return type

str

get_log_uri() Optional[str][source]

Get path to Spark job log, if applicable.

abstract get_start_time() datetime[source]

Get job start time.

abstract get_status() SparkJobStatus[source]

Job Status retrieval

Returns

Job status

Return type

SparkJobStatus

exception feast_spark.pyspark.abc.SparkJobFailure[source]

Bases: Exception

Job submission failed, encountered error during execution, or timeout

class feast_spark.pyspark.abc.SparkJobParameters[source]

Bases: ABC

abstract get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_class_name() Optional[str][source]

Getter for main class name if it’s applicable :returns: java class path, e.g. feast.ingestion.IngestionJob. :rtype: Optional[str]

get_extra_packages() List[str][source]

Getter for extra maven packages to be included on driver and executor classpath if applicable. :returns: List of maven packages :rtype: List[str]

abstract get_job_type() SparkJobType[source]

Getter for job type. :returns: Job type enum. :rtype: SparkJobType

abstract get_main_file_path() str[source]

Getter for jar | python path :returns: Full path to file. :rtype: str

abstract get_name() str[source]

Getter for job name :returns: Job name. :rtype: str

class feast_spark.pyspark.abc.SparkJobStatus(value)[source]

Bases: Enum

An enumeration.

COMPLETED = 3
FAILED = 2
IN_PROGRESS = 1
STARTING = 0
class feast_spark.pyspark.abc.SparkJobType(value)[source]

Bases: Enum

An enumeration.

BATCH_INGESTION = 1
HISTORICAL_RETRIEVAL = 0
SCHEDULED_BATCH_INGESTION = 3
STREAM_INGESTION = 2
to_pascal_case()[source]
class feast_spark.pyspark.abc.StreamIngestionJob[source]

Bases: SparkJob

Container for the streaming ingestion job result

abstract get_feature_table() str[source]

Get the feature table name associated with this job. Return None if unable to determine the feature table, such as when the job is created by the earlier version of Feast.

Returns

Feature table name

Return type

str

get_hash() str[source]

Gets the consistent hash of this stream ingestion job.

The hash needs to be persisted at the data processing layer, so that we can get the same hash when retrieving the job from Spark.

Returns

The hash for this streaming ingestion job

Return type

str

class feast_spark.pyspark.abc.StreamIngestionJobParameters(feature_table: Dict, source: Dict, jar: str, extra_jars: Optional[List[str]] = None, redis_host: Optional[str] = None, redis_port: Optional[int] = None, redis_password: Optional[str] = None, redis_ssl: Optional[bool] = None, bigtable_project: Optional[str] = None, bigtable_instance: Optional[str] = None, cassandra_host: Optional[str] = None, cassandra_port: Optional[int] = None, statsd_host: Optional[str] = None, statsd_port: Optional[int] = None, deadletter_path: Optional[str] = None, checkpoint_path: Optional[str] = None, stencil_url: Optional[str] = None, drop_invalid_rows: bool = False, triggering_interval: Optional[int] = None)[source]

Bases: IngestionJobParameters

get_arguments() List[str][source]

Getter for job arguments E.g., [”–source”, ‘{“kafka”:…}’, …] :returns: List of arguments. :rtype: List[str]

get_extra_jar_paths() List[str][source]
get_job_hash() str[source]
get_job_type() SparkJobType[source]

Getter for job type. :returns: Job type enum. :rtype: SparkJobType

get_name() str[source]

Getter for job name :returns: Job name. :rtype: str

feast_spark.pyspark.historical_feature_retrieval_job module

feast_spark.pyspark.launcher module

feast_spark.pyspark.launcher.get_health_metrics(client: Client, project: str, table_names: List[str]) Dict[str, List[str]][source]
feast_spark.pyspark.launcher.get_job_by_id(job_id: str, client: Client) SparkJob[source]
feast_spark.pyspark.launcher.get_stream_to_online_ingestion_params(client: Client, project: str, feature_table: FeatureTable, extra_jars: List[str]) StreamIngestionJobParameters[source]
feast_spark.pyspark.launcher.list_jobs(include_terminated: bool, client: Client, project: Optional[str] = None, table_name: Optional[str] = None) List[SparkJob][source]
feast_spark.pyspark.launcher.resolve_launcher(config: Config) JobLauncher[source]
feast_spark.pyspark.launcher.schedule_offline_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, ingestion_timespan: int, cron_schedule: str)[source]
feast_spark.pyspark.launcher.stage_dataframe(df, event_timestamp_column: str, config: Config) FileSource[source]

Helper function to upload a pandas dataframe in parquet format to a temporary location (under SPARK_STAGING_LOCATION) and return it wrapped in a FileSource.

Parameters
  • event_timestamp_column (str) – the name of the timestamp column in the dataframe.

  • config (Config) – feast config.

feast_spark.pyspark.launcher.start_historical_feature_retrieval_job(client: Client, project: str, entity_source: Union[FileSource, BigQuerySource], feature_tables: List[FeatureTable], output_format: str, output_path: str) RetrievalJob[source]
feast_spark.pyspark.launcher.start_historical_feature_retrieval_spark_session(client: Client, project: str, entity_source: Union[FileSource, BigQuerySource], feature_tables: List[FeatureTable])[source]
feast_spark.pyspark.launcher.start_offline_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, start: datetime, end: datetime) BatchIngestionJob[source]
feast_spark.pyspark.launcher.start_stream_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable, extra_jars: List[str]) StreamIngestionJob[source]
feast_spark.pyspark.launcher.table_reference_from_string(table_ref: str)[source]

Parses reference string with format “{project}:{dataset}.{table}” into bigquery.TableReference

feast_spark.pyspark.launcher.unschedule_offline_to_online_ingestion(client: Client, project: str, feature_table: FeatureTable)[source]

Module contents