Skip to content

Commit aacb41a

Browse files
author
mezzeddine
committed
feat: add global OpenSearch index prefix support (#8489)
1 parent 4c2a3eb commit aacb41a

File tree

7 files changed

+73
-11
lines changed

7 files changed

+73
-11
lines changed

docs/source/AdministratorGuide/ExternalsSupport/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a
5555
- ``ca_certs`` (default:``None``)
5656
- ``client_key`` (default:``None``)
5757
- ``client_cert`` (default:``None``)
58+
- ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes.
5859

5960

6061
to the location::

docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ For example::
5050
{
5151
User = test
5252
Password = password
53+
IndexPrefix = mydirac-
5354
}
5455
}
5556

5657

57-
The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`:
58+
The following global option can be set in `Systems/NoSQLDatabases`:
5859

59-
*IndexPrefix*: Prefix used to prepend to indexes created in the ES instance.
60+
*IndexPrefix*: Prefix prepended to all indexes created in the OpenSearch instance.
6061

6162
For each monitoring types managed, the Period (how often a new index is created)
6263
can be defined with::

src/DIRAC/ConfigurationSystem/Client/Utilities.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,13 @@ def _getCACerts(cs_path):
532532
if ca_certs:
533533
parameters["ca_certs"] = ca_certs
534534

535+
# Global index prefix for all OpenSearch databases
536+
result = gConfig.getOption("/Systems/NoSQLDatabases/IndexPrefix")
537+
if result["OK"]:
538+
parameters["IndexPrefix"] = str(result["Value"]).strip().lower()
539+
else:
540+
parameters["IndexPrefix"] = ""
541+
535542
# Check optional parameters: Host, Port, SSL
536543
result = gConfig.getOption(cs_path + "/Host")
537544
if not result["OK"]:

src/DIRAC/Core/Base/ElasticDB.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None):
3434
self.__ca_certs = dbParameters.get("ca_certs", None)
3535
self.__client_key = dbParameters.get("client_key", None)
3636
self.__client_cert = dbParameters.get("client_cert", None)
37+
self.__globalIndexPrefix = dbParameters.get("IndexPrefix", "")
3738

3839
super().__init__(
3940
host=self._dbHost,
4041
port=self._dbPort,
4142
user=self.__user,
4243
password=self.__dbPassword,
4344
indexPrefix=indexPrefix,
45+
globalIndexPrefix=self.__globalIndexPrefix,
4446
useSSL=self.__useSSL,
4547
useCRT=self.__useCRT,
4648
ca_certs=self.__ca_certs,

src/DIRAC/Core/Utilities/ElasticSearchDB.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def __init__(
103103
user=None,
104104
password=None,
105105
indexPrefix="",
106+
globalIndexPrefix="",
106107
useSSL=True,
107108
useCRT=False,
108109
ca_certs=None,
@@ -117,6 +118,7 @@ def __init__(
117118
:param str user: user name to access the db
118119
:param str password: if the db is password protected we need to provide a password
119120
:param str indexPrefix: it is the indexPrefix used to get all indexes
121+
:param str globalIndexPrefix: prefix prepended to all index names and patterns
120122
:param bool useSSL: We can disable using secure connection. By default we use secure connection.
121123
:param bool useCRT: Use certificates.
122124
:param str ca_certs: CA certificates bundle.
@@ -125,6 +127,7 @@ def __init__(
125127
"""
126128

127129
self._connected = False
130+
self._globalIndexPrefix = (globalIndexPrefix or "").strip().lower()
128131
if user and password:
129132
sLog.debug("Specified username and password")
130133
password = urlparse.quote_plus(password)
@@ -191,6 +194,40 @@ def __init__(
191194
except ElasticConnectionError as e:
192195
sLog.error(repr(e))
193196

197+
def _withGlobalPrefix(self, indexName):
198+
"""Prepend the global index prefix to an index name or pattern."""
199+
if not self._globalIndexPrefix:
200+
return indexName
201+
202+
if isinstance(indexName, (list, tuple)):
203+
return [self._withGlobalPrefix(name) for name in indexName]
204+
205+
if not isinstance(indexName, str):
206+
return indexName
207+
208+
prefixedTokens = []
209+
for token in indexName.split(","):
210+
strippedToken = token.strip()
211+
if not strippedToken:
212+
prefixedTokens.append(strippedToken)
213+
continue
214+
215+
excluded = strippedToken.startswith("-")
216+
if excluded:
217+
strippedToken = strippedToken[1:]
218+
219+
if strippedToken == "_all":
220+
strippedToken = "*"
221+
222+
if not strippedToken.startswith(self._globalIndexPrefix):
223+
strippedToken = f"{self._globalIndexPrefix}{strippedToken}"
224+
225+
if excluded:
226+
strippedToken = f"-{strippedToken}"
227+
prefixedTokens.append(strippedToken)
228+
229+
return ",".join(prefixedTokens)
230+
194231
@ifConnected
195232
def addIndexTemplate(
196233
self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None
@@ -204,6 +241,7 @@ def addIndexTemplate(
204241
"""
205242
if settings is None:
206243
settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}}
244+
index_patterns = [self._withGlobalPrefix(pattern) for pattern in index_patterns]
207245
body = {
208246
"index_patterns": index_patterns,
209247
"priority": priority,
@@ -225,6 +263,7 @@ def query(self, index: str, query):
225263
:param dict query: It is the query in OpenSearch DSL language
226264
227265
"""
266+
index = self._withGlobalPrefix(index)
228267
try:
229268
esDSLQueryResult = self.client.search(index=index, body=query)
230269
return S_OK(esDSLQueryResult)
@@ -247,6 +286,7 @@ def update(self, index: str, query=None, updateByQuery: bool = True, docID: str
247286
if not index or not query:
248287
return S_ERROR("Missing index or query")
249288

289+
index = self._withGlobalPrefix(index)
250290
try:
251291
if updateByQuery:
252292
esDSLQueryResult = self.client.update_by_query(index=index, body=query)
@@ -263,6 +303,7 @@ def getDoc(self, index: str, docID: str) -> dict:
263303
:param index: name of the index
264304
:param docID: document ID
265305
"""
306+
index = self._withGlobalPrefix(index)
266307
sLog.debug(f"Retrieving document {docID} in index {index}")
267308
try:
268309
return S_OK(self.client.get(index, docID)["_source"])
@@ -280,7 +321,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
280321
:param docIDs: document IDs
281322
"""
282323
sLog.debug(f"Retrieving documents {docIDs}")
283-
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
324+
docs = [{"_index": self._withGlobalPrefix(indexFunc(docID, vo)), "_id": docID} for docID in docIDs]
284325
try:
285326
response = self.client.mget({"docs": docs})
286327
except RequestError as re:
@@ -298,6 +339,7 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
298339
:param body: The request definition requires either `script` or
299340
partial `doc`
300341
"""
342+
index = self._withGlobalPrefix(index)
301343
sLog.debug(f"Updating document {docID} in index {index}")
302344
try:
303345
self.client.update(index, docID, body)
@@ -317,6 +359,7 @@ def deleteDoc(self, index: str, docID: str):
317359
:param index: name of the index
318360
:param docID: document ID
319361
"""
362+
index = self._withGlobalPrefix(index)
320363
sLog.debug(f"Deleting document {docID} in index {index}")
321364
try:
322365
return S_OK(self.client.delete(index, docID))
@@ -333,6 +376,7 @@ def existsDoc(self, index: str, docID: str) -> bool:
333376
:param index: name of the index
334377
:param docID: document ID
335378
"""
379+
index = self._withGlobalPrefix(index)
336380
sLog.debug(f"Checking if document {docID} in index {index} exists")
337381
return self.client.exists(index, docID)
338382

@@ -341,6 +385,7 @@ def _Search(self, indexname):
341385
"""
342386
it returns the object which can be used for retreiving certain value from the DB
343387
"""
388+
indexname = self._withGlobalPrefix(indexname)
344389
return Search(using=self.client, index=indexname)
345390

346391
def _Q(self, name_or_query="match", **params):
@@ -363,6 +408,7 @@ def getIndexes(self, indexName=None):
363408
"""
364409
if not indexName:
365410
indexName = ""
411+
indexName = self._withGlobalPrefix(indexName)
366412
sLog.debug(f"Getting indices alias of {indexName}")
367413
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
368414
return list(self.client.indices.get_alias(f"{indexName}*"))
@@ -376,6 +422,7 @@ def getDocTypes(self, indexName):
376422
:return: S_OK or S_ERROR
377423
"""
378424
result = []
425+
indexName = self._withGlobalPrefix(indexName)
379426
try:
380427
sLog.debug("Getting mappings for ", indexName)
381428
result = self.client.indices.get_mapping(indexName)
@@ -407,6 +454,7 @@ def existingIndex(self, indexName):
407454
:param str indexName: the name of the index
408455
:returns: S_OK/S_ERROR if the request is successful
409456
"""
457+
indexName = self._withGlobalPrefix(indexName)
410458
sLog.debug(f"Checking existance of index {indexName}")
411459
try:
412460
return S_OK(self.client.indices.exists(indexName))
@@ -428,6 +476,7 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
428476
else:
429477
sLog.warn("The period is not provided, so using non-periodic indexes names")
430478
fullIndex = indexPrefix
479+
fullIndex = self._withGlobalPrefix(fullIndex)
431480

432481
try:
433482
if not mapping:
@@ -444,6 +493,7 @@ def deleteIndex(self, indexName):
444493
"""
445494
:param str indexName: the name of the index to be deleted...
446495
"""
496+
indexName = self._withGlobalPrefix(indexName)
447497
sLog.info("Deleting index", indexName)
448498
try:
449499
retVal = self.client.indices.delete(indexName)
@@ -474,6 +524,7 @@ def index(self, indexName, body=None, docID=None, op_type="index"):
474524
if not indexName or not body:
475525
return S_ERROR("Missing index or body")
476526

527+
indexName = self._withGlobalPrefix(indexName)
477528
try:
478529
res = self.client.index(index=indexName, body=body, id=docID, params={"op_type": op_type})
479530
except (RequestError, TransportError) as e:
@@ -505,6 +556,7 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
505556
indexName = self.generateFullIndexName(indexPrefix, period)
506557
else:
507558
indexName = indexPrefix
559+
indexName = self._withGlobalPrefix(indexName)
508560
sLog.debug(f"Bulk indexing into {indexName} of {len(data)}")
509561

510562
res = self.existingIndex(indexName)
@@ -534,6 +586,7 @@ def getUniqueValue(self, indexName, key, orderBy=False):
534586
:param dict orderBy: it is a dictionary in case we want to order the result {key:'desc'} or {key:'asc'}
535587
:returns: a list of unique value for a certain key from the dictionary.
536588
"""
589+
indexName = self._withGlobalPrefix(indexName)
537590

538591
query = self._Search(indexName)
539592

@@ -592,6 +645,7 @@ def deleteByQuery(self, indexName, query):
592645
:param str indexName: the name of the index
593646
:param str query: the JSON-formatted query for which we want to issue the delete
594647
"""
648+
indexName = self._withGlobalPrefix(indexName)
595649
try:
596650
self.client.delete_by_query(index=indexName, body=query)
597651
except Exception as inst:

src/DIRAC/Core/scripts/install_full.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,7 @@ LocalInstallation
142142
Password = <password>
143143
Host = <host>
144144
Port = <port>
145+
# Optional global prefix prepended to all DIRAC-created OpenSearch indexes
146+
# IndexPrefix = <prefix>
145147
}
146148
}

src/DIRAC/MonitoringSystem/DB/MonitoringDB.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
44
**Configuration Parameters**:
55
6-
The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`
7-
8-
* *IndexPrefix*: Prefix used to prepend to indexes created in the OpenSearch instance.
6+
The global OpenSearch index prefix can be set in
7+
`Systems/NoSQLDatabases/IndexPrefix`.
98
109
For each monitoring types managed, the Period (how often a new index is created)
1110
can be defined with::
@@ -31,8 +30,6 @@
3130
import time
3231

3332
from DIRAC import S_ERROR, S_OK
34-
from DIRAC.ConfigurationSystem.Client.Config import gConfig
35-
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
3633
from DIRAC.Core.Base.ElasticDB import ElasticDB
3734
from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader
3835

@@ -45,10 +42,8 @@ def __init__(self, name="Monitoring/MonitoringDB"):
4542
"""Standard constructor"""
4643

4744
try:
48-
section = getDatabaseSection("Monitoring/MonitoringDB")
49-
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", "").lower()
5045
# Connecting to the ES cluster
51-
super().__init__(fullName=name, indexPrefix=indexPrefix)
46+
super().__init__(fullName=name)
5247
except RuntimeError as ex:
5348
self.log.error("Can't connect to MonitoringDB", repr(ex))
5449
raise ex

0 commit comments

Comments
 (0)