1.16.15

flytekitplugins.spark.task

Directory

Classes

Class Description
Databricks Deprecated.
DatabricksV2 Use this to configure a Databricks task.
PysparkFunctionTask Actual Plugin that transforms the local python code for execution within a spark context.
Spark Use this to configure a SparkContext for a your task.

Methods

Method Description
new_spark_session() Optionally creates a new spark session and returns it.

Variables

Property Type Description
PRIMARY_CONTAINER_DEFAULT_NAME str

Methods

new_spark_session()

def new_spark_session(
    name: str,
    conf: typing.Dict[str, str],
)

Optionally creates a new spark session and returns it. In cluster mode (running in hosted flyte, this will disregard the spark conf passed in)

This method is safe to be used from any other method. That is one reason why, we have duplicated this code fragment with the pre-execute. For example in the notebook scenario we might want to call it from a separate kernel

Parameter Type Description
name str
conf typing.Dict[str, str]

flytekitplugins.spark.task.Databricks

Deprecated. Use DatabricksV2 instead.

class Databricks(
    spark_conf: typing.Optional[typing.Dict[str, str]],
    hadoop_conf: typing.Optional[typing.Dict[str, str]],
    executor_path: typing.Optional[str],
    applications_path: typing.Optional[str],
    driver_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
    executor_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
    databricks_conf: typing.Optional[typing.Dict[str, typing.Union[str, dict]]],
    databricks_instance: typing.Optional[str],
)
Parameter Type Description
spark_conf typing.Optional[typing.Dict[str, str]]
hadoop_conf typing.Optional[typing.Dict[str, str]]
executor_path typing.Optional[str]
applications_path typing.Optional[str]
driver_pod typing.Optional[flytekit.core.pod_template.PodTemplate]
executor_pod typing.Optional[flytekit.core.pod_template.PodTemplate]
databricks_conf typing.Optional[typing.Dict[str, typing.Union[str, dict]]]
databricks_instance typing.Optional[str]

flytekitplugins.spark.task.DatabricksV2

Use this to configure a Databricks task. Task’s marked with this will automatically execute natively onto databricks platform as a distributed execution of spark.

Supports both classic compute (clusters) and serverless compute.

Attributes: databricks_conf (Optional[Dict[str, Union[str, dict]]]): Databricks job configuration compliant with API version 2.1, supporting 2.0 use cases. For the configuration structure, visit: https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure For updates in API 2.1, refer to: https://docs.databricks.com/en/workflows/jobs/jobs-api-updates.html databricks_instance (Optional[str]): Domain name of your deployment. Use the form <account>.cloud.databricks.com. databricks_service_credential_provider (Optional[str]): Provider name for Databricks Service Credentials for S3 access. Falls back to FLYTE_DATABRICKS_SERVICE_CREDENTIAL_PROVIDER env var. databricks_token_secret (Optional[str]): Custom name for the K8s secret containing the Databricks token. Defaults to ‘databricks-token’ if not specified. notebook_path (Optional[str]): Path to Databricks notebook (e.g., “/Users/[email protected]/notebook”). notebook_base_parameters (Optional[Dict[str, str]]): Parameters to pass to the notebook.

Compute Modes: The connector auto-detects the compute mode based on the databricks_conf contents:

1. Classic Compute (existing cluster):
    Provide `existing_cluster_id` in databricks_conf.

2. Classic Compute (new cluster):
    Provide `new_cluster` configuration in databricks_conf.

3. Serverless Compute (pre-configured environment):
    Provide `environment_key` referencing a pre-configured environment in Databricks.
    Do not include `existing_cluster_id` or `new_cluster`.

4. Serverless Compute (inline environment spec):
    Provide `environments` array with environment specifications.
    Optionally include `environment_key` to specify which environment to use.
    Do not include `existing_cluster_id` or `new_cluster`.

Example - Classic Compute with new cluster::

