Pipes

Generic

scanpipe.pipes.make_codebase_resource(project, location, save=True, **extra_fields)

Create a CodebaseResource instance in the database for the given project.

The provided location is the absolute path of this resource. It must be rooted in project.codebase_path as only the relative path within the project codebase/ directory is stored in the database.

Extra fields can be provided as keywords arguments to this function call:

make_codebase_resource(
    project=project,
    location=resource.location,
    rootfs_path=resource.path,
    tag=layer_tag,
)

In this example, rootfs_path is an optional path relative to a rootfs root within an Image/VM filesystem context. e.g.: “/var/log/file.log”

All paths use the POSIX separators.

If a CodebaseResource already exists in the project with the same path, the error raised on save() is not stored in the database and the creation is skipped.

scanpipe.pipes.get_resource_codebase_root(project, resource_path)

Return “to” or “from” depending on the resource location in the codebase.

scanpipe.pipes.yield_resources_from_codebase(project)

Yield CodebaseResource instances, including their info data, ready to be inserted in the database using save() or bulk_create().

scanpipe.pipes.collect_and_create_codebase_resources(project, batch_size=5000)

Collect and create codebase resources including the “to/” and “from/” context using the resource tag field.

The default batch_size can be overriden, although the benefits of a value greater than 5000 objects are usually not significant.

scanpipe.pipes.update_or_create_resource(project, resource_data)

Get, update or create a CodebaseResource then return it.

scanpipe.pipes.update_or_create_package(project, package_data, codebase_resources=None, is_virtual=False)

Get, update or create a DiscoveredPackage then return it. Use the project and package_data mapping to lookup and creates the DiscoveredPackage using its Package URL and package_uid as a unique key. The package can be associated to codebase_resources providing a list or queryset of resources.

scanpipe.pipes.create_local_files_package(project, defaults, codebase_resources=None)

Create a local-files package using provided defaults data.

scanpipe.pipes.ignore_dependency_scope(project, dependency_data)

Return True if the dependency should be ignored, i.e.: not created. The ignored scopes are defined on the project ignored_dependency_scopes setting field.

scanpipe.pipes.update_or_create_dependency(project, dependency_data, for_package=None, resolved_to_package=None, datafile_resource=None, datasource_id=None, strip_datafile_path_root=False)

Get, update or create a DiscoveredDependency then returns it. Use the project and dependency_data mapping to lookup and creates the DiscoveredDependency using its dependency_uid and for_package_uid as a unique key.

If strip_datafile_path_root is True, then DiscoveredDependency.create_from_data() will strip the root path segment from the datafile_path of dependency_data before looking up the corresponding CodebaseResource for datafile_path. This is used in the case where Dependency data is imported from a scancode-toolkit scan, where the root path segments are not stripped for datafile_path. If the dependency is resolved and a resolved package is created, we have the corresponding package_uid at resolved_to.

scanpipe.pipes.get_dependencies(project, dependency_data)

Given a dependency_data mapping, get a list of DiscoveredDependency objects for that project with similar dependency data.

scanpipe.pipes.get_or_create_relation(project, relation_data)

Get or create a CodebaseRelation then return it. The support for update is not useful as there is no fields on the model that could be updated.

scanpipe.pipes.normalize_path(path)

Return a normalized path from a path string.

scanpipe.pipes.strip_root(location)

Return the provided location without the root directory.

scanpipe.pipes.filename_now(sep='-')

Return the current date and time in iso format suitable for filename.

scanpipe.pipes.count_group_by(queryset, field_name)

Return a summary of all existing values for the provided field_name on the queryset, including the count of each entry, as a dictionary.

scanpipe.pipes.get_bin_executable(filename)

Return the location of the filename executable binary.

scanpipe.pipes.get_text_str_diff_ratio(str_a, str_b)

Return a similarity ratio as a float between 0 and 1 by comparing the text content of the str_a and str_b.

Return None if any of the two resources str is empty.

scanpipe.pipes.get_resource_diff_ratio(resource_a, resource_b)

Return a similarity ratio as a float between 0 and 1 by comparing the text content of the CodebaseResource resource_a and resource_b.

Return None if any of the two resources are not readable as text.

scanpipe.pipes.poll_until_success(check, sleep=10, **kwargs)

Given a function check, which returns the status of a run, return True when the run instance has completed successfully.

Return False when the run instance has failed, stopped, or gone stale.

The arguments for check need to be provided as keyword argument into this function.

ClamAV

scanpipe.pipes.clamav.scan_for_virus(project)

Run a ClamAV scan to detect virus infection. Create one Project error message per found virus and store the detection data on the related codebase resource extra_data field.

Codebase

scanpipe.pipes.codebase.get_resource_fields(resource, fields)

Return a mapping of fields from fields and values from resource

scanpipe.pipes.codebase.get_resource_tree(resource, fields, codebase=None, seen_resources={})

Return a tree as a dictionary structure starting from the provided resource.

The following classes are supported for the input resource object:
  • scanpipe.models.CodebaseResource

  • commoncode.resource.Resource

The data included for each child is controlled with the fields argument.

The codebase is only required in the context of a commoncode Resource input.

seen_resources is used when get_resource_tree() is used in the context of get_codebase_tree(). We keep track of child Resources we visit in seen_resources, so we don’t visit them again in get_codebase_tree().

scanpipe.pipes.codebase.get_codebase_tree(codebase, fields)

Return a tree as a dictionary structure starting from the root resources of the provided codebase.

The following classes are supported for the input codebase object:
  • scanpipe.pipes.codebase.ProjectCodebase

  • commoncode.resource.Codebase

  • commoncode.resource.VirtualCodebase

The data included for each child is controlled with the fields argument.

scanpipe.pipes.codebase.get_basic_virtual_codebase(resources_qs)

Return a VirtualCodebase created from CodebaseResources in resources_qs.

The only Resource fields that are populated are path, sha1, size, and is_file. This is intended for use with scanpipe.pipes.matchcode.fingerprint_codebase_directories

class scanpipe.pipes.codebase.ProjectCodebase(project)

Represents the codebase of a project stored in the database. A Codebase is a tree of Resources.

__init__(project)

Compliance

scanpipe.pipes.compliance.flag_compliance_files(project)

Flag compliance files status for the provided project.

scanpipe.pipes.compliance.analyze_compliance_licenses(project)

Scan compliance licenses status for the provided project.

CycloneDX

scanpipe.pipes.cyclonedx.resolve_license(license)

Return license expression/id/name from license item.

scanpipe.pipes.cyclonedx.get_declared_licenses(licenses)

Return resolved license from list of LicenseChoice.

scanpipe.pipes.cyclonedx.get_checksums(component)

Return dict of all the checksums from a component.

scanpipe.pipes.cyclonedx.get_external_references(component)

Return dict of reference urls from list of component.external_references.

scanpipe.pipes.cyclonedx.get_properties_data(component)

Return the properties as dict, extracted from component.properties.

scanpipe.pipes.cyclonedx.validate_document(document)

Check the validity of this CycloneDX document.

The validator is loaded from the document specVersion property.

scanpipe.pipes.cyclonedx.is_cyclonedx_bom(input_location)

Return True if the file at input_location is a CycloneDX BOM.

scanpipe.pipes.cyclonedx.cyclonedx_component_to_package_data(cdx_component, dependencies=None)

Return package_data from CycloneDX component.

scanpipe.pipes.cyclonedx.get_components(bom)

Return components from CycloneDX BOM except for the metadata.component.

scanpipe.pipes.cyclonedx.delete_ignored_root_properties(cyclonedx_document_json)

Remove root properties from the CycloneDX document that are irrelevant when loading SBOM component data as packages.

This function aims to maximize compatibility by excluding unsupported SPEC definitions while utilizing the cyclonedx-python-lib library.

The data contained in these properties is unnecessary for loading components from the SBOM and can be safely disregarded.

https://github.com/CycloneDX/cyclonedx-python-lib/issues/578

scanpipe.pipes.cyclonedx.cleanup_components_properties(cyclonedx_document_json)

Remove entries for which no values are set, such as {"name": ""} or "licenses":[{}].

Also remove the properties that are not used in the context of loading packages from SBOM and that may be unsupported by the cyclonedx-python-lib library.

Class like cyclonedx.model.contact.OrganizationalEntity raise a NoPropertiesProvidedException while it is not enforced in the spec.

See https://github.com/CycloneDX/cyclonedx-python-lib/issues/600

scanpipe.pipes.cyclonedx.get_bom_instance_from_file(input_location)

Return a Bom instance from the input_location CycloneDX document file.

scanpipe.pipes.cyclonedx.resolve_cyclonedx_packages(input_location)

