-
Notifications
You must be signed in to change notification settings - Fork 486
Expand file tree
/
Copy pathbottomless_migrate.rs
More file actions
194 lines (169 loc) · 6.95 KB
/
bottomless_migrate.rs
File metadata and controls
194 lines (169 loc) · 6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use std::path::{Path, PathBuf};
use std::sync::Arc;
use libsql_sys::ffi::Sqlite3DbHeader;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use libsql_wal::replication::injector::Injector;
use libsql_wal::segment::{Frame, FrameHeader};
use libsql_wal::storage::NoStorage;
use tempfile::TempDir;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use zerocopy::{FromBytes, FromZeroes};
use libsql_sys::wal::either::Either as EitherWAL;
use crate::namespace::broadcasters::BroadcasterRegistry;
use crate::namespace::configurator::{
BaseNamespaceConfig, NamespaceConfigurators, PrimaryConfig, PrimaryConfigurator,
};
use crate::namespace::meta_store::{MetaStore, MetaStoreHandle};
use crate::namespace::NamespaceStore;
/// The process for migrating from bottomless to libsql wal is simple:
/// 1) iteratate over all namespaces, and make sure that they are up to date with bottomless by
/// loading them
/// 2) with a dummy registry, in a temp directory, with no storage, and no checkpointer, inject all the pages from the
/// original db into a new temp db
/// 3) when all namespace have been successfully migrated, make the dbs and wals folders permanent
pub async fn bottomless_migrate(
meta_store: MetaStore,
base_config: BaseNamespaceConfig,
primary_config: PrimaryConfig,
) -> anyhow::Result<()> {
let base_dbs_dir = base_config.base_path.join("dbs");
let base_dbs_dir_tmp = base_config.base_path.join("_dbs");
// the previous migration failed. The _dbs is still present, but the wals is not. In this case
// we delete the current dbs if it exists and replace it with _dbs, and attempt migration again
if base_dbs_dir_tmp.try_exists()? {
tokio::fs::remove_dir_all(&base_dbs_dir).await?;
tokio::fs::rename(&base_dbs_dir_tmp, &base_dbs_dir).await?;
}
tracing::info!("attempting bottomless migration to libsql-wal");
let tmp = TempDir::new()?;
tokio::fs::create_dir_all(tmp.path().join("dbs")).await?;
let configs_stream = meta_store.namespaces();
tokio::pin!(configs_stream);
let (sender, mut rcv) = tokio::sync::mpsc::channel(1);
// we are not checkpointing anything, be we want to drain the receiver
tokio::spawn(async move {
loop {
match rcv.recv().await {
Some(libsql_wal::checkpointer::CheckpointMessage::Shutdown) | None => break,
Some(_) => (),
}
}
});
let tmp_registry = Arc::new(WalRegistry::new(NoStorage.into(), sender)?);
let mut configurators = NamespaceConfigurators::default();
let make_wal_manager = Arc::new(|| EitherWAL::A(Sqlite3WalManager::default()));
let primary_configurator =
PrimaryConfigurator::new(base_config.clone(), primary_config, make_wal_manager);
configurators.with_primary(primary_configurator);
let dummy_store = NamespaceStore::new(
false,
false,
1000,
meta_store.clone(),
NamespaceConfigurators::default(),
crate::database::DatabaseKind::Primary,
)
.await?;
while let Some(config) = configs_stream.next().await {
migrate_one(
&configurators,
config,
dummy_store.clone(),
tmp.path(),
tmp_registry.clone(),
&base_config.base_path,
)
.await?;
}
tmp_registry.shutdown().await?;
// unix prevents atomically renaming directories with mv, so we first rename dbs to _dbs, then
// move the new dbs and wals, then remove old dbs.
// when we perform a check form migration, whe verify if _dbs exists. If it exists, and wals
// doesn't exist, then we restore it, otherwise, we delete it.
tokio::fs::rename(&base_dbs_dir, &base_dbs_dir_tmp).await?;
tokio::fs::rename(tmp.path().join("dbs"), base_dbs_dir).await?;
tokio::fs::remove_dir_all(base_config.base_path.join("_dbs")).await?;
Ok(())
}
/// this may not be the most efficient method to perform a migration, but it has the advantage of
/// being atomic. when all namespaces are migrated, be rename the dbs and wals folders from the tmp
/// directory, in that order. If we don't find a wals folder in the db directory, we'll just
/// atttempt migrating again, because:
/// - either the migration didn't happen
/// - a crash happened before we could swap the directories
#[tracing::instrument(skip_all, fields(namespace = config.namespace().as_str()))]
async fn migrate_one(
configurators: &NamespaceConfigurators,
config: MetaStoreHandle,
dummy_store: NamespaceStore,
tmp: &Path,
tmp_registry: Arc<WalRegistry<StdIO, NoStorage>>,
base_path: &Path,
) -> anyhow::Result<()> {
let broadcasters = BroadcasterRegistry::default();
// TODO: check if we already have a backup for this db from storage
tracing::info!("started namespace migration");
// we load the namespace ensuring it's restored to the latest version
configurators
.configure_primary()?
.setup(
config.clone(),
crate::namespace::RestoreOption::Latest,
config.namespace(),
// don't care about reset
Box::new(|_| ()),
// don't care about attach
Arc::new(|_| Ok(PathBuf::new().into())),
dummy_store.clone(),
broadcasters.handle(config.namespace().clone()),
)
.await?;
let db_dir = tmp.join("dbs").join(config.namespace().as_str());
tokio::fs::create_dir_all(&db_dir).await?;
let db_path = db_dir.join("data");
let registry = tmp_registry.clone();
let namespace = config.namespace().clone();
let shared = tokio::task::spawn_blocking({
let registry = registry.clone();
move || registry.open(&db_path, &namespace.into())
})
.await
.unwrap()?;
let mut injector = Injector::new(shared.clone(), 10)?;
let orig_db_path = base_path
.join("dbs")
.join(config.namespace().as_str())
.join("data");
let mut orig_db_file = tokio::fs::File::open(orig_db_path).await?;
let mut db_size = usize::MAX;
let mut current = 0;
while current < db_size {
let mut frame: Box<Frame> = Frame::new_box_zeroed();
orig_db_file.read_exact(frame.data_mut()).await?;
if current == 0 {
let header: Sqlite3DbHeader = Sqlite3DbHeader::read_from_prefix(frame.data()).unwrap();
db_size = header.db_size.get() as usize;
}
let size_after = if current == db_size - 1 {
db_size as u32
} else {
0
};
*frame.header_mut() = FrameHeader {
page_no: (current as u32 + 1).into(),
size_after: size_after.into(),
frame_no: (current as u64 + 1).into(),
};
injector.insert_frame(frame).await?;
current += 1;
}
drop(injector);
tokio::task::spawn_blocking(move || shared.seal_current())
.await
.unwrap()?;
tracing::info!("sucessfull migration");
Ok(())
}