import re
import os
import tempfile
import shutil
from contextlib import contextmanager
from crl.devutils.devpiindex import DevpiIndex
from crl.devutils._tmpindex import _TmpIndex
from crl.devutils.utils import get_randomstring
__copyright__ = 'Copyright (C) 2019, Nokia'
[docs]class DevpiHandler(object):
"""
A handler for handling all devpi related actions.
Args:
run: Reference to a function capable of running shell commands.
packagehandler: An instance of PackageHandler object
credentials_file: User login information in a file.
"""
_index_re = re.compile('^(http[s]*://[^/]+)/([^/]+/[^/]+)[/]*$')
_user_re = re.compile('^(http[s]*://[^/]+)/([^:]+):')
_user_index_re = re.compile('^([^/]+)/([^/]+)*$')
def __init__(self,
run,
packagehandler,
credentials_file=None):
self.run = run
self.packagehandler = packagehandler
self._index = None
self.credentials_file = credentials_file
self._clientdir = None
self._username = None
@property
def index(self):
"""
Currently used PyPI index.
"""
return self._index
[docs] def set_index(self, index):
"""
Set the given index as the current index.
Args:
index: Index, specified as http[s]://host/user/index.
Raises:
MalformedURL: If index URL is in wrong format.
"""
self._index = index
self._validate_index_url_format(index)
def _validate_index_url_format(self, index):
match = self._index_re.match(index)
if not match or len(match.groups()) < 2:
raise MalformedURL(
"URL '{index}' not in form"
" 'http[s]://host/user/indexname'".format(
index=index))
def _get_short_url(self, index):
short_name = index
try:
self._validate_index_url_format(index)
short_name = self._index_re.match(index).groups()[1]
except MalformedURL:
self._validate_user_index_url_format(index)
return short_name
def _get_long_url(self, index):
try:
self._validate_index_url_format(index)
long_name = index
except MalformedURL:
self._validate_user_index_url_format(index)
long_name = "{url}/{userindex}".format(
url=self.url, userindex=index)
return long_name
def _validate_user_index_url_format(self, index):
match = self._user_index_re.match(index)
if not match or len(match.groups()) < 2:
raise MalformedURL(
"URL '{index}' not in form"
" 'user/indexname'".format(
index=index))
def set_credentials_file(self, credentials_file):
self.credentials_file = credentials_file
[docs] def publish(self, index):
"""
Publish a version of package from current index to given index.
Args:
index: PYPI Index to publish to.
"""
with self._session():
self._publish(index)
[docs] def test(self, test_index=None):
"""
Test and upload results and docs to the given index.
If no index is specified, uses a temporary index.
Args:
test_index: Index name to use for testing, specified as NAME.
"""
with self._session():
self._test_via_tmpindex(test_index)
[docs] def create_index(self, name, baseindex, otherbase=None,
credentials_file=None):
"""
Create a new index for the current user with given bases.
Args:
name: Name of the index to create, specified as NAME.
baseindex: URL of index from which the new index will inherit.
otherbase: Name of other indices (specified as USER/NAME) to use.
credentials_file: File containing the users credentials.
"""
def _combined(base, otherbase=None):
base = base.split()
if otherbase:
base.extend(otherbase.split(','))
return ','.join(base)
self.set_index(baseindex)
if credentials_file:
self.set_credentials_file(credentials_file)
with self._session():
index = DevpiIndex(run=self.run,
baseindex=_combined(self.userindex, otherbase),
baseurl=self.url,
index_name=name,
username=self.username,
clientarg=self._clientarg)
if not index.index_exists():
index.create_index()
index.use_index()
[docs] def delete_index(self, index, credentials_file=None):
"""
Delete the given index.
Args:
index: Index to delete, specified as http[s]://host/user/index.
"""
self.set_index(index)
if credentials_file:
self.set_credentials_file(credentials_file)
with self._session():
indexname = self.userindex.split('/')[-1]
index = DevpiIndex(run=self.run,
baseindex=self.userindex,
baseurl=self.url,
index_name=indexname,
username=self.username,
clientarg=self._clientarg)
if index.index_exists():
index.delete_index()
@contextmanager
def _session(self):
try:
self._login()
yield None
finally:
self._logoff()
@property
def _clientarg(self):
return (' --clientdir {clientdir}'.format(clientdir=self.clientdir)
if self.credentials_file else '')
@property
def clientdir(self):
if not self._clientdir and self.credentials_file:
self._clientdir = self._get_tmpdir()
return self._clientdir
def _get_tmpdir(self):
return os.path.join(
tempfile.gettempdir(),
'{username}_{randomstring}'.format(
username=self.username,
randomstring=get_randomstring(length=10)))
@property
def username(self):
"""
Currently logged in username. If no user is logged in,
uses credentials from a file.
"""
if self.credentials_file:
self._username = self._get_credentials()[0]
else:
self._username = self._get_login_user()
return self._username
@property
def _password(self):
return self._get_credentials()[1].rstrip('\n')
def _get_credentials(self):
with open(self.credentials_file) as creds:
return creds.read().split(':', 1)
def _get_login_user(self):
return self._user_re.match(self.run('devpi user').stdout).groups()[1]
@property
def url(self):
"""
The base URL of the PYPI server, specified as http[s]://host
"""
return self._index_re.match(self.index).groups()[0]
@property
def userindex(self):
"""
Current user index, specified as user/indexname
"""
return self._index_re.match(self.index).groups()[1]
def _pip_search_pkg(self, pkgname, index):
return self.run('pip search {pkg_name} -i {index}'.format(
pkg_name=pkgname,
index=index if index.endswith('/') else index + '/'),
ignore_codes=[23]).stdout.split('\n')
[docs] def latest_pypi_version(self, pkg_name=None, pypi_index=None):
"""
Returns latest version of a package from the given index.
Args:
pkg_name: Name of the package, if different from the current package.
pypi_index: Name of the index, if different from the current index.
"""
version = ''
pkgname = pkg_name if pkg_name else self.packagehandler.name
index = self._get_long_url(pypi_index) if pypi_index else self.index
results = self._pip_search_pkg(pkgname, index)
for result in results:
pkg_details = result.split()
if pkg_details and pkgname in pkg_details[0].split('/'):
version = re.search(r'\((.*?)\)', pkg_details[1]).group(1)
return version
def _login(self):
self.run('devpi use {url}{clientarg}'.format(
url=self.url, clientarg=self._clientarg))
if self.credentials_file:
self.run(
['devpi', 'login', self.username, '--password', self._password,
'--clientdir', self.clientdir],
replaced_output='devpi login {username}'.format(
username=self.username),
shell=False)
def _logoff(self):
if self.credentials_file:
self.run('devpi logoff{clientarg}'.format(
clientarg=self._clientarg))
shutil.rmtree(self.clientdir)
def _publish(self, index):
short_index_name = self._get_short_url(index)
indexname = self.userindex.split('/')[1]
pkgspec = '{name}=={version}'.format(
name=self.packagehandler.name, version=self.packagehandler.version)
index = DevpiIndex(run=self.run,
baseindex=self.userindex,
baseurl=self.url,
index_name=indexname,
username=self.username,
clientarg=self._clientarg)
index.use_index()
index.push(pkgspec, short_index_name)
def _test_via_tmpindex(self, index):
with _TmpIndex(run=self.run,
packagehandler=self.packagehandler,
baseindex=self.userindex,
baseurl=self.url,
index_name=index,
username=self.username,
clientarg=self._clientarg) as tmpindex:
tmpindex.test()