kube-stale-resources/kube-stale-resources.py

181 lines
7.0 KiB
Python

#!/usr/bin/env python
import argparse
import re
import sys
from typing import (IO, List, Tuple)
import requests
import yaml
K8sResourceIdentifier = Tuple[str, str, str, str]
HEADERS = {"Content-Type": "application/json"}
BLACKLIST_REGEXS = [
# Kubernetes inherent blacklist (should apply to every k8s cluster out there)
r'^.*:apps/v1:ControllerRevision:.*$',
r'^.*:apps/v1:ReplicaSet:.*$',
r'^.*:batch/v1:Job:.*-\d{10,}$', # jobs created by cron jobs with unix timestamp suffix
r'^.*:events.k8s.io/v1:Event:.*$',
r'^.*:metrics.k8s.io/v1beta1:PodMetrics:.*$',
# CM with CA bundle to verify kube-apiserver connections,
# see https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.20.md#introducing-rootcaconfigmap
r'^.*:v1:ConfigMap:kube-root-ca.crt$',
r'^.*:v1:Endpoints:.*$',
r'^.*:.*:EndpointSlice:.*$',
r'^.*:v1:Event:.*$',
r'^.*:v1:Pod:.*$',
r'^.*:v1:Secret:.*-token-\S{5}$', # secrets with token for service accounts
r'^.*:v1:ServiceAccount:default$',
r'^default:v1:Service:kubernetes$',
r'^kube-node-lease:.*$',
r'^kube-public:.*$',
r'^kube-system:.*$',
# GKE specific parts (should apply to every GKE-managed k8s cluster)
# '^.*:v1:ResourceQuota:gke-resource-quotas$',
# '^default:v1:LimitRange:limits$,
]
def get_live_namespaced_resources(url: str) -> List[K8sResourceIdentifier]:
"""
Returns list of Kubernetes resource identifiers of namespaced resources out of the live cluster reachable at url.
"""
result = []
# merges https://kubernetes.io/docs/reference/using-api/#api-groups
# legacy API group
apiVersions = requests.get(url + '/api', headers=HEADERS, timeout=10).json()['versions']
for apiVersion in apiVersions:
apiResources = requests.get(url + '/api/' + apiVersion, headers=HEADERS, timeout=10).json()['resources']
for apiResource in apiResources:
if not ('list' in apiResource['verbs'] and apiResource['namespaced']):
continue
items = requests.get(url + '/api/' + apiVersion + '/' + apiResource['name'], headers=HEADERS,
timeout=10).json()['items']
for item in items:
result.append(
(item['metadata']['namespace'], apiVersion, apiResource['kind'], item['metadata']['name']))
# named API groups
apiGroups = requests.get(url + '/apis', headers=HEADERS, timeout=10).json()['groups']
for apiGroup in apiGroups:
apiResources = requests.get(url + '/apis/' + apiGroup['preferredVersion']['groupVersion'],
headers=HEADERS,
timeout=10).json()['resources']
for apiResource in apiResources:
if not ('list' in apiResource['verbs'] and apiResource['namespaced']):
continue
if apiGroup['preferredVersion']['groupVersion'] == 'extensions/v1beta1' and apiResource['kind'] != 'Ingress':
# everything else in extensions/v1beta1 should be migrated to the preferred version
# except ingresses, see https://kubernetes.io/blog/2019/07/18/api-deprecations-in-1-16/
continue
items = requests.get(url + '/apis/' + apiGroup['preferredVersion']['groupVersion'] + '/' +
apiResource['name'],
headers=HEADERS,
timeout=10).json()['items']
for item in items:
result.append((item['metadata']['namespace'], apiGroup['preferredVersion']['groupVersion'],
apiResource['kind'], item['metadata']['name']))
return result
def get_target_namespaced_resources(stream: IO) -> List[K8sResourceIdentifier]:
"""
Returns list of Kubernetes resource identifiers of namespaced resources out of the target stream.
"""
result = []
target_documents = list(yaml.load_all(stream, Loader=yaml.SafeLoader))
for document in target_documents:
if not document:
continue
if not 'namespace' in document['metadata']:
continue
result.append(
(document['metadata']['namespace'], document['apiVersion'], document['kind'], document['metadata']['name']))
return result
def get_compact_resource_identifiers(tuples: List[K8sResourceIdentifier]) -> List[str]:
"""
Returns a compact, sortable string for a Kubernetes resource identifier.
"""
return [namespace + ':' + apiVersion + ':' + kind + ':' + name for namespace, apiVersion, kind, name in tuples]
def main():
parser = argparse.ArgumentParser(description='Utility to detect k8s configuration drift.')
parser.add_argument('-f',
dest='target_manifests_file',
required=True,
help='File path (or - for stdin) to read Kubernetes manifests from for target state.')
parser.add_argument('--url',
dest='k8s_apiserver_url',
default='http://localhost:8001',
help='URL of Kubernetes apiserver to retrieve live state.')
parser.add_argument('--blacklist',
dest='blacklist_file',
help='File path to read blacklist regex entries from that will be used to filter live state.')
args = parser.parse_args()
blacklist_regexs: List[str] = []
blacklist_regexs += BLACKLIST_REGEXS
if args.blacklist_file:
print(f'Reading blacklist file {args.blacklist_file}...')
with open(args.blacklist_file, 'r', encoding='utf-8') as f:
blacklist_regexs += list(filter(lambda x: not re.match(r'^\s*$', x), f.read().split('\n')))
print('Retrieving target state...')
if args.target_manifests_file == '-':
target_tuples = get_target_namespaced_resources(sys.stdin)
else:
with open(args.target_manifests_file, 'r', encoding='utf-8') as f:
target_tuples = get_target_namespaced_resources(f)
print(f'Retrieving live state from {args.k8s_apiserver_url}...')
raw_live_strings = get_compact_resource_identifiers(get_live_namespaced_resources(args.k8s_apiserver_url))
live_strings = list(filter(lambda s: not re.match('|'.join(blacklist_regexs), s), raw_live_strings))
starget = set(get_compact_resource_identifiers(target_tuples))
slive = set(live_strings)
print('Live dynamic configmaps that are not in target (stale):')
counter = 0
for x in sorted(list(slive - starget)):
if re.match('^.*:v1:ConfigMap:.*-[a-z0-9]{10}', x):
counter += 1
print(' ' + x)
print("..", counter, "entries")
print()
print('Live resources w/o dynamic configmaps that are not in target (stale):')
counter = 0
for x in sorted(list(slive - starget)):
if not re.match('^.*:v1:ConfigMap:.*-[a-z0-9]{10}', x):
counter += 1
print(' ' + x)
print("..", counter, "entries")
sys.exit(len(list(slive - starget)))
if __name__ == "__main__":
main()