Write Data

Use the Python client to write to a Synnax cluster.

Synnax supports multiple methods for writing data to a cluster. We can write directly to a channel, fetch a range and set its data, or leverage writers for writing large volumes of data.

Writes in Synnax are more complicated than reads, and, as such, we recommend checking out our concepts page to learn more about the best practices for writing data to Synnax. The rules of writes are especially important.

Writing to a Channel

Writing to a channel requires us to write timestamps to its index channel before we write the data. We’ll create the following channels to use as examples:

import synnax as sy

# Create the index
timestamps = client.channels.create(
    name="timestamps",
    data_type=sy.DataType.TIMESTAMP,
    is_index=True
)

# Create the temperature channel
my_precise_tc = client.channels.create(
    name="my_precise_tc",
    data_type=sy.DataType.FLOAT32,
    index=timestamps.key
)

# Create the pressure channel
my_precise_tc = client.channels.create(
    name="my_precise_pt",
    data_type=sy.DataType.FLOAT32,
    index=timestamps.key
)

We’ll make sure to write timestamps to the index before we write to the data channel:

from datetime import datetime

# Our temperature data.
temperatures = [55, 55.1, 55.7, 57.2, 58.1]
start = sy.TimeStamp.now()
times = [
    start,
    start + 1 * sy.TimeSpan.HOUR,
    start + 2 * sy.TimeSpan.HOUR,
    start + 3 * sy.TimeSpan.HOUR,
    start + 4 * sy.TimeSpan.HOUR,
]

# Write the timestamps to the index
timestamps.write(start, times)

# Write the data to the channel
my_precise_tc.write(start, temperatures)

Notice how we align the two arrays using the common start timestamp. This tells Synnax that the first sample in the temperatures array is associated with the first timestamp in the timestamps array.

Synnax will raise a ValidationError if the index channel does not contain a corresponding timestamp for every sample in the data channel. After all, it wouldn’t make sense to have a temperature reading without an associated timestamp.

Writing to a Range

Writing to a range takes away the burden of needing to correctly align the data from different channels.

We’ll create the following range as an example:

import synnax as sy

# Create the range
burst_test = client.ranges.create(
    name="burst_test",
    time_range=sy.TimeRange(
        start=sy.TimeStamp.now(),
        end=sy.TimeStamp.now() + 1 * sy.TimeSpan.HOUR
    )
)

Then, we’ll write to the range using the write method:

temperatures = [55, 55.1, 55.7, 57.2, 58.1, 58.9, 59.1, 59.2, 59.3]
pressures = [100, 100.1, 100.7, 102.2, 103.1, 103.9, 104.1, 104.2, 104.3]

# This call to write will assume that the timestamp of the first sample is
# the start of the range.
burst_test.write({
    "my_precise_tc": temperatures,
    "my_precise_pt": pressures,
})

Using a Writer

While the above methods are great for writing static, existing data, it’s common to write data in a streaming fashion as it’s acquired. This is especially useful for use in control sequences and live data processing. The Writer class is designed for this use case (and is actually used under the hood by the other methods).

To keep things intuitive, the writer maintains a file-like interface that is similar to Python’s built-in file objects. There are a few key differences, the most important being that writers are governed by a transaction. If you’d like to learn more about transactions and how writes work in Synnax, check out the concepts page.

We’ll create the following channels to use as examples:

import synnax as sy

# Create the index
timestamps = client.channels.create(
    name="timestamps",
    data_type=sy.DataType.TIMESTAMP,
    is_index=True
)

# Create the temperature channel
my_precise_tc = client.channels.create(
    name="my_precise_tc",
    data_type=sy.DataType.FLOAT32,
    index=timestamps
)

To open the writer, we use the open_writer method on the client and provide a starting timestamp for the first sample and a list of channels we’d like to write to:

import time

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })
        time.sleep(0.1)
    writer.commit()

This example will write 100 samples to the my_precise_tc channel, each spaced roughly 0.1 seconds apart, and will commit all writes when finished.

It’s typical to write and commit millions of samples over the course of hours or days, intermittently calling commit to ensure that the data is persisted to the cluster.

We recommend using writers within a context manager. This ensures that a writer is properly closed after use, ensuring that resources have been freed and sockets are closed.

If you can’t use a context manager, make sure you call writer.close() when you’re done using it.

Auto-Commit

You can also configure a writer to automatically commit written data after each write, making it immediately available for read access. To do this, set the enable_auto_commit argument to True when opening the writer:

import time

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    enable_auto_commit=True,
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })
        time.sleep(0.1)

Write Authorities

Writers support dynamic control handoff. Multiple writers can be opened on a channel at the same time, but only one writer is allowed to write to the channel. To determine which writer has control, an authority from 0 to 255 is assigned to each writer (or, optionally, each channel in the writer). The writer with the highest authority will be allowed to write. If two writers have the same authority, the writer that opened first will be allowed to write. For more information, see the concepts page on writers.

By default, writers are opened with an authority of ABSOLUTE i.e. 255. This means that no other writers can write to the channel as long as the writer is open.

Opening a writer with the same authority on all channels

To open a writer with the same authority on all channels, you can pass the authority argument with an integer.

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    authority=100,
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })
        time.sleep(0.1)

Opening a writer with different authorities on each channel

To open a writer with different authorities on each channel, you can pass the authority argument with a list of integers. This list must be the same length as the number of channels in the writer.

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    authority=[100, 200],
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })
        time.sleep(0.1)

Adjusting write authorities after open

To change the authority of a writer during operation, just call set_authority. This method accepts a dictionary with the channel name as the key and the new authority as the value.

# Set the authority on all channels
writer.set_authority(200)
# Set the authority on just a few channels
writer.set_authority({
    "timestamps": 150,
    "my_precise_tc": 250,
})

Persistence/Streaming Mode

By default, writers are opened in stream + persist mode. To change the mode of a writer, specify the enum value of the mode argument when opening the writer. This can be persist, stream, or persist_stream.

For example, to open a writer that only persists data:

import synnax as sy

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    mode="persist"
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })

Common Pitfalls

There are several common pitfalls to avoid when writing data to a Synnax cluster. These are important to avoid as they can lead to performance degradation and/or control issues.

Using Many Individual Write Calls Instead of a Writer

When writing large volumes of data in a streaming fashion (or in batches), it’s important to use a writer instead of making individual write calls to a channel. Calls to write on a channel use an entirely new transaction for each call - constantly creating, committing, and closing transactions has a dramatic impact on performance. So, don’t do this:

time = client.channels.retrieve("timestamps")
my_tc = client.channels.retrieve("my_precise_tc")
for i in range(100):
    # This is a very bad idea
    ts = sy.TimeStamp.now()
    time.write(ts, ts)
    my_tc.write(ts, i)

This is also a bad idea:

# open and close a transaction for every write
for i in range(100):
    with client.open_writer(
        start=sy.TimeStamp.now(),
        channels=["timestamps", "my_precise_tc"],
        enable_auto_commit=True,
    ) as writer:
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })

Instead, repeatedly call write on a single writer:

# This is dramatically more efficient
with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    enable_auto_commit=True,
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })

Calling Commit on Every Write

If you’re not using auto-commit, it’s important to call commit on the writer periodically to ensure that the data is persisted to the cluster. However, calling commit on every write is a bad idea. This is because commit requires a round-trip to the cluster to ensure that the data is persisted. This can be very slow if you’re writing a lot of data. If you’re writing a lot of data, commit every few seconds or turn on auto-commit.

This is a bad idea:

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })
        writer.commit()

Instead, use auto-commit:

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["timestamps", "my_precise_tc"],
    enable_auto_commit=True,
) as writer:
    for i in range(100):
        writer.write({
            "timestamps": sy.TimeStamp.now(),
            "my_precise_tc": i,
        })