API reference

Python interface for the LynxKite Remote API.

Installation:

pip install lynxkite-client

The default LynxKite connection parameters can be configured through the following environment variables:

LYNXKITE_ADDRESS=https://lynxkite.example.com/
LYNXKITE_USERNAME=user@company
LYNXKITE_PASSWORD=my_password
LYNXKITE_PUBLIC_SSL_CERT=/tmp/lynxkite.crt

Example usage:

import lynx.kite
lk = lynx.kite.LynxKite()
lk.createExampleGraph().sql('select * from graph_attributes').df()
class lynx.kite.AtomicBox(box_catalog: BoxCatalog, lk: LynxKite, operation: str, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)

Bases: Box

An AtomicBox is a Box that can not be further decomposed. It corresponds to a single frontend operation.

box_id_base() str

The base of the box_id, which is used when we save a workspace, containing this box.

name()

Either the name in the box catalog or the name under which the box is saved.

class lynx.kite.Box(box_catalog: BoxCatalog, lk: LynxKite, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)

Bases: object

Represents a box in a workspace segment.

It can store workspace segments, connected to its input plugs.

box_id_base() str

The base of the box_id, which is used when we save a workspace, containing this box.

is_box(operation: str) bool

Checks if the box is the operation box.

name() str

Either the name in the box catalog or the name under which the box is saved.

register(side_effect_collector: SideEffectCollector)
to_json(id_resolver: Callable[[Box], str], workspace_root: str, subworkspace_path: str) SerializedBox

Creates the json representation of a box in a workspace.

The inputs have to be connected, and all the attributes have to be defined when we call this.

trigger() None

Triggers this box.

Can be used on triggerable boxes like saveToSnapshot and export boxes.

class lynx.kite.BoxCatalog(boxes: Dict[str, SimpleNamespace])

Bases: object

Stores box metadata.

Offers utility functions to query box metadata information.

box_names() List[str]
inputs(name: str) List[str]
operation_id(name: str) str
outputs(name: str) List[str]
class lynx.kite.BoxPath(base: Box, stack: List[CustomBox] = [])

Bases: object

Represents a box (which can be inside (nested) custom boxes). It can be used for example to trigger boxes inside custom boxes.

stack[i+1] is always a box contained by the workspace referred by the custom box stack[i] and base is a box contained by stack[-1].

add_box_as_base(new_base: Box) BoxPath

Takes a box inside the current base as the new base and puts the current base on the top of the stack.

add_box_as_prefix(box: CustomBox) BoxPath
static dependencies(bps: Collection[BoxPath]) Dict[BoxPath, Set[BoxPath]]

Returns the dependencies between the given boxes.

dependency_representative() BoxPath

Returns the path of the box that should be used in dependency calculations.

For most boxes (like SQL1) this is themselves. Some boxes (like Compute inputs) have no outputs or rarely have boxes consuming their outputs. For these boxes we use their inputs for the purposes of dependency calculations. A full example:

[Create example graph] -> [Compute inputs]
  |
  v
[SQL1]                 -> [Compute inputs]

Only the Compute boxes are triggerable here, but there is no explicit dependency between them. You could trigger the second Compute box even if the first Compute box did not exist. But we want to order the Compute boxes in the intuitive way: as if the second one depended on the first. So we use their inputs as their representatives in the dependency calculation.

parent(input_name: str) BoxPath
parents() List[BoxPath]
snatch() Box

Returns a box that is accessible from outside and whose output is the same as the that of the box referred by this BoxPath.

stack_and_base() List[Box]
to_dict()

Returns a (human readable) dict representation of this object.

to_string_id(outer_ws) str

Can be used in automation, to generate unique task ids.

stack[0] is supposed to be in the outer_ws workspace.

class lynx.kite.CustomBox(box_catalog: BoxCatalog, lk: LynxKite, workspace: Workspace, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)

Bases: Box

A CustomBox is a Box composed of multiple other boxes.

box_id_base()

The base of the box_id, which is used when we save a workspace, containing this box.

name()

Either the name in the box catalog or the name under which the box is saved.

snatch(box: Box) Box

Takes a box that is inside the workspace referred to by this custom box and returns an equivalent box that is accessible from outside.

