@@ -3,6 +3,7 @@ import { z } from "zod";
33import { ClickhouseClient } from "./client/client.js" ;
44import {
55 TASK_RUN_INDEX ,
6+ composeTaskRunVersion ,
67 getChildRunStatusCounts ,
78 getTaskRunsQueryBuilder ,
89 insertRawTaskRunPayloadsCompactArrays ,
@@ -887,4 +888,313 @@ describe("Task Runs V2", () => {
887888 ) ;
888889 }
889890 ) ;
891+
892+ clickhouseTest (
893+ "should collapse the same run from two producers to one latest-snapshot row" ,
894+ async ( { clickhouseContainer } ) => {
895+ const client = new ClickhouseClient ( {
896+ name : "test" ,
897+ url : clickhouseContainer . getConnectionUrl ( ) ,
898+ } ) ;
899+ const insert = insertTaskRunsCompactArrays ( client , { async_insert : 0 } ) ;
900+
901+ const createdAt = new Date ( "2025-04-30 16:34:04.312" ) . getTime ( ) ;
902+
903+ const base : TaskRunInsertArray = [
904+ "cm9kddfcs01zqdy88ld9mmrli" ,
905+ "cm8zs78wb0002dy616dg75tv3" ,
906+ "cm9kddfbz01zpdy88t9dstecu" ,
907+ "cma45oli70002qrdy47w0j4n7" ,
908+ createdAt ,
909+ createdAt ,
910+ "PENDING" ,
911+ "PRODUCTION" ,
912+ "run_cma45oli70002qrdy47w0j4n7" ,
913+ 1 ,
914+ "V2" ,
915+ "retry-task" ,
916+ "task/retry-task" ,
917+ "" ,
918+ "" ,
919+ null ,
920+ null ,
921+ null ,
922+ null ,
923+ createdAt ,
924+ null ,
925+ 0 ,
926+ 0 ,
927+ 0 ,
928+ { data : null } ,
929+ { data : null } ,
930+ "" ,
931+ [ ] ,
932+ "" ,
933+ "" ,
934+ "" ,
935+ "" ,
936+ "" ,
937+ "" ,
938+ 0 ,
939+ "span" ,
940+ "trace" ,
941+ "" ,
942+ "" ,
943+ "" ,
944+ "" ,
945+ true ,
946+ "1" ,
947+ 0 ,
948+ "" ,
949+ [ ] ,
950+ "" ,
951+ "" ,
952+ "" ,
953+ null ,
954+ "" ,
955+ "" ,
956+ "" ,
957+ null ,
958+ ] ;
959+
960+ const rdsSnapshot : TaskRunInsertArray = [ ...base ] ;
961+ rdsSnapshot [ TASK_RUN_INDEX . status ] = "PENDING" ;
962+ rdsSnapshot [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
963+ originGeneration : 0 ,
964+ lsnVersion : 9_000_000_000n ,
965+ } ) . toString ( ) ;
966+
967+ const psSnapshot : TaskRunInsertArray = [ ...base ] ;
968+ psSnapshot [ TASK_RUN_INDEX . status ] = "COMPLETED_SUCCESSFULLY" ;
969+ psSnapshot [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
970+ originGeneration : 1 ,
971+ lsnVersion : 10n ,
972+ } ) . toString ( ) ;
973+
974+ const [ insertError ] = await insert ( [ rdsSnapshot , psSnapshot ] ) ;
975+ expect ( insertError ) . toBeNull ( ) ;
976+
977+ const query = client . query ( {
978+ name : "q" ,
979+ query :
980+ "SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL" ,
981+ schema : z . object ( { run_id : z . string ( ) , status : z . string ( ) , total : z . number ( ) . int ( ) } ) ,
982+ } ) ;
983+ const [ queryError , result ] = await query ( { } ) ;
984+ expect ( queryError ) . toBeNull ( ) ;
985+ expect ( result ) . toHaveLength ( 1 ) ;
986+ expect ( result ?. [ 0 ] ) . toEqual (
987+ expect . objectContaining ( {
988+ run_id : "cma45oli70002qrdy47w0j4n7" ,
989+ status : "COMPLETED_SUCCESSFULLY" ,
990+ } )
991+ ) ;
992+ }
993+ ) ;
994+
995+ clickhouseTest (
996+ "should keep the latest intra-producer snapshot (same generation, ascending LSN)" ,
997+ async ( { clickhouseContainer } ) => {
998+ const client = new ClickhouseClient ( {
999+ name : "test" ,
1000+ url : clickhouseContainer . getConnectionUrl ( ) ,
1001+ } ) ;
1002+ const insert = insertTaskRunsCompactArrays ( client , { async_insert : 0 } ) ;
1003+
1004+ const createdAt = new Date ( "2025-04-30 16:34:04.312" ) . getTime ( ) ;
1005+
1006+ const base : TaskRunInsertArray = [
1007+ "cm9kddfcs01zqdy88ld9mmrli" ,
1008+ "cm8zs78wb0002dy616dg75tv3" ,
1009+ "cm9kddfbz01zpdy88t9dstecu" ,
1010+ "cma45oli70002qrdy47w0j4n7" ,
1011+ createdAt ,
1012+ createdAt ,
1013+ "PENDING" ,
1014+ "PRODUCTION" ,
1015+ "run_cma45oli70002qrdy47w0j4n7" ,
1016+ 1 ,
1017+ "V2" ,
1018+ "retry-task" ,
1019+ "task/retry-task" ,
1020+ "" ,
1021+ "" ,
1022+ null ,
1023+ null ,
1024+ null ,
1025+ null ,
1026+ createdAt ,
1027+ null ,
1028+ 0 ,
1029+ 0 ,
1030+ 0 ,
1031+ { data : null } ,
1032+ { data : null } ,
1033+ "" ,
1034+ [ ] ,
1035+ "" ,
1036+ "" ,
1037+ "" ,
1038+ "" ,
1039+ "" ,
1040+ "" ,
1041+ 0 ,
1042+ "span" ,
1043+ "trace" ,
1044+ "" ,
1045+ "" ,
1046+ "" ,
1047+ "" ,
1048+ true ,
1049+ "1" ,
1050+ 0 ,
1051+ "" ,
1052+ [ ] ,
1053+ "" ,
1054+ "" ,
1055+ "" ,
1056+ null ,
1057+ "" ,
1058+ "" ,
1059+ "" ,
1060+ null ,
1061+ ] ;
1062+
1063+ const earlier : TaskRunInsertArray = [ ...base ] ;
1064+ earlier [ TASK_RUN_INDEX . status ] = "EXECUTING" ;
1065+ earlier [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
1066+ originGeneration : 1 ,
1067+ lsnVersion : 10n ,
1068+ } ) . toString ( ) ;
1069+
1070+ const later : TaskRunInsertArray = [ ...base ] ;
1071+ later [ TASK_RUN_INDEX . status ] = "COMPLETED_SUCCESSFULLY" ;
1072+ later [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
1073+ originGeneration : 1 ,
1074+ lsnVersion : 20n ,
1075+ } ) . toString ( ) ;
1076+
1077+ const [ insertError ] = await insert ( [ earlier , later ] ) ;
1078+ expect ( insertError ) . toBeNull ( ) ;
1079+
1080+ const query = client . query ( {
1081+ name : "q" ,
1082+ query :
1083+ "SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL" ,
1084+ schema : z . object ( { run_id : z . string ( ) , status : z . string ( ) , total : z . number ( ) . int ( ) } ) ,
1085+ } ) ;
1086+ const [ queryError , result ] = await query ( { } ) ;
1087+ expect ( queryError ) . toBeNull ( ) ;
1088+ expect ( result ) . toHaveLength ( 1 ) ;
1089+ expect ( result ?. [ 0 ] ) . toEqual (
1090+ expect . objectContaining ( {
1091+ run_id : "cma45oli70002qrdy47w0j4n7" ,
1092+ status : "COMPLETED_SUCCESSFULLY" ,
1093+ } )
1094+ ) ;
1095+ }
1096+ ) ;
1097+
1098+ clickhouseTest (
1099+ "should collapse to the same winner regardless of insert order" ,
1100+ async ( { clickhouseContainer } ) => {
1101+ const client = new ClickhouseClient ( {
1102+ name : "test" ,
1103+ url : clickhouseContainer . getConnectionUrl ( ) ,
1104+ } ) ;
1105+ const insert = insertTaskRunsCompactArrays ( client , { async_insert : 0 } ) ;
1106+
1107+ const createdAt = new Date ( "2025-04-30 16:34:04.312" ) . getTime ( ) ;
1108+
1109+ const base : TaskRunInsertArray = [
1110+ "cm9kddfcs01zqdy88ld9mmrli" ,
1111+ "cm8zs78wb0002dy616dg75tv3" ,
1112+ "cm9kddfbz01zpdy88t9dstecu" ,
1113+ "cma45oli70002qrdy47w0j4n7" ,
1114+ createdAt ,
1115+ createdAt ,
1116+ "PENDING" ,
1117+ "PRODUCTION" ,
1118+ "run_cma45oli70002qrdy47w0j4n7" ,
1119+ 1 ,
1120+ "V2" ,
1121+ "retry-task" ,
1122+ "task/retry-task" ,
1123+ "" ,
1124+ "" ,
1125+ null ,
1126+ null ,
1127+ null ,
1128+ null ,
1129+ createdAt ,
1130+ null ,
1131+ 0 ,
1132+ 0 ,
1133+ 0 ,
1134+ { data : null } ,
1135+ { data : null } ,
1136+ "" ,
1137+ [ ] ,
1138+ "" ,
1139+ "" ,
1140+ "" ,
1141+ "" ,
1142+ "" ,
1143+ "" ,
1144+ 0 ,
1145+ "span" ,
1146+ "trace" ,
1147+ "" ,
1148+ "" ,
1149+ "" ,
1150+ "" ,
1151+ true ,
1152+ "1" ,
1153+ 0 ,
1154+ "" ,
1155+ [ ] ,
1156+ "" ,
1157+ "" ,
1158+ "" ,
1159+ null ,
1160+ "" ,
1161+ "" ,
1162+ "" ,
1163+ null ,
1164+ ] ;
1165+
1166+ const rdsSnapshot : TaskRunInsertArray = [ ...base ] ;
1167+ rdsSnapshot [ TASK_RUN_INDEX . status ] = "PENDING" ;
1168+ rdsSnapshot [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
1169+ originGeneration : 0 ,
1170+ lsnVersion : 9_000_000_000n ,
1171+ } ) . toString ( ) ;
1172+
1173+ const psSnapshot : TaskRunInsertArray = [ ...base ] ;
1174+ psSnapshot [ TASK_RUN_INDEX . status ] = "COMPLETED_SUCCESSFULLY" ;
1175+ psSnapshot [ TASK_RUN_INDEX . _version ] = composeTaskRunVersion ( {
1176+ originGeneration : 1 ,
1177+ lsnVersion : 10n ,
1178+ } ) . toString ( ) ;
1179+
1180+ const [ insertError ] = await insert ( [ psSnapshot , rdsSnapshot ] ) ;
1181+ expect ( insertError ) . toBeNull ( ) ;
1182+
1183+ const query = client . query ( {
1184+ name : "q" ,
1185+ query :
1186+ "SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL" ,
1187+ schema : z . object ( { run_id : z . string ( ) , status : z . string ( ) , total : z . number ( ) . int ( ) } ) ,
1188+ } ) ;
1189+ const [ queryError , result ] = await query ( { } ) ;
1190+ expect ( queryError ) . toBeNull ( ) ;
1191+ expect ( result ) . toHaveLength ( 1 ) ;
1192+ expect ( result ?. [ 0 ] ) . toEqual (
1193+ expect . objectContaining ( {
1194+ run_id : "cma45oli70002qrdy47w0j4n7" ,
1195+ status : "COMPLETED_SUCCESSFULLY" ,
1196+ } )
1197+ ) ;
1198+ }
1199+ ) ;
8901200} ) ;
0 commit comments