Resolve the packages from the input_location CycloneDX document file.

Deploy to develop

scanpipe.pipes.d2d.get_inputs(project)

Locate the from and to input files in project inputs/ directory. The input source can be flagged using a “from-” / “to-” prefix in the filename or by adding a “#from” / “#to” fragment at the end of the download URL.

scanpipe.pipes.d2d.get_extracted_path(resource)

Return the -extract/ extracted path of provided resource.

scanpipe.pipes.d2d.get_extracted_subpath(path)

Return the path segments located after the last -extract/ segment.

scanpipe.pipes.d2d.get_best_path_matches(to_resource, matches)

Return the best matches for the provided to_resource.

scanpipe.pipes.d2d.get_from_files_for_scanning(resources)

Return resources in the “from/” side which has been mapped to the “to/” side, but are not mapped using ABOUT files.

scanpipe.pipes.d2d.map_checksum(project, checksum_field, logger=None)

Map using checksum.

scanpipe.pipes.d2d.map_java_to_class(project, logger=None)

Map to/ compiled Java .class(es) to from/ .java source using Java fully qualified paths and indexing from/ .java files.

scanpipe.pipes.d2d.get_indexable_qualified_java_paths_from_values(resource_values)

Yield tuples of (resource id, fully-qualified Java path) for indexable classes from a list of resource_data tuples of “from/” side of the project codebase.

These resource_data input tuples are in the form:

(resource.id, resource.name, resource.extra_data)

And the output tuples look like this example::

(123, “org/apache/commons/LoggerImpl.java”)

scanpipe.pipes.d2d.get_indexable_qualified_java_paths(from_resources_dot_java)

Yield tuples of (resource id, fully-qualified Java class name) for indexable classes from the “from/” side of the project codebase using the “java_package” Resource.extra_data.

scanpipe.pipes.d2d.find_java_packages(project, logger=None)

Collect the Java packages of Java source files for a project.

Multiprocessing is enabled by default on this pipe, the number of processes can be controlled through the SCANCODEIO_PROCESSES setting.

Note: we use the same API as the ScanCode scans by design

scanpipe.pipes.d2d.scan_for_java_package(location, with_threading=True)

Run a Java package scan on provided location.

Return a dict of scan results and a list of errors.

scanpipe.pipes.d2d.save_java_package_scan_results(codebase_resource, scan_results, scan_errors)

Save the resource Java package scan results in the database as Resource.extra_data. Create project errors if any occurred during the scan.

scanpipe.pipes.d2d.map_jar_to_source(project, logger=None)

Map .jar files to their related source directory.

scanpipe.pipes.d2d.map_path(project, logger=None)

Map using path suffix similarities.

scanpipe.pipes.d2d.get_project_resources_qs(project, resources)

Return a queryset of CodebaseResources from project containing the CodebaseResources from resources . If a CodebaseResource in resources is an archive or directory, then their descendants are also included in the queryset.

Return None if resources is empty or None.

scanpipe.pipes.d2d.create_package_from_purldb_data(project, resources, package_data, status)

Create a DiscoveredPackage instance from PurlDB package_data.

Return a tuple, containing the created DiscoveredPackage and the number of CodebaseResources matched to PurlDB that are part of that DiscoveredPackage.

scanpipe.pipes.d2d.match_purldb_package(project, resources_by_sha1, enhance_package_data=True, **kwargs)

Given a mapping of lists of CodebaseResources by their sha1 values, resources_by_sha1, send those sha1 values to purldb packages API endpoint, process the matched Package data, then return the number of CodebaseResources that were matched to a Package.

scanpipe.pipes.d2d.match_purldb_resource(project, resources_by_sha1, package_data_by_purldb_urls=None, **kwargs)

Given a mapping of lists of CodebaseResources by their sha1 values, resources_by_sha1, send those sha1 values to purldb resources API endpoint, process the matched Package data, then return the number of CodebaseResources that were matched to a Package.

package_data_by_purldb_urls is a mapping of package data by their purldb package instance URLs. This is intended to be used as a cache, to avoid retrieving package data we retrieved before.

scanpipe.pipes.d2d.match_purldb_directory(project, resource)

Match a single directory resource in the PurlDB.

scanpipe.pipes.d2d.match_sha1s_to_purldb(project, resources_by_sha1, matcher_func, package_data_by_purldb_urls)

Process resources_by_sha1 with matcher_func and return a 3-tuple contaning an empty defaultdict(list), the number of matches and the number of sha1s sent to purldb.

scanpipe.pipes.d2d.match_purldb_resources(project, extensions, matcher_func, chunk_size=1000, logger=None)

Match against PurlDB selecting codebase resources using provided package_extensions for archive type files, and resource_extensions.

Match requests are sent off in batches of 1000 SHA1s. This number is set using chunk_size.

scanpipe.pipes.d2d.match_purldb_directories(project, logger=None)

Match against PurlDB selecting codebase directories.

scanpipe.pipes.d2d.map_javascript(project, logger=None)

Map a packed or minified JavaScript, TypeScript, CSS and SCSS to its source.

class scanpipe.pipes.d2d.AboutFileIndexes(regex_by_about_path: dict, ignore_regex_by_about_path: dict, about_resources_by_path: dict, about_pkgdata_by_path: dict, mapped_resources_by_aboutpath: dict)

About file indexes are used to create packages from About files and map the resources described in them to the respective packages created, using regex path patterns and other About file data.

classmethod create_indexes(project, from_about_files, logger=None)

Return an ABOUT file index, containing path pattern mappings, package data, and resources, created from from_about_files, the About file resources.

get_matched_about_path(to_resource)

Map to_resource using the about file index, and if mapped, return the path string to the About file it was mapped to, and if not mapped or ignored, return None.

map_deployed_to_devel_using_about(to_resources)

Return mapped resources which are mapped using the path patterns in About file indexes. Resources are mapped for each About file in the index, and their status is updated accordingly.

get_about_file_companions(about_path)

Given an about_path path string to an About file, get CodebaseResource objects for the companion license and notice files.

create_about_packages_relations(project)

Create packages using About file package data, if the About file has mapped resources on the to/ codebase and creates the mappings for the package created and mapped resources.

__init__(regex_by_about_path: dict, ignore_regex_by_about_path: dict, about_resources_by_path: dict, about_pkgdata_by_path: dict, mapped_resources_by_aboutpath: dict) None
scanpipe.pipes.d2d.map_about_files(project, logger=None)

Map from/ .ABOUT files to their related to/ resources.

scanpipe.pipes.d2d.map_javascript_post_purldb_match(project, logger=None)

Map minified javascript file based on existing PurlDB match.

scanpipe.pipes.d2d.map_javascript_path(project, logger=None)

Map javascript file based on path.

scanpipe.pipes.d2d.map_javascript_colocation(project, logger=None)

Map JavaScript files based on neighborhood file mapping.

scanpipe.pipes.d2d.flag_processed_archives(project)

Flag package archives as processed if they meet the following criteria:

  1. They have no assigned status.

  2. They are identified as package archives.

  3. All resources inside the corresponding archive ‘-extract’ directory have an assigned status.

This function iterates through the package archives in the project and checks whether all resources within their associated ‘-extract’ directory have statuses. If so, it updates the status of the package archive to “archive-processed”.

scanpipe.pipes.d2d.map_thirdparty_npm_packages(project, logger=None)

Map thirdparty package using package.json metadata.

Return from-side resource files that have one or more relations with to-side resources that are not part of a package. Only resources with a detected_license_expression value are returned.

scanpipe.pipes.d2d.create_local_files_packages(project)

Create local-files packages for codebase resources not part of a package.

Resources are grouped by license_expression within a local-files packages.

scanpipe.pipes.d2d.match_resources_with_no_java_source(project, logger=None)

Match resources with no-java-source to PurlDB, if no match is found update status to requires-review.

scanpipe.pipes.d2d.match_unmapped_resources(project, matched_extensions=None, logger=None)

Match resources with empty status to PurlDB, if unmatched update status as requires-review.

scanpipe.pipes.d2d.flag_undeployed_resources(project)

Update status for undeployed files.

scanpipe.pipes.d2d.scan_unmapped_to_files(project, logger=None)

Scan unmapped/matched to/ files for copyrights, licenses, emails, and urls and update the status to requires-review.

scanpipe.pipes.d2d.flag_deployed_from_resources_with_missing_license(project, doc_extensions=None)

Update the status for deployed from files with missing license.

Scan the legal files with empty status and update status to REVIEW_DANGLING_LEGAL_FILE.

Save the legal resource scan results with REVIEW_DANGLING_LEGAL_FILE status in the database. Create project errors if any occurred during the scan.