class lynx.kite.DataFrameSender(lk)

Bases: object

Class to send some dataframe to LynxKite in Parquet format. We do this by saving the dataframe to a local Parquet file and then upload it to LynxKite.

save_dataframe_to_local_file(df, tmp_dir) str

Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.

send(df)

Sends the local Parquet file to LynxKite

class lynx.kite.ExternalComputationBox(box_catalog: BoxCatalog, lk: LynxKite, inputs: List[State], parameters: Dict[str, Any], fn: Callable, args: BoundArguments, manual_box_id: str | None = None)

Bases: SingleOutputAtomicBox

A box that runs external computation when triggered. Use it via @external.

class lynx.kite.InputTable(lk, lk_table: _DownloadableLKTable)

Bases: object

Input tables for external computations (@external) are translated to these objects.

lk() State

Returns a LynxKite State.

pandas()

Returns a Pandas DataFrame.

spark(spark: SparkSession)

Takes a SparkSession as the argument and returns the table as a Spark DataFrame.

exception lynx.kite.LynxException(error)

Bases: Exception

Raised when LynxKite indicates that an error has occurred while processing a command.

class lynx.kite.LynxKite(username: str | None = None, password: str | None = None, address: str | None = None, certfile: str | None = None, oauth_token: str | None = None, signed_token: str | None = None, box_catalog: BoxCatalog | None = None, spark: SparkSession | None = None)

Bases: object

A connection to a LynxKite instance.

Some LynxKite API methods take a connection argument which can be used to communicate with multiple LynxKite instances from the same session. If no arguments to the constructor are provided, then a connection is created using the following environment variables: LYNXKITE_ADDRESS, LYNXKITE_USERNAME, LYNXKITE_PASSWORD, LYNXKITE_PUBLIC_SSL_CERT, LYNXKITE_OAUTH_TOKEN, LYNXKITE_SIGNED_TOKEN.

It can also be used without a running LynxKite instance. In this case pass in a SparkSession as spark. This class will start a temporary LynxKite instance automatically. The SparkSession must be configured with the LynxKite jar. There are two ways to do this:

pyspark --jars lynxkite-X.X.X.jar my_script.py

Or within your Python code:

spark = SparkSession.builder.config('spark.jars', 'lynxkite-X.X.X.jar').getOrCreate()
address() str
box_catalog() BoxCatalog
certfile() str | None
change_acl(file: str, readACL: str, writeACL: str)

Sets the read and write access control list for a path in LynxKite.

clean_file_system()

Deletes the data files which are not referenced anymore.

create_dir(path: str, privacy: str = 'public-read')
download_file(path: str) bytes
empty_cleaner_trash()

Empties the cleaner trash.

export_box(outputs: Dict[Tuple[str, str], SimpleNamespace], box_id: str) SimpleNamespace

Equivalent to triggering the export. Returns the exportResult output.

export_operation_names() List[str]

When we use an export operation instead of an export box, the export will be triggered automatically.

fetch_states(boxes: List[SerializedBox], parameters: Dict = {}) Dict[Tuple[str, str], SimpleNamespace]
fetch_workspace_output_states(ws: Workspace, save_under_root: str | None = None) Dict[Tuple[str, str], SimpleNamespace]
from_df(df, index=True)

Converts a pandas dataframe to a LynxKite table.

Set index to False if you don’t want to include the index. Warning: the index setting feature only works with pandas>=0.24.0

from_spark(df, *, write_parquet=False)

Converts a Spark DataFrame to a LynxKite table.

Set “write_parquet” to pass in the DataFrame by writing it as a Parquet file. This is slower, but works even if LynxKite and PySpark are in separate Spark sessions.

get_data_files_status()

Returns the amount of space used by LynxKite data, various cleaning methods and the amount of space they can free up.

get_directory_entry(path: str)

Returns details about a LynxKite path. The returned object has the following fields: exists, isWorkspace, isSnapshot, isDirectory

get_export_result(state: str) SimpleNamespace
get_graph(state: str, path: str = '') SimpleNamespace
get_graph_attribute(guid: str) SimpleNamespace
get_parquet_metadata(path: str)

Reads the metadata of a parquet file and returns the number of rows.

get_prefixed_path(path: str)

