WindProbe
Sensors
The WindProbe measures:
- 3 axis wind speed (X, Y, Z) (meters / second)
- Temperature (°C)
- Relative Static Pressure (Pascal)
- Atomospheric Pressure (hecto Pascal)
Zeroing
The probe sensors may have some bias that can be corrected by performing a zeroing procedure. This is done by placing the probe in a known environment (e.g. no wind, no movement) and calling the zero_sensors() method.
import os
import threading
from dotenv import load_dotenv
from windsuite_sdk import WindsuiteSDK
load_dotenv()
SERVER_IP_ADDRESS = os.getenv("SERVER_IP_ADDRESS", default="localhost")
stop_event = threading.Event()
def main() -> None:
base_url = f"http://{SERVER_IP_ADDRESS}"
print(f"Connecting to WindSuite server at {base_url}")
sdk = WindsuiteSDK(base_url=base_url)
sdk.start_communication()
main_loop_hz = 25
try:
success = sdk.zero_windprobe()
if success:
print("Windprobe zeroed successfully.")
while not stop_event.wait(timeout=(1.0 / main_loop_hz)):
# ! DO WHATEVER
pass
except KeyboardInterrupt:
print("\nShutting down...")
stop_event.set()
finally:
sdk.fan_controller.set_intensity(0).apply()
sdk.cleanup()
print("SDK stopped")
if __name__ == "__main__":
main()
Correction
The probe can be tracked using Motion Capture systems (e.g. Optitrack, Vicon) for spatial wind measurements .
WindShaper referential
The WindProbe and the WindShaper are both tracked.
We can thus provide Wind measurements in the WindShaper referential, meaning that the wind vectors are given relative to the WindShaper position and orientation.
Movement Induced Airflow Correction
Since the WindProbe can be moved around in the wind field, movements can create additional airflow around the probe, creating measurement errors.
These errors are corrected using the tracking system, by subtracting the probe's movement-induced airflow from the measured wind.
Users still have access to:
- The raw WindProbe data
- The data in the WindShaper referential without movement correction
- The corrected (for movement) data in the WindShaper referential.
Callbacks
To access the WindProbe data, users can register a callback function.
The given function will automatically be called each time a new WindProbe measurement is available, with the new data passed as an argument to the function.
WindProbe Data callback
=== Wind Probe Data ===
Timestamp: 1770394125.7061703s
Wind Velocity (probe ref): (3.29, -0.54, 14.88) m/s
Wind Velocity (windshaper ref): (3.29, -0.54, 14.88) m/s
Wind Velocity (windshaper ref - Corrected): (3.29, -0.54, 14.88) m/s
Temperature: 20.38°C
Atmospheric Pressure: 945.13 hPa
Static Pressure: 0.41 Pa
import os
import threading
from dotenv import load_dotenv
from windsuite_sdk import WindProbeData, WindsuiteSDK
load_dotenv()
SERVER_IP_ADDRESS = os.getenv("SERVER_IP_ADDRESS", default="localhost")
# THIS IS THE CALLBACK DEFINITION
def on_windprobe_data(data: WindProbeData) -> None:
print(f"Timestamp: {data.timestamp_s}s")
print(
f"Wind Velocity (probe ref): ({data.wind_velocity_mps_probe_ref.x:.2f}, "
f"{data.wind_velocity_mps_probe_ref.y:5.2f}, "
f"{data.wind_velocity_mps_probe_ref.z:5.2f}) m/s"
)
print(
f"Wind Velocity (windshaper ref): ({data.wind_velocity_mps_windshaper_ref.x:.2f}, "
f"{data.wind_velocity_mps_windshaper_ref.y:6.2f}, "
f"{data.wind_velocity_mps_windshaper_ref.z:6.2f}) m/s"
)
print(
f"Wind Velocity (windshaper ref - Corrected): ({data.wind_velocity_mps_windshaper_ref_corrected.x:.2f}, "
f"{data.wind_velocity_mps_windshaper_ref_corrected.y:6.2f}, "
f"{data.wind_velocity_mps_windshaper_ref_corrected.z:6.2f}) m/s"
)
print(f"Temperature: {data.temperature_celcius:.2f}°C")
print(f"Atmospheric Pressure: {data.atmospheric_pressure_hpascal:.2f} hPa")
print(f"Static Pressure: {data.static_pressure_pascal:.2f} Pa")
print()
stop_event = threading.Event()
def main() -> None:
"""Main function to run the WindSuite SDK example."""
base_url = f"http://{SERVER_IP_ADDRESS}"
print(f"Connecting to WindSuite server at {base_url}")
sdk = WindsuiteSDK(base_url=base_url)
sdk.register_windprobe_callback(callback=on_windprobe_data)
sdk.start_communication()
try:
freq_hz = 25
while not stop_event.wait(timeout=(1.0 / freq_hz)):
# ! DO WHATEVER
pass
except KeyboardInterrupt:
print("\nShutting down...")
stop_event.set()
finally:
sdk.fan_controller.set_intensity(0).apply()
sdk.cleanup()
print("SDK stopped")
if __name__ == "__main__":
main()
Tracked Probe Data callback
For convinience, The SDK also provides a callback that will always give the best WindProbe data available with it's associated position and orientation.
Selection Priority:
- Movement & Orientation Corrected - Most accurate (when stable tracking is available)
- Orientation Corrected Only - Good accuracy (when tracking is partial/unstable)
- Raw Data - Basic fallback (when no tracking is available)
This ensures you always get the best possible wind measurements while gracefully handling tracking system limitations.
Velocity (m/s): (-0.00, 3.57, 16.69) mag: 17.06
Position (mm): (70.2, 441.0, 300.0)
import os
import threading
import numpy as np
from dotenv import load_dotenv
from windsuite_sdk import TrackedProbeData, WindsuiteSDK
load_dotenv()
SERVER_IP_ADDRESS = os.getenv("SERVER_IP_ADDRESS", default="172.16.80.203")
TIME_WINDOW_SECONDS = 10.0
def on_tracked_probe_data(data: TrackedProbeData) -> None:
"""
Callback for wind probe data
"""
x_probe = data.wind_velocity_mps.x
y_probe = data.wind_velocity_mps.y
z_probe = data.wind_velocity_mps.z
mag_probe = np.sqrt(x_probe**2 + y_probe**2 + z_probe**2)
x_position = data.position_data_mm.x
y_position = data.position_data_mm.y
z_position = data.position_data_mm.z
print(
f"Received data - Velocity (m/s): ({x_probe:.2f}, {y_probe:.2f}, {z_probe:.2f}), Magnitude: {mag_probe:.2f}"
)
print(f"Position (mm): ({x_position:.1f}, {y_position:.1f}, {z_position:.1f})")
stop_event = threading.Event()
def main() -> None:
"""Main function to run the WindSuite SDK with Rerun visualization."""
base_url = f"http://{SERVER_IP_ADDRESS}"
print(f"Connecting to WindSuite server at {base_url}")
sdk = WindsuiteSDK(base_url=base_url)
sdk.register_tracked_probe_callback(callback=on_tracked_probe_data)
sdk.start_communication()
print("Listening for wind probe data. Press Ctrl+C to stop...")
try:
while not stop_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
print("\nShutting down...")
stop_event.set()
finally:
sdk.cleanup()
print("SDK stopped")
if __name__ == "__main__":
main()
