feat: implement batch 3 temporal functions (make_date, make_timestamp, make_timestamp_ltz, last_day, next_day)#6672
Conversation
Greptile SummaryThis PR adds five temporal functions (
Confidence Score: 4/5Not safe to merge as-is — the fractional-seconds rounding bug silently produces nulls for valid inputs in both timestamp constructors. One P1 defect: rounding frac_micros to exactly 1_000_000 causes from_hms_micro_opt to return None and the row becomes null rather than the expected timestamp. This is a present, reproducible data-correctness bug. All other findings are P2 (style, rare-edge-case panic, duplication). src/daft-functions-temporal/src/date_construction.rs — both MakeTimestamp::call and MakeTimestampLtz::call need the frac_micros carry fix. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
PY["Python API\ndaft.functions.datetime"] --> EXPR["Expression._call_builtin_scalar_fn"]
SQL["SQL Layer\ndaft-sql/modules/temporal.rs"] --> EXPR
EXPR --> UDF["Rust UDFs\ndaft-functions-temporal"]
UDF --> MD["MakeDate\n→ DataType::Date"]
UDF --> MT["MakeTimestamp\n→ Timestamp(μs, tz?)"]
UDF --> MLZ["MakeTimestampLtz\n→ Timestamp(μs, UTC)"]
UDF --> LD["LastDay\n→ DataType::Date"]
UDF --> ND["NextDay\n→ DataType::Date"]
MT -->|"timezone=None"| TsLocal["Naive timestamp"]
MT -->|"timezone=Some(tz)"| TsMeta["Timestamp with tz metadata"]
MLZ -->|"timezone=None"| UTC["Treat as UTC"]
MLZ -->|"timezone=Some(tz)"| CONV["Convert from src tz → UTC\n(chrono-tz)"]
Reviews (1): Last reviewed commit: "feat: implement batch 3 temporal functio..." | Re-trigger Greptile |
| let whole_secs = s as u32; | ||
| let frac_micros = ((s - whole_secs as f64) * 1_000_000.0).round() as u32; | ||
| let date = NaiveDate::from_ymd_opt(y, mo as u32, d as u32)?; | ||
| let time = chrono::NaiveTime::from_hms_micro_opt( | ||
| h as u32, | ||
| mi as u32, | ||
| whole_secs, | ||
| frac_micros, | ||
| )?; |
There was a problem hiding this comment.
Fractional seconds rounding can produce spurious null
When s is very close to a whole second (e.g. s = 0.9999999), frac_micros is computed as 1_000_000 after rounding. NaiveTime::from_hms_micro_opt requires micro < 1_000_000, so it returns None and the row becomes null even though the input is valid. The fix is to carry the overflow into whole_secs:
| let whole_secs = s as u32; | |
| let frac_micros = ((s - whole_secs as f64) * 1_000_000.0).round() as u32; | |
| let date = NaiveDate::from_ymd_opt(y, mo as u32, d as u32)?; | |
| let time = chrono::NaiveTime::from_hms_micro_opt( | |
| h as u32, | |
| mi as u32, | |
| whole_secs, | |
| frac_micros, | |
| )?; | |
| let whole_secs = s as u32; | |
| let frac_micros_raw = ((s - whole_secs as f64) * 1_000_000.0).round() as u32; | |
| let (whole_secs, frac_micros) = if frac_micros_raw >= 1_000_000 { | |
| (whole_secs + 1, 0) | |
| } else { | |
| (whole_secs, frac_micros_raw) | |
| }; | |
| let date = NaiveDate::from_ymd_opt(y, mo as u32, d as u32)?; | |
| let time = chrono::NaiveTime::from_hms_micro_opt( | |
| h as u32, | |
| mi as u32, | |
| whole_secs, | |
| frac_micros, | |
| )?; |
The same issue exists in MakeTimestampLtz::call at the corresponding lines.
There was a problem hiding this comment.
valid edge case, will fix
| let last = if m == 12 { | ||
| NaiveDate::from_ymd_opt(y + 1, 1, 1) | ||
| } else { | ||
| NaiveDate::from_ymd_opt(y, m + 1, 1) | ||
| } | ||
| .unwrap() | ||
| - chrono::Duration::days(1); | ||
| date_to_days(last) |
There was a problem hiding this comment.
Potential panic on extreme year values in
last_day
When the input date has month 12 and the year is at or near the maximum representable year for NaiveDate (chrono caps around year 262142), NaiveDate::from_ymd_opt(y + 1, 1, 1) returns None and the subsequent .unwrap() panics rather than returning null. Replacing .unwrap() with ? inside the closure's Option-returning context would be safer:
| let last = if m == 12 { | |
| NaiveDate::from_ymd_opt(y + 1, 1, 1) | |
| } else { | |
| NaiveDate::from_ymd_opt(y, m + 1, 1) | |
| } | |
| .unwrap() | |
| - chrono::Duration::days(1); | |
| date_to_days(last) | |
| let first_of_next = if m == 12 { | |
| NaiveDate::from_ymd_opt(y + 1, 1, 1) | |
| } else { | |
| NaiveDate::from_ymd_opt(y, m + 1, 1) | |
| }?; | |
| let last = first_of_next - chrono::Duration::days(1); |
There was a problem hiding this comment.
good point, will change
| "log", | ||
| "log1p", | ||
| "log2", | ||
| "make_date", | ||
| "make_timestamp", | ||
| "make_timestamp_ltz", | ||
| "log10", |
There was a problem hiding this comment.
__all__ entries out of alphabetical order
"make_date", "make_timestamp", and "make_timestamp_ltz" are inserted between "log2" and "log10", breaking the alphabetical ordering of the list. They should appear after "map_get" and before "max".
| "log", | |
| "log1p", | |
| "log2", | |
| "make_date", | |
| "make_timestamp", | |
| "make_timestamp_ltz", | |
| "log10", | |
| "log10", | |
| "lower", | |
| "lpad", | |
| "lstrip", | |
| "map_get", | |
| "make_date", | |
| "make_timestamp", | |
| "make_timestamp_ltz", | |
| "max", |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| use std::sync::Arc; | ||
|
|
||
| use arrow_array::{Date32Array, TimestampMicrosecondArray}; | ||
| use chrono::NaiveDate; | ||
| use daft_core::datatypes::TimeUnit; | ||
| use daft_dsl::functions::prelude::*; | ||
|
|
||
| const UNIX_EPOCH: NaiveDate = match NaiveDate::from_ymd_opt(1970, 1, 1) { | ||
| Some(d) => d, | ||
| None => unreachable!(), | ||
| }; |
There was a problem hiding this comment.
There was a problem hiding this comment.
Doesn't seem to insanely worth doing. Maybe if other function implementations use it, I'd do it
| // --- MakeTimestamp --- | ||
|
|
||
| #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] | ||
| pub struct MakeTimestamp; | ||
|
|
||
| #[derive(FunctionArgs)] | ||
| struct MakeTimestampArgs<T> { | ||
| year: T, | ||
| month: T, | ||
| day: T, | ||
| hour: T, | ||
| minute: T, | ||
| second: T, | ||
| #[arg(optional)] | ||
| timezone: Option<String>, | ||
| } | ||
|
|
||
| #[typetag::serde] | ||
| impl ScalarUDF for MakeTimestamp { | ||
| fn name(&self) -> &'static str { | ||
| "make_timestamp" | ||
| } | ||
|
|
||
| fn call( | ||
| &self, | ||
| inputs: FunctionArgs<Series>, | ||
| _ctx: &daft_dsl::functions::scalar::EvalContext, | ||
| ) -> DaftResult<Series> { | ||
| let MakeTimestampArgs { | ||
| year, | ||
| month, | ||
| day, | ||
| hour, | ||
| minute, | ||
| second, | ||
| timezone, | ||
| } = inputs.try_into()?; | ||
|
|
||
| let year_i32 = year.cast(&DataType::Int32)?; | ||
| let month_i32 = month.cast(&DataType::Int32)?; | ||
| let day_i32 = day.cast(&DataType::Int32)?; | ||
| let hour_i32 = hour.cast(&DataType::Int32)?; | ||
| let minute_i32 = minute.cast(&DataType::Int32)?; | ||
| let second_f64 = second.cast(&DataType::Float64)?; | ||
|
|
||
| let year_arr = year_i32.i32()?; | ||
| let month_arr = month_i32.i32()?; | ||
| let day_arr = day_i32.i32()?; | ||
| let hour_arr = hour_i32.i32()?; | ||
| let minute_arr = minute_i32.i32()?; | ||
| let second_arr = second_f64.f64()?; | ||
|
|
||
| let field_name = year_arr.name().to_string(); | ||
|
|
||
| let values: Vec<Option<i64>> = year_arr | ||
| .into_iter() | ||
| .zip(month_arr.into_iter()) | ||
| .zip(day_arr.into_iter()) | ||
| .zip(hour_arr.into_iter()) | ||
| .zip(minute_arr.into_iter()) | ||
| .zip(second_arr.into_iter()) | ||
| .map(|(((((y, mo), d), h), mi), s)| match (y, mo, d, h, mi, s) { | ||
| (Some(y), Some(mo), Some(d), Some(h), Some(mi), Some(s)) => { | ||
| let whole_secs = s as u32; | ||
| let frac_micros = ((s - whole_secs as f64) * 1_000_000.0).round() as u32; | ||
| let date = NaiveDate::from_ymd_opt(y, mo as u32, d as u32)?; | ||
| let time = chrono::NaiveTime::from_hms_micro_opt( | ||
| h as u32, | ||
| mi as u32, | ||
| whole_secs, | ||
| frac_micros, | ||
| )?; | ||
| let dt = chrono::NaiveDateTime::new(date, time); | ||
| let epoch = chrono::NaiveDateTime::new(UNIX_EPOCH, chrono::NaiveTime::MIN); | ||
| dt.signed_duration_since(epoch).num_microseconds() | ||
| } | ||
| _ => None, | ||
| }) | ||
| .collect(); | ||
|
|
||
| let arrow_arr: arrow_array::ArrayRef = Arc::new(TimestampMicrosecondArray::from(values)); | ||
| Series::from_arrow( | ||
| Arc::new(Field::new( | ||
| field_name, | ||
| DataType::Timestamp(TimeUnit::Microseconds, timezone), | ||
| )), | ||
| arrow_arr, | ||
| ) | ||
| } | ||
|
|
||
| fn get_return_field( | ||
| &self, | ||
| inputs: FunctionArgs<ExprRef>, | ||
| schema: &Schema, | ||
| ) -> DaftResult<Field> { | ||
| let MakeTimestampArgs { | ||
| year, | ||
| month, | ||
| day, | ||
| hour, | ||
| minute, | ||
| second, | ||
| timezone, | ||
| } = inputs.try_into()?; | ||
| let year_field = year.to_field(schema)?; | ||
| let month_field = month.to_field(schema)?; | ||
| let day_field = day.to_field(schema)?; | ||
| let hour_field = hour.to_field(schema)?; | ||
| let minute_field = minute.to_field(schema)?; | ||
| let second_field = second.to_field(schema)?; | ||
| ensure!( | ||
| year_field.dtype.is_integer(), | ||
| TypeError: "Expected integer for year, got {}", | ||
| year_field.dtype | ||
| ); | ||
| ensure!( | ||
| month_field.dtype.is_integer(), | ||
| TypeError: "Expected integer for month, got {}", | ||
| month_field.dtype | ||
| ); | ||
| ensure!( | ||
| day_field.dtype.is_integer(), | ||
| TypeError: "Expected integer for day, got {}", | ||
| day_field.dtype | ||
| ); | ||
| ensure!( | ||
| hour_field.dtype.is_integer(), | ||
| TypeError: "Expected integer for hour, got {}", | ||
| hour_field.dtype | ||
| ); | ||
| ensure!( | ||
| minute_field.dtype.is_integer(), | ||
| TypeError: "Expected integer for minute, got {}", | ||
| minute_field.dtype | ||
| ); | ||
| ensure!( | ||
| second_field.dtype.is_numeric(), | ||
| TypeError: "Expected numeric for second, got {}", | ||
| second_field.dtype | ||
| ); | ||
| Ok(Field::new( | ||
| year_field.name, | ||
| DataType::Timestamp(TimeUnit::Microseconds, timezone), | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| // --- MakeTimestampLtz --- |
There was a problem hiding this comment.
Heavy code duplication between
MakeTimestamp and MakeTimestampLtz
The call implementations of MakeTimestamp and MakeTimestampLtz are nearly identical — the only differences are the timezone handling logic and the output dtype. Per the team's convention, prefer a single parametrized function with a ltz: bool or mode flag rather than two separate structs. This also halves the surface area for the rounding bug noted above.
Rule Used: Prefer single parametrized functions over multiple... (source)
Learnt From
Eventual-Inc/Daft#5207
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
following pattern from other files
…, make_timestamp_ltz, last_day, next_day)
9199450 to
f7d4230
Compare
| )?; | ||
| let dt = chrono::NaiveDateTime::new(date, time); | ||
| let epoch = chrono::NaiveDateTime::new(UNIX_EPOCH, chrono::NaiveTime::MIN); | ||
| dt.signed_duration_since(epoch).num_microseconds() |
There was a problem hiding this comment.
I think there is a bug here, we need to use time timezone when returning this dt right? something like:tz.from_local_datetime(&naive_dt).single()?.timestamp_micros().
Can we add a test with a non-utc timestamp?
| common_error::DaftError::ValueError( | ||
| "next_day requires a day_of_week argument".to_string(), | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
You have this marked as optional above right? should make this consistent between both
#[derive(FunctionArgs)]
struct NextDayArgs<T> {
input: T,
#[arg(optional)]
day_of_week: Option<String>,
}| let year_i32 = year.cast(&DataType::Int32)?; | ||
| let month_i32 = month.cast(&DataType::Int32)?; | ||
| let day_i32 = day.cast(&DataType::Int32)?; |
There was a problem hiding this comment.
ooc is there a reason we cast to i32? why not just cast to u32 here?
| def test_make_timestamp_with_timezone() -> None: | ||
| df = daft.from_pydict({"y": [2021], "m": [1], "d": [1], "h": [12], "mi": [0], "s": [0.0]}) | ||
| df = df.with_column( | ||
| "ts", make_timestamp(col("y"), col("m"), col("d"), col("h"), col("mi"), col("s"), timezone="UTC") | ||
| ) | ||
| result = df.to_pydict() | ||
| assert result["ts"] == [datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc)] |
There was a problem hiding this comment.
I think adding a test with a different timezone might catch the bug I mentioned earlier
| for dow in ["Monday", "Friday", "Sunday"]: | ||
| tmp = df.with_column("next_d", next_day(col("dt"), dow)) | ||
| results[dow] = tmp.to_pydict()["next_d"] |
There was a problem hiding this comment.
maybe use different short forms in this test since we support it
Summary
Implements 5 new temporal functions for PySpark parity (issue #3798):
make_date(year, month, day): constructs a Date from integer components; invalid dates return nullmake_timestamp(year, month, day, hour, minute, second, [timezone]): constructs a Timestamp with optional timezone metadata; supports fractional secondsmake_timestamp_ltz(year, month, day, hour, minute, second, [timezone]): constructs a UTC Timestamp, optionally interpreting components in a source timezone and converting to UTClast_day(date): returns the last day of the month for the given datenext_day(date, day_of_week): returns the next occurrence of the specified weekday after the given dateAll functions have Rust UDFs, SQL wrappers, Python wrappers, and tests.
Test plan
test_make_date: valid dates (2021-01-15, 2020-02-29 leap year, 2000-12-31)test_make_date_invalid: invalid dates (Feb 30, month 13) return nulltest_make_timestamp: basic timestamp constructiontest_make_timestamp_with_timezone: verify timezone metadata on dtypetest_make_timestamp_ltz: verify UTC dtype outputtest_make_timestamp_ltz_with_timezone: US/Eastern -> UTC conversion (12:00 EST = 17:00 UTC)test_last_day: Jan 31, Feb 28 (non-leap), Feb 29 (leap), Apr 30test_next_day: next Monday/Friday/Sunday from a Fridaytest_date_construction_sql: SQL integration (make_date, last_day, next_day)