Source code for feast_spark.remote_job

import time
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional

from feast.core.JobService_pb2 import CancelJobRequest, GetJobRequest
from feast.core.JobService_pb2 import Job as JobProto
from feast.core.JobService_pb2 import JobStatus, JobType
from feast.core.JobService_pb2_grpc import JobServiceStub
from feast_spark.pyspark.abc import (
    BatchIngestionJob,
    RetrievalJob,
    SparkJob,
    SparkJobFailure,
    SparkJobStatus,
    StreamIngestionJob,
)

GrpcExtraParamProvider = Callable[[], Dict[str, Any]]


[docs]class RemoteJobMixin: def __init__( self, service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, start_time: datetime, log_uri: Optional[str], ): """ Args: service: Job service GRPC stub job_id: job reference """ self._job_id = job_id self._service = service self._grpc_extra_param_provider = grpc_extra_param_provider self._start_time = start_time self._log_uri = log_uri
[docs] def get_id(self) -> str: return self._job_id
[docs] def get_status(self) -> SparkJobStatus: response = self._service.GetJob( GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() ) if response.job.status == JobStatus.JOB_STATUS_RUNNING: return SparkJobStatus.IN_PROGRESS elif response.job.status == JobStatus.JOB_STATUS_PENDING: return SparkJobStatus.STARTING elif response.job.status == JobStatus.JOB_STATUS_DONE: return SparkJobStatus.COMPLETED elif response.job.status == JobStatus.JOB_STATUS_ERROR: return SparkJobStatus.FAILED else: # we should never get here raise Exception(f"Invalid remote job state {response.job.status}")
[docs] def get_start_time(self) -> datetime: return self._start_time
[docs] def cancel(self): self._service.CancelJob( CancelJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() )
def _wait_for_job_status( self, goal_status: List[SparkJobStatus], timeout_seconds=90 ) -> SparkJobStatus: start_time = time.time() while time.time() < (start_time + timeout_seconds): status = self.get_status() if status in goal_status: return status else: time.sleep(1.0) else: raise TimeoutError("Timed out waiting for job status")
[docs] def get_log_uri(self) -> Optional[str]: return self._log_uri
[docs] def get_error_message(self) -> str: job = self._service.GetJob( GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() ).job return job.error_message
[docs] def wait_termination(self, timeout_sec=None): status = self._wait_for_job_status( goal_status=[SparkJobStatus.COMPLETED, SparkJobStatus.FAILED], timeout_seconds=timeout_sec or 600, ) if status != SparkJobStatus.COMPLETED: raise SparkJobFailure( f"Spark job failed; Reason: {self.get_error_message()}" )
[docs]class RemoteRetrievalJob(RemoteJobMixin, RetrievalJob): """ Historical feature retrieval job result, job being run remotely bt the job service """ def __init__( self, service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, output_file_uri: str, start_time: datetime, log_uri: Optional[str], ): """ This is the job object representing the historical retrieval job. Args: output_file_uri (str): Uri to the historical feature retrieval job output file. """ super().__init__( service, grpc_extra_param_provider, job_id, start_time, log_uri ) self._output_file_uri = output_file_uri
[docs] def get_output_file_uri(self, timeout_sec=None, block=True): if not block and self.get_status() != SparkJobStatus.COMPLETED: return self.wait_termination(timeout_sec) return self._output_file_uri
[docs]class RemoteBatchIngestionJob(RemoteJobMixin, BatchIngestionJob): """ Batch ingestion job result. """ def __init__( self, service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str], ): super().__init__( service, grpc_extra_param_provider, job_id, start_time, log_uri ) self._feature_table = feature_table
[docs] def get_feature_table(self) -> str: return self._feature_table
[docs]class RemoteStreamIngestionJob(RemoteJobMixin, StreamIngestionJob): """ Stream ingestion job result. """ def __init__( self, service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job_id: str, feature_table: str, start_time: datetime, log_uri: Optional[str], ): super().__init__( service, grpc_extra_param_provider, job_id, start_time, log_uri ) self._feature_table = feature_table
[docs] def get_hash(self) -> str: response = self._service.GetJob( GetJobRequest(job_id=self._job_id), **self._grpc_extra_param_provider() ) return response.job.hash
[docs] def get_feature_table(self) -> str: return self._feature_table
[docs]def get_remote_job_from_proto( service: JobServiceStub, grpc_extra_param_provider: GrpcExtraParamProvider, job: JobProto, ) -> SparkJob: """Get the remote job python object from Job proto. Args: service (JobServiceStub): Reference to Job Service grpc_extra_param_provider (GrpcExtraParamProvider): Callable for providing extra parameters to grpc requests job (JobProto): Proto object describing the Job Returns: (SparkJob): A remote job object for the given job """ if job.type == JobType.RETRIEVAL_JOB: return RemoteRetrievalJob( service, grpc_extra_param_provider, job.id, job.retrieval.output_location, job.start_time.ToDatetime(), job.log_uri, ) elif job.type == JobType.BATCH_INGESTION_JOB: return RemoteBatchIngestionJob( service, grpc_extra_param_provider, job.id, job.batch_ingestion.table_name, job.start_time.ToDatetime(), job.log_uri, ) elif job.type == JobType.STREAM_INGESTION_JOB: return RemoteStreamIngestionJob( service, grpc_extra_param_provider, job.id, job.stream_ingestion.table_name, job.start_time.ToDatetime(), job.log_uri, ) else: raise ValueError( f"Invalid Job Type {job.type}, has to be one of " f"{(JobType.RETRIEVAL_JOB, JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB)}" )