DatabricksV2(
    databricks_conf={
        "run_name": "my-spark-job",
        "new_cluster": {
            "spark_version": "13.3.x-scala2.12",
            "node_type_id": "m5.xlarge",
            "num_workers": 2,
        },
    },
    databricks_instance="my-workspace.cloud.databricks.com",
)

Example - Serverless Compute with pre-configured environment::

DatabricksV2(
    databricks_conf={
        "run_name": "my-serverless-job",
        "environment_key": "my-preconfigured-env",
    },
    databricks_instance="my-workspace.cloud.databricks.com",
)

Example - Serverless Compute with inline environment spec::

DatabricksV2(
    databricks_conf={
        "run_name": "my-serverless-job",
        "environment_key": "default",
        "environments": [{
            "environment_key": "default",
            "spec": {
                "client": "1",
                "dependencies": ["pandas==2.0.0", "numpy==1.24.0"],
            }
        }],
    },
    databricks_instance="my-workspace.cloud.databricks.com",
)

Note: Serverless compute has certain limitations compared to classic compute: - Only Python and SQL are supported (no Scala or R) - Only Spark Connect APIs are supported (no RDD APIs) - Must use Unity Catalog for external data sources - No support for compute-scoped init scripts or libraries For full details, see: https://docs.databricks.com/en/compute/serverless/limitations.html

Serverless Entrypoint: Both classic and serverless use the same flytetools repo for their entrypoints. Classic uses flytekitplugins/databricks/entrypoint.py and serverless uses flytekitplugins/databricks/entrypoint_serverless.py. No additional configuration needed.

To override the default, provide ``git_source`` and ``python_file`` in ``databricks_conf``.

AWS Credentials for Serverless: Databricks serverless does not provide AWS credentials via instance metadata. To access S3 (for Flyte data), configure a Databricks Service Credential.

The provider name is resolved in this order:
1. ``databricks_service_credential_provider`` in the task config (per-task override)
2. ``FLYTE_DATABRICKS_SERVICE_CREDENTIAL_PROVIDER`` environment variable on the connector (default for all tasks)

The entrypoint will use this to obtain AWS credentials via:
dbutils.credentials.getServiceCredentialsProvider(provider_name)

Notebook Support: To run a Databricks notebook instead of a Python file, set notebook_path. Parameters can be passed via notebook_base_parameters.

Example - Running a notebook::

    DatabricksV2(
        databricks_conf={
            "run_name": "my-notebook-job",
            "new_cluster": {...},
        },
        databricks_instance="my-workspace.cloud.databricks.com",
        notebook_path="/Users/[email protected]/my-notebook",
        notebook_base_parameters={"param1": "value1"},
    )
class DatabricksV2(
    spark_conf: typing.Optional[typing.Dict[str, str]],
    hadoop_conf: typing.Optional[typing.Dict[str, str]],
    executor_path: typing.Optional[str],
    applications_path: typing.Optional[str],
    driver_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
    executor_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
    databricks_conf: typing.Optional[typing.Dict[str, typing.Union[str, dict]]],
    databricks_instance: typing.Optional[str],
    databricks_service_credential_provider: typing.Optional[str],
    databricks_token_secret: typing.Optional[str],
    notebook_path: typing.Optional[str],
    notebook_base_parameters: typing.Optional[typing.Dict[str, str]],
)
Parameter Type Description
spark_conf typing.Optional[typing.Dict[str, str]]
hadoop_conf typing.Optional[typing.Dict[str, str]]
executor_path typing.Optional[str]
applications_path typing.Optional[str]
driver_pod typing.Optional[flytekit.core.pod_template.PodTemplate]
executor_pod typing.Optional[flytekit.core.pod_template.PodTemplate]
databricks_conf typing.Optional[typing.Dict[str, typing.Union[str, dict]]]
databricks_instance typing.Optional[str]
databricks_service_credential_provider typing.Optional[str]
databricks_token_secret typing.Optional[str]
notebook_path typing.Optional[str]
notebook_base_parameters typing.Optional[typing.Dict[str, str]]

