[e455b7]: / Allura / allura / tests / model / test_artifact.py  Maximize  Restore  History

Download this file

314 lines (273 with data), 11.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Model tests for artifact
"""
import re
from datetime import datetime
from tg import tmpl_context as c, app_globals as g
from mock import patch, Mock
import pytest
from ming.odm.odmsession import ThreadLocalODMSession
from ming.odm import Mapper
from bson import ObjectId
from webob import Request
import allura
from allura import model as M
from allura.lib import helpers as h
from allura.lib import security
from allura.tests import decorators as td
from allura.websetup.schema import REGISTRY
from alluratest.controller import setup_basic_test, setup_unit_test
from forgewiki import model as WM
class Checkmessage(M.Message):
class __mongometa__:
name = 'checkmessage'
def url(self):
return ''
def __init__(self, **kw):
super().__init__(**kw)
if self.slug is not None and self.full_slug is None:
self.full_slug = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') + ':' + self.slug
Mapper.compile_all()
class TestArtifact:
def setup_method(self):
setup_basic_test()
setup_unit_test()
self.setup_with_tools()
def teardown_class(cls):
ThreadLocalODMSession.close_all()
@td.with_wiki
def setup_with_tools(self):
h.set_context('test', 'wiki', neighborhood='Projects')
Checkmessage.query.remove({})
WM.Page.query.remove({})
WM.PageHistory.query.remove({})
M.Shortlink.query.remove({})
c.user = M.User.query.get(username='test-admin')
Checkmessage.project = c.project
Checkmessage.app_config = c.app.config
def test_artifact(self):
pg = WM.Page(title='TestPage1')
assert pg.project == c.project
assert pg.project_id == c.project._id
assert pg.app.config == c.app.config
assert pg.app_config == c.app.config
u = M.User.query.get(username='test-user')
pr = M.ProjectRole.by_user(u, upsert=True)
ThreadLocalODMSession.flush_all()
REGISTRY.register(allura.credentials, allura.lib.security.Credentials())
assert not security.has_access(pg, 'delete', u)
pg.acl.append(M.ACE.allow(pr._id, 'delete'))
ThreadLocalODMSession.flush_all()
assert security.has_access(pg, 'delete', u)
pg.acl.pop()
ThreadLocalODMSession.flush_all()
assert not security.has_access(pg, 'delete', u)
def test_artifact_index(self):
pg = WM.Page(title='TestPage1')
idx = pg.index()
assert 'title' in idx
assert 'url_s' in idx
assert 'project_id_s' in idx
assert 'mount_point_s' in idx
assert 'type_s' in idx
assert 'id' in idx
assert idx['id'] == pg.index_id()
assert 'text' in idx
assert 'TestPage' in pg.shorthand_id()
assert pg.link_text() == pg.shorthand_id()
def test_artifactlink(self):
pg = WM.Page(title='TestPage2')
q_shortlink = M.Shortlink.query.find(dict(
project_id=c.project._id,
app_config_id=c.app.config._id,
link=pg.shorthand_id()))
assert q_shortlink.count() == 0
ThreadLocalODMSession.flush_all()
M.MonQTask.run_ready()
ThreadLocalODMSession.flush_all()
assert q_shortlink.count() == 1
assert M.Shortlink.lookup('[TestPage2]')
assert M.Shortlink.lookup('[wiki:TestPage2]')
assert M.Shortlink.lookup('[test:wiki:TestPage2]')
assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]')
assert not M.Shortlink.lookup('[Wiki:TestPage2]')
assert not M.Shortlink.lookup('[TestPage2_no_such_page]')
c.project.uninstall_app('wiki')
assert not M.Shortlink.lookup('[wiki:TestPage2]')
assert q_shortlink.count() == 0
def test_gen_messageid(self):
assert re.match(r'[0-9a-zA-Z]*.wiki@test.p.localhost',
h.gen_message_id())
def test_gen_messageid_with_id_set(self):
oid = ObjectId()
assert re.match(r'%s.wiki@test.p.localhost' %
str(oid), h.gen_message_id(oid))
def test_artifact_messageid(self):
p = WM.Page(title='T')
assert re.match(r'%s.wiki@test.p.localhost' %
str(p._id), p.message_id())
@patch('allura.tasks.mail_tasks.sendmail', Mock())
def test_versioning(self):
pg = WM.Page(title='TestPage3')
with patch('allura.model.artifact.request', Request.blank('/', remote_addr='1.1.1.1')):
pg.commit()
ThreadLocalODMSession.flush_all()
pg.text = 'Here is some text'
pg.commit()
ThreadLocalODMSession.flush_all()
ss = pg.get_version(1)
assert ss.author.logged_ip == '1.1.1.1'
assert ss.index()['is_history_b']
assert ss.shorthand_id() == pg.shorthand_id() + '#1'
assert ss.title == pg.title
assert ss.text != pg.text
ss = pg.get_version(-1)
assert ss.index()['is_history_b']
assert ss.shorthand_id() == pg.shorthand_id() + '#2'
assert ss.title == pg.title
assert ss.text == pg.text
with pytest.raises(IndexError):
pg.get_version(42)
pg.revert(1)
pg.commit()
ThreadLocalODMSession.flush_all()
assert ss.text != pg.text
assert pg.history().count() == 3
_id = pg._id
M.MonQTask.run_ready()
assert g.solr.search('id:' + pg.index_id()).hits == 1
ph = WM.PageHistory.query.find({'artifact_id': _id, 'version': 2}).first()
assert ph
ph_index = ph.index_id()
solr_hist = g.solr.search('id:' + ph_index)
if not solr_hist: # sometimes history doesn't get added to solr
g.solr.add([ph.solarize()])
g.solr.commit()
assert g.solr.search('id:' + ph_index).hits == 1
pg.delete()
ThreadLocalODMSession.flush_all()
M.MonQTask.run_ready()
assert WM.PageHistory.query.find({'artifact_id': _id}).count() == 0
# history should be deleted from solr
assert g.solr.search('id:' + ph_index).hits == 0
def test_messages_unknown_lookup(self):
from bson import ObjectId
m = Checkmessage()
m.author_id = ObjectId() # something new
assert isinstance(m.author(), M.User), type(m.author())
assert m.author() == M.User.anonymous()
@patch('allura.model.artifact.datetime')
def test_last_updated(self, _datetime):
c.project.last_updated = datetime(2014, 1, 1)
_datetime.utcnow.return_value = datetime(2014, 1, 2)
WM.Page(title='TestPage1')
ThreadLocalODMSession.flush_all()
assert c.project.last_updated == datetime(2014, 1, 2)
@patch('allura.model.artifact.datetime')
def test_last_updated_disabled(self, _datetime):
c.project.last_updated = datetime(2014, 1, 1)
_datetime.utcnow.return_value = datetime(2014, 1, 2)
try:
M.artifact_orm_session._get().skip_last_updated = True
WM.Page(title='TestPage1')
ThreadLocalODMSession.flush_all()
assert c.project.last_updated == datetime(2014, 1, 1)
finally:
M.artifact_orm_session._get().skip_last_updated = False
def test_get_discussion_thread_dupe(self):
artif = WM.Page(title='TestSomeArtifact')
thr1 = artif.get_discussion_thread()[0]
thr1.post('thr1-post1')
thr1.post('thr1-post2')
thr2 = M.Thread.new(ref_id=thr1.ref_id)
thr2.post('thr2-post1')
thr2.post('thr2-post2')
thr2.post('thr2-post3')
thr3 = M.Thread.new(ref_id=thr1.ref_id)
thr3.post('thr3-post1')
thr4 = M.Thread.new(ref_id=thr1.ref_id)
thread_q = M.Thread.query.find(dict(ref_id=artif.index_id()))
assert thread_q.count() == 4
thread = artif.get_discussion_thread()[0] # force cleanup
threads = thread_q.all()
assert len(threads) == 1
assert len(thread.posts) == 6
# normal thread deletion propagates to children, make sure that doesn't happen
assert not any(p.deleted for p in thread.posts)
def test_snapshot_clear_user_data(self):
s = M.Snapshot(author={'username': 'johndoe',
'display_name': 'John Doe',
'logged_ip': '1.2.3.4'})
s.clear_user_data()
assert s.author == {'username': '',
'display_name': '',
'logged_ip': None,
'id': None}
def test_snapshot_from_username(self):
s = M.Snapshot(author={'username': 'johndoe',
'display_name': 'John Doe',
'logged_ip': '1.2.3.4'})
s = M.Snapshot(author={'username': 'johnsmith',
'display_name': 'John Doe',
'logged_ip': '1.2.3.4'})
ThreadLocalODMSession.flush_all()
assert len(M.Snapshot.from_username('johndoe')) == 1
def test_feed_clear_user_data(self):
f = M.Feed(author_name='John Doe',
author_link='/u/johndoe/',
title='Something')
f.clear_user_data()
assert f.author_name == ''
assert f.author_link == ''
assert f.title == 'Something'
f = M.Feed(author_name='John Doe',
author_link='/u/johndoe/',
title='Home Page modified by John Doe')
f.clear_user_data()
assert f.author_name == ''
assert f.author_link == ''
assert f.title == 'Home Page modified by <REDACTED>'
def test_feed_from_username(self):
M.Feed(author_name='John Doe',
author_link='/u/johndoe/',
title='Something')
M.Feed(author_name='John Smith',
author_link='/u/johnsmith/',
title='Something')
ThreadLocalODMSession.flush_all()
assert len(M.Feed.from_username('johndoe')) == 1
def test_subscribed(self):
pg = WM.Page(title='TestPage4a')
assert pg.subscribed(include_parents=True) # tool is subscribed to admins by default
assert not pg.subscribed(include_parents=False)
def test_subscribed_no_tool_sub(self):
pg = WM.Page(title='TestPage4b')
M.Mailbox.unsubscribe(user_id=c.user._id,
project_id=c.project._id,
app_config_id=c.app.config._id)
pg.subscribe()
assert pg.subscribed(include_parents=True)
assert pg.subscribed(include_parents=False)
def test_not_subscribed(self):
pg = WM.Page(title='TestPage4c')
M.Mailbox.unsubscribe(user_id=c.user._id,
project_id=c.project._id,
app_config_id=c.app.config._id)
assert not pg.subscribed(include_parents=True)
assert not pg.subscribed(include_parents=False)