Save data while fetching

This commit is contained in:
Antonio J. Delgado 2025-02-25 11:35:06 +02:00
parent 73fd484eb2
commit 4d57e74d41

View file

@ -67,36 +67,71 @@ class GithubPackages:
self.data = self._read_cached_data()
self._get_github_rate_limit()
if self.data['orgs_with_assets']:
for organization in self.data['orgs_with_assets']:
self._process_orgs_with_assets()
self._get_organizations()
for organization in self.data['organizations']:
self.data['organizations'].append(organization)
try:
self._process_organization(organization)
if (
except Exception as error:
self._log.error(
"Error processing organization. Organization: %s. Error: %s",
organization,
error
)
def _get_organizations(self):
if ( # There are orgs in cache and didn't expire?
'organizations' in self.data and
'organizations_last_update' in self.data and
self.data['organizations_last_update'] + self.config['max_cache_age'] < time.time()
):
organizations = self.data['organizations']
):
self._log.debug(
"A total of %s organizations existing in cache",
len(organizations)
len(self.data['organizations'])
)
else:
elif ( # There are full orgs in cache and didn't expire?
'full_organization' in self.data and
'full_organization_last_update' in self.data and
self.data['full_organizations_last_update'] + self.config['max_cache_age'] < time.time()
):
for organization in self.data['full_organizations']:
if 'login' in organization:
self.data['organizations'].append(organization['login'])
elif 'message' in organization:
self._log.debug(
"Incomplete list of organizations. %s",
organization
)
else:
self._log.error(
"Organization is not a dictionay? %s",
organization,
)
self.data.pop('full_organizations')
self._save_cached_data(self.data)
# sys.exit(2)
else: # Otherwise fetch orgs from Github
self._log.debug(
"Getting all (or up to %s) organizations...",
self.config['max_organizations'],
)
full_organizations = self._get_paginated_url(
if not self._get_paginated_url(
'https://api.github.com/organizations',
'full_organizations',
self.config['max_organizations'],
)
organizations = []
):
self._log.error(
"Error fetching organizations"
)
self._log.debug(
"Obtained %s organizations",
len(full_organizations)
len(self.data['full_organizations'])
)
for organization in full_organizations:
for organization in self.data['full_organizations']:
if 'login' in organization:
organizations.append(organization['login'])
self.data['organizations'].append(organization['login'])
elif 'message' in organization:
self._log.debug(
"Incomplete list of organizations. %s",
@ -110,20 +145,15 @@ class GithubPackages:
# sys.exit(2)
self._log.debug(
"A total of %s organizations fetched",
len(organizations)
len(self.data['organizations'])
)
self.data['organizations_last_update'] = time.time()
self._save_cached_data(self.data)
for organization in organizations:
self.data['organizations'].append(organization)
try:
def _process_orgs_with_assets(self):
if self.data['orgs_with_assets']:
for organization in self.data['orgs_with_assets']:
self._process_organization(organization)
except Exception as error:
self._log.error(
"Error processing organization. Organization: %s. Error: %s",
organization,
error
)
def close(self, error=0):
'''Close class and save data'''
@ -147,17 +177,21 @@ class GithubPackages:
self.config['max_repos_per_org'],
organization
)
repositories = self._get_paginated_url(
if not self._get_paginated_url(
f"https://api.github.com/orgs/{organization}/repos",
'repositories',
self.config['max_repos_per_org']
)
):
self._log.error(
"Error fetching repositories"
)
self._log.debug(
"A total of %s repositories fetched",
len(repositories)
len(self.data['repositories'])
)
for repository in repositories:
for repository in self.data['repositories']:
try:
self._process_repository(repository)
self._process_repository(self.data['repository'])
except Exception as error:
self._log.error(
"Error processing repository '%s'. %s",
@ -170,16 +204,20 @@ class GithubPackages:
"Getting latest release of the repo '%s'...",
repository['name']
)
latest_release = self._get_paginated_url(
if not self._get_paginated_url(
f"{repository['url']}/releases/latest",
'latest_release',
max_items=1 # It should be onely one 'latest'
)
):
self._log.error(
"Error fetching latest release"
)
organization = repository['owner']
if (
'status' not in latest_release or
latest_release['status'] != '404'
'status' not in self.data['latest_release'] or
self.data['latest_release']['status'] != '404'
) and (
'assets' in latest_release
'assets' in self.data['latest_release']
):
# self._log.debug(
# "Latest release: %s",
@ -190,7 +228,7 @@ class GithubPackages:
# "A total of %s assets",
# len(self.data['assets'])
# )
if len(latest_release['assets']) > 0:
if len(self.data['latest_release']['assets']) > 0:
if organization['login'] not in self.data['orgs_with_assets']:
self.data['orgs_with_assets'].append(organization['login'])
if organization['login'] in self.data['orgs_without_assets']:
@ -203,7 +241,7 @@ class GithubPackages:
organization['login']
)
self.data['orgs_without_assets'].append(organization['login'])
for asset in latest_release['assets']:
for asset in self.data['latest_release']['assets']:
self._process_asset(asset)
self._save_cached_data(self.data)
@ -288,7 +326,7 @@ class GithubPackages:
}
self._new_request(result.headers)
def _get_paginated_url(self, url, max_items, payload=None):
def _get_paginated_url(self, url, data_field, max_items, payload=None):
if payload is None:
payload = self._default_payload
result = self.session.get(url, params=payload)
@ -320,6 +358,9 @@ class GithubPackages:
"Got a status field in the response. Response: %s",
items
)
self.data[data_field] = items
self.data[f"{data_field}_last_update"] = time.time()
self._save_cached_data(self.data)
page = 0
next_link = self._get_next_pagination_link(result.headers)
while next_link and len(items) < max_items:
@ -333,14 +374,17 @@ class GithubPackages:
result_data = result.json()
if 'status' not in result_data:
items += list(result_data)
self.data[data_field] += items
self.data[f"{data_field}_last_update"] = time.time()
self._save_cached_data(self.data)
else:
self._log.warning(
"Didn't obtain a successful response. Stopping fetching pages. %s",
result_data
)
return items
return True
next_link = self._get_next_pagination_link(result.headers)
return items
return True
def _get_next_pagination_link(self, headers):
if 'link' in headers: