Add PubSub and BigTable support for Central hosted network controllers#2526
Add PubSub and BigTable support for Central hosted network controllers#2526
Conversation
… ztcontroller feature flag
…stance for the whole library, add init/shutdown functions for it exposed to C
match other things already written
Tests currently need to be run with --test-threads=1. Seems like the instances of the pubsub emulator stomp on each other without that
…s in new CentralDB class This allows us to interchangeably use different listeners (pgsql, redis, pubsub) depending on configuration values passed into the constructor.
PubSub allows us to do schema validation, however it only allows one top level message at a time. Move other sub-message declarations under the main message declaration so that we can enable schema validation in the pubsub stream directly
Muuuuch easier to use external dependencies now Also tried out conan and vcpkg. Ran into dependency issues when solving for packages to install with conan. vcpkg is just obtuse as all hell to install and not easy to integrate
* Postgres Direct * Redis * BigTable
|
OK not sure why the github action is failing now but I'll look at it tomorrow |
Required to get the list of configs for a particular org that the controller has. Named it `linked_id` rather than `org_id` since we don't know what it will be linked to in CV2
Publish CTL_NONCE_UPDATE to PubSub when nonces are created or reused in getSSOAuthInfo(), with the network's frontend as a message attribute so only the correct CV frontend receives it. Listen for ZT1_AUTH_UPDATE messages and update sso_expiry.authentication_expiry_time accordingly, with a network existence check before applying. - Add sso_send_topic/sso_recv_topic to PubSubConfig - Add PubSubWriter::publishSSONonceUpdate() with frontend param - Add PubSubSSOListener class for inbound auth updates - Rename CV1_AUTH_UPDATE to ZT1_AUTH_UPDATE in sso.proto - Fix pre-existing connection pool leak in getSSOAuthInfo() catch block
Allow controllers to advertise which central version (cv1, cv2, or all) they are assigned to handle via a new configurable field. The value is persisted to the database on each heartbeat and validated at startup against the DB CHECK constraint.
Now handled in CV1 on new member join via pubsub integration when a new member comes through
|
|
||
| std::set<std::string> networksUpdated; | ||
| uint64_t updateCount = 0; | ||
| for (const auto& entry : _pending) { |
There was a problem hiding this comment.
_pending here should be toWrite.
| RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::Redis> redis, std::string controller_id) | ||
| : _redis(redis) | ||
| , _mode(REDIS_MODE_STANDALONE) | ||
| { | ||
| } | ||
|
|
||
| RedisStatusWriter::RedisStatusWriter(std::shared_ptr<sw::redis::RedisCluster> cluster, std::string controller_id) | ||
| : _cluster(cluster) | ||
| , _mode(REDIS_MODE_CLUSTER) | ||
| { | ||
| } |
There was a problem hiding this comment.
These constructors don't set _controller_id.
| throw std::runtime_error("controller config required"); | ||
| } | ||
|
|
||
| if (_path.length() > 9 && (_path.substr(0, 9) != "postgres:")) { |
There was a problem hiding this comment.
This test isn't quite complete enough: it allows shorter, non-postgres strings through. You can just flip the size check:
if (_path.length() < 10 || (_path.substr(0, 9) != "postgres:"))
| } | ||
| case LISTENER_MODE_PUBSUB: |
| nlohmann::json oldMember; | ||
| nlohmann::json newMember = config; | ||
| if (! isNewMember) { | ||
| oldMember = _getNetworkMember(w, networkId, memberId); |
There was a problem hiding this comment.
Above was a w.commit(), after which any queries on w should be raising exceptions, since the transaction is complete. I'm wondering if this is a code path that has never been hit in testing (i.e. pubsub, null/controller change source, and it's not a new member).
| frontend); | ||
| } | ||
|
|
||
| w.commit(); |
There was a problem hiding this comment.
Similar to elsewhere, this commit will be "invalidating" the handle (since the transaction is complete). That would mean a call such as on line 541 shouldn't properly work. Is this a path that isn't occurring or is my understanding of libpqxx's transaction handling obsolete?
| { | ||
| fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str()); | ||
| _queueItem qitem; | ||
| while (_commitQueue.get(qitem) & (_run == 1)) { |
There was a problem hiding this comment.
This should be && instead of &.
| // if (memberId == "a10dccea52" && networkId == "8056c2e21c24673d") { | ||
| // fprintf(stderr, "invalid authinfo for grant's machine\n"); | ||
| // info.version=1; | ||
| // return info; | ||
| // } | ||
| // fprintf(stderr, "CentralDB::updateMemberOnLoad: %s-%s\n", networkId.c_str(), memberId.c_str()); |
| // void create_bigtable_table(std::string project_id, std::string instance_id) | ||
| // { | ||
| // auto bigtableAdminClient = | ||
| // bigtable_admin::BigtableTableAdminClient(bigtable_admin::MakeBigtableTableAdminConnection()); | ||
|
|
||
| // std::string table_id = "member_status"; | ||
| // std::string table_name = "projects/" + project_id + "/instances/" + instance_id + "/tables/" + table_id; | ||
|
|
||
| // // Check if the table exists | ||
| // auto table = bigtableAdminClient.GetTable(table_name); | ||
| // if (! table.ok()) { | ||
| // if (table.status().code() == google::cloud::StatusCode::kNotFound) { | ||
| // google::bigtable::admin::v2::Table table_config; | ||
| // table_config.set_name(table_id); | ||
| // auto families = table_config.mutable_column_families(); | ||
| // // Define column families | ||
| // // Column family "node_info" with max 1 version | ||
| // // google::bigtable::admin::v2::ColumnFamily* node_info = table_config.add_column_families(); | ||
| // // Column family "check_in" with max 1 version | ||
|
|
||
| // auto create_result = bigtableAdminClient.CreateTable( | ||
| // "projects/" + project_id + "/instances/" + instance_id, table_id, table_config); | ||
|
|
||
| // if (! create_result.ok()) { | ||
| // fprintf( | ||
| // stderr, "Failed to create Bigtable table member_status: %s\n", | ||
| // create_result.status().message().c_str()); | ||
| // throw std::runtime_error("Failed to create Bigtable table"); | ||
| // } | ||
| // fprintf(stderr, "Created Bigtable table: member_status\n"); | ||
| // } | ||
| // else { | ||
| // fprintf(stderr, "Failed to get Bigtable table member_status: %s\n", table.status().message().c_str()); | ||
| // throw std::runtime_error("Failed to get Bigtable table"); | ||
| // } | ||
| // } | ||
| // } |
There was a problem hiding this comment.
Is this leftover code, or intended to stay as documetation?
The 10-second session.cancel() loop raced with in-flight acks — when cancel fired while the GCP client was processing messages, acks were lost before reaching the server. With message ordering enabled, an unacked message blocks all subsequent messages on that ordering key, causing silent stalls with no error output. Two fixes: - Replace the cancel/reconnect timer with a blocking session.get(), storing the session future so the destructor can cancel on shutdown. - Always ack messages even when onNotification fails — permanent errors (bad protobuf, missing fields) will never succeed on retry and would otherwise poison the ordering key indefinitely.
This is a big one that goes along with other internal upcoming changes.
NotificationListenerinterface, and adds a 3rd method for message passing via GCP PubSub. Which one to use is configurable by local.conf controller settingsext/. This is likely more useful for Central controller builds than end user builds for the time being.