Skip to content

Commit 59e4d0e

Browse files
committed
Replay existing statuses back onto a repository.
1 parent f6b0303 commit 59e4d0e

1 file changed

Lines changed: 167 additions & 0 deletions

File tree

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#!/usr/bin/env python3
2+
3+
"""Replay secret scanning alert status for a GitHub repository, organization or Enterprise, based on a provided file of previous statuses."""
4+
5+
import sys
6+
import argparse
7+
import re
8+
import logging
9+
import datetime
10+
import json
11+
from typing import Generator
12+
from collections import defaultdict
13+
from defusedcsv import csv # type: ignore
14+
from githubapi import GitHub, parse_date
15+
from list_secret_scanning_alerts import list_secret_scanning_alerts
16+
17+
18+
LOG = logging.getLogger(__name__)
19+
20+
21+
def existing_results_by_secret(reader: csv.DictReader) -> dict:
22+
"""Index results by secret and type for easy lookup."""
23+
24+
existing_results: dict = {}
25+
26+
for result in reader:
27+
repo = result["repo"]
28+
secret_type = result["secret_type"]
29+
secret = result["secret"]
30+
31+
existing_results[repo] = (
32+
{} if repo not in existing_results else existing_results[repo]
33+
)
34+
existing_results[repo][secret] = (
35+
{} if secret not in existing_results[repo] else existing_results[repo][secret]
36+
)
37+
existing_results[repo][secret][secret_type] = result
38+
39+
return existing_results
40+
41+
42+
def change_state(hostname, result: dict, res: dict) -> None:
43+
"""Change the state of the alert to match the existing result using the GitHub API to update the alert."""
44+
g = GitHub(hostname=hostname)
45+
46+
repo_name = result["repo"]
47+
48+
state_update = {
49+
"state": res["state"],
50+
"resolution": res["resolution"],
51+
"resolution_comment": res["resolution_comment"],
52+
}
53+
54+
alert_number = result["url"].split("/")[-1]
55+
56+
LOG.debug(f"Changing state of alert {repo_name}/{alert_number} to {state_update}")
57+
58+
g.query_once(
59+
"repo",
60+
repo_name,
61+
f"/secret-scanning/alerts/{alert_number}",
62+
data=state_update,
63+
method="PATCH",
64+
)
65+
66+
return
67+
68+
69+
def add_args(parser: argparse.ArgumentParser) -> None:
70+
"""Add command-line arguments to the parser."""
71+
parser.add_argument(
72+
"name", type=str, help="Name of the repo/org/Enterprise to query"
73+
)
74+
parser.add_argument(
75+
"--scope",
76+
type=str,
77+
default="org",
78+
choices=["ent", "org", "repo"],
79+
required=False,
80+
help="Scope of the query",
81+
)
82+
parser.add_argument(
83+
"--state",
84+
"-s",
85+
type=str,
86+
choices=["open", "resolved"],
87+
required=False,
88+
help="State of the alerts to query",
89+
)
90+
parser.add_argument(
91+
"--since",
92+
"-S",
93+
type=str,
94+
required=False,
95+
help="Only show alerts created after this date/time - ISO 8601 format, e.g. 2024-10-08 or 2024-10-08T12:00; or Nd format, e.g. 7d for 7 days ago",
96+
)
97+
parser.add_argument(
98+
"--json", action="store_true", help="Output in JSON format (otherwise CSV)"
99+
)
100+
parser.add_argument(
101+
"--quote-all", "-q", action="store_true", help="Quote all fields in CSV output"
102+
)
103+
parser.add_argument(
104+
"--hostname",
105+
type=str,
106+
default="github.com",
107+
required=False,
108+
help="GitHub Enterprise hostname (defaults to github.com)",
109+
)
110+
parser.add_argument(
111+
"--debug", "-d", action="store_true", help="Enable debug logging"
112+
)
113+
114+
115+
def main() -> None:
116+
"""CLI entrypoint."""
117+
parser = argparse.ArgumentParser(description=__doc__)
118+
add_args(parser)
119+
args = parser.parse_args()
120+
121+
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
122+
123+
since = parse_date(args.since)
124+
125+
LOG.debug("Since: %s (%s) [%s]", since, args.since, type(since))
126+
127+
scope = "repo" if "/" in args.name and args.scope != "repo" else args.scope
128+
name = args.name
129+
state = args.state
130+
hostname = args.hostname
131+
132+
if not GitHub.check_name(args.name, scope):
133+
raise ValueError("Invalid name: %s for %s", args.name, scope)
134+
135+
reader = csv.DictReader(sys.stdin)
136+
137+
if args.debug:
138+
reader = csv.DictReader(sys.stdin)
139+
140+
existing_results = existing_results_by_secret(reader)
141+
142+
LOG.debug(existing_results)
143+
144+
results = list_secret_scanning_alerts(name, scope, hostname, state=state, since=since)
145+
146+
for result in results:
147+
repo = result["repo"]
148+
secret = result["secret"]
149+
secret_type = result["secret_type"]
150+
151+
LOG.debug(f"{repo}, {secret}, {secret_type}")
152+
153+
try:
154+
res = existing_results[repo][secret][secret_type]
155+
LOG.warning(f"Found existing alert: {res}")
156+
except KeyError:
157+
continue
158+
159+
if res["state"] != result["state"]:
160+
LOG.warning(f"State mismatch: {res['state']} != {result['state']}")
161+
162+
if result["state"] != "pattern_edited":
163+
change_state(hostname, result, res)
164+
165+
166+
if __name__ == "__main__":
167+
main()

0 commit comments

Comments
 (0)