Resolves a path on a distributed file system. The path has to be specified using LynxKite’s prefixed path syntax. (E.g. DATA$/my_file.csv.)

The returned object has an exists and a resolved attribute. resolved is a string containing the absolute path.

get_state_id(state: State) str
get_table_data(state: str, limit: int = -1) SimpleNamespace
get_workspace(path: str, stack: List[str] = []) SimpleNamespace
get_workspace_boxes(path: str, stack: List[str] = []) List[SimpleNamespace]
home() str
import_box(boxes: List[SerializedBox], box_id: str) List[SerializedBox]

Equivalent to clicking the import button for an import box. Returns the updated boxes.

import_operation_names() List[str]

When we use an import operation instead of an import box, “run import” will be triggered automatically.

list_dir(dir: str = '') List[SimpleNamespace]

List the objects in a directory, with their names, types, notes, and other optional data about graphs.

move_to_cleaner_trash(method: str)

Moves LynxKite data files specified by the cleaning method into the cleaner trash. The possible values of method are defined in the result of get_data_files_status.

oauth_token() str | None
operation_names() List[str]
password() str | None
recursively_collect_customboxes(ws: Workspace, path) Set[Tuple[str, CustomBox]]
remove_name(name: str, force: bool = False)

Removes an object named name.

save_snapshot(path: str, stateId: str)
save_workspace(path: str, boxes: List[SerializedBox], overwrite: bool = True)
save_workspace_recursively(ws: Workspace, save_under_root: str | None = None) Tuple[str, str]
set_cleaner_min_age(days: float)
set_executors(count)
signed_token() str | None
sql(sql: str, *args, **kwargs) Box

Shorthand for sql1, sql2, …, sql10 boxes.

Use with positional arguments to go with the default naming, like:

lk.sql('select name from one join two', state1, state2)

Or pass the states in keyword arguments to use custom input names:

lk.sql('select count(*) from people', people=state1)

In either case you can pass in extra keyword arguments which will be passed on to the SQL boxes.

trigger_box(workspace_name: str, box_id: str, custom_box_stack: List[str] = [])

Triggers the computation of all the GUIDs in the box which is in the saved workspace named workspace_name and has boxID=box_id. If custom_box_stack is not empty, it specifies the sequence of custom boxes which leads to the workspace which contains the box with the given box_id.

upload(data: bytes, name: str | None = None)

Uploads a file that can then be used in import methods.

prefixed_path = lk.upload(‘id,namen1,Bob’)

uploadCSVNow(data: bytes, name: str | None = None)

Uploads CSV data and returns a table state.

uploadParquetNow(data: bytes, name: str | None = None)

Uploads Parquet file and returns a table state.

username() str
workspace(name: str | None = None, parameters: List[WorkspaceParameter] = [], inputs: List[str] | None = None) Callable[[Callable[[...], Dict[str, State]]], Workspace]
workspace_with_side_effects(name: str | None = None, parameters: List[WorkspaceParameter] = [], inputs: List[str] | None = None) Callable[[Callable[[...], Dict[str, State]]], Workspace]
class lynx.kite.ManagedLynxKite(spark, **kwargs)

Bases: object

Takes care of starting and stopping LynxKite in a SparkSession.

assert_jar_is_loaded(spark)
getDataFrame(guid)
importDataFrame(df)
set_env(**kwargs)
start()

Returns when LynxKite is ready to be used.

stop()
class lynx.kite.PandasDataFrameSender(lk)

Bases: DataFrameSender

save_dataframe_to_local_file(df, path) str

Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.

class lynx.kite.ParametricParameter(parametric_expr: str)

Bases: object

Represents a parametric parameter value. The short alias pp is defined for this type.

class lynx.kite.Placeholder(value=None)

Bases: object

Universal placeholder. Use it whenever you need to hold a place.

class lynx.kite.SideEffectCollector

Bases: object

AUTO: SideEffectCollector = <lynx.kite.SideEffectCollector object>
add_box(box: Box) None
all_triggerables() Iterable[BoxPath]
compute(state, id_override=None)
class lynx.kite.SingleOutputAtomicBox(box_catalog: BoxCatalog, lk: LynxKite, operation: str, inputs: Dict[str, State], parameters: Dict[str, Any], output_name: str, manual_box_id: str | None = None)

