This guide covers all functions available in the ndp-ep Python library, organized by category.
from ndp_ep import Client
# With token authentication
client = Client(base_url="http://localhost:8002", token="your-token")
# With password authentication
client = Client(base_url="http://localhost:8002")
client.get_token(username="user", password="pass")
Check the system status and available services.
status = client.get_system_status()
print(status)
Returns:
{
'api_version': '0.4.0',
'ep_name': 'EP-DEMO',
'organization': 'ORGANIZATION-DEMO',
'backend_connected': True, # Local catalog (CKAN or other)
's3_connected': True, # MinIO/S3 storage
'kafka_enabled': True, # Kafka streaming
'jupyterlab_enabled': True, # JupyterLab integration
'is_public': True
}
Retrieve system metrics.
metrics = client.get_system_metrics()
print(metrics)
Get information about the current authenticated user.
user = client.get_user_info()
print(user)
Obtain an authentication token using username and password.
client = Client(base_url="http://localhost:8002")
client.get_token(username="admin", password="secret")
# Token is now stored in the client
Register a new general dataset.
result = client.register_general_dataset({
"name": "my-dataset",
"title": "My Dataset Title",
"owner_org": "my-organization",
"notes": "Description of the dataset"
})
print(result)
Parameters:
data: Dictionary with dataset metadataserver: 'local' (default) or 'global'Register a URL resource pointing to external data.
result = client.register_url({
"resource_name": "external-data",
"resource_title": "External Data Source",
"owner_org": "my-organization",
"resource_url": "https://example.com/data.csv",
"file_type": "CSV",
"notes": "Data from external source"
})
print(result)
Register a resource linked to S3 storage.
result = client.register_s3_link({
"resource_name": "s3-data",
"resource_title": "S3 Stored Data",
"owner_org": "my-organization",
"s3_bucket": "my-bucket",
"s3_key": "path/to/file.nc",
"resource_s3": "my-bucket/path/to/file.nc",
"notes": "Data stored in S3"
})
print(result)
Register a Kafka topic as a streaming data source.
result = client.register_kafka_topic({
"resource_name": "sensor-stream",
"resource_title": "Real-time Sensor Data",
"owner_org": "my-organization",
"topic": "sensors.temperature",
"notes": "Live temperature readings"
})
print(result)
Search for datasets by terms.
# Simple search
results = client.search_datasets(terms=["temperature"])
# Search with specific keys
results = client.search_datasets(
terms=["utah", "satellite"],
keys=["title", "notes"],
server="local"
)
for ds in results:
print(f"{ds['name']}: {ds.get('title', 'N/A')}")
Parameters:
terms: List of search termskeys: Optional list of fields to search inserver: 'global' (default) or 'local'Perform advanced search with complex queries.
results = client.advanced_search({
"query": "temperature AND utah",
"filters": {
"organization": "noaa"
},
"limit": 10,
"offset": 0
})
Full update of a dataset (replaces all fields).
result = client.update_general_dataset(
dataset_id="my-dataset-id",
data={
"resource_name": "my-dataset",
"resource_title": "Updated Title",
"owner_org": "my-organization",
"notes": "Updated description"
}
)
Partial update of a dataset (only specified fields).
result = client.patch_general_dataset(
dataset_id="my-dataset-id",
data={
"notes": "Only updating the description"
}
)
Update a URL resource.
result = client.update_url_resource(
resource_id="resource-id",
data={
"resource_url": "https://new-url.com/data.csv"
}
)
Update an S3 resource.
result = client.update_s3_resource(
resource_id="resource-id",
data={
"s3_key": "new/path/to/file.nc"
}
)
Partial update of an S3 resource.
result = client.patch_s3_resource(
resource_id="resource-id",
data={
"notes": "Updated notes only"
}
)
Partial update of a specific resource within a dataset.
result = client.patch_dataset_resource(
dataset_id="dataset-id",
resource_id="resource-id",
data={
"description": "Updated resource description"
}
)
Delete a resource by its ID.
result = client.delete_resource_by_id(resource_id="resource-id")
Delete a resource by its name.
result = client.delete_resource_by_name(resource_name="my-dataset")
Delete a specific resource from a dataset.
result = client.delete_dataset_resource(
dataset_id="dataset-id",
resource_id="resource-id"
)
List all organizations.
# List all organizations
orgs = client.list_organizations()
for org in orgs:
print(org)
# Filter by name
orgs = client.list_organizations(name="noaa")
# From local server
orgs = client.list_organizations(server="local")
Register a new organization.
result = client.register_organization({
"name": "my-org",
"title": "My Organization",
"description": "Organization description"
})
Delete an organization.
result = client.delete_organization(organization_name="my-org")
Register a new service.
result = client.register_service({
"name": "data-processing-api",
"title": "Data Processing API",
"url": "https://api.example.com",
"description": "API for processing datasets"
})
Full update of a service.
result = client.update_service(
service_id="service-id",
data={
"name": "data-processing-api",
"title": "Updated API Title",
"url": "https://new-api.example.com"
}
)
Partial update of a service.
result = client.patch_service(
service_id="service-id",
data={
"description": "Updated description only"
}
)
List all available S3 buckets.
result = client.list_buckets()
for bucket in result['buckets']:
print(f"{bucket['name']} - Created: {bucket.get('creation_date')}")
Create a new S3 bucket.
result = client.create_bucket("my-new-bucket")
print(result)
Get information about a specific bucket.
info = client.get_bucket_info("my-bucket")
print(info)
Delete an S3 bucket (must be empty).
result = client.delete_bucket("my-bucket")
List objects in a bucket.
# List all objects
result = client.list_objects("my-bucket")
for obj in result['objects']:
print(f"{obj['key']} - {obj['size']} bytes")
# List with prefix filter
result = client.list_objects("my-bucket", prefix="data/2024/")
Upload an object to S3.
# Upload from bytes
with open("local_file.csv", "rb") as f:
data = f.read()
result = client.upload_object(
bucket_name="my-bucket",
object_key="path/to/file.csv",
file_data=data,
content_type="text/csv"
)
print(result)
Download an object from S3.
data = client.download_object(
bucket_name="my-bucket",
object_key="path/to/file.csv"
)
# Save to local file
with open("downloaded_file.csv", "wb") as f:
f.write(data)
Get metadata for an S3 object.
metadata = client.get_object_metadata(
bucket_name="my-bucket",
object_key="path/to/file.csv"
)
print(metadata)
Delete an object from S3.
result = client.delete_object(
bucket_name="my-bucket",
object_key="path/to/file.csv"
)
Generate a temporary URL for downloading an object.
result = client.generate_presigned_download_url(
bucket_name="my-bucket",
object_key="path/to/file.csv",
expiration=3600 # 1 hour in seconds
)
print(f"Download URL: {result['url']}")
Generate a temporary URL for uploading an object.
result = client.generate_presigned_upload_url(
bucket_name="my-bucket",
object_key="uploads/new-file.csv",
expiration=3600
)
print(f"Upload URL: {result['url']}")
# Use the URL to upload directly
import requests
with open("local_file.csv", "rb") as f:
requests.put(result['url'], data=f)
Get Kafka connection details.
kafka = client.get_kafka_details()
print(f"Host: {kafka['kafka_host']}")
print(f"Port: {kafka['kafka_port']}")
Register a Kafka topic (see Dataset Registration).
Update a Kafka topic resource.
result = client.update_kafka_topic(
dataset_id="topic-id",
data={
"topic": "new.topic.name",
"notes": "Updated topic"
}
)
Pelican is a federated data distribution system. These functions allow interaction with Pelican namespaces.
List available Pelican federations.
federations = client.list_federations()
print(federations)
Browse files in a Pelican namespace.
# Browse a directory
contents = client.browse_pelican(
path="/osg-htc/public/",
federation="osdf"
)
for item in contents:
print(item)
# With detailed information
contents = client.browse_pelican(
path="/osg-htc/public/",
federation="osdf",
detail=True
)
Get metadata for a file without downloading.
info = client.get_pelican_info(
path="/osg-htc/public/data/file.csv",
federation="osdf"
)
print(f"Size: {info['size']}")
print(f"Modified: {info['modified']}")
Download a file from Pelican.
# Download entire file
data = client.download_pelican(
path="/osg-htc/public/data/file.csv",
federation="osdf"
)
# Stream large files
for chunk in client.download_pelican(
path="/osg-htc/public/data/large_file.nc",
federation="osdf",
stream=True
):
process(chunk)
Import a Pelican file as a resource in the local catalog.
result = client.import_pelican_metadata(
pelican_url="pelican://osg-htc/public/data/file.csv",
package_id="my-dataset-id",
resource_name="pelican-data",
resource_description="Data imported from Pelican"
)
Get JupyterLab connection details.
jupyter = client.get_jupyter_details()
print(f"URL: {jupyter['jupyter_url']}")
All methods may raise exceptions on errors. Use try/except for robust code:
try:
result = client.create_bucket("my-bucket")
print(f"Created: {result}")
except Exception as e:
print(f"Error: {e}")
status = client.get_system_status()
if not status.get('backend_connected'):
raise RuntimeError("Local catalog not available")
if not status.get('s3_connected'):
raise RuntimeError("S3 storage not available")
# Proceed with operations...
# 1. Create bucket
client.create_bucket("data-bucket")
# 2. Upload file
with open("data.csv", "rb") as f:
client.upload_object(
bucket_name="data-bucket",
object_key="datasets/data.csv",
file_data=f.read(),
content_type="text/csv"
)
# 3. Register in catalog
client.register_s3_link({
"resource_name": "my-csv-data",
"resource_title": "My CSV Dataset",
"owner_org": "my-org",
"s3_bucket": "data-bucket",
"s3_key": "datasets/data.csv"
})
# Search for datasets
results = client.search_datasets(terms=["temperature"])
# Get first result with S3 data
for ds in results:
if 's3_bucket' in ds and 's3_key' in ds:
data = client.download_object(
bucket_name=ds['s3_bucket'],
object_key=ds['s3_key']
)
break