scanpipe.pipes.d2d.flag_whitespace_files(project)

Flag whitespace files with size less than or equal to 100 byte as ignored.

scanpipe.pipes.d2d.match_purldb_resources_post_process(project, logger=None)

Choose the best package for PurlDB matched resources.

scanpipe.pipes.d2d.map_paths_resource(to_resource, from_resources, from_resources_index, map_types, logger=None)

Map paths found in the to_resource extra_data to paths of the from_resources CodebaseResource queryset using the precomputed from_resources_index path index.

scanpipe.pipes.d2d.process_paths_in_binary(to_resource, from_resources, from_resources_index, map_type, paths_in_binary)

Process list of paths in binary and Yield either: - a tuple of (unique key for a relationship, CodebaseRelation object) - Or a path if it was not mapped

scanpipe.pipes.d2d.count_path_segments(path)

Return the number of path segments in POSIX path string

scanpipe.pipes.d2d.sort_matched_from_resources(matched_from_resources)

Return the sorted list of matched_from_resources based on path length and path.

scanpipe.pipes.d2d.is_invalid_match(match, matched_path_length)

Check if the match is invalid based on the matched_path_length and the number of resource IDs.

scanpipe.pipes.d2d.map_elfs(project, logger=None)

Map ELF binaries to their sources in project.

scanpipe.pipes.d2d.get_elf_file_dwarf_paths(location)

Retrieve dwarf paths for ELF files.

scanpipe.pipes.d2d.get_go_file_paths(location)

Retrieve Go file paths.

scanpipe.pipes.d2d.map_go_paths(project, logger=None)

Map Go binaries to their source in project.

Docker

scanpipe.pipes.docker.get_tarballs_from_inputs(project)

Return the tarballs from the project input/ work directory. Supported file extensions: .tar, .tar.gz, .tgz.

scanpipe.pipes.docker.extract_images_from_inputs(project)

Collect all the tarballs from the project input/ work directory, extracts each tarball to the tmp/ work directory and collects the images.

Return the images and an errors list of error messages that may have happened during the extraction.

scanpipe.pipes.docker.extract_image_from_tarball(input_tarball, extract_target, verify=True)

Extract images from an input_tarball to an extract_target directory Path object and collects the extracted images.

Return the images and an errors list of error messages that may have happened during the extraction.

scanpipe.pipes.docker.extract_layers_from_images(project, images)

Extract all layers from the provided images into the project codebase work directory.

Return an errors list of error messages that may occur during the extraction.

scanpipe.pipes.docker.extract_layers_from_images_to_base_path(base_path, images)

Extract all layers from the provided images into the base_path work directory.

Return an errors list of error messages that may occur during the extraction.

scanpipe.pipes.docker.get_image_data(image, layer_path_segments=2)

Return a mapping of image-related data given an image. Keep only layer_path_segments trailing layer location segments (or keep the locations unmodified if layer_path_segments is 0).

scanpipe.pipes.docker.get_layer_tag(image_id, layer_id, layer_index, id_length=6)

Return a “tag” crafted from the provided image_id, layer_id, and layer_index. The purpose of this tag is to be short, clear and sortable.

For instance, given an image with an id:

785df58b6b3e120f59bce6cd10169a0c58b8837b24f382e27593e2eea011a0d8

and two layers from bottom to top as:

0690c89adf3e8c306d4ced085fc16d1d104dcfddd6dc637e141fa78be242a707 7a1d89d2653e8e4aa9011fd95034a4857109d6636f2ad32df470a196e5dd1585

we would get these two tags:

img-785df5-layer-01-0690c8 img-785df5-layer-02-7a1d89

scanpipe.pipes.docker.create_codebase_resources(project, image)

Create the CodebaseResource for an image in a project.

scanpipe.pipes.docker.create_system_package(project, purl, package, layer, layer_tag)

Create system package and related resources.

scanpipe.pipes.docker.scan_image_for_system_packages(project, image)

Given a project and an image - this scans the image layer by layer for installed system packages and creates a DiscoveredPackage for each.

Then for each installed DiscoveredPackage file, check if it exists as a CodebaseResource. If exists, relate that CodebaseResource to its DiscoveredPackage; otherwise, keep that as a missing file.

scanpipe.pipes.docker.flag_whiteout_codebase_resources(project)

Tag overlayfs/AUFS whiteout special files CodebaseResource as “ignored-whiteout”. See https://github.com/opencontainers/image-spec/blob/master/layer.md#whiteouts for details.

class scanpipe.pipes.docker.Layer(layer_tag, created_by, layer_id, image_id, created, size, author, comment, archive_location)
archive_location

Alias for field number 8

author

Alias for field number 6

comment

Alias for field number 7

created

Alias for field number 4

created_by

Alias for field number 1

image_id

Alias for field number 3

layer_id

Alias for field number 2

layer_tag

Alias for field number 0

size

Alias for field number 5

scanpipe.pipes.docker.get_layers_data(project)

Get list of structured layers data from project extra_data field.

ELF

scanpipe.pipes.elf.collect_dwarf_source_path_references(resource)

Collect and store the DWARF debug paths of the provided ELF resource.

Fetch

scanpipe.pipes.fetch.run_command_safely(command_args)

Execute the external commands following security best practices.

This function is using the subprocess.run function which simplifies running external commands. It provides a safer and more straightforward API compared to older methods like subprocess.Popen.

WARNING: Please note that the –option=value syntax is required for args entries, and not the –option value format.

  • This does not use the Shell (shell=False) to prevent injection vulnerabilities.

  • The command should be provided as a list of command_args arguments.

  • Only full paths to executable commands should be provided to avoid any ambiguity.

WARNING: If you’re incorporating user input into the command, make sure to sanitize and validate the input to prevent any malicious commands from being executed.

Raise a SubprocessError if the exit code was non-zero.

scanpipe.pipes.fetch.get_request_session(uri)

Return a Requests session setup with authentication and headers.

scanpipe.pipes.fetch.fetch_http(uri, to=None)

Download a given uri in a temporary directory and return the directory’s path.

exception scanpipe.pipes.fetch.FetchDockerImageError
scanpipe.pipes.fetch.get_docker_image_platform(docker_url)

Return a platform mapping of a docker reference. If there are more than one, return the first one by default.

scanpipe.pipes.fetch.fetch_docker_image(docker_url, to=None)

Fetch a docker image from the provided Docker image docker_url, using the “docker://reference” URL syntax. Return a Download object.

Docker references are documented here: https://github.com/containers/skopeo/blob/0faf16017/docs/skopeo.1.md#image-names

scanpipe.pipes.fetch.fetch_git_repo(url, to=None)

Fetch provided git url as a clone and return a Download object.

scanpipe.pipes.fetch.get_fetcher(url)

Return the fetcher function based on the provided url scheme.

scanpipe.pipes.fetch.fetch_url(url)

Fetch provided url and returns the result as a Download object.

scanpipe.pipes.fetch.fetch_urls(urls)

Fetch provided urls list. The urls can also be provided as a string containing one URL per line. Return the fetched URLs as downloads objects and a list of errors.

scanpipe.pipes.fetch.check_urls_availability(urls)

Check the accessibility of a list of URLs.

Flag

scanpipe.pipes.flag.flag_empty_files(project)

Flag empty files as ignored.

scanpipe.pipes.flag.flag_ignored_directories(project)

Flag directories as ignored.

scanpipe.pipes.flag.flag_ignored_patterns(project, patterns)

Flag codebase resource as ignored status from list of patterns.

scanpipe.pipes.flag.analyze_scanned_files(project)

Set the status for CodebaseResource to unknown or no license.

scanpipe.pipes.flag.flag_not_analyzed_codebase_resources(project)

Flag codebase resource as not-analyzed.

scanpipe.pipes.flag.flag_mapped_resources(project)

Flag all codebase resources that were mapped during the d2d pipeline.

Input

scanpipe.pipes.input.copy_input(input_location, dest_path)

Copy the input_location (file or directory) to the dest_path.

scanpipe.pipes.input.copy_inputs(input_locations, dest_path)

Copy the provided input_locations to the dest_path.

scanpipe.pipes.input.move_input(input_location, dest_path)

Move the provided input_location to the dest_path.

scanpipe.pipes.input.move_inputs(inputs, dest_path)

Move the provided inputs to the dest_path.

scanpipe.pipes.input.get_tool_name_from_scan_headers(scan_data)

Return the tool_name of the first header in the provided scan_data.

scanpipe.pipes.input.is_archive(location)

Return True if the file at location is an archive.

scanpipe.pipes.input.load_inventory_from_toolkit_scan(project, input_location)