Bases: AtomicBox, State

An AtomicBox with a single output. This makes chaining multiple operations after each other possible.

class lynx.kite.SingleOutputCustomBox(box_catalog: BoxCatalog, lk: LynxKite, workspace: Workspace, inputs: Dict[str, State], parameters: Dict[str, Any], output_name: str, manual_box_id: str | None = None)

Bases: CustomBox, State

An CustomBox with a single output. This makes chaining multiple operations after each other possible.

class lynx.kite.SparkDataFrameSender(lk)

Bases: DataFrameSender

save_dataframe_to_local_file(df, target_dir) str

Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.

class lynx.kite.State(box: Box, output_plug_name: str)

Bases: object

Represents a named output plug of a box.

It can recursively store the boxes which are connected to the input plugs of the box of this state.

columns()

Returns a list of columns if this state is a table.

compute() None

Triggers the computation of this state.

Uses a temporary folder to save a temporary workspace for this computation.

df(limit: int = -1)

Returns a Pandas DataFrame if this state is a table.

property edge_attributes: SingleOutputAtomicBox

Returns the edge attributes as a table.

property edges: SingleOutputAtomicBox

Returns the edges as a table.

get_graph() SimpleNamespace

Returns the graph metadata if this state is a graph.

get_progress()

Returns progress info about the state.

get_table_data(limit: int = -1) SimpleNamespace

Returns the “raw” table data if this state is a table.

property graph_attributes: SingleOutputAtomicBox

Returns the graph attributes as a table.

operation_names()
persist() SingleOutputAtomicBox

Same as x.sql('select * from input', persist='yes').

run_export() str

Triggers the export if this state is an exportResult.

Returns the prefixed path of the exported file. This method is deprecated, only used in tests, where we need the export path.

save_snapshot(path: str) None

Save this state as a snapshot under path.

save_to_sequence(tss, date: datetime) None

Save this state to the tss TableSnapshotSequence with date as the date of the snapshot.

segmentation(name: str) SingleOutputAtomicBox

Returns the named segmentation as a base project.

Example usage: ` graph = lk.createExampleGraph().findConnectedComponents(name='seg1') segmentation = graph.segmentation('seg1') `

spark(spark: SparkSession | None = None)

Returns the table as a Spark DataFrame. The “spark” parameter must be set if LynxKite was not started from Python. In this case the DataFrame is read from a Parquet file.

sql(sql: str, **kwargs) SingleOutputAtomicBox
property vertices: SingleOutputAtomicBox

Returns the vertices as a table.

class lynx.kite.Workspace(terminal_boxes: List[Box] = [], name: str | None = None, custom_box_id_base: str | None = None, side_effect_paths: List[BoxPath] = [], input_boxes: List[AtomicBox] = [], ws_parameters: List[WorkspaceParameter] = [])

Bases: object

Immutable class representing a LynxKite workspace.

custom_boxes() List[CustomBox]
find(box_id_base: str) BoxPath

Returns the BoxPath for the box nested in the workspace whose box_id_base is the given string.

Raises an error if there is not exactly one such a box.

find_all(box_id_base: str) List[BoxPath]

Returns the BoxPaths for all boxes nested in the workspace whose box_id_base is the given string.

has_date_parameter() bool
has_local_date_parameter() bool
id_of(box: Box) str
safename() str
save(saved_under_folder: str) str
side_effect_paths() List[BoxPath]
terminal_box_ids() List[str]
to_json(workspace_root: str, subworkspace_path: str) List[SerializedBox]
trigger(box_to_trigger: BoxPath)

Triggers one side effect.

Assumes the workspace is not saved, so saves it under a random folder.

trigger_all_side_effects()

Triggers all side effects.

Also saves the workspace under a temporary folder.

trigger_saved(box_to_trigger: BoxPath, saved_under_folder: str)

Triggers one side effect.

Assumes the workspace is saved under saved_under_root.

class lynx.kite.WorkspaceParameter(name: str, kind: str, default_value: str = '')

Bases: object

Represents a workspace parameter declaration.

to_json() Dict[str, str]
lynx.kite.dedent(s: str | ParametricParameter) str | ParametricParameter

