API reference¶
Python interface for the LynxKite Remote API.
Installation:
pip install lynxkite-client
The default LynxKite connection parameters can be configured through the following environment variables:
LYNXKITE_ADDRESS=https://lynxkite.example.com/
LYNXKITE_USERNAME=user@company
LYNXKITE_PASSWORD=my_password
LYNXKITE_PUBLIC_SSL_CERT=/tmp/lynxkite.crt
Example usage:
import lynx.kite
lk = lynx.kite.LynxKite()
lk.createExampleGraph().sql('select * from graph_attributes').df()
- class lynx.kite.AtomicBox(box_catalog: BoxCatalog, lk: LynxKite, operation: str, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)¶
Bases:
Box
An
AtomicBox
is aBox
that can not be further decomposed. It corresponds to a single frontend operation.- box_id_base() str ¶
The base of the box_id, which is used when we save a workspace, containing this box.
- name()¶
Either the name in the box catalog or the name under which the box is saved.
- class lynx.kite.Box(box_catalog: BoxCatalog, lk: LynxKite, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)¶
Bases:
object
Represents a box in a workspace segment.
It can store workspace segments, connected to its input plugs.
- box_id_base() str ¶
The base of the box_id, which is used when we save a workspace, containing this box.
- is_box(operation: str) bool ¶
Checks if the box is the operation box.
- name() str ¶
Either the name in the box catalog or the name under which the box is saved.
- register(side_effect_collector: SideEffectCollector)¶
- to_json(id_resolver: Callable[[Box], str], workspace_root: str, subworkspace_path: str) SerializedBox ¶
Creates the json representation of a box in a workspace.
The inputs have to be connected, and all the attributes have to be defined when we call this.
- trigger() None ¶
Triggers this box.
Can be used on triggerable boxes like saveToSnapshot and export boxes.
- class lynx.kite.BoxCatalog(boxes: Dict[str, SimpleNamespace])¶
Bases:
object
Stores box metadata.
Offers utility functions to query box metadata information.
- box_names() List[str] ¶
- inputs(name: str) List[str] ¶
- operation_id(name: str) str ¶
- outputs(name: str) List[str] ¶
- class lynx.kite.BoxPath(base: Box, stack: List[CustomBox] = [])¶
Bases:
object
Represents a box (which can be inside (nested) custom boxes). It can be used for example to trigger boxes inside custom boxes.
stack[i+1]
is always a box contained by the workspace referred by the custom boxstack[i]
andbase
is a box contained bystack[-1]
.- add_box_as_base(new_base: Box) BoxPath ¶
Takes a box inside the current base as the new base and puts the current base on the top of the stack.
- static dependencies(bps: Collection[BoxPath]) Dict[BoxPath, Set[BoxPath]] ¶
Returns the dependencies between the given boxes.
- dependency_representative() BoxPath ¶
Returns the path of the box that should be used in dependency calculations.
For most boxes (like SQL1) this is themselves. Some boxes (like Compute inputs) have no outputs or rarely have boxes consuming their outputs. For these boxes we use their inputs for the purposes of dependency calculations. A full example:
[Create example graph] -> [Compute inputs] | v [SQL1] -> [Compute inputs]
Only the Compute boxes are triggerable here, but there is no explicit dependency between them. You could trigger the second Compute box even if the first Compute box did not exist. But we want to order the Compute boxes in the intuitive way: as if the second one depended on the first. So we use their inputs as their representatives in the dependency calculation.
- snatch() Box ¶
Returns a box that is accessible from outside and whose output is the same as the that of the box referred by this BoxPath.
- to_dict()¶
Returns a (human readable) dict representation of this object.
- to_string_id(outer_ws) str ¶
Can be used in automation, to generate unique task ids.
stack[0] is supposed to be in the outer_ws workspace.
- class lynx.kite.CustomBox(box_catalog: BoxCatalog, lk: LynxKite, workspace: Workspace, inputs: Dict[str, State], parameters: Dict[str, Any], manual_box_id: str | None = None)¶
Bases:
Box
A
CustomBox
is aBox
composed of multiple other boxes.- box_id_base()¶
The base of the box_id, which is used when we save a workspace, containing this box.
- name()¶
Either the name in the box catalog or the name under which the box is saved.
- class lynx.kite.DataFrameSender(lk)¶
Bases:
object
Class to send some dataframe to LynxKite in Parquet format. We do this by saving the dataframe to a local Parquet file and then upload it to LynxKite.
- save_dataframe_to_local_file(df, tmp_dir) str ¶
Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.
- send(df)¶
Sends the local Parquet file to LynxKite
- class lynx.kite.ExternalComputationBox(box_catalog: BoxCatalog, lk: LynxKite, inputs: List[State], parameters: Dict[str, Any], fn: Callable, args: BoundArguments, manual_box_id: str | None = None)¶
Bases:
SingleOutputAtomicBox
A box that runs external computation when triggered. Use it via
@external
.
- class lynx.kite.InputTable(lk, lk_table: _DownloadableLKTable)¶
Bases:
object
Input tables for external computations (
@external
) are translated to these objects.- pandas()¶
Returns a Pandas DataFrame.
- spark(spark: SparkSession)¶
Takes a SparkSession as the argument and returns the table as a Spark DataFrame.
- exception lynx.kite.LynxException(error)¶
Bases:
Exception
Raised when LynxKite indicates that an error has occurred while processing a command.
- class lynx.kite.LynxKite(username: str | None = None, password: str | None = None, address: str | None = None, certfile: str | None = None, oauth_token: str | None = None, signed_token: str | None = None, box_catalog: BoxCatalog | None = None, spark: SparkSession | None = None)¶
Bases:
object
A connection to a LynxKite instance.
Some LynxKite API methods take a connection argument which can be used to communicate with multiple LynxKite instances from the same session. If no arguments to the constructor are provided, then a connection is created using the following environment variables:
LYNXKITE_ADDRESS
,LYNXKITE_USERNAME
,LYNXKITE_PASSWORD
,LYNXKITE_PUBLIC_SSL_CERT
,LYNXKITE_OAUTH_TOKEN
,LYNXKITE_SIGNED_TOKEN
.It can also be used without a running LynxKite instance. In this case pass in a SparkSession as
spark
. This class will start a temporary LynxKite instance automatically. The SparkSession must be configured with the LynxKite jar. There are two ways to do this:pyspark --jars lynxkite-X.X.X.jar my_script.py
Or within your Python code:
spark = SparkSession.builder.config('spark.jars', 'lynxkite-X.X.X.jar').getOrCreate()
- address() str ¶
- box_catalog() BoxCatalog ¶
- certfile() str | None ¶
- change_acl(file: str, readACL: str, writeACL: str)¶
Sets the read and write access control list for a path in LynxKite.
- clean_file_system()¶
Deletes the data files which are not referenced anymore.
- create_dir(path: str, privacy: str = 'public-read')¶
- download_file(path: str) bytes ¶
- empty_cleaner_trash()¶
Empties the cleaner trash.
- export_box(outputs: Dict[Tuple[str, str], SimpleNamespace], box_id: str) SimpleNamespace ¶
Equivalent to triggering the export. Returns the exportResult output.
- export_operation_names() List[str] ¶
When we use an export operation instead of an export box, the export will be triggered automatically.
- fetch_states(boxes: List[SerializedBox], parameters: Dict = {}) Dict[Tuple[str, str], SimpleNamespace] ¶
- fetch_workspace_output_states(ws: Workspace, save_under_root: str | None = None) Dict[Tuple[str, str], SimpleNamespace] ¶
- from_df(df, index=True)¶
Converts a pandas dataframe to a LynxKite table.
Set index to False if you don’t want to include the index. Warning: the index setting feature only works with pandas>=0.24.0
- from_spark(df, *, write_parquet=False)¶
Converts a Spark DataFrame to a LynxKite table.
Set “write_parquet” to pass in the DataFrame by writing it as a Parquet file. This is slower, but works even if LynxKite and PySpark are in separate Spark sessions.
- get_data_files_status()¶
Returns the amount of space used by LynxKite data, various cleaning methods and the amount of space they can free up.
- get_directory_entry(path: str)¶
Returns details about a LynxKite path. The returned object has the following fields:
exists
,isWorkspace
,isSnapshot
,isDirectory
- get_export_result(state: str) SimpleNamespace ¶
- get_graph(state: str, path: str = '') SimpleNamespace ¶
- get_graph_attribute(guid: str) SimpleNamespace ¶
- get_parquet_metadata(path: str)¶
Reads the metadata of a parquet file and returns the number of rows.
- get_prefixed_path(path: str)¶
Resolves a path on a distributed file system. The path has to be specified using LynxKite’s prefixed path syntax. (E.g.
DATA$/my_file.csv
.)The returned object has an
exists
and aresolved
attribute.resolved
is a string containing the absolute path.
- get_table_data(state: str, limit: int = -1) SimpleNamespace ¶
- get_workspace(path: str, stack: List[str] = []) SimpleNamespace ¶
- get_workspace_boxes(path: str, stack: List[str] = []) List[SimpleNamespace] ¶
- home() str ¶
- import_box(boxes: List[SerializedBox], box_id: str) List[SerializedBox] ¶
Equivalent to clicking the import button for an import box. Returns the updated boxes.
- import_operation_names() List[str] ¶
When we use an import operation instead of an import box, “run import” will be triggered automatically.
- list_dir(dir: str = '') List[SimpleNamespace] ¶
List the objects in a directory, with their names, types, notes, and other optional data about graphs.
- move_to_cleaner_trash(method: str)¶
Moves LynxKite data files specified by the cleaning
method
into the cleaner trash. The possible values ofmethod
are defined in the result of get_data_files_status.
- oauth_token() str | None ¶
- operation_names() List[str] ¶
- password() str | None ¶
- remove_name(name: str, force: bool = False)¶
Removes an object named
name
.
- save_snapshot(path: str, stateId: str)¶
- save_workspace(path: str, boxes: List[SerializedBox], overwrite: bool = True)¶
- set_cleaner_min_age(days: float)¶
- set_executors(count)¶
- signed_token() str | None ¶
- sql(sql: str, *args, **kwargs) Box ¶
Shorthand for sql1, sql2, …, sql10 boxes.
Use with positional arguments to go with the default naming, like:
lk.sql('select name from one join two', state1, state2)
Or pass the states in keyword arguments to use custom input names:
lk.sql('select count(*) from people', people=state1)
In either case you can pass in extra keyword arguments which will be passed on to the SQL boxes.
- trigger_box(workspace_name: str, box_id: str, custom_box_stack: List[str] = [])¶
Triggers the computation of all the GUIDs in the box which is in the saved workspace named
workspace_name
and hasboxID=box_id
. If custom_box_stack is not empty, it specifies the sequence of custom boxes which leads to the workspace which contains the box with the given box_id.
- upload(data: bytes, name: str | None = None)¶
Uploads a file that can then be used in import methods.
prefixed_path = lk.upload(‘id,namen1,Bob’)
- uploadCSVNow(data: bytes, name: str | None = None)¶
Uploads CSV data and returns a table state.
- uploadParquetNow(data: bytes, name: str | None = None)¶
Uploads Parquet file and returns a table state.
- username() str ¶
- workspace(name: str | None = None, parameters: List[WorkspaceParameter] = [], inputs: List[str] | None = None) Callable[[Callable[[...], Dict[str, State]]], Workspace] ¶
- workspace_with_side_effects(name: str | None = None, parameters: List[WorkspaceParameter] = [], inputs: List[str] | None = None) Callable[[Callable[[...], Dict[str, State]]], Workspace] ¶
- class lynx.kite.ManagedLynxKite(spark, **kwargs)¶
Bases:
object
Takes care of starting and stopping LynxKite in a SparkSession.
- assert_jar_is_loaded(spark)¶
- getDataFrame(guid)¶
- importDataFrame(df)¶
- set_env(**kwargs)¶
- start()¶
Returns when LynxKite is ready to be used.
- stop()¶
- class lynx.kite.PandasDataFrameSender(lk)¶
Bases:
DataFrameSender
- save_dataframe_to_local_file(df, path) str ¶
Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.
- class lynx.kite.ParametricParameter(parametric_expr: str)¶
Bases:
object
Represents a parametric parameter value. The short alias pp is defined for this type.
- class lynx.kite.Placeholder(value=None)¶
Bases:
object
Universal placeholder. Use it whenever you need to hold a place.
- class lynx.kite.SideEffectCollector¶
Bases:
object
- AUTO: SideEffectCollector = <lynx.kite.SideEffectCollector object>¶
- compute(state, id_override=None)¶
- class lynx.kite.SingleOutputAtomicBox(box_catalog: BoxCatalog, lk: LynxKite, operation: str, inputs: Dict[str, State], parameters: Dict[str, Any], output_name: str, manual_box_id: str | None = None)¶
-
An
AtomicBox
with a single output. This makes chaining multiple operations after each other possible.
- class lynx.kite.SingleOutputCustomBox(box_catalog: BoxCatalog, lk: LynxKite, workspace: Workspace, inputs: Dict[str, State], parameters: Dict[str, Any], output_name: str, manual_box_id: str | None = None)¶
-
An
CustomBox
with a single output. This makes chaining multiple operations after each other possible.
- class lynx.kite.SparkDataFrameSender(lk)¶
Bases:
DataFrameSender
- save_dataframe_to_local_file(df, target_dir) str ¶
Saves the dataframe as a single Parquet file in tmpdir Returns the actual path of the binary that was written.
- class lynx.kite.State(box: Box, output_plug_name: str)¶
Bases:
object
Represents a named output plug of a box.
It can recursively store the boxes which are connected to the input plugs of the box of this state.
- columns()¶
Returns a list of columns if this state is a table.
- compute() None ¶
Triggers the computation of this state.
Uses a temporary folder to save a temporary workspace for this computation.
- df(limit: int = -1)¶
Returns a Pandas DataFrame if this state is a table.
- property edge_attributes: SingleOutputAtomicBox¶
Returns the edge attributes as a table.
- property edges: SingleOutputAtomicBox¶
Returns the edges as a table.
- get_graph() SimpleNamespace ¶
Returns the graph metadata if this state is a graph.
- get_progress()¶
Returns progress info about the state.
- get_table_data(limit: int = -1) SimpleNamespace ¶
Returns the “raw” table data if this state is a table.
- property graph_attributes: SingleOutputAtomicBox¶
Returns the graph attributes as a table.
- operation_names()¶
- persist() SingleOutputAtomicBox ¶
Same as
x.sql('select * from input', persist='yes')
.
- run_export() str ¶
Triggers the export if this state is an
exportResult
.Returns the prefixed path of the exported file. This method is deprecated, only used in tests, where we need the export path.
- save_snapshot(path: str) None ¶
Save this state as a snapshot under path.
- save_to_sequence(tss, date: datetime) None ¶
Save this state to the
tss
TableSnapshotSequence withdate
as the date of the snapshot.
- segmentation(name: str) SingleOutputAtomicBox ¶
Returns the named segmentation as a base project.
Example usage:
` graph = lk.createExampleGraph().findConnectedComponents(name='seg1') segmentation = graph.segmentation('seg1') `
- spark(spark: SparkSession | None = None)¶
Returns the table as a Spark DataFrame. The “spark” parameter must be set if LynxKite was not started from Python. In this case the DataFrame is read from a Parquet file.
- sql(sql: str, **kwargs) SingleOutputAtomicBox ¶
- property vertices: SingleOutputAtomicBox¶
Returns the vertices as a table.
- class lynx.kite.Workspace(terminal_boxes: List[Box] = [], name: str | None = None, custom_box_id_base: str | None = None, side_effect_paths: List[BoxPath] = [], input_boxes: List[AtomicBox] = [], ws_parameters: List[WorkspaceParameter] = [])¶
Bases:
object
Immutable class representing a LynxKite workspace.
- find(box_id_base: str) BoxPath ¶
Returns the BoxPath for the box nested in the workspace whose box_id_base is the given string.
Raises an error if there is not exactly one such a box.
- find_all(box_id_base: str) List[BoxPath] ¶
Returns the BoxPaths for all boxes nested in the workspace whose box_id_base is the given string.
- has_date_parameter() bool ¶
- has_local_date_parameter() bool ¶
- safename() str ¶
- save(saved_under_folder: str) str ¶
- terminal_box_ids() List[str] ¶
- to_json(workspace_root: str, subworkspace_path: str) List[SerializedBox] ¶
- trigger(box_to_trigger: BoxPath)¶
Triggers one side effect.
Assumes the workspace is not saved, so saves it under a random folder.
- trigger_all_side_effects()¶
Triggers all side effects.
Also saves the workspace under a temporary folder.
- class lynx.kite.WorkspaceParameter(name: str, kind: str, default_value: str = '')¶
Bases:
object
Represents a workspace parameter declaration.
- to_json() Dict[str, str] ¶
- lynx.kite.dedent(s: str | ParametricParameter) str | ParametricParameter ¶
Format sql queries to look nicer on the UI.
- lynx.kite.escape(s: str | ParametricParameter) str | ParametricParameter ¶
Sanitizes a string for injecting into a generated SQL query.
- lynx.kite.external(fn: Callable)¶
Decorator for executing computation outside of LynxKite in a LynxKite workflow.
Returns a custom box that internally exports the input tables and runs the external computation on them when it is triggered. The output of this custom box is the result of the external computation as a LynxKite table.
Example:
@external def titled_names(table, default): df = table.pandas() df['gender'] = df.gender.fillna(default) df['titled'] = np.where(df.gender == 'Female', 'Ms ' + df.name, 'Mr ' + df.name) return df t = titled_names(lk.createExampleGraph(), 'Female') t.trigger() # Causes the function to run. print(t.df())
Table state parameters of the function call are exported into a Parquet file. You can read them manually, or via the methods of the
InputTable
objects that are passed to your function in their place:filename
is the Hadoop path for the file. (I.e.file:/something
orhdfs:///something
.)pandas()
loads the Parquet file into a Pandas DataFrame.spark()
loads the Parquet file into a PySpark DataFrame.lk()
loads the Parquet file into a LynxKite state.
Your function can return one of the following types:
pandas.DataFrame
to be written to a Parquet file and imported in LynxKite.spark.DataFrame
to be written to a Parquet file and imported in LynxKite.A string that is the LynxKite prefixed path to a Parquet file that is your output.
A LynxKite table state to be written to a Parquet file and imported in LynxKite.
Why use LynxKite states in external computations? This allows the use of LynxKite as in a notebook environment. You can access the actual data and write code that is conditional on the data.
- lynx.kite.map_args(signature: Signature, bound: BoundArguments, map_fn: Callable[[str, Any], Any]) BoundArguments ¶
- lynx.kite.pp¶
alias of
ParametricParameter
- lynx.kite.random_filename() str ¶
- lynx.kite.serialize_deps(deps: Dict[Any, Set[Any]]) List[Any] ¶
Returns the keys of
deps
in an execution order that respects the dependencies.
- lynx.kite.subworkspace(fn: Callable)¶
Allows using the decorated function as a LynxKite custom box.
Example use:
@subworkspace def my_func(input1, other_arg1): return input1.sql1(sql=f'select {other_arg1} from vertices') df = my_func(lk.createExampleGraph(), 'name').df()
To make a multi-output box, return a dictionary instead of a state.
my_func()
can have any number of positional or keyword arguments. The arguments carrying states will be turned into the inputs of the custom box.Use
@ws_param
to take workspace parameters.To define a custom box with side-effects, take an argument with a default value of SideEffectCollector.AUTO. A fresh SideEffectCollector() will be provided each time the function is called.
@subworkspace def my_func(input1, sec=SideEffectCollector.AUTO): input1.saveAsSnapshot('x').register(sec) my_func(lk.createExampleGraph()).trigger()
- lynx.kite.text(name: str, default: str = '') WorkspaceParameter ¶
Helper function to make it easy to define a text kind ws parameter.
- lynx.kite.to_simple_dicts(obj)¶
Converts a SimpleNamespace structure (as returned from LynxKite requests) into simple dicts.
- lynx.kite.ws_name(name: str)¶
Specifies the name of the wrapped subworkspace.
Example use:
@ws_name('My nice workspace') @subworkspace def my_func(input1): return input1.sql1(sql='select * from vertices') my_func(lk.createExampleGraph())
- lynx.kite.ws_param(name: str, default: str = '', description: str = '')¶
Adds a workspace parameter to the wrapped subworkspace.
Example use:
@ws_param('p1') @ws_param('p2', default='x', description='The second column.') @subworkspace def my_func(input1): return input1.sql1(sql=pp('select $p1, $p2 from vertices')) my_func(lk.createExampleGraph(), p1='age', p2='name')
Often you just want to pass your workspace parameter along unchanged. You could just write
box(param=pp('$ws_param'))
. But if you take a keyword argument with the same name as a workspace parameter, this is even easier:@ws_params('p1') @subworkspace def my_func(input1, p1): return input1.sql1(sql=p1) # Equivalent to sql=pp('$p1').