flytekitplugins.spark.task.PysparkFunctionTask

Actual Plugin that transforms the local python code for execution within a spark context

class PysparkFunctionTask(
    task_config: flytekitplugins.spark.task.Spark,
    task_function: typing.Callable,
    container_image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType],
    kwargs,
)
Parameter Type Description
task_config flytekitplugins.spark.task.Spark
task_function typing.Callable
container_image typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType]
kwargs **kwargs

Properties

Property Type Description
container_image None
deck_fields None If not empty, this task will output deck html file for the specified decks
disable_deck None If true, this task will not output deck html file
docs None
enable_deck None If true, this task will output deck html file
environment None Any environment variables that supplied during the execution of the task.
execution_mode None
instantiated_in None
interface None
lhs None
location None
metadata None
name None Returns the name of the task.
node_dependency_hints None
python_interface None Returns this task’s python interface.
resources None
security_context None
task_config None Returns the user-specified task config which is used for plugin-specific handling of the task.
task_function None
task_resolver None
task_type None
task_type_version None

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition.
compile_into_workflow() In the case of dynamic workflows, this function will produce a workflow definition at execution time which will.
connector_signal_handler()
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor.
dynamic_execute() By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte.
execute() This method will be invoked to execute the task.
find_lhs()
get_command() Returns the command which should be used in the container definition for the serialized version of this task.
get_config() Returns the task config as a serializable dictionary.
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte.
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary.
get_default_command() Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte.
get_image() Update image spec based on fast registration usage, and return string representing the image.
get_input_types() Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
get_type_for_input_var() Returns the python type for an input variable by name.
get_type_for_output_var() Returns the python type for the specified output variable by name.
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute.
local_execution_mode()
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,.
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs.
reset_command_fn() Resets the command which should be used in the container definition of this task to the default arguments.
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution.
set_command_fn() By default, the task will run on the Flyte platform using the pyflyte-execute command.
set_resolver() By default, flytekit uses the DefaultTaskResolver to resolve the task.
to_k8s_pod() Convert the podTemplate to K8sPod.

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]

Generates a node that encapsulates this task in a workflow definition.

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

compile_into_workflow()

def compile_into_workflow(
    ctx: FlyteContext,
    task_function: Callable,
    kwargs,
) -> Union[_dynamic_job.DynamicJobSpec, _literal_models.LiteralMap]

In the case of dynamic workflows, this function will produce a workflow definition at execution time which will then proceed to be executed.

Parameter Type Description
ctx FlyteContext
task_function Callable
kwargs **kwargs

connector_signal_handler()

def connector_signal_handler(
    resource_meta: flytekit.extend.backend.base_connector.ResourceMeta,
    signum: int,
    frame: frame,
) -> typing.Any
Parameter Type Description
resource_meta flytekit.extend.backend.base_connector.ResourceMeta
signum int
frame frame

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

dynamic_execute()

def dynamic_execute(
    task_function: Callable,
    kwargs,
) -> Any

By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte literal wrappers so that the kwargs we are working with here are now Python native literal values. This function is also expected to return Python native literal values.

Since the user code within a dynamic task constitute a workflow, we have to first compile the workflow, and then execute that workflow.

When running for real in production, the task would stop after the compilation step, and then create a file representing that newly generated workflow, instead of executing it.

Parameter Type Description
task_function Callable
kwargs **kwargs

execute()

def execute(
    kwargs,
) -> typing.Any

This method will be invoked to execute the task. If you do decide to override this method you must also handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.

Parameter Type Description
kwargs **kwargs

find_lhs()

def find_lhs()

get_command()

def get_command(
    settings: SerializationSettings,
) -> List[str]

Returns the command which should be used in the container definition for the serialized version of this task registered on a hosted Flyte platform.

Parameter Type Description
settings SerializationSettings

