mirror of
https://github.com/cmur2/kube-stale-resources.git
synced 2024-12-22 12:54:25 +01:00
181 lines
7.0 KiB
Python
181 lines
7.0 KiB
Python
#!/usr/bin/env python
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
|
|
from typing import (IO, List, Tuple)
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
K8sResourceIdentifier = Tuple[str, str, str, str]
|
|
|
|
HEADERS = {"Content-Type": "application/json"}
|
|
|
|
BLACKLIST_REGEXS = [
|
|
# Kubernetes inherent blacklist (should apply to every k8s cluster out there)
|
|
r'^.*:apps/v1:ControllerRevision:.*$',
|
|
r'^.*:apps/v1:ReplicaSet:.*$',
|
|
r'^.*:batch/v1:Job:.*-\d{10,}$', # jobs created by cron jobs with unix timestamp suffix
|
|
r'^.*:events.k8s.io/v1:Event:.*$',
|
|
r'^.*:metrics.k8s.io/v1beta1:PodMetrics:.*$',
|
|
# CM with CA bundle to verify kube-apiserver connections,
|
|
# see https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.20.md#introducing-rootcaconfigmap
|
|
r'^.*:v1:ConfigMap:kube-root-ca.crt$',
|
|
r'^.*:v1:Endpoints:.*$',
|
|
r'^.*:.*:EndpointSlice:.*$',
|
|
r'^.*:v1:Event:.*$',
|
|
r'^.*:v1:Pod:.*$',
|
|
r'^.*:v1:Secret:.*-token-\S{5}$', # secrets with token for service accounts
|
|
r'^.*:v1:ServiceAccount:default$',
|
|
r'^default:v1:Service:kubernetes$',
|
|
r'^kube-node-lease:.*$',
|
|
r'^kube-public:.*$',
|
|
r'^kube-system:.*$',
|
|
|
|
# GKE specific parts (should apply to every GKE-managed k8s cluster)
|
|
# '^.*:v1:ResourceQuota:gke-resource-quotas$',
|
|
# '^default:v1:LimitRange:limits$,
|
|
]
|
|
|
|
|
|
def get_live_namespaced_resources(url: str) -> List[K8sResourceIdentifier]:
|
|
"""
|
|
Returns list of Kubernetes resource identifiers of namespaced resources out of the live cluster reachable at url.
|
|
"""
|
|
|
|
result = []
|
|
|
|
# merges https://kubernetes.io/docs/reference/using-api/#api-groups
|
|
|
|
# legacy API group
|
|
apiVersions = requests.get(url + '/api', headers=HEADERS, timeout=10).json()['versions']
|
|
for apiVersion in apiVersions:
|
|
apiResources = requests.get(url + '/api/' + apiVersion, headers=HEADERS, timeout=10).json()['resources']
|
|
for apiResource in apiResources:
|
|
if not ('list' in apiResource['verbs'] and apiResource['namespaced']):
|
|
continue
|
|
|
|
items = requests.get(url + '/api/' + apiVersion + '/' + apiResource['name'], headers=HEADERS,
|
|
timeout=10).json()['items']
|
|
for item in items:
|
|
result.append(
|
|
(item['metadata']['namespace'], apiVersion, apiResource['kind'], item['metadata']['name']))
|
|
|
|
# named API groups
|
|
apiGroups = requests.get(url + '/apis', headers=HEADERS, timeout=10).json()['groups']
|
|
for apiGroup in apiGroups:
|
|
|
|
apiResources = requests.get(url + '/apis/' + apiGroup['preferredVersion']['groupVersion'],
|
|
headers=HEADERS,
|
|
timeout=10).json()['resources']
|
|
for apiResource in apiResources:
|
|
if not ('list' in apiResource['verbs'] and apiResource['namespaced']):
|
|
continue
|
|
|
|
if apiGroup['preferredVersion']['groupVersion'] == 'extensions/v1beta1' and apiResource['kind'] != 'Ingress':
|
|
# everything else in extensions/v1beta1 should be migrated to the preferred version
|
|
# except ingresses, see https://kubernetes.io/blog/2019/07/18/api-deprecations-in-1-16/
|
|
continue
|
|
|
|
items = requests.get(url + '/apis/' + apiGroup['preferredVersion']['groupVersion'] + '/' +
|
|
apiResource['name'],
|
|
headers=HEADERS,
|
|
timeout=10).json()['items']
|
|
for item in items:
|
|
result.append((item['metadata']['namespace'], apiGroup['preferredVersion']['groupVersion'],
|
|
apiResource['kind'], item['metadata']['name']))
|
|
|
|
return result
|
|
|
|
|
|
def get_target_namespaced_resources(stream: IO) -> List[K8sResourceIdentifier]:
|
|
"""
|
|
Returns list of Kubernetes resource identifiers of namespaced resources out of the target stream.
|
|
"""
|
|
result = []
|
|
|
|
target_documents = list(yaml.load_all(stream, Loader=yaml.SafeLoader))
|
|
for document in target_documents:
|
|
if not document:
|
|
continue
|
|
if not 'namespace' in document['metadata']:
|
|
continue
|
|
|
|
result.append(
|
|
(document['metadata']['namespace'], document['apiVersion'], document['kind'], document['metadata']['name']))
|
|
|
|
return result
|
|
|
|
|
|
def get_compact_resource_identifiers(tuples: List[K8sResourceIdentifier]) -> List[str]:
|
|
"""
|
|
Returns a compact, sortable string for a Kubernetes resource identifier.
|
|
"""
|
|
return [namespace + ':' + apiVersion + ':' + kind + ':' + name for namespace, apiVersion, kind, name in tuples]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Utility to detect k8s configuration drift.')
|
|
|
|
parser.add_argument('-f',
|
|
dest='target_manifests_file',
|
|
required=True,
|
|
help='File path (or - for stdin) to read Kubernetes manifests from for target state.')
|
|
parser.add_argument('--url',
|
|
dest='k8s_apiserver_url',
|
|
default='http://localhost:8001',
|
|
help='URL of Kubernetes apiserver to retrieve live state.')
|
|
parser.add_argument('--blacklist',
|
|
dest='blacklist_file',
|
|
help='File path to read blacklist regex entries from that will be used to filter live state.')
|
|
|
|
args = parser.parse_args()
|
|
|
|
blacklist_regexs: List[str] = []
|
|
blacklist_regexs += BLACKLIST_REGEXS
|
|
|
|
if args.blacklist_file:
|
|
print(f'Reading blacklist file {args.blacklist_file}...')
|
|
with open(args.blacklist_file, 'r', encoding='utf-8') as f:
|
|
blacklist_regexs += list(filter(lambda x: not re.match(r'^\s*$', x), f.read().split('\n')))
|
|
|
|
print('Retrieving target state...')
|
|
if args.target_manifests_file == '-':
|
|
target_tuples = get_target_namespaced_resources(sys.stdin)
|
|
else:
|
|
with open(args.target_manifests_file, 'r', encoding='utf-8') as f:
|
|
target_tuples = get_target_namespaced_resources(f)
|
|
|
|
print(f'Retrieving live state from {args.k8s_apiserver_url}...')
|
|
raw_live_strings = get_compact_resource_identifiers(get_live_namespaced_resources(args.k8s_apiserver_url))
|
|
|
|
live_strings = list(filter(lambda s: not re.match('|'.join(blacklist_regexs), s), raw_live_strings))
|
|
|
|
starget = set(get_compact_resource_identifiers(target_tuples))
|
|
slive = set(live_strings)
|
|
|
|
print('Live dynamic configmaps that are not in target (stale):')
|
|
counter = 0
|
|
for x in sorted(list(slive - starget)):
|
|
if re.match('^.*:v1:ConfigMap:.*-[a-z0-9]{10}', x):
|
|
counter += 1
|
|
print(' ' + x)
|
|
print("..", counter, "entries")
|
|
|
|
print()
|
|
print('Live resources w/o dynamic configmaps that are not in target (stale):')
|
|
counter = 0
|
|
for x in sorted(list(slive - starget)):
|
|
if not re.match('^.*:v1:ConfigMap:.*-[a-z0-9]{10}', x):
|
|
counter += 1
|
|
print(' ' + x)
|
|
print("..", counter, "entries")
|
|
|
|
sys.exit(len(list(slive - starget)))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|