Create packages, dependencies, and resources loaded from the ScanCode-toolkit scan results located at input_location.

scanpipe.pipes.input.load_inventory_from_scanpipe(project, scan_data)

Create packages, dependencies, resources, and relations loaded from a ScanCode.io JSON output provided as scan_data.

scanpipe.pipes.input.get_worksheet_data(worksheet)

Return the data from provided worksheet as a list of dict.

scanpipe.pipes.input.clean_xlsx_field_value(model_class, field_name, value)

Clean the value for compatibility with the database model_class.

scanpipe.pipes.input.clean_xlsx_data_to_model_data(model_class, xlsx_data)

Clean the xlsx_data for compatibility with the database model_class.

scanpipe.pipes.input.load_inventory_from_xlsx(project, input_location)

Create packages, dependencies, resources, and relations loaded from XLSX file located at input_location.

JS

scanpipe.pipes.js.is_source_mapping_in_minified(resource, map_file_name)

Return True if a string contains a source mapping in its last 5 lines.

scanpipe.pipes.js.sha1(content)

Calculate the SHA-1 hash of a string.

scanpipe.pipes.js.source_content_sha1_list(map_file)

Return list containing sha1 of sourcesContent.

scanpipe.pipes.js.load_json_from_file(location)

Return the deserialized json content from location.

scanpipe.pipes.js.get_map_sources(map_file)

Return source paths from a map file.

scanpipe.pipes.js.get_map_sources_content(map_file)

Return sources contents from a map file.

scanpipe.pipes.js.get_minified_resource(map_resource, minified_resources)

Return the corresponding minified_resource given a map_resource Resource object and a minified_resources query set of minified JS Resource. Return None if it cannot be found.

scanpipe.pipes.js.get_js_map_basename_and_extension(filename)

Return a 2-tuple pf (basename, extension) of a JavaScript/TypeScript related file. Return None otherwise.

JVM

Support for JVM-specific file formats such as .class and .java files.

scanpipe.pipes.jvm.get_java_package(location, java_extensions=('.java',), **kwargs)

Return a Java package as a mapping with a single “java_package” key, or None from the .java source code file at location.

Only look at files with an extension in the java_extensions tuple.

Note: this is the same API as a ScanCode Toolkit API scanner function by design.

scanpipe.pipes.jvm.find_java_package(lines)

Return a mapping of {'java_package': <value>} or None from an iterable or text lines.

For example:

>>> lines = ["   package    foo.back ;  # dsasdasdasdasdasda.asdasdasd"]
>>> assert find_java_package(lines) == {"java_package": "foo.back"}
scanpipe.pipes.jvm.get_normalized_java_path(path)

Return a normalized .java file path for path .class file path string. Account for inner classes in that their .java file name is the name of their outer class.

For example:

>>> get_normalized_java_path("foo/org/common/Bar$inner.class")
'foo/org/common/Bar.java'
>>> get_normalized_java_path("foo/org/common/Bar.class")
'foo/org/common/Bar.java'
scanpipe.pipes.jvm.get_fully_qualified_java_path(java_package, filename)

Return a fully qualified java path of a .java filename in a java_package string. Note that we use “/” as path separators.

For example:

>>> get_fully_qualified_java_path("org.common" , "Bar.java")
'org/common/Bar.java'

MatchCode

exception scanpipe.pipes.matchcode.MatchCodeIOException
scanpipe.pipes.matchcode.is_configured()

Return True if the required MatchCode.io settings have been set.

scanpipe.pipes.matchcode.is_available()

Return True if the configured MatchCode.io server is available.

scanpipe.pipes.matchcode.request_get(url, payload=None, timeout=60)

Wrap the HTTP request calls on the API.

scanpipe.pipes.matchcode.save_directory_fingerprints(project, virtual_codebase, to_codebase_only=False)

Save directory fingerprints from directory Resources in virtual_codebase to the directory CodebaseResources from project that have the same path.

If to_codebase_only is True, then we are only saving the directory fingerprints for directories from the to/ codebase of a d2d project.

scanpipe.pipes.matchcode.fingerprint_codebase_directories(project, to_codebase_only=False)

Compute directory fingerprints for the directories from project.

These directory fingerprints are used for matching purposes on matchcode.

If to_codebase_only is True, the only directories from the to/ codebase are computed.

scanpipe.pipes.matchcode.fingerprint_codebase_resource(location, with_threading=True, **kwargs)

Compute fingerprints for the resource at location using the scancode-toolkit direct API.

Return a dictionary of scan results and a list of errors.

scanpipe.pipes.matchcode.save_resource_fingerprints(resource, scan_results, scan_errors=None)

Save computed fingerprints from scan_results to resource.extra_data. Create project errors if any occurred during the scan.

scanpipe.pipes.matchcode.fingerprint_codebase_resources(project, resource_qs=None, progress_logger=None, to_codebase_only=False)

Compute fingerprints for the resources from project.

These resource fingerprints are used for matching purposes on matchcode.

Multiprocessing is enabled by default on this pipe, the number of processes can be controlled through the SCANCODEIO_PROCESSES setting.

If to_codebase_only is True, the only resources from the to/ codebase are computed.

scanpipe.pipes.matchcode.send_project_json_to_matchcode(project, timeout=60, api_url=None)

Given a project, create a JSON scan of the project CodebaseResources and send it to MatchCode.io for matching. Return a tuple containing strings of the url to the particular match run and the url to the match results.

scanpipe.pipes.matchcode.get_run_url_status(run_url, **kwargs)

Given a run_url, which is a URL to a ScanCode.io Project run, return its status, otherwise return None.

scanpipe.pipes.matchcode.poll_run_url_status(run_url, sleep=10)

Given a URL to a scancode.io run instance, run_url, return True when the run instance has completed successfully.

Raise a MatchCodeIOException when the run instance has failed, stopped, or gone stale.

scanpipe.pipes.matchcode.create_match_results_url(match_url)

Given the match_url for a project running the matchcode matching pipeline, return the match results URL from match_url.

scanpipe.pipes.matchcode.get_match_results(match_url)

Given the match_url for a project running the matchcode matching pipeline, return the match results.

scanpipe.pipes.matchcode.map_match_results(match_results)

Given match_results, which is a mapping of ScanCode.io codebase results, return a defaultdict(list) where the keys are the package_uid of matched packages and the value is a list containing the paths of Resources associated with the package_uid.

scanpipe.pipes.matchcode.create_packages_from_match_results(project, match_results)

Given match_results, which is a mapping of ScanCode.io codebase results, use the Package data from it to create DiscoveredPackages for project and associate the proper Resources of project to the DiscoveredPackages.

Output

scanpipe.pipes.output.safe_filename(filename)

Convert the provided filename to a safe filename.

scanpipe.pipes.output.get_queryset(project, model_name)

Return a consistent QuerySet for all supported outputs (json, xlsx, csv, …)

scanpipe.pipes.output.queryset_to_csv_file(queryset, fieldnames, output_file)

Output csv content generated from the provided queryset objects to the output_file. The fields to be included as columns and their order are controlled by the fieldnames list.

scanpipe.pipes.output.queryset_to_csv_stream(queryset, fieldnames, output_stream)

Output csv content generated from the provided queryset objects to the output_stream. The fields to be included as columns and their order are controlled by the fieldnames list.

scanpipe.pipes.output.to_csv(project)

Generate output for the provided project in csv format. Since the csv format does not support multiple tabs, one file is created per object type. The output files are created in the project output/ directory. Return a list of paths of the generated output files.

scanpipe.pipes.output.to_json(project)

Generate output for the provided project in JSON format. The output file is created in the project output/ directory. Return the path of the generated output file.

scanpipe.pipes.output.queryset_to_xlsx_worksheet(queryset, workbook, exclude_fields=())

Add a new worksheet to the workbook xlsxwriter.Workbook using the queryset. The queryset “model_name” is used as a name for the “worksheet”. Exclude fields listed in the exclude_fields sequence of field names.

Add an extra trailing “xlsx_errors” column with conversion error messages if any. Return a number of conversion errors.

scanpipe.pipes.output.to_xlsx(project)

Generate output for the provided project in XLSX format. The output file is created in the project “output/” directory. Return the path of the generated output file.

Note that the XLSX worksheets contain each an extra “xlsx_errors” column with possible error messages for a row when converting the data to XLSX exceed the limits of what can be stored in a cell.

scanpipe.pipes.output.to_spdx(project, include_files=False)

Generate output for the provided project in SPDX document format. The output file is created in the project “output/” directory. Return the path of the generated output file.

scanpipe.pipes.output.get_cyclonedx_bom(project)