Format sql queries to look nicer on the UI.

lynx.kite.escape(s: str | ParametricParameter) str | ParametricParameter

Sanitizes a string for injecting into a generated SQL query.

lynx.kite.external(fn: Callable)

Decorator for executing computation outside of LynxKite in a LynxKite workflow.

Returns a custom box that internally exports the input tables and runs the external computation on them when it is triggered. The output of this custom box is the result of the external computation as a LynxKite table.

Example:

@external
def titled_names(table, default):
  df = table.pandas()
  df['gender'] = df.gender.fillna(default)
  df['titled'] = np.where(df.gender == 'Female', 'Ms ' + df.name, 'Mr ' + df.name)
  return df

t = titled_names(lk.createExampleGraph(), 'Female')
t.trigger() # Causes the function to run.
print(t.df())

Table state parameters of the function call are exported into a Parquet file. You can read them manually, or via the methods of the InputTable objects that are passed to your function in their place:

  • filename is the Hadoop path for the file. (I.e. file:/something or hdfs:///something.)

  • pandas() loads the Parquet file into a Pandas DataFrame.

  • spark() loads the Parquet file into a PySpark DataFrame.

  • lk() loads the Parquet file into a LynxKite state.

Your function can return one of the following types:

  • pandas.DataFrame to be written to a Parquet file and imported in LynxKite.

  • spark.DataFrame to be written to a Parquet file and imported in LynxKite.

  • A string that is the LynxKite prefixed path to a Parquet file that is your output.

  • A LynxKite table state to be written to a Parquet file and imported in LynxKite.

Why use LynxKite states in external computations? This allows the use of LynxKite as in a notebook environment. You can access the actual data and write code that is conditional on the data.

lynx.kite.map_args(signature: Signature, bound: BoundArguments, map_fn: Callable[[str, Any], Any]) BoundArguments
lynx.kite.pp

alias of ParametricParameter

lynx.kite.random_filename() str
lynx.kite.serialize_deps(deps: Dict[Any, Set[Any]]) List[Any]

Returns the keys of deps in an execution order that respects the dependencies.

lynx.kite.subworkspace(fn: Callable)

Allows using the decorated function as a LynxKite custom box.

Example use:

@subworkspace
def my_func(input1, other_arg1):
  return input1.sql1(sql=f'select {other_arg1} from vertices')

df = my_func(lk.createExampleGraph(), 'name').df()

To make a multi-output box, return a dictionary instead of a state.

my_func() can have any number of positional or keyword arguments. The arguments carrying states will be turned into the inputs of the custom box.

Use @ws_param to take workspace parameters.

To define a custom box with side-effects, take an argument with a default value of SideEffectCollector.AUTO. A fresh SideEffectCollector() will be provided each time the function is called.

@subworkspace
def my_func(input1, sec=SideEffectCollector.AUTO):
  input1.saveAsSnapshot('x').register(sec)

my_func(lk.createExampleGraph()).trigger()
lynx.kite.text(name: str, default: str = '') WorkspaceParameter

Helper function to make it easy to define a text kind ws parameter.

lynx.kite.to_simple_dicts(obj)

Converts a SimpleNamespace structure (as returned from LynxKite requests) into simple dicts.

lynx.kite.ws_name(name: str)

Specifies the name of the wrapped subworkspace.

Example use:

@ws_name('My nice workspace')
@subworkspace
def my_func(input1):
  return input1.sql1(sql='select * from vertices')

my_func(lk.createExampleGraph())
lynx.kite.ws_param(name: str, default: str = '', description: str = '')

Adds a workspace parameter to the wrapped subworkspace.

Example use:

@ws_param('p1')
@ws_param('p2', default='x', description='The second column.')
@subworkspace
def my_func(input1):
  return input1.sql1(sql=pp('select $p1, $p2 from vertices'))

my_func(lk.createExampleGraph(), p1='age', p2='name')

Often you just want to pass your workspace parameter along unchanged. You could just write box(param=pp('$ws_param')). But if you take a keyword argument with the same name as a workspace parameter, this is even easier:

@ws_params('p1')
@subworkspace
def my_func(input1, p1):
  return input1.sql1(sql=p1)  # Equivalent to sql=pp('$p1').