Compare commits

...

14 Commits
main ... wip

Author SHA1 Message Date
MasterofJOKers ead0cf51de Move Comment fetching into click callback
2 years ago
MasterofJOKers f0b455326e Add comment creation, editing and deletion
2 years ago
MasterofJOKers 9bcd668171 Add client to debug command
2 years ago
MasterofJOKers 68b1142cd7 Add a filter for watched issues
2 years ago
MasterofJOKers 6596be6c1e Let the user query issues without default project
2 years ago
MasterofJOKers 6c1667046b Change meaning of --jql in issues command
2 years ago
MasterofJOKers 0ce3932e9c wip
2 years ago
MasterofJOKers e7e286fbf3 Support not having [filters] in config
2 years ago
MasterofJOKers 6e2d639034 Support NOT in --status of issues
2 years ago
MasterofJOKers 471855beb6 Fix multiple=True for components in issues
2 years ago
MasterofJOKers 57afd1c82b Add implementation for issue show
2 years ago
MasterofJOKers 6594f515f3 wip
2 years ago
MasterofJOKers 14a9af7e51 wip
2 years ago
MasterofJOKers d4491cf7ab WIP
2 years ago

@ -0,0 +1,539 @@
import configparser
import functools
import os
from pathlib import Path
from textwrap import dedent, indent, shorten, wrap
from typing import Any, Callable, Dict
import click
import editor
CONFIG_PATH = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config')).expanduser() / 'jiracli' / 'config'
def get_config():
config = configparser.ConfigParser()
config.read(CONFIG_PATH)
if not config.has_section('server'):
raise ValueError(f"Config in {CONFIG_PATH} does not contain a section [server]")
return config
def ipython():
import IPython
IPython.embed()
@click.group()
@click.pass_context
def main(ctx):
from jira import JIRA
ctx.ensure_object(dict)
config = get_config()
ctx.obj['config'] = config
server_config = config['server']
skip_cert_verify = server_config.getboolean('skip_cert_verify', False)
if skip_cert_verify:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jira_kwargs = dict(
options={
'server': server_config['host'],
'verify': not skip_cert_verify
},
timeout=server_config.getint('timeout', 2),
max_retries=server_config.getint('max_retries', 2)
)
if 'password' in server_config:
if 'user' not in server_config:
raise ValueError('If password is set in [server], user needs to be set, too.')
jira_kwargs['basic_auth'] = (server_config['user'], server_config['password'])
elif 'token' in server_config:
jira_kwargs['token_auth'] = server_config['token']
else:
raise ValueError("Need either user and password or token in [server] for login.")
ctx.obj['client'] = JIRA(**jira_kwargs)
def pass_client(fn):
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
client = obj['client']
return fn(client, *args, **kwargs)
return wrapped
def pass_config(fn):
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
config = obj['config']
return fn(config, *args, **kwargs)
return wrapped
@main.command()
@pass_client
def version(client):
print('Version {}.{}.{}-{}p{}'.format(*client.sys_version_info))
@main.command()
@click.option('--project', help='Project to filter for. Default: take from config [filters]/default_project')
@click.option('--component', 'components', multiple=True, help='Filter for issues with *any* of the given components')
@click.option('--status', multiple=True, help='Filter for issues with *any* of the given statuses. '
'! at the beginning means NOT.')
@click.option('--label', 'labels', multiple=True, help='Filter for issues with all the given labels')
@click.option('--text', 'texts', multiple=True, help='Filter for issues containing all the given strings')
@click.option('--flagged/--not-flagged', is_flag=True, default=None, help='Filter issues being flagged or not flagged')
@click.option('--watching/--not-watching', is_flag=True, default=None, help='Filter issues I watch or do not watch')
@click.option('--order', help='Sort by this field')
@click.option('--asc/--desc', 'asc', help='Sort ascending/descending. Default: descending')
@click.option('--jql', help='Provide your own JQL query string to include in the search via AND.')
@pass_client
@pass_config
def issues(config, client, project, components, status, labels, texts, flagged, watching, order, asc, jql):
if project is None and config.has_section('filters') and config.get('filters', 'default_project'):
project = config['filters']['default_project']
# TODO add filtering by created since
# TODO add filtering by updated
# TODO support pagination I guess? i.e. a limit
filters = []
if jql:
filters.append(jql)
if texts:
texts_jql = ' AND '.join([f'text ~ "{text}"' for text in texts])
filters.append(f"( {texts_jql} )")
if components:
component_list = ', '.join([f'"{c}"' for c in components])
components_jql = f"component in ({component_list})"
filters.append(components_jql)
if status:
status_list = ', '.join([f'"{s}"' for s in status if not s.startswith('!')])
if status_list:
status_jql = f"status IN ({status_list})"
filters.append(status_jql)
status_list = ', '.join([f'"{s[1:]}"' for s in status if s.startswith('!')])
if status_list:
status_jql = f"status NOT IN ({status_list})"
filters.append(status_jql)
if labels:
labels_jql = ' AND '.join([f'labels = "{label}"' for label in labels])
filters.append(f"( {labels_jql} )")
if flagged is not None:
if flagged:
flagged_jql = 'flagged IS NOT EMPTY'
else:
flagged_jql = 'flagged IS EMPTY'
filters.append(flagged_jql)
if watching is not None:
if watching:
watching_jql = 'watcher = currentUser()'
else:
watching_jql = 'watcher != currentUser()'
filters.append(watching_jql)
if project:
project_jql = f"project = {project}"
filters.append(project_jql)
jql = ' AND '.join(filters)
if order:
jql += f" ORDER BY {order} {'ASC' if asc else 'DESC'}"
print(f"Searching with query: {jql}")
issues = client.search_issues(jql)
for issue in issues:
# TODO tabulate?
print(f"{issue.key:>8} - {issue.fields.status} - {issue.fields.created} - {shorten(issue.fields.summary, 72)}")
def get_more_attr(obj, fields):
fields = fields.split('.')
for field in fields:
obj = getattr(obj, field)
if not obj:
break
return obj
def comment_to_dict(comment):
fields = ['created', 'author.displayName', 'body']
return {f: get_more_attr(comment, f) for f in fields}
def issue_to_yaml(issue):
import yaml
info_fields = ['creator.displayName', 'reporter.displayName', 'created']
editable_fields = ['summary', 'description', 'status.name', 'priority.name', 'labels',
'issuelinks', 'components', 'assignee.displayName']
d = {
'info': {f: get_more_attr(issue.fields, f) for f in info_fields},
'editable': {f: get_more_attr(issue.fields, f) for f in editable_fields},
'comments': [comment_to_dict(c) for c in issue.fields.comment.comments]
}
return yaml.dump(d)
# @main.command()
# @click.argument('issue_id')
# def issue(issue_id):
# try:
# issue = client.issue(issue_id)
# except JIRAError as e:
# print(f'Problem retrieving issue: {e.text}')
# return
#
# # import IPython
# # IPython.embed()
# original_content = issue_to_yaml(issue)
# print(original_content)
# # new_content = editor.edit(contents=original_content).decode('utf-8')
# # print(dir(issue))
@main.command()
def dump():
try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper
import ruamel.yaml
# from ruamel.yaml import YAML
import yaml
def str_presenter(dumper, data):
"""configures yaml for dumping multiline strings
Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data"""
if data.count('\n') > 0: # check for multiline string
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
# yaml.add_representer(str, str_presenter)
# yaml.representer.SafeRepresenter.add_representer(str, str_presenter) # to use with safe_dum
def yaml_multiline_string_pipe(dumper, data):
text_list = [line for line in data.splitlines()]
fixed_data = "\n".join(text_list)
if len(text_list) > 1:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="|")
return dumper.represent_scalar('tag:yaml.org,2002:str', fixed_data)
# yaml.add_representer(str, yaml_multiline_string_pipe)
print(yaml.dump({"multiline": "First line \nSecond line\nThird line"},
allow_unicode=True, Dumper=Dumper, default_style="|"))
import sys
ruamel.yaml.dump({"multiline": "First line \nSecond line\nThirdline"}, sys.stdout, default_style='|')
@main.group(invoke_without_command=True)
@click.argument('issue_id')
@pass_client
@click.pass_obj
def issue(obj: Dict[str, Any], client, issue_id: str):
from jira.exceptions import JIRAError
try:
issue = client.issue(issue_id)
except JIRAError as e:
raise click.ClickException(f"Problem retrieving {issue_id}: {e.text}")
obj['issue'] = issue
obj['issue_id'] = issue_id
def pass_issue(fn: Callable) -> Callable:
@functools.wraps(fn)
@click.pass_obj
def wrapped(obj, *args, **kwargs):
issue = obj['issue']
return fn(issue, *args, **kwargs)
return wrapped
@issue.group()
def link():
pass
def _format_links(client, issue):
lines = []
issuelinks = getattr(issue.fields, 'issuelinks', [])
if issuelinks:
lines.append("Issue Links")
# TODO group by type?
for issuelink in issuelinks:
other_issue = getattr(issuelink, 'inwardIssue', getattr(issuelink, 'outwardIssue', None))
summary = indent('\n'.join(wrap(other_issue.fields.summary, width=72)), prefix=' ' * 4)
lines.append(f" {issuelink.type} - {other_issue.key}\n{summary}")
remote_links = client.remote_links(issue.id)
if remote_links:
if lines:
lines.append('')
lines.append("Remote Links:")
for link in remote_links:
# TODO tabulate?
lines.append(f" {link.object.title} => {link.object.url}")
return '\n'.join(lines)
@link.command('list')
@pass_issue
@pass_client
def link_list(client, issue):
print(_format_links(client, issue))
def prompt(description: str, *, multiline_ok: bool = True, initial_content: str = '', empty_ok: bool = False) -> str:
divider_line = '--- Everything after this line will be removed'
contents = f"{initial_content}\n{divider_line}\n\n{description}"
contents = editor.edit(contents=contents).decode('utf-8')
if not contents:
raise click.ClickException('Empty content. Cannot continue.')
contents_lines = contents.splitlines()
for i, line in enumerate(reversed(contents_lines), start=1):
if line == divider_line:
break
else:
raise click.ClickException('Could not find divider line. Please keep the footer intact.')
contents_lines = contents_lines[:-i]
if not multiline_ok and len(contents_lines) > 1:
# TODO build a retry into this?
raise click.ClickException('Input needs to be a single line only')
if not empty_ok and not any(contents_lines):
raise click.ClickException('Empty content. Cannot continue.')
return '\n'.join(contents_lines)
@link.command('create')
@click.argument('URL', required=False)
@click.argument('TITLE', required=False)
@pass_issue
@pass_client
def link_create(client, issue, url: str, title: str):
if url is None:
url = prompt(f"Input the URL for the link to add to {issue.key}", multiline_ok=False)
if title is None:
title = prompt(f"Input a single line of title for {url}. This will be added to {issue.key}.",
multiline_ok=False)
client.add_simple_link(issue.key, {'url': url, 'title': title})
@link.command('issue')
@pass_issue
@pass_client
def link_issue(client, issue):
"""Link to another issue"""
raise NotImplementedError
@issue.command('open')
@pass_issue
def issue_open(issue):
import subprocess
subprocess.call(['xdg-open', issue.permalink()])
@issue.group()
def label():
pass
@label.command('list')
@pass_issue
def label_list(issue):
"""List all labels on the issue"""
print('\n'.join(issue.fields.labels))
@label.command('edit')
@pass_issue
def label_edit(issue):
"""Edit the labels of the issue"""
import jira
original_labels = issue.fields.labels
# TODO add a configurable list of "often used" labels into the description
labels = set(prompt('Add or remove labels by adding them line by line or removing one',
initial_content='\n'.join(original_labels),
empty_ok=True).splitlines())
original_labels = set(original_labels)
if labels == original_labels:
print(f"Not updating {issue.key} as the labels are the same.")
return
changes = []
added_labels = labels - original_labels
for label in added_labels:
changes.append({'add': label})
removed_labels = original_labels - labels
for label in removed_labels:
changes.append({'remove': label})
# inspired by jira.Issue.add_field_value
jira.resources.Resource.update(issue, fields={"update": {"labels": changes}})
@issue.command()
@pass_issue
@pass_client
def debug(client, issue):
import IPython
IPython.embed()
@issue.command()
@pass_issue
@pass_client
def show(client, issue):
"""Show a lot of information about the issue"""
print(f"{issue.key} on {issue.fields.status} as {issue.fields.issuetype} assigned to {issue.fields.assignee}")
if getattr(issue.fields, 'parent', None):
parent = issue.fields.parent
print(f" parent: {parent.key} on {parent.fields.status} as {parent.fields.issuetype}")
summary = indent('\n'.join(wrap(parent.fields.summary, width=72)), prefix=' ' * 4)
print(f"{summary}")
# TODO handle customfields in a good manner, e.g. let the user enable
# showing them by a custom name or their own. use cache for that.
print(indent(dedent(f"""\
created: {issue.fields.created}
components: {', '.join(c.name for c in issue.fields.components)}
flagged: {bool(getattr(issue.fields, 'customfield_13216', False))}
labels: {', '.join(issue.fields.labels)}
priority: {issue.fields.priority}
reporter: {issue.fields.reporter}"""),
' ' * 2))
def _header(s):
# print(f"\n {s}\n {'+' * len(s)}")
print(f"\n +++ {s} +++")
summary = indent('\n'.join(wrap(issue.fields.summary, replace_whitespace=False)), prefix=' ' * 2)
_header('summary')
print(summary)
description = indent('\n'.join(wrap(issue.fields.description or '', replace_whitespace=False)), prefix=' ' * 2)
_header('description')
print(description)
_header('attachments')
for i, attachment in enumerate(issue.fields.attachment):
print(f" * {attachment.filename} by {attachment.author}")
print(f" {attachment.content}")
print(f" {attachment.mimeType} with {attachment.size} B")
_header('links')
links = _format_links(client, issue)
if links:
print(indent(links, prefix=' ' * 2))
_header('comments')
for i, comment in enumerate(issue.fields.comment.comments):
if i != 0:
print('')
print(f" {comment.created} - {comment.author} - {comment.id}")
body = indent('\n'.join(wrap(comment.body, replace_whitespace=False)), ' ' * 4)
print(body)
print('')
@issue.group()
def comment():
pass
@comment.command('create')
@pass_issue
@pass_client
def comment_create(client, issue):
"""Create a comment on the issue"""
# TODO add some help text for the syntax in the description
# TODO add all issue info to the description, so we can autocomplete in the
# editor
text = prompt(f"Comment text to add as new comment to {issue.key}")
client.add_comment(issue.key, text)
def get_comment(ctx, param, comment_id):
"""click callback returning a Comment for the comment_id value given by the user"""
from jira.exceptions import JIRAError
issue = ctx.obj['issue']
if not comment_id:
from clintermission import cli_select_item
options = []
for comment in issue.fields.comment.comments:
# TODO make this roughly terminal width or add a shorten() method
# with … at the end
body = comment.body.replace('\n', ' ')[:64]
options.append((f"{comment.created} - {comment.author} - {body}", comment.id))
_, comment_id = cli_select_item(options)
client = ctx.obj['client']
try:
return client.comment(issue.key, comment_id, expand=['body'])
except JIRAError as e:
raise click.ClickException(f"Problem retrieving {comment_id} for issue {issue.key}: {e.text}")
@comment.command('edit')
@click.argument('comment', required=False, callback=get_comment)
@pass_issue
@pass_client
def comment_edit(client, issue, comment):
"""Edit the text of an issues comment"""
text = prompt(f"Comment text to set as new body for comment {comment.id} on {issue.key}",
initial_content=comment.body)
if text == comment.body:
print("No changes. Aborting.")
return
comment.update(body=text)
@comment.command('delete')
@click.argument('comment', required=False, callback=get_comment)
@pass_issue
@pass_client
def comment_delete(client, issue, comment):
# TODO print formatted comment for verification
click.confirm("Do you really want to delete this ^ ?", abort=True)
comment.delete()

@ -0,0 +1,25 @@
[metadata]
name = jiracli
version = 0.1
description = JIRA command line interface
[options]
packages = find:
install_requires =
click
clintermission
python-editor
jira
pyaml
[options.entry_points]
console_scripts =
jiracli = jiracli.cli:main
[options.packages.find]
exclude=env
[flake8]
max-line-length = 120
exclude = .git,__pycache__,*.egg-info,*lib/python*
ignore = E241,E741,W503,W504

@ -0,0 +1,4 @@
import setuptools
setuptools.setup()
Loading…
Cancel
Save