Return a CycloneDX Bom object filled with provided project data. See https://cyclonedx.org/use-cases/#dependency-graph

scanpipe.pipes.output.sort_bom_with_schema_ordering(bom_as_dict, schema_version)

Sort the bom_as_dict using the ordering from the schema_version.

scanpipe.pipes.output.to_cyclonedx(project, version='1.6')

Generate output for the provided project in CycloneDX BOM format. The output file is created in the project “output/” directory. Return the path of the generated output file.

scanpipe.pipes.output.render_template(template_string, context)

Render a Django template_string using the context dict.

scanpipe.pipes.output.render_template_file(template_location, context)

Render a Django template at template_location using the context dict.

scanpipe.pipes.output.get_attribution_template(project)

Return a custom attribution template if provided or the default one.

scanpipe.pipes.output.make_unknown_license_object(license_symbol)

Return a License object suitable for the provided license_symbol, that is representing a license key unknown by the current toolkit licensed index.

scanpipe.pipes.output.get_package_expression_symbols(parsed_expression)

Return the list of license_symbols contained in the parsed_expression. Since unknown license keys are missing a License set in the wrapped attribute, a special “unknown” License object is injected.

scanpipe.pipes.output.get_package_data_for_attribution(package, licensing)

Convert the package instance into a dictionary of values usable during attribution generation.

scanpipe.pipes.output.get_unique_licenses(packages)

Return a list of unique License symbol objects preserving ordering. Return an empty list if the packages do not have licenses.

Replace by the following one-liner once this toolkit issues is fixed: https://github.com/aboutcode-org/scancode-toolkit/issues/3425 licenses = set(license for package in packages for license in package[“licenses”])

scanpipe.pipes.output.to_attribution(project)

Generate attribution for the provided project. The output file is created in the project “output/” directory. Return the path of the generated output file.

Custom template can be provided in the codebase/.scancode/templates/attribution.html location.

The model instances are converted into data dict to prevent any data leak as the attribution template is customizable.

PathMap

class scanpipe.pipes.pathmap.Match(matched_path_length, resource_ids)
matched_path_length: int

Alias for field number 0

resource_ids: list

Alias for field number 1

scanpipe.pipes.pathmap.find_paths(path, index)

Return a Match for the longest paths matched in the index automaton for a POSIX path string. Return None if there is not matching paths found.

scanpipe.pipes.pathmap.build_index(resource_id_and_paths, with_subpaths=True)

Return an index (an index) built from a resource_id_and_paths iterable of tuples of (resource_id int, resource_path string).

