Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
H
heroku-buildpack-python
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
1
Issues
1
List
Board
Labels
Milestones
JIRA
JIRA
Merge Requests
0
Merge Requests
0
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Open sidebar
Python-Dev
heroku-buildpack-python
Commits
2f42fdb2
Commit
2f42fdb2
authored
Mar 23, 2012
by
Kenneth Reitz
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
basic tests
parent
b16261c1
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
253 additions
and
0 deletions
+253
-0
envoy.py
test/distutils/envoy.py
+210
-0
setup.py
test/distutils/setup.py
+43
-0
setup.py
test/no-requirements/setup.py
+0
-0
Gemfile
test/not-python/Gemfile
+0
-0
No files found.
test/distutils/envoy.py
0 → 100644
View file @
2f42fdb2
# -*- coding: utf-8 -*-
"""
envoy.core
~~~~~~~~~~
This module provides envoy awesomeness.
Copyright 2012, Kenneth Reitz.
MIT Licensed.
"""
import
os
import
shlex
import
subprocess
import
threading
__version__
=
'0.0.2'
__license__
=
'MIT'
__author__
=
'Kenneth Reitz'
class
Command
(
object
):
def
__init__
(
self
,
cmd
):
self
.
cmd
=
cmd
self
.
process
=
None
self
.
out
=
None
self
.
err
=
None
self
.
returncode
=
None
self
.
data
=
None
def
run
(
self
,
data
,
timeout
,
env
):
self
.
data
=
data
environ
=
dict
(
os
.
environ
)
.
update
(
env
or
{})
def
target
():
self
.
process
=
subprocess
.
Popen
(
self
.
cmd
,
universal_newlines
=
True
,
shell
=
False
,
env
=
environ
,
stdin
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
bufsize
=
0
,
)
self
.
out
,
self
.
err
=
self
.
process
.
communicate
(
self
.
data
)
thread
=
threading
.
Thread
(
target
=
target
)
thread
.
start
()
thread
.
join
(
timeout
)
if
thread
.
is_alive
():
self
.
process
.
terminate
()
thread
.
join
()
self
.
returncode
=
self
.
process
.
returncode
return
self
.
out
,
self
.
err
class
ConnectedCommand
(
object
):
def
__init__
(
self
,
process
=
None
,
std_in
=
None
,
std_out
=
None
,
std_err
=
None
):
self
.
_process
=
process
self
.
std_in
=
std_in
self
.
std_out
=
std_out
self
.
std_err
=
std_out
def
__enter__
(
self
):
return
self
def
__exit__
(
self
,
type
,
value
,
traceback
):
self
.
kill
()
@
property
def
status_code
(
self
):
"""The status code of the process.
If the code is None, assume that it's still running.
"""
if
self
.
_status_code
is
not
None
:
return
self
.
_status_code
# investigate
return
None
@
property
def
pid
(
self
):
"""The process' PID."""
return
self
.
_process
.
pid
def
kill
(
self
):
"""Kills the process."""
return
self
.
_process
.
kill
()
def
expect
(
self
,
bytes
,
stream
=
None
):
"""Block until given bytes appear in the stream."""
if
stream
is
None
:
stream
=
self
.
std_out
pass
def
send
(
self
,
end
=
'
\n
'
):
"""Sends a line to std_in."""
#TODO: Y U LINE BUFFER
pass
def
block
(
self
):
"""Blocks until command finishes. Returns Response instance."""
self
.
_status_code
=
self
.
_process
.
wait
()
class
Response
(
object
):
"""A command's response"""
def
__init__
(
self
,
process
=
None
):
super
(
Response
,
self
)
.
__init__
()
self
.
_process
=
process
self
.
command
=
None
self
.
std_err
=
None
self
.
std_out
=
None
self
.
status_code
=
None
self
.
history
=
[]
def
__repr__
(
self
):
if
len
(
self
.
command
):
return
'<Response [{0}]>'
.
format
(
self
.
command
[
0
])
else
:
return
'<Response>'
def
expand_args
(
command
):
"""Parses command strings and returns a Popen-ready list."""
# Prepare arguments.
if
isinstance
(
command
,
basestring
):
splitter
=
shlex
.
shlex
(
command
,
posix
=
True
)
splitter
.
whitespace
=
'|'
splitter
.
whitespace_split
=
True
command
=
[]
while
True
:
token
=
splitter
.
get_token
()
if
token
:
command
.
append
(
token
)
else
:
break
command
=
map
(
shlex
.
split
,
command
)
return
command
def
run
(
command
,
data
=
None
,
timeout
=
None
,
env
=
None
):
"""Executes a given commmand and returns Response.
Blocks until process is complete, or timeout is reached.
"""
command
=
expand_args
(
command
)
history
=
[]
for
c
in
command
:
if
len
(
history
):
# due to broken pipe problems pass only first 10MB
data
=
history
[
-
1
]
.
std_out
[
0
:
10
*
1024
]
cmd
=
Command
(
c
)
out
,
err
=
cmd
.
run
(
data
,
timeout
,
env
)
r
=
Response
(
process
=
cmd
)
r
.
command
=
c
r
.
std_out
=
out
r
.
std_err
=
err
r
.
status_code
=
cmd
.
returncode
history
.
append
(
r
)
r
=
history
.
pop
()
r
.
history
=
history
return
r
def
connect
(
command
,
data
=
None
,
env
=
None
):
"""Spawns a new process from the given command."""
# TODO: support piped commands
command_str
=
expand_args
(
command
)
.
pop
()
environ
=
dict
(
os
.
environ
)
.
update
(
env
or
{})
process
=
subprocess
.
Popen
(
command_str
,
universal_newlines
=
True
,
shell
=
False
,
env
=
environ
,
stdin
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
bufsize
=
0
,
)
return
ConnectedCommand
(
process
=
process
)
test/distutils/setup.py
0 → 100644
View file @
2f42fdb2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
os
import
sys
import
envoy
try
:
from
setuptools
import
setup
except
ImportError
:
from
distutils.core
import
setup
if
sys
.
argv
[
-
1
]
==
"publish"
:
os
.
system
(
"python setup.py sdist upload"
)
sys
.
exit
()
required
=
[]
setup
(
name
=
'envoy'
,
version
=
envoy
.
__version__
,
description
=
'Simple API for running external processes.'
,
author
=
'Kenneth Reitz'
,
author_email
=
'me@kennethreitz.com'
,
url
=
'https://github.com/kennethreitz/envoy'
,
py_modules
=
[
'envoy'
],
install_requires
=
required
,
license
=
'MIT'
,
classifiers
=
(
'Development Status :: 5 - Production/Stable'
,
'Intended Audience :: Developers'
,
'Natural Language :: English'
,
'License :: OSI Approved :: MIT License'
,
'Programming Language :: Python'
,
'Programming Language :: Python :: 2.5'
,
'Programming Language :: Python :: 2.6'
,
'Programming Language :: Python :: 2.7'
,
# 'Programming Language :: Python :: 3.0',
# 'Programming Language :: Python :: 3.1',
),
)
test/no-requirements/setup.py
0 → 100644
View file @
2f42fdb2
test/not-python/Gemfile
0 → 100644
View file @
2f42fdb2
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment