Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/ni/datastore/data/_data_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
ReadConditionValueRequest,
ReadMeasurementValueRequest,
)
from ni.protobuf.types.precision_timestamp_conversion import (
hightime_datetime_to_protobuf,
)
from ni_grpc_extensions.channelpool import GrpcChannelPool

from ni.datastore.data._grpc_conversion import (
convert_read_condition_response_from_protobuf,
convert_read_measurement_response_from_protobuf,
get_publish_measurement_batch_timestamps,
get_publish_measurement_timestamp,
populate_publish_condition_batch_request_values,
populate_publish_condition_request_value,
Expand Down Expand Up @@ -347,7 +345,6 @@ def publish_measurement_batch(
publish_request = PublishMeasurementBatchRequest(
name=name,
step_id=step_id,
timestamps=[hightime_datetime_to_protobuf(ts) for ts in timestamps],
outcomes=[outcome.to_protobuf() for outcome in outcomes],
error_information=(
[ei.to_protobuf() for ei in (error_information or [])] if error_information else []
Expand All @@ -358,6 +355,9 @@ def publish_measurement_batch(
notes=notes,
)
populate_publish_measurement_batch_request_values(publish_request, values)
publish_request.timestamps.extend(
get_publish_measurement_batch_timestamps(publish_request, timestamps)
)
publish_response = self._get_data_store_client().publish_measurement_batch(publish_request)
return publish_response.measurement_ids

Expand Down
46 changes: 46 additions & 0 deletions src/ni/datastore/data/_grpc_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,49 @@ def get_publish_measurement_timestamp(
if no_client_timestamp_provided:
publish_time = waveform_t0
return publish_time


def get_publish_measurement_batch_timestamps(
publish_request: PublishMeasurementBatchRequest,
client_provided_timestamps: Iterable[ht.datetime],
) -> list[PrecisionTimestamp]:
"""Determine the correct timestamps to use for publishing a measurement batch."""
client_timestamps = [hightime_datetime_to_protobuf(ts) for ts in client_provided_timestamps]

waveform_t0s: list[PrecisionTimestamp] = []
value_case = publish_request.WhichOneof("values")
if value_case == "double_analog_waveform_values":
waveform_t0s = [w.t0 for w in publish_request.double_analog_waveform_values.waveforms]
elif value_case == "i16_analog_waveform_values":
waveform_t0s = [w.t0 for w in publish_request.i16_analog_waveform_values.waveforms]
elif value_case == "double_complex_waveform_values":
waveform_t0s = [w.t0 for w in publish_request.double_complex_waveform_values.waveforms]
elif value_case == "i16_complex_waveform_values":
waveform_t0s = [w.t0 for w in publish_request.i16_complex_waveform_values.waveforms]
elif value_case == "digital_waveform_values":
waveform_t0s = [w.t0 for w in publish_request.digital_waveform_values.waveforms]

# Determining count here accounts for the case where the user passes in less
# timestamps than the number of waveforms in the batch. In that case, we will
# backfill the "missing" timestamps with waveform t0 or "now".
# TODO: If the user passes in more timestamps than the number of waveforms,
# we'll end up passing that same larger number of timestamps into the publish
# request. Is this OK? Is the server going to get confused about this?
count = max(len(client_timestamps), len(waveform_t0s))
now = hightime_datetime_to_protobuf(ht.datetime.now(std_datetime.timezone.utc))
default_t0 = PrecisionTimestamp()

publish_times: list[PrecisionTimestamp] = []
for i in range(count):
if i < len(client_timestamps):
publish_times.append(client_timestamps[i])
else:
t0 = waveform_t0s[i] if i < len(waveform_t0s) else None
# If an initialized waveform t0 value is present and no client timestamp was
# provided, use the waveform t0 as the measurement start time.
if t0 is not None and t0 != default_t0:
publish_times.append(t0)
else:
publish_times.append(now)

return publish_times
97 changes: 97 additions & 0 deletions tests/unit/data/test_publish_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,33 @@ def test___publish_analog_waveform_data_without_timestamp_parameter___uses_wavef
assert request.timestamp == hightime_datetime_to_protobuf(timestamp)


def test___batch_publish_analog_waveform_data_without_timestamp_parameter___uses_waveform_t0s(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
) -> None:
timestamp = datetime.now(tz=std_datetime.timezone.utc)
waveform_values = [1.0, 2.0, 3.0]
analog_waveforms = [

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these tests use more than one waveform for completeness?

AnalogWaveform(
sample_count=len(waveform_values),
raw_data=np.array(waveform_values, dtype=np.float64),
timing=Timing.create_with_regular_interval(timedelta(seconds=1), timestamp),
),
]
expected_response = PublishMeasurementBatchResponse(measurement_ids=["response_id"])
mocked_data_store_service_client.publish_measurement_batch.return_value = expected_response

measurement_ids = data_store_client.publish_measurement_batch(
"name", analog_waveforms, "step_id"
)

args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args
request = cast(PublishMeasurementBatchRequest, args[0])
assert next(iter(measurement_ids)) == "response_id"
timestamp_proto = hightime_datetime_to_protobuf(timestamp)
assert request.timestamps == [timestamp_proto]


def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
Expand All @@ -225,6 +252,29 @@ def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter(
assert request.timestamp == hightime_datetime_to_protobuf(timestamp)


def test___batch_publish_analog_waveform_data_without_t0___uses_timestamps_parameter(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
) -> None:
timestamp = datetime.now(tz=std_datetime.timezone.utc)
analog_waveforms = [AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float)]
publish_measurement_batch_response = PublishMeasurementBatchResponse(
measurement_ids=["response_id"]
)
mocked_data_store_service_client.publish_measurement_batch.return_value = (
publish_measurement_batch_response
)

measurement_ids = data_store_client.publish_measurement_batch(
"name", analog_waveforms, "step_id", [timestamp]
)

args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args
request = cast(PublishMeasurementBatchRequest, args[0])
assert measurement_ids == ["response_id"]
assert request.timestamps == [hightime_datetime_to_protobuf(timestamp)]


def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___uses_provided_timestamp(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
Expand All @@ -251,6 +301,34 @@ def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___us
assert request.timestamp == hightime_datetime_to_protobuf(mismatched_timestamp)


def test___batch_publish_analog_waveform_data_with_mismatched_timestamp_parameter___uses_provided_timestamps(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
) -> None:
timestamp = datetime.now(tz=std_datetime.timezone.utc)
waveform_values = [1.0, 2.0, 3.0]
analog_waveforms = [
AnalogWaveform(
sample_count=len(waveform_values),
raw_data=np.array(waveform_values, dtype=np.float64),
timing=Timing.create_with_regular_interval(timedelta(seconds=1), timestamp),
)
]
mismatched_timestamp = timestamp + timedelta(seconds=1)
mocked_data_store_service_client.publish_measurement_batch.return_value = (
PublishMeasurementBatchResponse(measurement_ids=["response_id"])
)

measurement_ids = data_store_client.publish_measurement_batch(
"name", analog_waveforms, "step_id", [mismatched_timestamp]
)

args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args
request = cast(PublishMeasurementBatchRequest, args[0])
assert measurement_ids == ["response_id"]
assert request.timestamps == [hightime_datetime_to_protobuf(mismatched_timestamp)]


def test___publish_analog_waveform_data_without_t0_or_timestamp___uses_now(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
Expand All @@ -270,6 +348,25 @@ def test___publish_analog_waveform_data_without_t0_or_timestamp___uses_now(
assert request.timestamp == hightime_datetime_to_protobuf(now)


def test___batch_publish_analog_waveform_data_without_t0_or_timestamp___uses_now(
data_store_client: DataStoreClient,
mocked_data_store_service_client: NonCallableMock,
) -> None:
now = datetime.now(tz=std_datetime.timezone.utc)
analog_waveforms = [AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float)]
mocked_data_store_service_client.publish_measurement_batch.return_value = (
PublishMeasurementBatchResponse(measurement_ids=["response_id"])
)

with unittest.mock.patch("ni.datastore.data._grpc_conversion.ht.datetime") as mock_ht_datetime:
mock_ht_datetime.now.return_value = now
data_store_client.publish_measurement_batch("name", analog_waveforms, "step_id")

args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args
request = cast(PublishMeasurementBatchRequest, args[0])
assert request.timestamps == [hightime_datetime_to_protobuf(now)]


def test___none___publish_measurement___raises_type_error(
data_store_client: DataStoreClient,
) -> None:
Expand Down
Loading