If with_subpaths` is True, index all suffixes of the paths, other index and match only each complete path.

For example, for the path “samples/JGroups/src/RouterStub.java”, the suffixes are:

samples/JGroups/src/RouterStub.java
JGroups/src/RouterStub.java
src/RouterStub.java

RouterStub.java

scanpipe.pipes.pathmap.add_path(resource_id, segments, segments_count, index)

Add the resource_id path represented by its list of reversed path segments with segments_count segments to the index automaton.

scanpipe.pipes.pathmap.add_subpaths(resource_id, segments, segments_count, index)

Add all the resource_id subpaths “suffixes” of the resource path as represented by its list of reversed path segments with segments_count segments to the index automaton.

scanpipe.pipes.pathmap.get_reversed_path_segments(path)

Return reversed segments list given a POSIX path string. We reverse based on path segments separated by a “/”.

Note that the inputh path is assumed to be normalized, not relative and not containing double slash.

For example:: >>> assert get_reversed_path_segments(“a/b/c.js”) == [“c.js”, “b”, “a”]

scanpipe.pipes.pathmap.convert_segments_to_path(segments)

Return a path string is suitable for indexing or matching given a segments sequence of path segment strings. The resulting reversed path is prefixed and suffixed by a “/” irrespective of whether the original path is a file or directory and had such prefix or suffix.

For example:: >>> assert convert_segments_to_path([“c.js”, “b”, “a”]) == “/c.js/b/a/”

PurlDB

exception scanpipe.pipes.purldb.PurlDBException
scanpipe.pipes.purldb.is_configured()

Return True if the required PurlDB settings have been set.

scanpipe.pipes.purldb.is_available()

Return True if the configured PurlDB server is available.

scanpipe.pipes.purldb.check_service_availability(*args)

Check if the PurlDB service if configured and available.

scanpipe.pipes.purldb.request_get(url, payload=None, timeout=60, raise_on_error=False)

Wrap the HTTP request calls on the API.

scanpipe.pipes.purldb.collect_response_results(response, data, timeout=60)

Return all results from a purldb API response.

scanpipe.pipes.purldb.match_packages(sha1_list, enhance_package_data=False, timeout=60, api_url=None)

Match a list of SHA1 in the PurlDB for package-type files.

If enhance_package_data is True, then purldb will enhance Package data for matched Packages, if possible.

scanpipe.pipes.purldb.match_resources(sha1_list, timeout=60, api_url=None)

Match a list of SHA1 in the PurlDB for resource files.

scanpipe.pipes.purldb.match_directory(fingerprint, timeout=60, api_url=None)

Match directory content fingerprint in the PurlDB for a single directory resource.

scanpipe.pipes.purldb.submit_purls(packages, timeout=60, api_url=None)

Submit list of dict where each dict has either resolved PURL i.e. PURL with version or version-less PURL along with vers range to PurlDB for indexing.

scanpipe.pipes.purldb.feed_purldb(packages, chunk_size, logger=<bound method Logger.info of <Logger scanpipe.pipes.purldb (INFO)>>)

Feed PurlDB with list of PURLs for indexing.

scanpipe.pipes.purldb.get_unique_resolved_purls(project)

Return PURLs from project’s resolved DiscoveredDependencies.

scanpipe.pipes.purldb.get_unique_unresolved_purls(project)

Return PURLs from project’s unresolved DiscoveredDependencies.

scanpipe.pipes.purldb.populate_purldb_with_discovered_packages(project, logger=<bound method Logger.info of <Logger scanpipe.pipes.purldb (INFO)>>)

Add DiscoveredPackage to PurlDB.

scanpipe.pipes.purldb.populate_purldb_with_discovered_dependencies(project, logger=<bound method Logger.info of <Logger scanpipe.pipes.purldb (INFO)>>)

Add DiscoveredDependency to PurlDB.

scanpipe.pipes.purldb.find_packages(payload)

Get Packages using provided payload filters on the PurlDB package list.

scanpipe.pipes.purldb.get_packages_for_purl(package_url)

Get Package details entries providing a package_url.

scanpipe.pipes.purldb.get_next_download_url(timeout=60, api_url=None)

Return the ScannableURI UUID, download URL, and pipelines for the next Package to be scanned from PurlDB

Return None if the request was not successful

scanpipe.pipes.purldb.update_status(scannable_uri_uuid, status, scan_log='', timeout=60, api_url=None)

Update the status of a ScannableURI on a PurlDB scan queue

scanpipe.pipes.purldb.create_project_name(download_url, scannable_uri_uuid)

Create a project name from download_url and scannable_uri_uuid

scanpipe.pipes.purldb.check_project_run_statuses(project, logger=None)

If any of the runs of this Project has failed, stopped, or gone stale, update the status of the Scannable URI associated with this Project to failed and send back a log of the failed runs.

scanpipe.pipes.purldb.get_run_status(run, **kwargs)

Refresh the values of run and return its status

scanpipe.pipes.purldb.enrich_package(package)

Enrich the provided package with the PurlDB data.

scanpipe.pipes.purldb.enrich_discovered_packages(project, logger=<bound method Logger.info of <Logger scanpipe.pipes.purldb (INFO)>>)

Enrich all project discovered packages with the PurlDB data.

Resolve

scanpipe.pipes.resolve.resolve_manifest_resources(resource, package_registry)

Get package data from resource.

scanpipe.pipes.resolve.get_packages(project, package_registry, manifest_resources, model=None)

Get package data from package manifests/lockfiles/SBOMs or get package data for resolved packages from package requirements.

scanpipe.pipes.resolve.create_packages_and_dependencies(project, packages, resolved=False)

Create DiscoveredPackage and DiscoveredDependency objects for packages detected in a package manifest, lockfile or SBOM.

If resolved, create packages out of resolved dependencies, otherwise create dependencies.

scanpipe.pipes.resolve.create_dependencies_from_packages_extra_data(project)

Create Dependency objects from the Package extra_data values. The Package instances need to be saved first in the database before creating the Dependency objects. The dependencies declared in the SBOM are stored on the Package.extra_data field and resolved as Dependency objects in this function.

scanpipe.pipes.resolve.get_packages_from_manifest(input_location, package_registry=None)

Resolve packages or get packages data from a package manifest file/ lockfile/SBOM at input_location.

scanpipe.pipes.resolve.get_manifest_resources(project)

Get all resources in the codebase which are package manifests.

scanpipe.pipes.resolve.resolve_pypi_packages(input_location)

Resolve the PyPI packages from the input_location requirements file.

scanpipe.pipes.resolve.resolve_about_package(input_location)

Resolve the package from the input_location .ABOUT file.

scanpipe.pipes.resolve.populate_license_notice_fields_about(package_data, about_data)

Populate package_data with license and notice attributes from about_data.

scanpipe.pipes.resolve.resolve_about_packages(input_location)

Wrap resolve_about_package to return a list as expected by the InspectManifest pipeline.

scanpipe.pipes.resolve.convert_spdx_expression(license_expression_spdx)

Return an ScanCode license expression from a SPDX license_expression_spdx string.

scanpipe.pipes.resolve.resolve_spdx_packages(input_location)

Resolve the packages from the input_location SPDX document file.

scanpipe.pipes.resolve.get_default_package_type(input_location)

Return the package type associated with the provided input_location. This type is used to get the related handler that knows how process the input.

scanpipe.pipes.resolve.set_license_expression(package_data)

Set the license expression from a detected license dict/str in provided package_data.

scanpipe.pipes.resolve.get_manifest_headers(resource)

Extract headers from a manifest file based on its package type.

scanpipe.pipes.resolve.extract_headers(input_location, extract_fields)

Read a file from the given location and extracts specified fields.

RootFS

exception scanpipe.pipes.rootfs.DistroNotFound
exception scanpipe.pipes.rootfs.DistroNotSupported
class scanpipe.pipes.rootfs.RootFs(location, distro=None)

A root filesystem.

classmethod from_project_codebase(project)

Return RootFs objects collected from the project’s “codebase” directory. Each directory in the input/ is considered as the root of a root filesystem.

get_resources(with_dir=False)

Return a Resource for each file in this rootfs.

get_installed_packages(packages_getter)

Return tuples of (package_url, package) for installed packages found in this rootfs layer using the packages_getter function or callable.

The packages_getter() function should:

  • Accept a first argument string that is the root directory of filesystem of this rootfs

  • Return tuples of (package_url, package) where package_url is a package_url string that uniquely identifies a package; while, a package is an object that represents a package (typically a scancode- toolkit packagedcode.models.Package class or some nested mapping with the same structure).

The packages_getter function would typically query the system packages database, such as an RPM database or similar, to collect the list of installed system packages.

__init__(location, distro=None) None

Method generated by attrs for class RootFs.

scanpipe.pipes.rootfs.get_resources(location, with_dir=False)

Return the Resource found in the location in root directory of a rootfs.

scanpipe.pipes.rootfs.create_codebase_resources(project, rootfs)

Create the CodebaseResource for a rootfs in project.

scanpipe.pipes.rootfs.has_hash_diff(install_file, codebase_resource)

Return True if one of available hashes on both install_file and codebase_resource, by hash type, is different. For example: Alpine uses SHA1 while Debian uses MD5, we prefer the strongest hash that’s present.

scanpipe.pipes.rootfs.package_getter(root_dir, **kwargs)

Return installed package objects.

scanpipe.pipes.rootfs.scan_rootfs_for_system_packages(project, rootfs)

Given a project Project and a rootfs RootFs, scan the rootfs for installed system packages, and create a DiscoveredPackage for each.

Then for each installed DiscoveredPackage file, check if it exists as a CodebaseResource. If exists, relate that CodebaseResource to its DiscoveredPackage; otherwise, keep that as a missing file.

scanpipe.pipes.rootfs.get_resource_with_md5(project, status)

Return a queryset of CodebaseResource from a project that has a status, a non-empty size, and md5.

scanpipe.pipes.rootfs.match_not_analyzed(project, reference_status='system-package', not_analyzed_status='not-analyzed')

Given a project Project : 1. Build an MD5 index of files assigned to a package that has a status of reference_status 2. Attempt to match resources with status not_analyzed_status to that index 3. Relate each matched CodebaseResource to the matching DiscoveredPackage and set its status.

scanpipe.pipes.rootfs.flag_uninteresting_codebase_resources(project)

Flag any file that do not belong to any system package and determine if it’s: - A temp file - Generated - Log file of sorts (such as var) using few heuristics

scanpipe.pipes.rootfs.flag_ignorable_codebase_resources(project)

Flag codebase resource using the glob patterns from commoncode.ignore of ignorable files/directories, if their paths match an ignorable pattern.

scanpipe.pipes.rootfs.flag_data_files_with_no_clues(project)

Flag CodebaseResources that have a file type of data and no detected clues to be uninteresting.

scanpipe.pipes.rootfs.flag_media_files_as_uninteresting(project)

Flag CodebaseResources that are media files to be uninteresting.

scanpipe.pipes.rootfs.get_rootfs_data(root_fs)

Return a mapping of rootfs-related data given a root_fs.

ScanCode

scanpipe.pipes.scancode.logger = <Logger scanpipe.pipes (INFO)>

Utilities to deal with ScanCode toolkit features and objects.

exception scanpipe.pipes.scancode.InsufficientResourcesError
scanpipe.pipes.scancode.get_max_workers(keep_available)

Return the SCANCODEIO_PROCESSES if defined in the setting, or returns a default value based on the number of available CPUs, minus the provided keep_available value.

On operating system where the multiprocessing start method is not “fork”, but for example “spawn”, such as on macOS, multiprocessing and threading are disabled by default returning -1 max_workers.

scanpipe.pipes.scancode.extract_archive(location, target)

Extract a single archive or compressed file at location to the target directory.

Return a dict of extraction errors, keyed by the resource location.

Wrapper of the extractcode.api.extract_archive function.

scanpipe.pipes.scancode.extract_archives(location, recurse=False)

Extract all archives at location and return errors.

Archives and compressed files are extracted in a new directory named “<file_name>-extract” created in the same directory as each extracted archive.

If recurse is True, extract nested archives-in-archives recursively.

Return a dict of extraction errors, keyed by the resource location.

Wrapper of the extractcode.api.extract_archives function.

scanpipe.pipes.scancode.get_resource_info(location)

Return a mapping suitable for the creation of a new CodebaseResource.

scanpipe.pipes.scancode.scan_file(location, with_threading=True, min_license_score=0, **kwargs)

Run a license, copyright, email, and url scan on a provided location, using the scancode-toolkit direct API.

Return a dictionary of scan results and a list of errors.

scanpipe.pipes.scancode.scan_for_package_data(location, with_threading=True, package_only=False, **kwargs)

Run a package scan on provided location using the scancode-toolkit direct API.

Return a dict of scan results and a list of errors.

scanpipe.pipes.scancode.save_scan_file_results(codebase_resource, scan_results, scan_errors)

Save the resource scan file results in the database. Create project errors if any occurred during the scan.

scanpipe.pipes.scancode.save_scan_package_results(codebase_resource, scan_results, scan_errors)

Save the resource scan package results in the database. Create project errors if any occurred during the scan.

scanpipe.pipes.scancode.scan_resources(resource_qs, scan_func, save_func, scan_func_kwargs=None, progress_logger=None)

Run the scan_func on the codebase resources of the provided resource_qs. The save_func is called to save the results.

Multiprocessing is enabled by default on this pipe, the number of processes can be controlled through the SCANCODEIO_PROCESSES setting. Multiprocessing can be disabled using SCANCODEIO_PROCESSES=0, and threading can also be disabled SCANCODEIO_PROCESSES=-1

The codebase resources QuerySet is chunked in 2000 results at the time, this can result in a significant reduction in memory usage.

Note that all database related actions are executed in this main process as the database connection does not always fork nicely in the pool processes.

scanpipe.pipes.scancode.scan_for_files(project, resource_qs=None, progress_logger=None)

Run a license, copyright, email, and url scan on files without a status for a project.

Multiprocessing is enabled by default on this pipe, the number of processes can be controlled through the SCANCODEIO_PROCESSES setting.

scanpipe.pipes.scancode.scan_for_application_packages(project, assemble=True, package_only=False, resource_qs=None, progress_logger=None)

Run a package scan on resources without a status for a project, and add them in their respective package_data attribute. Then create DiscoveredPackage and DiscoveredDependency instances from the detected package data optionally. If the assemble argument is set to True, DiscoveredPackage and DiscoveredDependency instances are created and added to the project by assembling resource level package_data, and resources which belong in the DiscoveredPackage instance, are assigned to that package.

Multiprocessing is enabled by default on this pipe, the number of processes can be controlled through the SCANCODEIO_PROCESSES setting.

scanpipe.pipes.scancode.add_resource_to_package(package_uid, resource, project)

Relate a DiscoveredPackage to resource from project using package_uid.

Add a ProjectMessage when the DiscoveredPackage could not be fetched using the provided package_uid.

scanpipe.pipes.scancode.assemble_packages(project)

Create instances of DiscoveredPackage and DiscoveredDependency for project from the parsed package data present in the CodebaseResources of project, using the respective package handlers for each package manifest type.

scanpipe.pipes.scancode.process_package_data(project, static_resolve=False)

Create instances of DiscoveredPackage and DiscoveredDependency for project from the parsed package data present in the CodebaseResources of project.

Here package assembly though package handlers are not performed, instead package/dependency objects are created directly from package data.

scanpipe.pipes.scancode.create_packages_and_dependencies_from_mapping(project, resource, package_mapping, find_package=False, process_resolved=False)

Create or update packages and dependencies from a package_mapping, for a respective resource and project.

If find_package is True, find the package with the respective purl data, instead of trying to create it. If process_resolved is True, also create packages and dependency relations from the resolved packages of dependencies of this package_mapping.

scanpipe.pipes.scancode.resolve_dependencies(project)

Match and merge resolved dependencies to create a dependency graph of direct dependency relations between resolved packages.

scanpipe.pipes.scancode.update_packages_and_dependencies(project, dependencies, package, resource, datasource_id, process_resolved=True)

Create DiscoveredPackage and DiscoveredDependency objects from a package_data dependencies, and also from nested resolved packages and dependencies if present.

If process_resolved is True, also create packages and dependency relations from the resolved packages of dependencies.

scanpipe.pipes.scancode.match_and_resolve_dependencies(project)

From a project with both direct dependency relationships (contains only the parent package and the requirement) and indirect dependency relationships like in lockfiles (this contains the resolved package and the requirement), match and update dependencies to contain the full dependency graph.

scanpipe.pipes.scancode.get_packages_with_purl_from_resources(project)

Yield Dependency or PackageData objects created from detected package_data in all the project resources. Both Dependency and PackageData objects have the purl attribute with a valid purl.

scanpipe.pipes.scancode.get_pretty_params(args)

Format provided args for the pretty_params run_scan argument.

scanpipe.pipes.scancode.run_scan(location, output_file, run_scan_args)

Scan the location content and write the results into an output_file.

scanpipe.pipes.scancode.get_virtual_codebase(project, input_location)

Return a ScanCode virtual codebase built from the JSON scan file located at the input_location.

scanpipe.pipes.scancode.create_codebase_resources(project, scanned_codebase)

Save the resources of a ScanCode scanned_codebase scancode.resource.Codebase object to the database as a CodebaseResource of the project. This function can be used to expend an existing project Codebase with new CodebaseResource objects as the existing objects (based on the path) will be skipped.

scanpipe.pipes.scancode.create_discovered_packages(project, scanned_codebase)

Save the packages of a ScanCode scanned_codebase scancode.resource.Codebase object to the database as a DiscoveredPackage of project.

scanpipe.pipes.scancode.create_discovered_dependencies(project, scanned_codebase, strip_datafile_path_root=False)

Save the dependencies of a ScanCode scanned_codebase scancode.resource.Codebase object to the database as a DiscoveredDependency of project.

If strip_datafile_path_root is True, then DiscoveredDependency.create_from_data() will strip the root path segment from the datafile_path of dependency_data before looking up the corresponding CodebaseResource for datafile_path. This is used in the case where Dependency data is imported from a scancode-toolkit scan, where the root path segments are not stripped for datafile_path.

scanpipe.pipes.scancode.set_codebase_resource_for_package(codebase_resource, discovered_package)

Assign the discovered_package to the codebase_resource and set its status to “application-package”.

scanpipe.pipes.scancode.get_license_matches_grouped(project)

Return a dictionary of all license_matches of a given project grouped by resource.detected_license_expression.

scanpipe.pipes.scancode.make_results_summary(project, scan_results_location)

Extract selected sections of the Scan results, such as the summary license_clarity_score, and license_matches related data. The key_files are also collected and injected in the summary output.

SPDX

scanpipe.pipes.spdx.SPDX_SCHEMA_URL = 'https://github.com/spdx/spdx-spec/raw/development/v2.3.1/schemas/spdx-schema.json'

Generate SPDX Documents. Spec documentation: https://spdx.github.io/spdx-spec/v2.3/

Usage:

import pathlib
from scanpipe.pipes import spdx

creation_info = spdx.CreationInfo(
    person_name="John Doe",
    person_email="john@starship.space",
    organization_name="Starship",
    tool="SPDXCode-1.0",
)

package1 = spdx.Package(
    spdx_id="SPDXRef-package1",
    name="lxml",
    version="3.3.5",
    license_concluded="LicenseRef-1",
    checksums=[
        spdx.Checksum(
            algorithm="SHA1", value="10c72b88de4c5f3095ebe20b4d8afbedb32b8f"
        ),
        spdx.Checksum(algorithm="MD5", value="56770c1a2df6e0dc51c491f0a5b9d865"),
    ],
    external_refs=[
        spdx.ExternalRef(
            category="PACKAGE-MANAGER",
            type="purl",
            locator="pkg:pypi/lxml@3.3.5",
        ),
    ]
)

document = spdx.Document(
    name="Document name",
    namespace="https://[CreatorWebsite]/[pathToSpdx]/[DocumentName]-[UUID]",
    creation_info=creation_info,
    packages=[package1],
    extracted_licenses=[
        spdx.ExtractedLicensingInfo(
            license_id="LicenseRef-1",
            extracted_text="License Text",
            name="License 1",
            see_alsos=["https://license1.text"],
        ),
    ],
    comment="This document was created using SPDXCode-1.0",
)

# Display document content:
print(document.as_json())

# Validate document
schema = pathlib.Path(spdx.SPDX_JSON_SCHEMA_LOCATION).read_text()
document.validate(schema)

# Write document to a file:
with open("document_name.spdx.json", "w") as f:
    f.write(document.as_json())
class scanpipe.pipes.spdx.CreationInfo(person_name: str = '', organization_name: str = '', tool: str = '', person_email: str = '', organization_email: str = '', license_list_version: str = '3.20', comment: str = '', created: str = <factory>)

One instance is required for each SPDX file produced. It provides the necessary information for forward and backward compatibility for processing tools.

comment: str = ''

Identify when the SPDX document was originally created. The date is to be specified according to combined date and time in UTC format as specified in ISO 8601 standard. Format: YYYY-MM-DDThh:mm:ssZ

as_dict()

Return the data as a serializable dict.

get_creators_spdx()

Return the creators list from related field values.

static get_creators_dict(creators_data)

Return the creators dict from SPDX data.

__init__(person_name: str = '', organization_name: str = '', tool: str = '', person_email: str = '', organization_email: str = '', license_list_version: str = '3.20', comment: str = '', created: str = <factory>) None
class scanpipe.pipes.spdx.Checksum(algorithm: str, value: str)

The checksum provides a mechanism that can be used to verify that the contents of a File or Package have not changed.

as_dict()

Return the data as a serializable dict.

__init__(algorithm: str, value: str) None
class scanpipe.pipes.spdx.ExternalRef(category: str, type: str, locator: str, comment: str = '')

An External Reference allows a Package to reference an external source of additional information, metadata, enumerations, asset identifiers, or downloadable content believed to be relevant to the Package.

as_dict()

Return the data as a serializable dict.

__init__(category: str, type: str, locator: str, comment: str = '') None
class scanpipe.pipes.spdx.ExtractedLicensingInfo(license_id: str, extracted_text: str, name: str = '', comment: str = '', see_alsos: ~typing.List[str] = <factory>)

An ExtractedLicensingInfo represents a license or licensing notice that was found in a package, file or snippet. Any license text that is recognized as a license may be represented as a License rather than an ExtractedLicensingInfo.

as_dict()

Return the data as a serializable dict.

__init__(license_id: str, extracted_text: str, name: str = '', comment: str = '', see_alsos: ~typing.List[str] = <factory>) None
class scanpipe.pipes.spdx.Package(spdx_id: str, name: str, download_location: str = 'NOASSERTION', license_declared: str = 'NOASSERTION', license_concluded: str = 'NOASSERTION', copyright_text: str = 'NOASSERTION', files_analyzed: bool = False, version: str = '', supplier: str = '', originator: str = '', homepage: str = '', filename: str = '', description: str = '', summary: str = '', source_info: str = '', release_date: str = '', built_date: str = '', valid_until_date: str = '', primary_package_purpose: str = '', comment: str = '', license_comments: str = '', checksums: ~typing.List[~scanpipe.pipes.spdx.Checksum] = <factory>, external_refs: ~typing.List[~scanpipe.pipes.spdx.ExternalRef] = <factory>, attribution_texts: ~typing.List[str] = <factory>)

Packages referenced in the SPDX document.

as_dict()

Return the data as a serializable dict.

static date_to_iso(date_str)

Convert a provided date_str to the SPDX format: YYYY-MM-DDThh:mm:ssZ.

__init__(spdx_id: str, name: str, download_location: str = 'NOASSERTION', license_declared: str = 'NOASSERTION', license_concluded: str = 'NOASSERTION', copyright_text: str = 'NOASSERTION', files_analyzed: bool = False, version: str = '', supplier: str = '', originator: str = '', homepage: str = '', filename: str = '', description: str = '', summary: str = '', source_info: str = '', release_date: str = '', built_date: str = '', valid_until_date: str = '', primary_package_purpose: str = '', comment: str = '', license_comments: str = '', checksums: ~typing.List[~scanpipe.pipes.spdx.Checksum] = <factory>, external_refs: ~typing.List[~scanpipe.pipes.spdx.ExternalRef] = <factory>, attribution_texts: ~typing.List[str] = <factory>) None
class scanpipe.pipes.spdx.File(spdx_id: str, name: str, checksums: ~typing.List[~scanpipe.pipes.spdx.Checksum] = <factory>, license_concluded: str = 'NOASSERTION', copyright_text: str = 'NOASSERTION', license_in_files: ~typing.List[str] = <factory>, contributors: ~typing.List[str] = <factory>, notice_text: str = '', types: ~typing.List[str] = <factory>, attribution_texts: ~typing.List[str] = <factory>, comment: str = '', license_comments: str = '')

Files referenced in the SPDX document.

as_dict()

Return the data as a serializable dict.

__init__(spdx_id: str, name: str, checksums: ~typing.List[~scanpipe.pipes.spdx.Checksum] = <factory>, license_concluded: str = 'NOASSERTION', copyright_text: str = 'NOASSERTION', license_in_files: ~typing.List[str] = <factory>, contributors: ~typing.List[str] = <factory>, notice_text: str = '', types: ~typing.List[str] = <factory>, attribution_texts: ~typing.List[str] = <factory>, comment: str = '', license_comments: str = '') None
class scanpipe.pipes.spdx.Relationship(spdx_id: str, related_spdx_id: str, relationship: str, comment: str = '')

Represent the relationship between two SPDX elements. For example, you can represent a relationship between two different Files, between a Package and a File, between two Packages, or between one SPDXDocument and another SPDXDocument.

as_dict()

Return the SPDX relationship as a serializable dict.

__init__(spdx_id: str, related_spdx_id: str, relationship: str, comment: str = '') None
class scanpipe.pipes.spdx.Document(name: str, namespace: str, creation_info: ~scanpipe.pipes.spdx.CreationInfo, packages: ~typing.List[~scanpipe.pipes.spdx.Package], spdx_id: str = 'SPDXRef-DOCUMENT', version: str = '2.3', data_license: str = 'CC0-1.0', comment: str = '', files: ~typing.List[~scanpipe.pipes.spdx.File] = <factory>, extracted_licenses: ~typing.List[~scanpipe.pipes.spdx.ExtractedLicensingInfo] = <factory>, relationships: ~typing.List[~scanpipe.pipes.spdx.Relationship] = <factory>)

Collection of section instances each of which contains information about software organized using the SPDX format.

as_dict()

Return the SPDX document as a serializable dict.

as_json(indent=2)

Return the SPDX document as serialized JSON.

static safe_document_name(name)

Convert provided name to a safe SPDX document name.

validate(schema)

Check the validity of this SPDX document.

__init__(name: str, namespace: str, creation_info: ~scanpipe.pipes.spdx.CreationInfo, packages: ~typing.List[~scanpipe.pipes.spdx.Package], spdx_id: str = 'SPDXRef-DOCUMENT', version: str = '2.3', data_license: str = 'CC0-1.0', comment: str = '', files: ~typing.List[~scanpipe.pipes.spdx.File] = <factory>, extracted_licenses: ~typing.List[~scanpipe.pipes.spdx.ExtractedLicensingInfo] = <factory>, relationships: ~typing.List[~scanpipe.pipes.spdx.Relationship] = <factory>) None
scanpipe.pipes.spdx.validate_document(document, schema=PosixPath('/home/docs/checkouts/readthedocs.org/user_builds/scancodeio/checkouts/latest/scanpipe/pipes/schemas/spdx-schema-2.3.json'))

SPDX document validation. Requires the jsonschema library.

scanpipe.pipes.spdx.is_spdx_document(input_location)

Return True if the file at input_location is a SPDX Document.

Symbols

exception scanpipe.pipes.symbols.UniversalCtagsNotFound
scanpipe.pipes.symbols.collect_and_store_resource_symbols_ctags(project, logger=None)

Collect symbols from codebase files using Ctags and store them in the extra data field.

scanpipe.pipes.symbols.collect_and_store_pygments_symbols_and_strings(project, logger=None)

Collect symbols, strings and comments from codebase files using pygments and store them in the extra data field.

scanpipe.pipes.symbols.collect_and_store_tree_sitter_symbols_and_strings(project, logger=None)

Collect symbols from codebase files using tree-sitter and store them in the extra data field.

VulnerableCode

scanpipe.pipes.vulnerablecode.is_configured()

Return True if the required VulnerableCode settings have been set.

scanpipe.pipes.vulnerablecode.is_available()

Return True if the configured VulnerableCode server is available.

scanpipe.pipes.vulnerablecode.chunked(iterable, chunk_size)

Break an iterable into lists of chunk_size length.

>>> list(chunked([1, 2, 3, 4, 5], 2))
[[1, 2], [3, 4], [5]]
>>> list(chunked([1, 2, 3, 4, 5], 3))
[[1, 2, 3], [4, 5]]
scanpipe.pipes.vulnerablecode.get_purls(packages)

Return the PURLs for the given list of packages.

scanpipe.pipes.vulnerablecode.request_get(url, payload=None, timeout=None)

Wrap the HTTP request calls on the API.

scanpipe.pipes.vulnerablecode.get_vulnerabilities_by_purl(purl, timeout=None, api_url=None)

Get the list of vulnerabilities providing a package purl.

scanpipe.pipes.vulnerablecode.get_vulnerabilities_by_cpe(cpe, timeout=None, api_url=None)

Get the list of vulnerabilities providing a package or component cpe.

scanpipe.pipes.vulnerablecode.bulk_search_by_purl(purls, timeout=None, api_url=None)

Bulk search of vulnerabilities using the provided list of purls.

scanpipe.pipes.vulnerablecode.bulk_search_by_cpes(cpes, timeout=None, api_url=None)

Bulk search of vulnerabilities using the provided list of cpes.

scanpipe.pipes.vulnerablecode.filter_vulnerabilities(vulnerabilities, ignore_set)

Filter out vulnerabilities based on a list of ignored IDs and aliases.

scanpipe.pipes.vulnerablecode.fetch_vulnerabilities(packages, chunk_size=1000, logger=<bound method Logger.info of <Logger scanpipe.pipes.vulnerablecode (INFO)>>, ignore_set=None)

Fetch and store vulnerabilities for each provided packages. The PURLs are used for the lookups in batch of chunk_size per request.

Windows

scanpipe.pipes.windows.package_getter(root_dir, **kwargs)

Return installed package objects.

scanpipe.pipes.windows.flag_uninteresting_windows_codebase_resources(project)

Flag known uninteresting files as uninteresting.

scanpipe.pipes.windows.flag_installed_package_files(project, root_dir_pattern, package, q_objects=None)

For all CodebaseResources from project whose rootfs_path starts with root_dir_pattern, add package to the discovered_packages of each CodebaseResource and set the status.

scanpipe.pipes.windows.flag_known_software(project)

Find Windows software in project by checking CodebaseResources to see if their rootfs_path is under a known software root directory. If there are CodebaseResources that are under a known software root directory, a DiscoveredPackage is created for that software package and all files under that software package’s root directory are considered installed files for that package.

Currently, we are only checking for Python and openjdk in Windows Docker image layers.

If a version number cannot be determined for an installed software Package, then a version number of “nv” will be set.

scanpipe.pipes.windows.flag_program_files(project)

Report all subdirectories of Program Files and Program Files (x86) as Packages.

If a Package is detected in this manner, then we will attempt to determine the version from the path. If a version cannot be determined, a version of nv will be set for the Package.