get_config()

def get_config(
    settings: SerializationSettings,
) -> Optional[Dict[str, str]]

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type Description
settings SerializationSettings

get_container()

def get_container(
    settings: SerializationSettings,
) -> _task_model.Container

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Dict[str, typing.Any]

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_default_command()

def get_default_command(
    settings: SerializationSettings,
) -> List[str]

Returns the default pyflyte-execute command used to run this on hosted Flyte platforms.

Parameter Type Description
settings SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: SerializationSettings,
) -> Optional[tasks_pb2.ExtendedResources]

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type Description
settings SerializationSettings

get_image()

def get_image(
    settings: SerializationSettings,
) -> str

Update image spec based on fast registration usage, and return string representing the image

Parameter Type Description
settings SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: SerializationSettings,
) -> _task_model.K8sPod

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for an input variable by name.

Parameter Type Description
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for the specified output variable by name.

Parameter Type Description
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
) -> typing.Any

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type Description
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters] are the modified user params as created during the pre_execute step
rval typing.Any

pre_execute()

def pre_execute(
    user_params: flytekit.core.context_manager.ExecutionParameters,
) -> flytekit.core.context_manager.ExecutionParameters

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type Description
user_params flytekit.core.context_manager.ExecutionParameters

reset_command_fn()

def reset_command_fn()

Resets the command which should be used in the container definition of this task to the default arguments. This is useful when the command line is overridden at serialization time.

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

set_command_fn()

def set_command_fn(
    get_command_fn: Optional[Callable[[SerializationSettings], List[str]]],
)

By default, the task will run on the Flyte platform using the pyflyte-execute command. However, it can be useful to update the command with which the task is serialized for specific cases like running map tasks (“pyflyte-map-execute”) or for fast-executed tasks.

Parameter Type Description
get_command_fn Optional[Callable[[SerializationSettings], List[str]]]

set_resolver()

def set_resolver(
    resolver: TaskResolverMixin,
)

By default, flytekit uses the DefaultTaskResolver to resolve the task. This method allows the user to set a custom task resolver. It can be useful to override the task resolver for specific cases like running tasks in the jupyter notebook.

Parameter Type Description
resolver TaskResolverMixin

to_k8s_pod()

def to_k8s_pod(
    pod_template: typing.Optional[flytekit.core.pod_template.PodTemplate],
) -> typing.Optional[flytekit.models.task.K8sPod]

Convert the podTemplate to K8sPod

Parameter Type Description
pod_template typing.Optional[flytekit.core.pod_template.PodTemplate]

flytekitplugins.spark.task.Spark

Use this to configure a SparkContext for a your task. Task’s marked with this will automatically execute natively onto K8s as a distributed execution of spark

Attributes: spark_conf (Optional[Dict[str, str]]): Spark configuration dictionary. hadoop_conf (Optional[Dict[str, str]]): Hadoop configuration dictionary. executor_path (Optional[str]): Path to the Python binary for PySpark execution. applications_path (Optional[str]): Path to the main application file. driver_pod (Optional[PodTemplate]): The pod template for the Spark driver pod. executor_pod (Optional[PodTemplate]): The pod template for the Spark executor pod.

class Spark(
    spark_conf: typing.Optional[typing.Dict[str, str]],
    hadoop_conf: typing.Optional[typing.Dict[str, str]],
    executor_path: typing.Optional[str],
    applications_path: typing.Optional[str],
    driver_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
    executor_pod: typing.Optional[flytekit.core.pod_template.PodTemplate],
)
Parameter Type Description
spark_conf typing.Optional[typing.Dict[str, str]]
hadoop_conf typing.Optional[typing.Dict[str, str]]
executor_path typing.Optional[str]
applications_path typing.Optional[str]
driver_pod typing.Optional[flytekit.core.pod_template.PodTemplate]
executor_pod typing.Optional[flytekit.core.pod_template.PodTemplate]